In [1]:
import pyspark

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200509195732-0000
KERNEL_ID = 13729671-8cb1-448e-ae76-c4f2bf53816f


In [2]:
# #### Check local file system

import os
print(os.getcwd())

/home/spark/shared


In [3]:
# #### Import data

import ibmos2spark
import pandas as pd
# @hidden_cell
# The following code contains the credentials for a file in your IBM Cloud Object Storage.
# You might want to remove those credentials before you share your notebook.
credentials = {
    'IAM_SERVICE_ID': 'iam-ServiceId-57d72783-fe0f-4185-a9e1-f31abc5fb319',
    'IBM_API_KEY_ID': 'BBTTv16fkk0Dd6gz9ttr7dq7bsrFsnrUGmmZTIoiw1vD',
    'ENDPOINT': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'IBM_AUTH_ENDPOINT': 'https://iam.ng.bluemix.net/oidc/token',
    'BUCKET': 'myfirstsparkproject-donotdelete-pr-a7oeziabdfgdet',
    'FILE': 'combined-csv-files-data-prepping.csv'
}

from ibm_botocore.client import Config
import ibm_boto3

cos = ibm_boto3.client(service_name='s3',
    ibm_api_key_id=credentials['IBM_API_KEY_ID'],
    ibm_service_instance_id=credentials['IAM_SERVICE_ID'],
    ibm_auth_endpoint=credentials['IBM_AUTH_ENDPOINT'],
    config=Config(signature_version='oauth'),
    endpoint_url=credentials['ENDPOINT'])

try:
    res=cos.download_file(Bucket=credentials['BUCKET'],Key='combined-csv-files-data-prepping.csv',Filename='combined-csv-files-data-prepping.csv')
except Exception as e:
    print(Exception, e)
else:
    print('Data File Downloaded')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#data_start = spark.read.csv(cos.url(credentials['FILE'], credentials['BUCKET']), sep=",", inferSchema="true", header="true")
data_start = spark.read.csv('combined-csv-files-data-prepping.csv', sep=",", inferSchema="true", header="true")
data_start.show(5)

File Downloaded
+-------+--------------+-----------------+---------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+----------------+-------------------+--------+--------+
|FL_DATE|UNIQUE_CARRIER|ORIGIN_AIRPORT_ID|DEST_AIRPORT_ID|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+-------+--------------+-----------------+---------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+----------------+-------------------+--------+--------+
| 1/1/12|            AA|            12478|          12892|         900|     855|       -5|      

In [6]:
# #### Process categorical columns

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.sql.functions import udf

# Filter out cancelled flights and rows with no label
data1 = data_start.filter(data_start["ARR_TIME"]>0)
data1 = data_start.filter(data_start["ARR_DEL15"]>=0)
# Convert date to Unix timestamp for OneHotEncoding - NOT NEEDED
#data = data1.withColumn("FL_DATE", unix_timestamp($"FL_DATE", "%d/%m/%Y"))

# Convert departure time to minutes
name = 'CRS_DEP_TIME'
@udf("int")
def get_minutes(t):
    return ((t//100)*60) + t%100

data = data1.select(*[get_minutes(name).alias(name) if column == name else column for column in data1.columns])

# Categorical Columns
categorical_columns = data.columns[0:4]
stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded") for c in categorical_columns]
stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+ "_one_hot") for c in categorical_columns]

ppl = Pipeline(stages=stage_string + stage_one_hot)
df = ppl.fit(data).transform(data)

df.select('UNIQUE_CARRIER', 'UNIQUE_CARRIER_one_hot','UNIQUE_CARRIER_string_encoded').distinct().sort(F.asc("UNIQUE_CARRIER_string_encoded")).show()

+--------------+----------------------+-----------------------------+
|UNIQUE_CARRIER|UNIQUE_CARRIER_one_hot|UNIQUE_CARRIER_string_encoded|
+--------------+----------------------+-----------------------------+
|            WN|        (14,[0],[1.0])|                          0.0|
|            AA|        (14,[1],[1.0])|                          1.0|
|            DL|        (14,[2],[1.0])|                          2.0|
|            EV|        (14,[3],[1.0])|                          3.0|
|            OO|        (14,[4],[1.0])|                          4.0|
|            UA|        (14,[5],[1.0])|                          5.0|
|            MQ|        (14,[6],[1.0])|                          6.0|
|            US|        (14,[7],[1.0])|                          7.0|
|            B6|        (14,[8],[1.0])|                          8.0|
|            FL|        (14,[9],[1.0])|                          9.0|
|            AS|       (14,[10],[1.0])|                         10.0|
|            YV|    

In [7]:
# #### Build VectorAssembler stage

df.columns

assembler = VectorAssembler(
  inputCols=['FL_DATE_one_hot',
             'UNIQUE_CARRIER_one_hot',
             'DISTANCE',
             'CRS_DEP_TIME',
             'ORIGIN_AIRPORT_ID_one_hot',
             'DEST_AIRPORT_ID_one_hot'
             ],
    outputCol="features")

data_df = assembler.transform(df)
data_df = data_df.withColumn('label', F.col('ARR_DEL15'))
data_df.select("features", "label").show()

data_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(678,[51,91,104,1...|    0|
|(678,[1,91,104,10...|    0|
|(678,[17,91,104,1...|    0|
|(678,[25,91,104,1...|    0|
|(678,[7,91,104,10...|    0|
|(678,[6,91,104,10...|    0|
|(678,[54,91,104,1...|    0|
|(678,[47,91,104,1...|    0|
|(678,[23,91,104,1...|    0|
|(678,[39,91,104,1...|    0|
|(678,[34,91,104,1...|    0|
|(678,[31,91,104,1...|    0|
|(678,[13,91,104,1...|    0|
|(678,[58,91,104,1...|    0|
|(678,[49,91,104,1...|    0|
|(678,[24,91,104,1...|    0|
|(678,[45,91,104,1...|    0|
|(678,[37,91,104,1...|    0|
|(678,[29,91,104,1...|    0|
|(678,[42,91,104,1...|    0|
+--------------------+-----+
only showing top 20 rows

+-------+--------------+-----------------+---------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+----------------+-------------------+-

In [8]:
# #### Split data into training and test datasets
training, test = data_df.randomSplit([0.7, 0.3], seed=51423)

In [9]:
# #### Build Logistic Regression model

from pyspark.ml.regression import GeneralizedLinearRegression
logr = GeneralizedLinearRegression(family="binomial", link="logit", regParam=0.0)

# Fit the model to the data and call this model logr_Model
logr_Model = logr.fit(training)

# Print the coefficients and intercept for linear regression
summary = logr_Model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))

# #### Prediction on training data
pred_training_cv = logr_Model.transform(training)
pred_training_cv.show(5, truncate=False)

# #### Prediction on test data
pred_test_cv = logr_Model.transform(test)
pred_test_cv.show(5, truncate=False)

Coefficient Standard Errors: [0.06536559984563783, 0.06405651918969593, 0.064812108292438, 0.06681301527641431, 0.06632306829843422, 0.06303227369041435, 0.0711157805010364, 0.06915894096111863, 0.06630196499575987, 0.06532602935407057, 0.07033605534333301, 0.0661874810020931, 0.06893356156045888, 0.0646439537109742, 0.06515455708894047, 0.067844656692197, 0.06519323930531197, 0.06526021531713944, 0.0648547183006187, 0.06429567613039631, 0.06532738573516989, 0.0699164399403504, 0.0679203202104353, 0.06564137413543619, 0.0662403193662525, 0.06746951605230395, 0.0664353810586309, 0.06744271959626359, 0.06583220915276666, 0.06548338611286136, 0.0633490086660836, 0.06443518580888166, 0.06926393148189743, 0.07006935416261136, 0.06599677067477594, 0.06952329344319992, 0.06767766460154769, 0.06614664854971361, 0.06790623973080379, 0.06937384059311975, 0.07328504385793314, 0.06984024672556916, 0.06433254659374064, 0.06939717507595589, 0.06537313701198394, 0.06502583613316157, 0.066545193602401

+-------+--------------+-----------------+---------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+----------------+-------------------+--------+--------+----------------------+-----------------------------+--------------------------------+------------------------------+---------------+----------------------+-------------------------+-----------------------+-----------------------------------------------------------+-----+-------------------+
|FL_DATE|UNIQUE_CARRIER|ORIGIN_AIRPORT_ID|DEST_AIRPORT_ID|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|FL_DATE_string_encoded|UNIQUE_CARRIER_string_encoded|ORIGIN_AIRPORT_ID_string_encoded|DEST_AIRPORT_ID_string_encoded|

In [12]:
# #### Cost Matrix

from sklearn.metrics import confusion_matrix
y_true = pred_test_cv.select("label")
y_true = y_true.toPandas()

y_pred = pred_test_cv.select("prediction").withColumn('pred_label', F.when(F.col('prediction') >0.5, 1).otherwise(0))
y_pred = y_pred.select('pred_label').toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
cnf_matrix

array([[265834,    733],
       [ 43413,    615]])

In [13]:
# #### Model Accuracy

tn, fp, fn, tp = cnf_matrix.ravel()

model_acc = (tp + tn) / (tn+ fp + fn + tp)

print("Prediction accuracy: {:0.2f}%".format(model_acc*100))

Prediction accuracy: 85.79%


In [17]:
# #### Model Evaluation

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")

rmse = evaluator.evaluate(pred_test_cv)
print("Root Mean Squared Error (RMSE) for test data = {:0.4f}".format(rmse))

Root Mean Squared Error (RMSE) for test data = 0.3383
