In [None]:
# Static variables
BUCKET='elite-caster-125113'

In [None]:
# Init spark session
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
spark = SparkSession\
  .builder \
  .appName("Logistic regression w/ Spark ML") \
  .getOrCreate()

In [None]:
# Import modules 
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.rdd import PipelinedRDD
import numpy as np
from collections import namedtuple
from matplotlib import pyplot as plt
from google.cloud import storage
from typing import Tuple

## Creating a Training Dataset

In [None]:
# CSV to Dataframe
traindays: DataFrame = spark.read \
  .option("header", "true") \
  .csv('gs://{}/flights/trainday.csv'.format(BUCKET))
    
traindays.printSchema()

In [None]:
# Register the dataframe as TempView for spark sql
traindays.createOrReplaceTempView('traindays')

spark.sql("SELECT * FROM traindays LIMIT 5").show()

In [None]:
# Define the schema map from CSV to DataFrame
from pyspark.sql.types import StringType, FloatType, StructType, StructField

header = 'FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,'
header += 'ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,'
header += 'DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,'
header += 'DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,'
header += 'CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,'
header += 'CANCELLATION_CODE,DIVERTED,DISTANCE,DEP_AIRPORT_LAT,'
header += 'DEP_AIRPORT_LON,DEP_AIRPORT_TZOFFSET,ARR_AIRPORT_LAT,ARR_AIRPORT_LON,'
header += 'ARR_AIRPORT_TZOFFSET,EVENT,NOTIFY_TIME'

print(header)

def get_structfield(colname: str) -> StructField:
    if colname in ['ARR_DELAY', 'DEP_DELAY', 'DISTANCE', 'TAXI_OUT']:
        return StructField(colname, FloatType(), True)
    else:
        return StructField(colname, StringType(), True)


schema = StructType([get_structfield(colname) for colname in header.split(',')])

In [None]:
# Load data from google storage to spark
inputs = 'gs://{}/flights/tzcorr/flights-00000-*'.format(BUCKET)
# inputs = 'gs://{}/flights/tzcorr/flights-*'.format(BUCKET)

flights: DataFrame = spark.read \
  .schema(schema) \
  .csv(inputs)
    
flights.createOrReplaceTempView('flights')

In [None]:
# Load training dataset
trainquery: str = """
SELECT
  f.*
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
 t.is_train_day == 'True'
"""

traindata: DataFrame = spark.sql(trainquery)
traindata.head(2)

## Dealing with Corner Cases

In [None]:
# In order to procede to training data should be consistent.
# In this case, 'count' should be equivalent across all features.
# The data I want is the ones that taxied out and arrived at the airport.
traindata[["DEP_DELAY", "TAXI_OUT", "ARR_DELAY", "DISTANCE"]].describe().show()

In [None]:
# I will clean the odd values.
# Revise query by putting NULL fields into account
# Flights that were scheduled but 
#   never left the gate (DEP_DELAY is null)
#   never take off (TAXI_OUT is null) 
# Flights took off but diverted and do not have an ARR_DELAY (This includes TAXI_OUT)
# 'count' will now be all even.
trainquery: str = """
SELECT
  DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
 t.is_train_day == 'True' AND
 f.DEP_DELAY IS NOT NULL AND
 f.ARR_DELAY IS NOT NULL
"""
    
traindata: DataFrame = spark.sql(trainquery)
traindata.describe().show()


In [None]:
# However, excluding NULL is fixing the problem at surface.
# It is recommended to fix the ROOT CAUSE of the context.
# In this case the root cause is "canceled flights" and "divereted flights"

trainquery: str = """
SELECT
  DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
 t.is_train_day == 'True' AND
 f.CANCELLED == '0.00' AND
 f.DIVERTED == '0.00'
 """
    
traindata: DataFrame = spark.sql(trainquery)
traindata.describe().show()

In [None]:
# Looks like there are still NULLs although we have excluded CACELLED and DIVERTED flights.
# Note: In the book it says that counts will be the same but in my case it was not so I still needed to exclude the NULLs.
trainquery: str = """
SELECT
  DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
 t.is_train_day == 'True' AND
 f.CANCELLED == '0.00' AND
 f.DIVERTED == '0.00' AND
 f.DEP_DELAY IS NOT NULL AND
 f.ARR_DELAY IS NOT NULL
"""
    
traindata: DataFrame = spark.sql(trainquery)
traindata.describe().show()


## Creating Training Examples

In [None]:
# To use Logistic Regression (https://bit.ly/3HGBYpw)
# I first need labled training sets for binary outcomes.
# In this case , positive lable (1) and negative label(0)
# Note: https://spark.apache.org/docs/3.1.1/mllib-linear-methods.html#loss-functions
# Note that, in the mathematical formulation above, a binary label y is denoted as either +1 (positive) or −1 (negative), 
# which is convenient for the formulation. 
# However, the negative label is represented by 0 in spark.mllib instead of −1, to be consistent with multiclass labeling.
def to_example(raw_data_point: DataFrame) -> LabeledPoint:
    return LabeledPoint(\
            float(raw_data_point['ARR_DELAY'] < 15),  # on-time? \
            [ \
                raw_data_point['DEP_DELAY'], \
                raw_data_point['TAXI_OUT'], \
                raw_data_point['DISTANCE'], \
            ])

examples: PipelinedRDD = traindata.rdd.map(to_example)

## Training

In [None]:
# Creating a model means finding out the weights
# w0*x0 + w1*x1 + w2*x2 + b
# intercept is set to True because prediction should not be 0 if x = 0
lrmodel: LogisticRegressionModel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
    
print(lrmodel.weights,lrmodel.intercept)

In [None]:
# Example for positive
# DEP_DELAY, TAXI_OUT, DISTANCE
lrmodel.predict([6.0, 12.0, 594.0])

In [None]:
# Example for negative
lrmodel.predict([36.0, 12.0, 594.0])

In [None]:
# I want to see how probability is working out. To do that, I will need to clear the threshold.
lrmodel.clearThreshold()

In [None]:
# Predict probability with fixed dep delay and taxi-out
dist: np.ndarray = np.arange(10, 2000, 10)
prob: list = [lrmodel.predict([20,10,d]) for d in dist]
plt.plot(dist, prob)

In [None]:
# Predict probability with fixed taxi-out  and distance
delay: np.ndarray = np.arange(-20, 60, 1)
prob= list = [lrmodel.predict([d, 10, 500]) for d in delay]
ax = plt.plot(delay, prob)

In [None]:
# Okay. I now saw probability is predicted by changing the parameters.
# I will now set threshold to my goal and do the prediction.
# Set threshold for the probability to our goal
# Cancel flights if the probability of arriving flights are less than 70% 
lrmodel.setThreshold(0.7)

# Predicting by Using a Model

In [None]:
# Save model to cloud stroage for future use
PREFIX = "flights/sparkmloutput/model"
MODEL_FILE: str = f"gs://{BUCKET}/{PREFIX}"

client = storage.Client()

blobs = [ f for f in client.list_blobs(BUCKET, prefix=PREFIX)]

# Remove if any old model exist
if len(blobs) > 0:
    for blob in blobs:
        blob.delete()
        
lrmodel.save(sc, MODEL_FILE)

# Predict from saved model in google storage
from pyspark.mllib.classification import LogisticRegressionModel
lrmodel: LogisticRegressionModel = LogisticRegressionModel.load(sc, MODEL_FILE)

# Set decision threshold to 70%
lrmodel.setThreshold(0.7)

# Try out sample
print(lrmodel.predict([36.0, 12.0, 594.0]))

# Evaluating a Model

In [None]:
# Evaluation will be done using the test data instead of training data
# Use non test days for evaluation
testquery: str = trainquery.replace("t.is_train_day == 'True'", "t.is_train_day == 'False'")
    
# Label the dataset
testdata: DataFrame = spark.sql(testquery)
examples: PipelinedRDD = testdata.rdd.map(to_example)
    
# Predict
labelpred: PipelinedRDD = examples.map(lambda lp: (lp.label, lrmodel.predict(lp.features)))

# Print first 10 elements for confirmation
labelpred.take(10)

In [None]:
# Layout statistics for evaluation
def eval(labelpred: PipelinedRDD):
    """
    labelpred[0]: Actual
    labelpred[1]: Prediction
    """

    # Predicted as positive
    cancel: PipelinedRDD = labelpred.filter(lambda lp: lp[1] == 1)

    # Predicted as negative
    nocancel: PipelinedRDD = labelpred.filter(lambda lp: lp[1] == 0)

    # Out of all positive predicted results , count the ones that are actually positive
    corr_cancel = cancel.filter(lambda lp: lp[0] == lp[1]).count()

    # Out of all positive predicted results , count the ones that are actually negative
    corr_nocancel = nocancel.filter(lambda lp: lp[0] == lp[1]).count()  
    
    print(f"Predicted as canceled: {cancel.count()}")
    print(f"Predicted as noncancel: {nocancel.count()}")
    print(f"Correctly labeled canceled: {corr_cancel}")
    print(f"Correctly labeled not canceled: {corr_nocancel}")
    
    return {
        'total_cancel': cancel.count(),
        'correct_cancel': float(corr_cancel)/cancel.count(),
        'total_nocancel': nocancel.count(),
        'correct_nocancel': float(corr_nocancel)/nocancel.count()
    }

print(eval(labelpred))

In [None]:
# Revised evaluation. Based on porbability instead of categorical prediction.
def eval_revised(labelpred: namedtuple):
    """
    labelpred[0]: Actual
    labelpred[1]: Prediction
    """
    # Canceled which probability is below 0.7
    cancel: PipelinedRDD = labelpred.filter(lambda lp: lp[1] < 0.7)

    # No cacelation which probability is greater than equal 0.7
    nocancel: PipelinedRDD = labelpred.filter(lambda lp: lp[1] >= 0.7)

    corr_cancel = cancel.filter(lambda lp: lp[0] == int(lp[1] < 0.7)).count()
    corr_nocancel = nocancel.filter(lambda lp: lp[0] == int(lp[1] >= 0.7)).count()
    
    # RMSE
    totsqe: PipelinedRDD = labelpred.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum()

    print(f"Predicted as canceled: {cancel.count()}")
    print(f"Predicted as noncancel: {nocancel.count()}")
    print(f"Correctly labeled canceled: {corr_cancel}")
    print(f"Correctly labeled not canceled: {corr_nocancel}")
    
    return {
        'total_cancel': cancel.count(),
        'correct_cancel': float(corr_cancel)/cancel.count(),
        'total_nocancel': nocancel.count(),
        'correct_nocancel': float(corr_nocancel)/nocancel.count(),
        'rmse': np.sqrt(totsqe/float(corr_cancel + corr_nocancel))
    }

In [None]:
lrmodel.clearThreshold() # so it returns probabilities

# Predict
labelpred: PipelinedRDD = examples.map(lambda lp: (lp.label, lrmodel.predict(lp.features)))
print(eval_revised(labelpred))

# keep only those examples near the decision threshold
labelpred: PipelinedRDD = labelpred.filter(lambda p: p[1] > 0.65 and p[1] < 0.75)
print(eval_revised(labelpred))

## Experimental framework

So far the probability is looks okay if 71% chance of arriving at least 15 min early.
I will verfiy this by doing feature selection. 
But first I will set RMSE to choose between models. Model that has better RMSE will be chosen.
As the book says, unless 0.5 % decrease with RMSE, feature will not be changed.

## Creating the Held-Out Dataset

First prepare three types of data set

| Dataset type | Conditional |
| ------------ | ----------- |
| Training   | is_train_day == True, holdout == False |
| Held out   | is_train_dat == True, holdout == True |
| Test | is_train_day == False |

In [None]:
# Mark random 20 % of training dataset as held out dataset
from pyspark.sql.functions import rand

SEED: int = 13
traindays = traindays.withColumn("holdout", rand(SEED) > 0.8)
traindays.createOrReplaceTempView('traindays')

traindays.show()

trainquery: str = """
SELECT
  DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
 t.is_train_day == 'True' AND
 t.holdout == False AND
 f.CANCELLED == '0.00' AND
 f.DIVERTED == '0.00' AND
 f.DEP_DELAY IS NOT NULL AND
 f.ARR_DELAY IS NOT NULL
"""
    
traindata: DataFrame = spark.sql(trainquery)
    
# This will be the query to fetch data for feature selection
heldout_query: str = trainquery.replace("t.holdout == False", "t.holdout == True")
heldout_data: DataFrame = spark.sql(heldout_query)

## Feature Selection

In [None]:
# 1st seletion : Excluding TAXI_OUT
def to_example(raw_data_point: dict):
    return LabeledPoint(
        float(raw_data_point['ARR_DELAY'] < 15), # ontime
        [
            raw_data_point['DEP_DELAY'], # DEP_DELAY
            raw_data_point['DISTANCE'] # DEP_DISTANCE,
        ]
    )

# Train
examples: PipelinedRDD = traindata.rdd.map(to_example)
lrmodel: LogisticRegressionModel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
lrmodel.clearThreshold() # so it returns probabilities

# Predict
examples: PipelinedRDD = heldout_data.rdd.map(to_example)
labelpred: PipelinedRDD = examples.map(lambda lp: (lp.label, lrmodel.predict(lp.features)))
labelpred: PipelinedRDD = labelpred.filter(lambda p: p[1] > 0.65 and p[1] < 0.75)

# Evaluate
print(eval_revised(labelpred))

In [None]:
# 2nd seletion : Excluding DEP_DELAY
def to_example(raw_data_point: dict):
    return LabeledPoint(
        float(raw_data_point['ARR_DELAY'] < 15), # ontime
        [
                raw_data_point['TAXI_OUT'], \
                raw_data_point['DISTANCE'], \
        ]
    )

# Train
examples: PipelinedRDD = traindata.rdd.map(to_example)
lrmodel: LogisticRegressionModel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
lrmodel.clearThreshold() # so it returns probabilities
    
# Predict
examples: PipelinedRDD = heldout_data.rdd.map(to_example)
labelpred: PipelinedRDD = examples.map(lambda lp: (lp.label, lrmodel.predict(lp.features)))
labelpred: PipelinedRDD = labelpred.filter(lambda p: p[1] > 0.65 and p[1] < 0.75)
    
# Evaluate
print(eval_revised(labelpred))

In [None]:
# 3rd seletion : Excluding DISTANCE
def to_example(raw_data_point: dict):
    return LabeledPoint(
        float(raw_data_point['ARR_DELAY'] < 15), # ontime
        [
                raw_data_point['DEP_DELAY'], \
                raw_data_point['TAXI_OUT'],
        ]
    )

# Train
examples: PipelinedRDD = traindata.rdd.map(to_example)
lrmodel: LogisticRegressionModel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
lrmodel.clearThreshold() # so it returns probabilities
    
# Predict
examples: PipelinedRDD = heldout_data.rdd.map(to_example)
labelpred: PipelinedRDD = examples.map(lambda lp: (lp.label, lrmodel.predict(lp.features)))
labelpred: PipelinedRDD = labelpred.filter(lambda p: p[1] > 0.65 and p[1] < 0.75)
    
# Evaluate
print(eval_revised(labelpred))

Result

Looks like RMSE was not reduced in any of the pattern.

| Pattern | RMSE | Percent increase |
| ------- | ---- | ---------------- |
| Full set | 0.252742459141375 | |
| Removed TAXI_OUT | 0.3488991171205433 | 38% |
| Removed DEP_DELAY | 0.5200556625529862 | 105% | 
| Removed DISTANCE | 0.31710244140962074 | 25% |

## Scaling

In [None]:
# ToDO Different RMSE from initial experiment with 3 features
# Scailng is to scale the feature variables to similar magnitude.
# This will help the regression library to choose initial weight effective manner.

def to_example(raw_data_point):
    return LabeledPoint(
        float(raw_data_point['DEP_DELAY'] < 15), #ontime
        [
            raw_data_point['DEP_DELAY'] / 30,
            (raw_data_point['DISTANCE'] / 1000) - 1,
            (raw_data_point['TAXI_OUT'] / 10) - 1
        ]
    )

# Train
traindata: DataFrame = spark.sql(trainquery)
examples: PipelinedRDD = traindata.rdd.map(to_example)
lrmodel: LogisticRegressionModel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
lrmodel.clearThreshold() # so it returns probabilities
   
# Predict
testquery: str = trainquery.replace("t.is_train_day == 'True'", "t.is_train_day == 'False'")
    
# Label the dataset
testdata: DataFrame = spark.sql(testquery)
examples: PipelinedRDD = testdata.rdd.map(to_example)
labelpred: PipelinedRDD = examples.map(lambda lp: (lp.label, lrmodel.predict(lp.features)))
#labelpred: PipelinedRDD = labelpred.filter(lambda p: p[1] > 0.65 and p[1] < 0.75)
    
# Evaluate
print(eval_revised(labelpred))