In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType
from pyspark.sql import SQLContext
from pyspark.sql.functions import concat, col, hour, minute, lpad, rpad, substring, year, month, dayofmonth, lit, to_timestamp, expr,split,explode,split
from pyspark.sql.functions import isnan, when, count, col,isnull
from pyspark.mllib import *
from sklearn.metrics import accuracy_score, f1_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix

import numpy as np
#spark = SparkSession.builder.getOrCreate()

sqlContext = SQLContext(sc)

In [0]:
#reading the dataset
df = spark.read.option("header", "true").parquet(f"dbfs:/mnt/mids-w261/team20SSDK/final_datasets/data_range/")



### Adjusting Imbalanced dataset - Class Weighing

##### Handling Imbalance in the dataset
###### Low Preicision in minority class  (Delay)

As we've seen our data is highly imbalanced this would result in a model which'll be more biased towards predicting the majority class (No Delay). This is beacuse the algorithm will not have enough data to learn the patterns present in the minority class (Delay).That is why there will be high misclassification errors for the minority class and hence a low precision.

###### Workaround
- Modify the current training algorithm to take into account the skewed distribution of the classes by giving different weights to the majority and minority classes.

This difference in weights will influence the classification of the classes during the training phase.
The idea is to penalize the misclassification made by the minority class by setting a higher class weight and at the same time reducing weight for the majority class.

###### Implementation in LR
This is implemented in LR by modifying the **cost function** as below:

 $$log loss = \frac{1}{N}\sum_{x=1}^N [- w_0(y_i*(log{(\hat y_i)})+w_1((1-y_i)(log{1-(\hat y_i)})) ]$$
 
 $$ w_0 = weight class 0 $$
 $$ w_1 = weight class 1 $$

In [0]:
dataset_size = df.count()
print(f'dataset_size = {dataset_size}')

In [0]:
num_delayed = df.filter(df['DEP_DEL15'] == 1).count()
print(f'num_delayed = {num_delayed}')


num_not_delayed = df.filter(df['DEP_DEL15'] == 0).count()
print(f'num_not_delayed = {num_not_delayed}')

In [0]:

BalancingRatio= num_not_delayed /dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

In [0]:
#Adding the column in the data frame
df =df.withColumn("classWeights", when(df.DEP_DEL15 == 1,BalancingRatio).otherwise(1-BalancingRatio))
df.select("classWeights").show(5)

In [0]:
# preparing the list of columns for modeling

DROPPED = ['ORIGIN_AIRPORT_SEQ_ID','ORIGIN_CITY_MARKET_ID','ORIGIN_STATE_ABR', 'ORIGIN_STATE_FIPS', 'ORIGIN_STATE_NM', 'ORIGIN_WAC', 'DEST_AIRPORT_ID', 'DEST_AIRPORT_SEQ_ID',
 'DEST_CITY_MARKET_ID','ORIGIN_CITY_NAME','DEST_CITY_NAME', 'DEST_STATE_ABR', 'DEST_STATE_FIPS', 'DEST_STATE_NM', 'DEST_WAC','TAXI_IN',  'CANCELLED', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME','origin_max_date', 'dest_max_date','OP_CARRIER','ORIGIN_AIRPORT_ID','ORIGIN_AIRPORT_ID','CRS_DEP_TIME','WHEELS_ON',  'FLIGHTS','DIV_AIRPORT_LANDINGS','ORIGIN_TZ','DEST_TZ','DEST_STATION', 'DEST_STATION_NAME', 'ORIGIN_UTC_ADJ','TAIL_NUM','ORIGIN_TS', 'DEST_TS', 'DEST_UTC', 'ORIGIN_STATION', 'ORIGIN_STATION_NAME','OP_CARRIER_FL_NUM','OP_UNIQUE_CARRIER','FL_DATE']

cat_cols = ['DAY_OF_MONTH', 'DAY_OF_WEEK','OP_CARRIER_AIRLINE_ID', 'ORIGIN', 'DEST', 'DEP_TIME_BLK',  'DISTANCE_GROUP', 'MONTH']

num_cols = [ 'TAXI_OUT','AVG_WND_SPEED_ORIGIN', 'MIN_CIG_HEIGHT_ORIGIN', 'MIN_VIS_DIS_ORIGIN', 'AVG_TMP_DEG_ORIGIN', 'AVG_DEW_DEG_ORIGIN', 'AVG_SLP_ORIGIN', 'AVG_WND_SPEED_DEST', 'MIN_CIG_HEIGHT_DEST','DISTANCE','MIN_VIS_DIS_DEST','AVG_TMP_DEG_DEST', 'AVG_DEW_DEG_DEST', 'AVG_SLP_DEST' ,'WHEELS_OFF','PAGERANK','ORIGIN_FLIGHT_COUNT', 'DEST_FLIGHT_COUNT','DEP_MIN', 'DEP_HOUR', 'ARR_MIN', 'ARR_HOUR']

weights = 'classWeights'

label = 'DEP_DEL15','DEP_DELAY_GROUP'

In [0]:
#Restricting the data to 2015 and to the busiest airports - Chicago, and Atlanta
df_Q1_15 = df.filter(((df['ORIGIN'] =='ORD') | (df['ORIGIN'] =='ATL')) & (df['QUARTER'] == 1) & (df['YEAR'] == 2015))

In [0]:
#sanity check - taking a count
df_Q1_15.count()

### One hot encoder

In [0]:
# selecting a subset of models to create a final feature list
trunc_df = df_Q1_15[['DAY_OF_MONTH', 'DAY_OF_WEEK','OP_CARRIER_AIRLINE_ID', 'ORIGIN', 'DEST', 'DEP_TIME_BLK',  'DISTANCE_GROUP', 'MONTH','TAXI_OUT','AVG_WND_SPEED_ORIGIN', 'MIN_CIG_HEIGHT_ORIGIN', 'MIN_VIS_DIS_ORIGIN', 'AVG_TMP_DEG_ORIGIN', 'AVG_DEW_DEG_ORIGIN', 'AVG_SLP_ORIGIN', 'AVG_WND_SPEED_DEST', 'MIN_CIG_HEIGHT_DEST','DISTANCE','MIN_VIS_DIS_DEST','AVG_TMP_DEG_DEST', 'AVG_DEW_DEG_DEST', 'AVG_SLP_DEST' ,'WHEELS_OFF',
 'DEP_DELAY_GROUP','PAGERANK','ORIGIN_FLIGHT_COUNT', 'DEST_FLIGHT_COUNT','DEP_MIN', 'DEP_HOUR', 'ARR_MIN', 'ARR_HOUR','classWeights','DEP_DEL15','ORIGIN_UTC']]




In [0]:
# first step in the pipeline to transform categorical variables represented as string into an indexer
from pyspark.ml.feature import StringIndexer,OneHotEncoder
cat_cols_indexed = [x+"_string_indexer" for x in cat_cols]
#print(cat_cols_indexed)
for i in range(0,len(cat_cols)):
  cat_cols_indexed[i] = StringIndexer(inputCol = cat_cols[i] , outputCol= cat_cols[i] +"_StringIndexer",handleInvalid='skip')
  

In [0]:

for i in range(0,len(cat_cols_indexed)):
  trunc_df = cat_cols_indexed[i].fit(trunc_df).transform(trunc_df)

In [0]:
#sending the output of String Indexer into OHE.

from pyspark.ml.feature import OneHotEncoder
cat_cols_one_hot = ['DAY_OF_MONTH_StringIndexer', 'DAY_OF_WEEK_StringIndexer', 'OP_CARRIER_AIRLINE_ID_StringIndexer', 'ORIGIN_StringIndexer', 'DEST_StringIndexer', 'DEP_TIME_BLK_StringIndexer', 'DISTANCE_GROUP_StringIndexer',  'MONTH_StringIndexer']
#print(cat_cols_indexed)
for i in range(0,len(cat_cols_one_hot)):
  cat_cols_one_hot[i] = OneHotEncoder(inputCol = cat_cols_one_hot[i] , outputCol= cat_cols_one_hot[i] +"_ohe")
  


In [0]:
#adding those OHE columnt to the truncated dataframe(with dropped columns)
for i in range(0,len(cat_cols_one_hot)):
  trunc_df = cat_cols_one_hot[i].fit(trunc_df).transform(trunc_df)

In [0]:
#preparing colums for input to vector assembler
encoded_cols = ['DAY_OF_MONTH_StringIndexer_ohe',
 'DAY_OF_WEEK_StringIndexer_ohe',
 'OP_CARRIER_AIRLINE_ID_StringIndexer_ohe',
 'ORIGIN_StringIndexer_ohe',
 'DEST_StringIndexer_ohe',
 'DEP_TIME_BLK_StringIndexer_ohe',
 'DISTANCE_GROUP_StringIndexer_ohe',
 'MONTH_StringIndexer_ohe']

assembler_cols = [num_cols.append(col) for col in encoded_cols]
assembler_cols = num_cols


In [0]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=assembler_cols,outputCol="features",handleInvalid='skip')

In [0]:
one_hot_encoded_trunc_df =assembler.transform(trunc_df)
one_hot_encoded_trunc_df.select("features").display(truncate=False)

one_hot_encoded_trunc_df.count()

features
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 28, 52, 58, 70, 171, 260, 269, 277), values -> List(16.0, 37.22727272727273, 2286.0, 16093.0, -27.136363636363637, -135.13636363636363, 10293.454545454546, 27.666666666666668, 2134.0, 240.0, 16093.0, 11.333333333333334, -88.0, 10295.42857142857, 2200.0, 0.05425050648176137, 1676.0, 13.0, 40.0, 21.0, 48.0, 21.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 39, 52, 59, 70, 183, 252, 267, 277), values -> List(13.0, 24.736842105263158, 91.0, 2012.0, 82.26315789473684, 51.421052631578945, 10257.526315789473, 34.6, 1372.0, 646.0, 16093.0, 255.1, 205.5, 10212.8, 2020.0, 0.05425050648176137, 1914.0, 24.0, 10.0, 19.0, 21.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 36, 53, 58, 70, 183, 251, 267, 277), values -> List(12.0, 22.2, 122.0, 4828.0, 13.9, -12.3, 10221.7, 36.0, 91.0, 646.0, 4828.0, 154.72727272727272, 140.36363636363637, 10196.363636363636, 949.0, 0.05425050648176137, 2023.0, 16.0, 40.0, 9.0, 33.0, 11.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 36, 53, 58, 70, 183, 257, 267, 277), values -> List(15.0, 28.133333333333333, 122.0, 805.0, 13.0, -16.6, 10238.2, 31.90909090909091, 61.0, 646.0, 1609.0, 164.0909090909091, 141.36363636363637, 10206.90909090909, 1252.0, 0.05425050648176137, 2023.0, 16.0, 40.0, 12.0, 34.0, 14.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 33, 57, 58, 70, 183, 251, 267, 277), values -> List(13.0, 34.94444444444444, 274.0, 8047.0, 44.111111111111114, 22.444444444444443, 10151.333333333334, 22.333333333333332, 2134.0, 646.0, 12875.0, 154.22222222222223, 142.77777777777777, 10197.333333333334, 948.0, 0.05425050648176137, 1510.0, 16.0, 40.0, 9.0, 33.0, 11.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 25, 52, 58, 70, 183, 257, 267), values -> List(20.0, 28.0, 2134.0, 11265.0, -89.47368421052632, -183.47368421052633, 10317.842105263158, 55.0, 22000.0, 646.0, 16093.0, 82.36363636363636, -54.45454545454545, 10294.90909090909, 1308.0, 0.05425050648176137, 2105.0, 16.0, 48.0, 12.0, 40.0, 14.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 50, 52, 58, 70, 83, 249, 267, 277), values -> List(16.0, 61.84615384615385, 823.0, 14484.0, 43.42307692307692, -48.19230769230769, 10288.807692307691, 42.57142857142857, 22000.0, 594.0, 16093.0, 208.42857142857142, 107.35714285714286, 10233.214285714286, 1352.0, 0.05425050648176137, 2057.0, 428.0, 40.0, 13.0, 36.0, 15.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 50, 52, 62, 83, 257, 272, 277), values -> List(12.0, 42.851851851851855, 762.0, 16093.0, -2.814814814814815, -86.11111111111111, 10336.333333333334, 42.57142857142857, 22000.0, 1197.0, 16093.0, 208.42857142857142, 107.35714285714286, 10233.214285714286, 1213.0, 0.04521768069470927, 1609.0, 428.0, 5.0, 12.0, 13.0, 16.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 34, 56, 62, 83, 261, 272), values -> List(11.0, 70.58333333333333, 244.0, 2012.0, 16.375, -41.875, 10169.916666666666, 50.23076923076923, 975.0, 1197.0, 16093.0, 199.53846153846155, 98.3076923076923, 10154.384615384615, 1552.0, 0.04521768069470927, 1551.0, 407.0, 50.0, 14.0, 57.0, 18.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"Map(vectorType -> sparse, length -> 278, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20, 21, 34, 56, 58, 70, 83, 253, 267), values -> List(21.0, 26.705882352941178, 366.0, 11265.0, 50.8235294117647, -8.411764705882353, 10191.470588235294, 50.23076923076923, 975.0, 594.0, 16093.0, 199.53846153846155, 98.3076923076923, 10154.384615384615, 1525.0, 0.05425050648176137, 1960.0, 407.0, 15.0, 55.0, 16.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"


#### Original dataset size was 24603731 and now we are down to 24603083 . We've lost 700 records as a result of 'skip' operation in vector assembler

### Separting 2019 data as test data
We only want 2015-2018 data for training and val.
In the cells below we split the dataset.

In [0]:
df_temp = one_hot_encoded_trunc_df.filter(df['YEAR']<2019)

In [0]:
df_test = one_hot_encoded_trunc_df.filter(df['YEAR']== 2019)

In [0]:
#Validation - Total 
print(df_temp.count()+df_test.count())

In [0]:
df_temp = df_temp.withColumnRenamed('DEP_DEL15', 'label')


### Let's do a train Val split- based on timeseries split.

In [0]:
df_temp.columns

In [0]:
from pyspark.sql import Window
df_temp_order = df_temp.withColumn("time_rank", f.percent_rank().over(Window.partitionBy().orderBy("ORIGIN_UTC")))
#display(df_temp_order)

In [0]:
print(df_temp_order.count())

In [0]:
train_df = df_temp_order.where("time_rank <= .8").drop("time_rank")
val_df = df_temp_order.where("time_rank > .8").drop("time_rank")

print("Train size: ", train_df.count())
print("Test size: ", val_df.count())


### Modeling

In [0]:
from pyspark.ml.classification import LogisticRegression
# lr = LogisticRegression().setWeightCol("classWeights").setLabelCol("Outcome").setFeaturesCol("Aspect")
lr = LogisticRegression(labelCol="label", featuresCol="features",weightCol="classWeights",maxIter=10)
model=lr.fit(train_df)
predict_train=model.transform(train_df)
predict_test=model.transform(val_df)
predict_test.select("label","prediction").show(10)
from sklearn.metrics import confusion_matrix


trainScoreAndLabels = predict_train.select(['probability','label', f.col("prediction").alias("raw")])
valScoreAndLabels = predict_test.select(['probability','label', f.col("prediction").alias("raw")])


# To print confusion metrics
trainScoreAndLabels_pd = trainScoreAndLabels.toPandas()
valScoreAndLabels_pd = valScoreAndLabels.toPandas()
    
y_train_true = trainScoreAndLabels_pd["label"]
y_train_pred = trainScoreAndLabels_pd["raw"]
conf_mat_train = confusion_matrix(y_train_true, y_train_pred)
    
    
    
y_val_true = valScoreAndLabels_pd["label"]
y_val_pred = valScoreAndLabels_pd["raw"]
conf_mat_val = confusion_matrix(y_val_true, y_val_pred)
    
print("Accuracy Score: ", accuracy_score(y_val_true, y_val_pred))
print("F1 Score: ", f1_score(y_val_true, y_val_pred))
print(classification_report(y_val_true, y_val_pred))