# Logistic Regression from Scratch

## Logistic Regression
Reference [link](https://github.com/UCB-w261/main/blob/main/HelpfulResources/logistic_regression/Logistic%20regression.ipynb)

Logistic regression is an algorithm that could be used when the dependant variable \\(y\\) has a dichotomous outcome of a negative class or a postive class, \\(y \in\\) {0,1}

Some examples of the classification problem where logistic regression can be used.
* Email classification : Spam or not spam
* Credit card transactions: Fraud or not fraud
* Airline flights: Delayed or not delayed

### Notations

Below are the notations that are used in the explanation of logistic regression<br/>
\\(n\\) → number of features<br/>
\\(m\\) → number of training examples<br/>
\\(X\\) → input data matrix of shape (m x n)<br/>
\\(y\\) → true/ target value (can be 0 or 1 only)<br/>
\\(x(i), y(i)\\) → ith training example<br/>
\\(w\\) → weights (parameters) of shape (n x 1)<br/>
\\(b\\) → bias (parameter), a real number that can be broadcasted<br/>
\\({y}^{-}\\) → hypothesis \\({h}_{\theta}(X)\\) (outputs values between 0 and 1)<br/>

In logistic regression \\(0 \le {h}_{\theta}(X) \le 1 \\)

### Hypothesis

The hypothesis is \\({h}_{\theta}(X)\\) is the estimated probability that \\(y = 1\\) given the set of features \\(X\\) and parameterized by \\(\theta\\).<br/> 
\\(P(y=1 | X;\theta)\\)

In logistic regression, the hypothesis \\({h}_{\theta}(X) \\) is a Sigmoid function \\(g\\) where \\(g(z) = \frac{1}{1+{e}^{-z}}\\) and \\(z\\) = \\({\theta}^{T}X\\).

Hence, the hypothesis can be re-written as \\({h}_{\theta}(X) = \frac{1}{1+{e}^{-{\theta}^{T}X}}\\)

The probability that \\(y = 0\\) can be calculated by \\(1 - P(y=1 | X;\theta)\\), since the probability of both classes will add up to \\(1\\).

###Cost Function
For gradient descent to work, the cost function for logistic regression has to be a convex function. <br/>
Cost function is defined as

\\(J(\theta) = \frac{1}{m} {\Sigma}_{i=1}^{m} Cost(h_\theta(X(i)),y(i))\\)

where \\(y =\\) 0 or 1 and \\(Cost(h_\theta (X),y) = -y log(h_\theta (X)) - (1-y) log(1 - h_\theta(X))\\)

rewriting the cost function

\\(J(\theta) = - \frac{1}{m} [ {\Sigma}_{i=1}^{m} y(i) log(h_\theta (X(i))) + (1-y(i)) log(1 - h_\theta(X(i))) ]\\)

Here, \\(h_\theta(X)\\) is the Sigmoid function \\(\frac{1}{1+{e}^{-{\theta}^{T}X}}\\)

###Gradient Descent

We can use the gradient descent algorithm to fit the parameters \\(\theta\\) that minimizes the cost function \\(J(\theta)\\).

Gradient update step uses \\(\theta_j := \theta_j - \alpha \frac{d}{d\theta_j} J(\theta)\\) where \\(\alpha\\) is the learning rate and all \\(\theta_j\\) are updated simultaneously.

\\(\frac{d}{d\theta_j} J(\theta) = \frac{1}{m} {\Sigma}_{i=1}^{m} (h_\theta(X(i))- y(i)) X_j(i)\\)

The gradient descent step is re-written as

\\(\theta_j := \theta_j - \alpha \frac{1}{m} {\Sigma}_{i=1}^{m} (h_\theta(X(i))- y(i)) X_j(i)\\), all \\(\theta_j\\) are updated simultaneously.

Here, \\(h_\theta(X)\\) is the Sigmoid function \\(\frac{1}{1+{e}^{-{\theta}^{T}X}}\\)

###Make Predictions

To fit the parameters \\(\theta\\) we need to find the \\(\theta\\) such that it minimizes the cost function \\(J(\theta)\\). Using this set of \\(\theta\\) we can make predictions.

For predictions on a new feature set \\(X\\), calculate \\({h}_{\theta}(X) = \frac{1}{1+{e}^{-{\theta}^{T}X}}\\), which will give the probalility that \\(y=1\\).

We can use a threshold value for the probability to predict 0, if less than threshold, and 1, if greater than or equal to threshold probability.

### Data pre-processing

In [None]:
# Load Libraries
from pyspark.sql.functions import col
from pyspark.sql import types
from pyspark.sql.functions import *
from pyspark.sql.types import LongType, StringType, IntegerType, DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql.types import *

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer, MinMaxScaler, PCA
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import mlflow
import mlflow.sklearn

import re
import os
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as plt
import warnings

##### Set up blob storage

In [None]:
# Parameters for SAS Token. Required for establishing connection to blob storage.

blob_container = "cont" # The name of your container created in https://portal.azure.com
storage_account = "acc" # The name of your Storage account created in https://portal.azure.com
secret_scope = "scope" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/path"

In [None]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

####Load data from blob storage

In [None]:
# SMALL DATASET (6M)
joined_data_6M = spark.read.parquet(f"{blob_url}/joined_data")
# LARGE DATASET (2015-2018)
joined_data_traintest = spark.read.parquet(f'{blob_url}/joined_data_traintest')
# LARGE DATASET (2019)
joined_data_holdout = spark.read.parquet(f'{blob_url}/joined_data_holdout')

#### Features
- Columns to keep

In [None]:
keep = ['ELEVATION_ORG', 'ELEVATION_DEST', 'FL_DATE', 'YEAR', 'QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'ORIGIN_STATE_FIPS', 'DEST', 'DEST_STATE_FIPS', 'DEP_DEL15', 'DEP_TIME_BLK', 'ARR_TIME_BLK', 'DISTANCE_GROUP', 'BAD_TRAVEL_DAY', 'SLP_ORG', 'VIS_ORG', 'TMP_ORG', 'DEW_ORG', 'CIG_ORG', 'WND_ORG', 'GUST_ORG', 'TCB_ORG', 'OBSC_ORG', 'PREC_ORG', 'SNW_ORG', 'FZFG_ORG', 'FZDZ_ORG', 'FZRA_ORG', 'TSRA_ORG', 'HSN_ORG', 'LSN_ORG', 'BSN_ORG', 'VV_ORG', 'TS_ORG', 'HAIL_ORG', 'FOG_ORG', 'VA_ORG', 'BR_ORG', 'SQ_ORG', 'FC_ORG',  'SLP_DEST', 'VIS_DEST', 'TMP_DEST', 'DEW_DEST', 'CIG_DEST', 'WND_DEST', 'GUST_DEST', 'TCB_DEST', 'OBSC_DEST', 'PREC_DEST', 'SNW_DEST', 'FZFG_DEST', 'FZDZ_DEST', 'FZRA_DEST', 'TSRA_DEST', 'HSN_DEST', 'LSN_DEST', 'BSN_DEST', 'VV_DEST', 'TS_DEST', 'HAIL_DEST', 'FOG_DEST', 'VA_DEST', 'BR_DEST', 'SQ_DEST', 'FC_DEST', 'DEP_TIME_SEGMENT', 'p_rank']

- Columns to drop

In [None]:
drop = ['DISTANCE', 'CRS_ARR_TIME', 'CRS_DEP_TIME', 'OP_CARRIER_AIRLINE_ID', 'OP_CARRIER_FL_NUM', 'DEP_TIMESTAMP', 'DEP_TIMESTAMP_UTC', 'DEP_TIMESTAMP_UTC_MINUS2', 'WX_STATION_ORIG', 'WX_STATION_DEST', 'CALL_SIGN_ORG', 'WX_TIMESTAMP_UTC_START_ORG', 'WX_TIMESTAMP_UTC_END_ORG', 'CALL_SIGN_DEST', 'WX_TIMESTAMP_UTC_START_DEST', 'WX_TIMESTAMP_UTC_END_DEST', 'DEST_AIRPORT_ID', 'DEST_CITY_MARKET_ID', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'join_year', 'year_start']

In [None]:
#drop = ['DISTANCE', 'CRS_ARR_TIME', 'CRS_DEP_TIME', 'OP_CARRIER_AIRLINE_ID', 'OP_CARRIER_FL_NUM', 'DEP_TIMESTAMP', 'DEP_TIMESTAMP_UTC', 'DEP_TIMESTAMP_UTC_MINUS2', 'WX_STATION_ORIG', 'WX_STATION_DEST', 'CALL_SIGN_ORG', 'WX_TIMESTAMP_UTC_START_ORG', 'WX_TIMESTAMP_UTC_END_ORG', 'CALL_SIGN_DEST', 'WX_TIMESTAMP_UTC_START_DEST', 'WX_TIMESTAMP_UTC_END_DEST', 'DEST_AIRPORT_ID', 'DEST_CITY_MARKET_ID', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'join_year', 'year_start','FL_DATE', 'YEAR', 'QUARTER', 'DEST_STATE_FIPS', 'DISTANCE_GROUP', 'BAD_TRAVEL_DAY','WND_ORG', 'GUST_ORG', 'TCB_ORG', 'OBSC_ORG', 'SNW_ORG', 'FZFG_ORG', 'FZDZ_ORG', 'FZRA_ORG', 'TSRA_ORG', 'HSN_ORG', 'LSN_ORG', 'BSN_ORG', 'VV_ORG', 'TS_ORG', 'HAIL_ORG', 'FOG_ORG', 'VA_ORG', 'BR_ORG', 'SQ_ORG', 'FC_ORG',  'SLP_DEST', 'VIS_DEST', 'TMP_DEST', 'DEW_DEST','WND_DEST', 'GUST_DEST', 'TCB_DEST', 'OBSC_DEST','SNW_DEST', 'FZFG_DEST', 'FZDZ_DEST', 'FZRA_DEST', 'TSRA_DEST', 'HSN_DEST', 'LSN_DEST', 'BSN_DEST', 'VV_DEST', 'TS_DEST', 'HAIL_DEST', 'FOG_DEST', 'VA_DEST', 'BR_DEST', 'SQ_DEST', 'FC_DEST', 'DEP_TIME_SEGMENT']

- Columns that need one-hot-encoding (and string indexing)

In [None]:
need_ohe = ['YEAR','QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'ORIGIN_STATE_FIPS', 'DEST', 'DEST_STATE_FIPS', 'DEP_TIME_BLK', 'ARR_TIME_BLK', 'DISTANCE_GROUP', 'BAD_TRAVEL_DAY', 'DEP_TIME_SEGMENT']

In [None]:
#need_ohe = ['MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'ORIGIN_STATE_FIPS', 'DEST', 'DEP_TIME_BLK', 'ARR_TIME_BLK']

- Weather continuous features that require normalization or standardization

In [None]:
features_numerical = ['ELEVATION_ORG', 'ELEVATION_DEST', 'SNW_DEST', 'SNW_ORG', 'CIG_ORG', 'WND_ORG', 'GUST_ORG',  'VIS_DEST', 'CIG_DEST', 'WND_DEST', 'GUST_DEST', 'p_rank']

In [None]:
#features_numerical = ['ELEVATION_ORG', 'ELEVATION_DEST', 'CIG_ORG', 'CIG_DEST', 'p_rank']

In [None]:
# Null columns
null_cols = ['SLP_ORG', 'VIS_ORG', 'TMP_ORG', 'PREC_ORG', 'DEW_ORG', 'SLP_DEST', 'TMP_DEST', 'DEW_DEST', 'PREC_DEST']

In [None]:
#null_cols = ['TMP_ORG', 'PREC_ORG', 'DEW_ORG', 'PREC_DEST']

- Weather features that don't require one-hot-encoding (already int 1 or 0)

In [None]:
wx_categorical = ['TCB_ORG', 'OBSC_ORG', 'FZFG_ORG', 'FZDZ_ORG', 'FZRA_ORG', 'TSRA_ORG', 'HSN_ORG', 'LSN_ORG', 'BSN_ORG', 'VV_ORG', 'TS_ORG', 'HAIL_ORG', 'FOG_ORG', 'VA_ORG', 'BR_ORG', 'SQ_ORG', 'FC_ORG', 'TCB_DEST', 'OBSC_DEST', 'FZFG_DEST', 'FZDZ_DEST', 'FZRA_DEST', 'TSRA_DEST', 'HSN_DEST', 'LSN_DEST', 'BSN_DEST', 'VV_DEST', 'TS_DEST', 'HAIL_DEST', 'FOG_DEST', 'VA_DEST', 'BR_DEST', 'SQ_DEST', 'FC_DEST']

In [None]:
#wx_categorical = []

### Pipeline Functions

#### Sampling
- Found this function to undersample the majority class
- This step is optional for now
- Source: https://stackoverflow.com/questions/53978683/how-to-undersampling-the-majority-class-using-pyspark

- Example: Passing in a ratio of 2.0 returns a majority class twice as large as the minority class.

In [None]:
def resample(base_features,ratio,class_field,base_class):
    pos = base_features.filter(col(class_field)==base_class)
    neg = base_features.filter(col(class_field)!=base_class)
    total_pos = pos.count()
    total_neg = neg.count()
    fraction = float(total_pos*ratio)/float(total_neg)
    sampled = neg.sample(False, fraction)
    return sampled.union(pos)

#### Train/Test Split for Small Dataset

In [None]:
def get_split_small(df):
    # Splitting the data into training and testing, with 80% of the dates in the training set and the remaining 20% in the testing set
    df = df.drop(*drop)
    joined_data_sorted = df.withColumn("rank_p", percent_rank().over(Window.partitionBy().orderBy("FL_DATE")))
    train_df = joined_data_sorted.where("rank_p <= .8").drop("rank_p", "FL_DATE")
    test_df = joined_data_sorted.where("rank_p > .8").drop("rank_p", "FL_DATE")
    return train_df, test_df

#### Train/Test Split for Large Dataset

In [None]:
def get_split_large(df):
    df = df.drop(*drop)
    train_df = df.where(col('YEAR') != '2018').drop("FL_DATE")
    test_df = df.where(col('YEAR') == '2018').drop("FL_DATE")
    return train_df, test_df

#### Balance Ratio for Logistic Regression

- Creates a new columns called `class_weights` with class percentages that gets passed into the logistic regression as a parameter (`weightCol`)

In [None]:
def balance_ratio(train_df):
    balance_ratio = train_df.select('DEP_DEL15').filter(col('DEP_DEL15') == 0).count() / train_df.count()
    train_df = train_df.withColumn('class_weights', when(train_df.DEP_DEL15 == 1, balance_ratio).otherwise(1 - balance_ratio))
    return train_df

#### Model Evaluation

In [None]:
# Evaluate the model on validation set
def model_evaluation(results):
    # Input: prediction results
    
    predictionAndLabels = results.select(['prediction', 'label']\
                                      ).withColumn('label',col('label').cast(DoubleType())).rdd

    metrics = MulticlassMetrics(predictionAndLabels)
    conf_matrix = metrics.confusionMatrix().toArray()
    
    accuracy = (conf_matrix[0][0] + conf_matrix[1][1]) / conf_matrix.sum()
    precision = (conf_matrix[1][1]) / (conf_matrix[0][1] + conf_matrix[1][1])
    recall = (conf_matrix[1][1]) / (conf_matrix[1][0] + conf_matrix[1][1])
    f1 = (2 * precision * recall)/(precision + recall)
    f05 = 1.25 * ((precision * recall) / ((0.25 * precision) + recall))
    f2 = 5 * ((precision * recall) / ((4 * precision) + recall))
        
    print(f'Accuracy = {accuracy:.4f}')
    print(f'Precision = {precision:.4f}')
    print(f'Recall = {recall:.4f}')
    print(f'F1-Score = {f1:.4f}')
    print(f'F05-Score = {f05:.4f}')
    print(f'F2-Score = {f2:.4f}')
    print(f'TP = {conf_matrix[1][1]:.0f}')
    print(f'FN = {conf_matrix[1][0]:.0f}')
    print(f'TN = {conf_matrix[0][0]:.0f}')
    print(f'FP = {conf_matrix[0][1]:.0f}')
    print(f"Total Records: {conf_matrix[0][0] + conf_matrix[1][1] + conf_matrix[0][1] + conf_matrix[1][0]:.0f}")

#### Pipleline Function

In [None]:
# This pipeline is only used for data preprocessing and the passed model is ignored
def get_pipeline(model, ohe_cols, num_cols, wx_cat, null_col, minmax_std='minmax', pca_k=None):
    
    stages = []
    
    # one-hot-encoding
    stringIndexer = StringIndexer(inputCols=ohe_cols, outputCols=[x + "_Index" for x in ohe_cols], handleInvalid = 'keep')    
    encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "_OHE" for x in need_ohe]) 
    stages += [stringIndexer, encoder]  
    
    # label indexing
    labelToIndex = StringIndexer(inputCol="DEP_DEL15", outputCol="label")
    stages += [labelToIndex]
    
    # Imputation with mean
    imputer = Imputer(strategy='mean', inputCols=null_col, outputCols=[x + "_imp" for x in null_col])
    stages += [imputer]
    
    # Normalization/Standardization   
    if minmax_std == 'minmax':
        scaler_cols = num_cols + [c + "_imp" for c in null_col]
        assembler_1 = VectorAssembler(inputCols=scaler_cols, outputCol='features_a', handleInvalid = 'keep')
        stages += [assembler_1]
        scaler = MinMaxScaler(inputCol='features_a', outputCol='features_scaled', min=0, max=1) 
        stages += [scaler]
    elif minmax_std == 'std':
        scaler_cols = num_cols + [c + "_imp" for c in null_col]
        assembler_1 = VectorAssembler(inputCols=scaler_cols, outputCol='features_a', handleInvalid = 'keep')
        stages += [assembler_1]
        scaler = StandardScaler(inputCol="features_a", outputCol="features_scaled", withStd=True, withMean=False)
        stages += [scaler]
    elif minmax_std == None:
        pass
    else:
        print('Invalid entry in minmax_std parameter')

    # collecting inputs and assemble to one final vector for model
    if minmax_std == None:
        assembler_input = [c + "_Index" for c in ohe_cols] + num_cols + [c + "_imp" for c in null_col] + wx_cat
        assembler_2 = VectorAssembler(inputCols=assembler_input, outputCol="features", handleInvalid = 'keep')
        stages += [assembler_2]
    elif minmax_std == 'minmax' or minmax_std == 'std':
        assembler_input = [c + "_OHE" for c in ohe_cols] + ['features_scaled'] + wx_cat
        assembler_2 = VectorAssembler(inputCols=assembler_input, outputCol="features", handleInvalid = 'keep')
        stages += [assembler_2]
    
    # add pca component
    if pca_k == None:
        pass
    else:
        pca = PCA(k = pca_k, inputCol="features", outputCol="features_pca")
        stages += [pca]
    
    #Not using the passed model as we are going to use our own algorithm from scratch
    #stages += [model]
    
    return Pipeline(stages=stages)

In [None]:
# This model is not used
model = LogisticRegression(featuresCol="features", labelCol="label", regParam=0.0, maxIter=50)


#### Set up the pipeline

**minmax_std parameter:**
- False = returns pipeline without scaling of continuous features
- 'minmax' = scales continuous features with `MinMaxScaler` (min=0, max=1)
- 'std' = scales continuous features with `StandardScaler` (withStd=True, withMean=False)

In [None]:
#need_ohe = ['YEAR', 'QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'ORIGIN_STATE_FIPS', 'DEST', 'DEST_STATE_FIPS', 'DEP_TIME_BLK', 'ARR_TIME_BLK', 'DISTANCE_GROUP', 'BAD_TRAVEL_DAY', 'DEP_TIME_SEGMENT']
# need_ohe = ['QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'ORIGIN_STATE_FIPS', 'DEST', 'DEST_STATE_FIPS', 'DEP_TIME_BLK', 'ARR_TIME_BLK', 'DISTANCE_GROUP']

#features_numerical = ['ELEVATION_ORG', 'ELEVATION_DEST', 'SNW_DEST', 'SNW_ORG', 'CIG_ORG', 'WND_ORG', 'GUST_ORG',  'VIS_DEST', 'CIG_DEST', 'WND_DEST', 'GUST_DEST', 'p_rank']
# features_numerical = ['ELEVATION_ORG', 'ELEVATION_DEST', 'SNW_DEST', 'SNW_ORG', 'CIG_ORG', 'WND_ORG', 'GUST_ORG',  'VIS_DEST', 'CIG_DEST', 'WND_DEST', 'GUST_DEST']

In [None]:
pipeline = get_pipeline(model, need_ohe, features_numerical, wx_categorical, null_cols, minmax_std='minmax', pca_k=None)

In [None]:
pipeline.getStages()

#### Get Train and Test Set

In [None]:
# train/test split on small dataset
train_df, test_df = get_split_small(joined_data_6M)

# train/test split on large (2015-2018) dataset
# train_df, test_df = get_split_large(joined_data_traintest)

# # get holdout (2019) dataset
# holdout = get_holdout(joined_data_holdout)

- Downsample majority class in training set. The second parameter is the class ratio (e.g. 2 would result in a 2:1 majority class ration)

In [None]:
 train_df = resample(train_df, 1.6, 'DEP_DEL15', 1)

#### Fit the dataset in the pipeline to generate the required features

In [None]:
pipelineModel = pipeline.fit(train_df)

In [None]:
train_df = pipelineModel.transform(train_df)

In [None]:
display(train_df.where(train_df['DAY_OF_MONTH']=='2').limit(2))

YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,ORIGIN,ORIGIN_STATE_FIPS,DEST,DEST_STATE_FIPS,DEP_DEL15,DEP_TIME_BLK,ARR_TIME_BLK,DISTANCE_GROUP,BAD_TRAVEL_DAY,DEP_TIME_SEGMENT,p_rank,ELEVATION_ORG,SLP_ORG,VIS_ORG,TMP_ORG,DEW_ORG,CIG_ORG,WND_ORG,GUST_ORG,TCB_ORG,OBSC_ORG,PREC_ORG,SNW_ORG,FZFG_ORG,FZDZ_ORG,FZRA_ORG,TSRA_ORG,HSN_ORG,LSN_ORG,BSN_ORG,VV_ORG,TS_ORG,HAIL_ORG,FOG_ORG,VA_ORG,BR_ORG,SQ_ORG,FC_ORG,ELEVATION_DEST,SLP_DEST,VIS_DEST,TMP_DEST,DEW_DEST,CIG_DEST,WND_DEST,GUST_DEST,TCB_DEST,OBSC_DEST,PREC_DEST,SNW_DEST,FZFG_DEST,FZDZ_DEST,FZRA_DEST,TSRA_DEST,HSN_DEST,LSN_DEST,BSN_DEST,VV_DEST,TS_DEST,HAIL_DEST,FOG_DEST,VA_DEST,BR_DEST,SQ_DEST,FC_DEST,YEAR_Index,QUARTER_Index,MONTH_Index,DAY_OF_MONTH_Index,DAY_OF_WEEK_Index,OP_UNIQUE_CARRIER_Index,ORIGIN_Index,ORIGIN_STATE_FIPS_Index,DEST_Index,DEST_STATE_FIPS_Index,DEP_TIME_BLK_Index,ARR_TIME_BLK_Index,DISTANCE_GROUP_Index,BAD_TRAVEL_DAY_Index,DEP_TIME_SEGMENT_Index,YEAR_OHE,QUARTER_OHE,MONTH_OHE,DAY_OF_MONTH_OHE,DAY_OF_WEEK_OHE,OP_UNIQUE_CARRIER_OHE,ORIGIN_OHE,ORIGIN_STATE_FIPS_OHE,DEST_OHE,DEST_STATE_FIPS_OHE,DEP_TIME_BLK_OHE,ARR_TIME_BLK_OHE,DISTANCE_GROUP_OHE,BAD_TRAVEL_DAY_OHE,DEP_TIME_SEGMENT_OHE,label,SLP_ORG_imp,VIS_ORG_imp,TMP_ORG_imp,PREC_ORG_imp,DEW_ORG_imp,SLP_DEST_imp,TMP_DEST_imp,DEW_DEST_imp,PREC_DEST_imp,features_a,features_scaled,features
2015,1,1,2,5,DL,ORD,17,ATL,13,0.0,0600-0659,0900-0959,3,0,2,0.0355829397291677,201.8,1023.0,10.0,-7.2,-9.4,72178,6.0,0.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,307.8,1024.0,10.0,6.7,5.0,7998,0.0,0.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0,0.0,2.0,16.0,1.0,0.0,1.0,1.0,28.0,12.0,16.0,14.0,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 5, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 31, indices -> List(16), values -> List(1.0))","Map(vectorType -> sparse, length -> 7, indices -> List(1), values -> List(1.0))","Map(vectorType -> sparse, length -> 13, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(1), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(1), values -> List(1.0))","Map(vectorType -> sparse, length -> 194, indices -> List(28), values -> List(1.0))","Map(vectorType -> sparse, length -> 51, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 19, indices -> List(16), values -> List(1.0))","Map(vectorType -> sparse, length -> 19, indices -> List(14), values -> List(1.0))","Map(vectorType -> sparse, length -> 10, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 4, indices -> List(1), values -> List(1.0))",0.0,1023.0,10.0,-7.2,0.0,-9.4,1024.0,6.7,5.0,0.0,"Map(vectorType -> dense, length -> 21, values -> List(201.8, 307.8, 0.0, 0.0, 72178.0, 6.0, 0.0, 10.0, 7998.0, 0.0, 0.0, 0.03558293972916778, 1023.0, 10.0, -7.2, 0.0, -9.4, 1024.0, 6.7, 5.0, 0.0))","Map(vectorType -> sparse, length -> 21, indices -> List(1, 4, 5, 7, 8, 12, 13, 14, 16, 17, 18, 19), values -> List(0.13069534172050326, 1.0, 0.2006688963210702, 0.125, 0.11080938790213084, 0.6296296296296295, 1.0, 0.2782931354359926, 0.37627811860940685, 0.5694444444444444, 0.5461309523809524, 0.6377295492487479))","Map(vectorType -> sparse, length -> 416, indices -> List(0, 1, 5, 24, 40, 46, 60, 62, 91, 269, 324, 341, 346, 356, 358, 362, 365, 366, 368, 369, 373, 374, 375, 377, 378, 379, 380), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.13069534172050326, 1.0, 0.2006688963210702, 0.125, 0.11080938790213084, 0.6296296296296295, 1.0, 0.2782931354359926, 0.37627811860940685, 0.5694444444444444, 0.5461309523809524, 0.6377295492487479))"
2015,1,1,2,5,DL,ATL,13,MSP,27,0.0,0800-0859,1000-1059,4,0,2,0.0376777266400135,307.8,1025.0,9.0,6.7,5.0,6499,0.0,0.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,265.8,1024.0,10.0,-13.3,-18.3,72178,4.1,0.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0,0.0,2.0,16.0,1.0,0.0,0.0,0.0,5.0,18.0,7.0,5.0,3.0,0.0,1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 5, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 31, indices -> List(16), values -> List(1.0))","Map(vectorType -> sparse, length -> 7, indices -> List(1), values -> List(1.0))","Map(vectorType -> sparse, length -> 13, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 194, indices -> List(5), values -> List(1.0))","Map(vectorType -> sparse, length -> 51, indices -> List(18), values -> List(1.0))","Map(vectorType -> sparse, length -> 19, indices -> List(7), values -> List(1.0))","Map(vectorType -> sparse, length -> 19, indices -> List(5), values -> List(1.0))","Map(vectorType -> sparse, length -> 10, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 4, indices -> List(1), values -> List(1.0))",0.0,1025.0,9.0,6.7,0.0,5.0,1024.0,-13.3,-18.3,0.0,"Map(vectorType -> dense, length -> 21, values -> List(307.8, 265.8, 0.0, 0.0, 6499.0, 0.0, 0.0, 10.0, 72178.0, 4.1, 0.0, 0.037677726640013585, 1025.0, 9.0, 6.7, 0.0, 5.0, 1024.0, -13.3, -18.3, 0.0))","Map(vectorType -> dense, length -> 21, values -> List(1.0, 0.11284427065623939, 0.0, 0.0, 0.0888041065482797, 0.0, 0.0, 0.125, 1.0, 0.10761154855643043, 0.0, 1.0, 0.6666666666666666, 0.9, 0.536178107606679, 0.0, 0.6707566462167688, 0.5694444444444444, 0.24851190476190474, 0.24874791318864778, 0.0))","Map(vectorType -> sparse, length -> 416, indices -> List(0, 1, 5, 24, 40, 46, 59, 61, 68, 275, 315, 332, 349, 356, 358, 361, 362, 365, 368, 369, 370, 372, 373, 374, 375, 377, 378, 379, 380), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.11284427065623939, 0.0888041065482797, 0.125, 1.0, 0.10761154855643043, 1.0, 0.6666666666666666, 0.9, 0.536178107606679, 0.6707566462167688, 0.5694444444444444, 0.24851190476190474, 0.24874791318864778))"


In [None]:
# Select only the features and label
vec_train_df = train_df.select('features','label')

In [None]:
display(vec_train_df.limit(2))

features,label
"Map(vectorType -> sparse, length -> 416, indices -> List(0, 1, 5, 35, 39, 47, 60, 62, 149, 273, 314, 330, 348, 356, 358, 362, 365, 366, 367, 368, 369, 370, 373, 374, 375, 377, 378, 379, 380), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.1000085005100306, 1.0, 0.3712374581939799, 0.44345898004434586, 0.125, 1.0, 0.4488188976377953, 0.537037037037037, 1.0, 0.2374768089053803, 0.27402862985685067, 0.4861111111111111, 0.32291666666666663, 0.313856427378965))",0.0
"Map(vectorType -> sparse, length -> 416, indices -> List(0, 1, 5, 35, 39, 52, 59, 61, 67, 261, 321, 334, 346, 356, 358, 361, 362, 365, 368, 369, 370, 372, 373, 374, 375, 377, 378, 379, 380), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.001147568854131248, 1.0, 0.125, 1.0, 0.2099737532808399, 1.0, 0.7037037037037037, 1.0, 0.463821892393321, 0.5235173824130879, 0.5694444444444444, 0.45535714285714285, 0.41569282136894825))",0.0


In [None]:
# helper function to parse features and labels
def parse(row):
    """
    Map record_string --> (tuple,of,fields)
    """
    return(row.features.toArray(), int(row.label))

In [None]:
# Parse the train set features and label into a tuple rdd
vec_train_rdd = vec_train_df.rdd.map(parse).cache()

In [None]:
# Parse the test set features and label into a tuple rdd
test_df = pipelineModel.transform(test_df)
vec_test_df = test_df.select('features','label')
vec_test_rdd = vec_test_df.rdd.map(parse).cache()

In [None]:
display(vec_test_df.limit(2))

features,label
"Map(vectorType -> sparse, length -> 416, indices -> List(0, 2, 6, 37, 40, 47, 59, 61, 166, 266, 320, 338, 346, 356, 357, 361, 362, 365, 366, 368, 369, 370, 372, 373, 374, 375, 377, 378, 379, 380, 381), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.11195171710302619, 0.07491675915649279, 0.17056856187290967, 0.125, 0.048491230014685915, 0.34120734908136485, 1.0, 0.6296296296296295, 1.0, 0.8868274582560297, 0.9325153374233128, 0.4583333333333333, 0.8258928571428571, 0.8964941569282137, 0.012903225806451613))",1.0
"Map(vectorType -> sparse, length -> 416, indices -> List(0, 2, 6, 37, 40, 46, 59, 61, 63, 259, 324, 342, 349, 356, 358, 361, 362, 365, 366, 368, 369, 370, 372, 373, 374, 375, 377, 378, 379, 380, 396), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0013175790547432847, 0.004189789123196449, 0.2006688963210702, 0.125, 1.0, 0.10761154855643043, 1.0, 0.5740740740740741, 0.1, 0.7829313543599259, 0.936605316973415, 0.5694444444444444, 0.7351190476190476, 0.7495826377295494, 1.0))",0.0


###Logistic Regression Implementation

In [None]:
# Sigmoid function to be used while calculating the loss
def sigmoid(z):
    return 1.0/(1 + np.exp(-z))

In [None]:
# Defining log loss/cost function
def LogLoss(dataRDD, W): 
    """
    This function calculates the log loss error
    
    Args:
        dataRDD - with each rdd record as a tuple of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    """
    
    
    # augmenting the features array with a 1 for handling bias at index 0
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
    
    # Calculating the loss/cost
    loss = augmentedData.map(lambda x: x[1]*np.log(sigmoid(W.dot(x[0]))) + (1-x[1])*np.log(1 - sigmoid(W.dot(x[0])))).mean()*-1
       
    return loss

In [None]:
# Defining gradient update function with L1 regularization
def GDUpdate (dataRDD, W, learningRate = 0.15, regParam = 0.1):
    """
    Perform one gradient descent step/update with ridge or lasso regularization.
    Args:
        dataRDD - tuple of (features_array, y)
        W       - (array) model coefficients with bias at index 0
        learningRate - (float) defaults to 0.1 
        regParam - (float) L1 regularization parameter
    Returns:
        model   - (array) updated coefficients, bias at index 0
    """
    # augmented data
    M=dataRDD.count()
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
    
    # Broadcast coeeficients
    sc.broadcast(W)  
     
    #Claculate gradient with L1 regularization    
    grad = augmentedData.map(lambda x: ((sigmoid(W.dot(x[0])) - x[1])*x[0])).sum() + regParam * np.append([0.0], np.sign(W)[1:])
    
    # Perform gradient update
    new_model = W - learningRate * grad/M
    
    return new_model

In [None]:
# Gradient descent function
def GradientDescent(trainRDD, testRDD, wInit, nSteps = 100, learningRate = 0.15, verbose = False):
    """
    Perform nSteps iterations of gradient descent and 
    track loss on a test and train set. Return lists of
    test/train loss and the models themselves.
    """
    # initialize lists to track model performance
    train_history, test_history, model_history = [], [], []
    
    # perform n updates & compute test and train loss after each
    model = wInit
    
    for idx in range(nSteps):  
        # update the model
        model = GDUpdate(trainRDD, model, learningRate)
        
        # keep track of test/train loss for plotting
        train_history.append(LogLoss(trainRDD, model))
        model_history.append(model)
        
        # console output if desired
        if verbose:
            print("----------")
            print(f"STEP: {idx+1}")
            print(f"Model: {[round(w,3) for w in model]}")
            
    # We are not using the test rdd here and the test_history returned will be an empty list
    return train_history, test_history, model_history

In [None]:
# Initialize the weights based on the number of features + 1
wInit = np.random.uniform(0,1,417)
# Call the gradient descent functon to get the model co-fficients
# Note: even though we are passing the test rdd, we will not be utilizing it in the function
lr_results = GradientDescent(vec_train_rdd, vec_test_rdd, wInit, nSteps = 50, learningRate = 0.21)

In [None]:
# Check the last three loss results and see loss is not exploding
print(lr_results[0][-3])
print(lr_results[0][-2])
print(lr_results[0][-1])

In [None]:
# Select the last model if the loss was not exploding
model = lr_results[2][-1]

### Predict and Evaluate

In [None]:
# A function to make predictions using the final model
def makePrediction(testRDD, model, threshold = 0.5):
    """
    Perform predictions on the passed rdd using the model.
    Threshold can be pass to the function to adjust predictions of 1.0 or 0.0
    """
    
    # Augment with 1.0 in the first index to account for bias
    augmentedData = testRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
    
    # Broadcast model
    sc.broadcast(model)
    
    # Make predictions on the input rdd using the model and the sigmoid function
    predictions = augmentedData.map(lambda x: (float(sigmoid(model.dot(x[0]))), 1.0 if sigmoid(model.dot(x[0])) > threshold else 0.0, x[1]))
    
    return predictions

In [None]:
model

In [None]:
# Make predictions on the test rdd
predrdd = makePrediction(vec_test_rdd, model,threshold = 0.35)

In [None]:
# Define the predicted resuts schema
schema = StructType([
StructField("prob", FloatType(), True),
StructField("prediction", DoubleType(), True),
StructField("label", IntegerType(), True)])

In [None]:
# Convert the predicted results to a dataframe
preddf = sqlContext.createDataFrame(predrdd, schema)

In [None]:
display(preddf.limit(2))

prob,prediction,label
0.146727,0.0,1
0.28866726,0.0,0


###Metrics

In [None]:
# Evaluate metrics on the predcted results dataframe
model_evaluation(preddf)

### Training on a portion of Large dataset

In [None]:
def get_split_large_par(df):
    df = df.drop(*drop)
    train_df = df.where((col('YEAR') == '2018') & (col('MONTH') == '2')).drop("FL_DATE")
    test_df = df.where((col('YEAR') == '2018') & (col('MONTH') == '3')).drop("FL_DATE")
    return train_df, test_df

In [None]:
# Create train and test data and resample
train_df, test_df = get_split_large_par(joined_data_traintest)
train_df = resample(train_df, 1.7, 'DEP_DEL15', 1)

In [None]:
# Perform data processing using the pipeline and fit train data
pipelineModel = pipeline.fit(train_df)
train_df = pipelineModel.transform(train_df)
vec_train_df = train_df.select('features','label')
vec_train_rdd = vec_train_df.rdd.map(parse).cache()

# Transform test data using fitted train data
test_df = pipelineModel.transform(test_df)
vec_test_df = test_df.select('features','label')
vec_test_rdd = vec_test_df.rdd.map(parse).cache()

In [None]:
display(vec_train_df.take(2))

features,label
"Map(vectorType -> sparse, length -> 939, indices -> List(0, 1, 2, 17, 33, 48, 101, 403, 456, 792, 830, 852, 869, 879, 880, 884, 885, 888, 889, 891, 892, 893, 895, 896, 897, 898, 900, 901, 902, 903), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.15572934376062564, 0.11284427065623939, 1.0, 0.25984251968503935, 0.22222222222222224, 1.0, 0.2222222222222222, 0.12332561516426588, 0.6615384615384616, 0.22222222222222224, 0.6720827178729691, 0.47649918962722854, 0.5396825396825397, 0.5809379727685325, 0.4903536977491961))",0.0
"Map(vectorType -> sparse, length -> 939, indices -> List(0, 1, 2, 28, 37, 49, 58, 391, 629, 779, 843, 858, 868, 879, 881, 884, 885, 888, 889, 891, 892, 893, 895, 896, 897, 898, 900, 901, 902, 903, 919, 936), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.07242434546072764, 0.0029751785107106432, 0.047078057025686494, 0.29133858267716534, 0.11111111111111112, 0.0041286818698218295, 0.3611111111111111, 0.831163441543136, 0.5538461538461539, 0.11111111111111112, 0.6070901033973414, 0.6564019448946515, 0.5238095238095237, 0.8245083207261725, 0.9196141479099679, 1.0, 1.0))",0.0


In [None]:
display(vec_test_df.take(2))

features,label
"Map(vectorType -> sparse, length -> 939, indices -> List(0, 1, 16, 33, 53, 173, 393, 724, 798, 838, 864, 870, 879, 881, 884, 885, 888, 889, 891, 892, 893, 894, 895, 896, 897, 898, 900, 901, 902, 903), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0031026861611696704, 0.09626827609656581, 1.0, 0.1837270341207349, 0.22222222222222224, 1.0, 0.275, 0.33125, 0.013377338712258793, 0.4307692307692308, 0.22222222222222224, 0.8360413589364846, 0.9092382495948136, 0.5873015873015872, 0.47806354009077157, 0.4678456591639871))",0.0
"Map(vectorType -> sparse, length -> 939, indices -> List(0, 1, 7, 31, 41, 68, 404, 570, 819, 841, 855, 871, 879, 880, 884, 885, 888, 889, 891, 892, 893, 895, 896, 897, 898, 900, 901, 902, 903), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.11284427065623939, 0.11645698741924516, 0.3463659286763279, 0.31758530183727035, 0.22222222222222224, 1.0, 0.3361111111111111, 0.6783038531349699, 0.7230769230769232, 0.22222222222222224, 0.5657311669128509, 0.3954619124797407, 0.6666666666666666, 0.5128593040847201, 0.527331189710611))",0.0


In [None]:
# Initialize model with number of features + 1 for bias
wInit = np.random.uniform(0,1,940)
# Perform gradient descent on the train set
lr_results = GradientDescent(vec_train_rdd, vec_test_rdd, wInit, nSteps = 50, learningRate = 0.28)

In [None]:
# Check the last three loss results and see loss is not exploding
# The above results are valid if the loss was not exploding
print(lr_results[0][-3])
print(lr_results[0][-2])
print(lr_results[0][-1])

In [None]:
# Pick the last model
model = lr_results[2][-1]

In [None]:
# Make predictions
predrdd = makePrediction(vec_test_rdd, model,threshold = 0.40)
preddf = sqlContext.createDataFrame(predrdd, schema)
# Evaluate results
model_evaluation(preddf)

###Testing on the full dataset
Training on 2015 to 2017 records<br/>
Testing on 2018 records

In [None]:
def get_split_large_par(df):
    df = df.drop(*drop)
    train_df = df.where(col('YEAR') != '2018').drop("FL_DATE")
    test_df = df.where(col('YEAR') == '2018').drop("FL_DATE")
    return train_df, test_df

In [None]:
# 2015 to 2017 data for training and 2018 data for testing
train_df_l, test_df_l = get_split_large_par(joined_data_traintest)

In [None]:
# Undersample the majority class
train_df_l = resample(train_df_l, 1.7, 'DEP_DEL15', 1)

In [None]:
# Fit train dataset for feature processing
pipelineModel = pipeline.fit(train_df_l)
train_df_l = pipelineModel.transform(train_df_l)
vec_train_df_l = train_df_l.select('features','label')
vec_train_rdd_l = vec_train_df_l.rdd.map(parse).cache()

# Transfor the test dataset
test_df_l = pipelineModel.transform(test_df_l)
vec_test_df_l = test_df_l.select('features','label')
vec_test_rdd_l = vec_test_df_l.rdd.map(parse).cache()

In [None]:
display(vec_train_df_l.take(2))

features,label
"Map(vectorType -> sparse, length -> 948, indices -> List(1, 3, 9, 37, 54, 60, 72, 407, 649, 801, 852, 871, 877, 887, 891, 893, 894, 897, 898, 900, 901, 904, 905, 906, 907, 909, 910, 911, 912), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.08564263855831351, 0.12036722203332201, 0.12467787968633101, 0.12359550561797754, 0.1098901098901099, 1.0, 0.8233509086568446, 0.5567010309278351, 0.125, 0.7663157894736842, 0.7982456140350876, 0.5656565656565657, 0.7447698744769876, 0.8338235294117647))",0.0
"Map(vectorType -> sparse, length -> 948, indices -> List(2, 6, 14, 42, 51, 62, 84, 418, 649, 801, 853, 868, 879, 887, 891, 893, 894, 897, 900, 901, 904, 905, 906, 907, 909, 910, 911, 912), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 7.650459027541654E-4, 0.12036722203332201, 1.0, 0.1098901098901099, 0.13854637147053117, 0.4113487722258379, 0.5463917525773195, 0.125, 0.5084210526315789, 0.4809941520467836, 0.5454545454545455, 0.45920502092050214, 0.48235294117647054))",0.0


In [None]:
display(vec_test_df_l.take(2))

features,label
"Map(vectorType -> sparse, length -> 948, indices -> List(5, 15, 33, 50, 86, 413, 649, 801, 852, 858, 877, 887, 891, 893, 894, 897, 900, 901, 902, 904, 905, 906, 907, 909, 910, 911, 912), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.09418565113906835, 0.12036722203332201, 1.0, 0.1098901098901099, 0.05126215744409653, 0.036204744069912614, 0.3230417664908438, 0.5670103092783505, 0.125, 0.8073684210526316, 0.8713450292397661, 0.595959595959596, 0.6631799163179918, 0.7602941176470589))",0.0
"Map(vectorType -> sparse, length -> 948, indices -> List(6, 14, 25, 52, 86, 413, 649, 801, 851, 867, 877, 887, 890, 893, 894, 897, 898, 900, 901, 902, 904, 905, 906, 907, 909, 910, 911, 912), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.09418565113906835, 0.12036722203332201, 1.0, 0.06367041198501873, 0.1098901098901099, 1.0, 0.08739076154806492, 0.3230417664908438, 0.7525773195876289, 0.125, 0.3915789473684211, 0.38304093567251457, 0.7777777777777778, 0.3378661087866109, 0.3191176470588235))",1.0


In [None]:
# Initialize model
wInit = np.random.uniform(0,1,949)
# Perform gradient descent
lr_results_l = GradientDescent(vec_train_rdd_l, vec_test_rdd_l, wInit, nSteps = 125, learningRate = 0.28)
# Pick the last model (the model will be checked if the loss did not explode in next step)
model_l = lr_results_l[2][-1]
#make predictions
predrdd_l = makePrediction(vec_test_rdd_l, model_l,threshold = 0.38)
preddf_l = sqlContext.createDataFrame(predrdd_l, schema)
# Evaluate resuts
model_evaluation(preddf_l)

In [None]:
# Check the last three loss results and see loss is not exploding
# The above results are valid if the loss was not exploding
#lr_results_l[0][-3]
#lr_results_l[0][-2]
lr_results_l[0][-1]

In [None]:
# Make another set of predictions by lowering the threshold value
predrdd_l = makePrediction(vec_test_rdd_l, model_l,threshold = 0.25)

In [None]:
# Convert predictions to dataframe
preddf_l = sqlContext.createDataFrame(predrdd_l, schema)

In [None]:
# Evaluate predictions
model_evaluation(preddf_l)