## 1. Imports and Initialization

In [None]:
ls /kaggle/input/airline-delay-and-cancellation-data-2009-2018

In [None]:
pip install pyspark

import altair as alt

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

import warnings
warnings.filterwarnings('ignore')

In [None]:
# initialize sparkSession
spark = SparkSession.builder.config("spark.executor.memory","2g").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [None]:
spark.catalog.clearCache()

## 2. Loading and Cleaning the Data

In [None]:
file_names_range = list(range(2009, 2016))
file_paths = [f'/kaggle/input/airline-delay-and-cancellation-data-2009-2018/{file}.csv' for file in file_names_range]

In [None]:
schema = T.StructType([
    T.StructField("FL_DATE", T.TimestampType(), nullable=True),
    T.StructField("OP_CARRIER", T.StringType(), nullable=True),
    T.StructField("OP_CARRIER_FL_NUM", T.IntegerType(), nullable=True),
    T.StructField("ORIGIN", T.StringType(), nullable=True),
    T.StructField("DEST", T.StringType(), nullable=True),
    T.StructField("CRS_DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_DELAY", T.DoubleType(), nullable=True),
    T.StructField("TAXI_OUT", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_OFF", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_ON", T.DoubleType(), nullable=True),
    T.StructField("TAXI_IN", T.DoubleType(), nullable=True),
    T.StructField("CRS_ARR_TIME", T.DoubleType(), nullable=True),
    T.StructField("ARR_TIME",T.DoubleType(), nullable=True),
    T.StructField("ARR_DELAY", T.DoubleType(), nullable=True),
    T.StructField("CANCELLED", T.DoubleType(), nullable=True),
    T.StructField("CANCELLATION_CODE", T.StringType(), nullable=True),
    T.StructField("DIVERTED", T.DoubleType(), nullable=True),
    T.StructField("CRS_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("ACTUAL_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("AIR_TIME", T.DoubleType(), nullable=True),
    T.StructField("DISTANCE", T.DoubleType(), nullable=True),
    T.StructField("CARRIER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("WEATHER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("NAS_DELAY", T.DoubleType(), nullable=True),
    T.StructField("SECURITY_DELAY", T.DoubleType(), nullable=True),
    T.StructField("LATE_AIRCRAFT_DELAY", T.DoubleType(), nullable=True),
    T.StructField("Unnamed: 27", T.StringType(), nullable=True)
])

In [None]:
df = spark.read.schema(schema).format("csv").option("header", "true").load(file_paths)

In [None]:
# remove null values from the cols used for classification:
df = df.dropna(subset= [
    'FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'CRS_ARR_TIME',
 'CANCELLED',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'DISTANCE'])

# save df for analysis
analysis_df = df

In [None]:
# drop the cols who indirectly indicate if a flight is canelled or not (apart from the column CANCELLED)
# most of those cols contain null values, if the flight is cancelled

classify_df = df.drop("Unnamed: 27", 
                        "CARRIER_DELAY", 
                        "WEATHER_DELAY",
                        "NAS_DELAY",
                        "SECURITY_DELAY",
                        "LATE_AIRCRAFT_DELAY",
                        "CANCELLATION_CODE",
                        "DEP_TIME",
                        "DEP_DELAY",
                        "TAXI_OUT",
                        "WHEELS_OFF",
                        "WHEELS_ON",
                        "TAXI_IN",
                        "ARR_TIME",
                        "ARR_DELAY",
                        "ACTUAL_ELAPSED_TIME", 
                        "AIR_TIME")

In [None]:
# numerical timestamp column
classify_df = classify_df.withColumn("FL_DATE", F.unix_timestamp("FL_DATE"))

In [None]:
classify_df.columns

In [None]:
# Take a subset: either balanced (with subsampling) or unbalanced
# we take a subset, because of memory limitations

# select subsample of positive samples
pos_df = classify_df.filter(F.col('CANCELLED').isin(1)).sample(fraction=0.1)
# select an equal amount of negative samples (number of neg samples == number of pos samples)
neg_df = classify_df.filter(F.col('CANCELLED').isin(0)).orderBy(F.rand()).limit(pos_df.count())


# balanced df - a subset - around 141k
classify_df = pos_df.union(neg_df).sample(fraction=1.0).cache()

# unbalanced df - but a subset - around 215k
#classify_df = classify_df.sample(fraction=0.005).cache() 

In [None]:
#classify_df.rdd.countApprox(timeout = 1000,confidence = 0.90)

## 3. Analysis (on the analysis_df)

In [None]:
# get the most present flight carriers
carriers_flight_count_df = analysis_df.groupBy(F.col('OP_CARRIER')).count().orderBy(F.col('count').desc())
top_10 = carriers_flight_count_df.limit(10).toPandas()
top_10 = top_10.rename(columns={'OP_CARRIER':'Carrier'})
top_10

In [None]:
# visualisation
chart = alt.Chart(top_10).mark_arc(outerRadius=260, innerRadius=75).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Carrier:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Top 10 Carriers by amount of flights',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=350)
value_text = pie.mark_text(radius=300, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=250)
text = pie2.mark_text(radius=200, size=15).encode(
    text=alt.Text('Carrier:N'), 
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)

In [None]:
# count number of cancellations per code/reason
carriers_flight_count_df = analysis_df.filter(F.col('CANCELLATION_CODE').isNotNull()).groupBy(F.col('CANCELLATION_CODE')).count()
cancellation_reasons = carriers_flight_count_df.toPandas()
cancellation_reasons

In [None]:
# rename col values
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'A'] = 'By carrier'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'B'] = 'Due to weather'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'C'] = 'By national air system'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'D'] = 'For security'
cancellation_reasons = cancellation_reasons.rename(columns={'CANCELLATION_CODE':'Reason'})

In [None]:
cancellation_reasons

In [None]:
# visualisation of calcellation reasons
chart = alt.Chart(cancellation_reasons).mark_arc(outerRadius=180, innerRadius=50).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Reason:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Reasons for flight cancellations',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=250)
value_text = pie.mark_text(radius=220, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=150)
text = pie2.mark_text(radius=120, size=12).encode(
    text=alt.Text('Reason:N'), 
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)

## 4. Preprocessing

In [None]:
# define StringIndexer: categorical (string) cols -> to column indices, 
# each category gets a integer based on their frequency (start from 0)

carrier_indexer = StringIndexer(inputCol="OP_CARRIER", outputCol="OP_CARRIER_Index")
origin_indexer = StringIndexer(inputCol="ORIGIN", outputCol="ORIGIN_Index")
dest_indexer = StringIndexer(inputCol="DEST", outputCol="DEST_Index")


In [None]:
# define onehotencoder for a index columns 
onehotencoder_carrier_vector = OneHotEncoder(inputCol="OP_CARRIER_Index", outputCol="OP_CARRIER_vec")
onehotencoder_origin_vector = OneHotEncoder(inputCol="ORIGIN_Index", outputCol="ORIGIN_vec")
onehotencoder_dest_vector = OneHotEncoder(inputCol="DEST_Index", outputCol="DEST_vec")


In [None]:
# Pipelining the preprocessing stages defined above 
pipeline = Pipeline(stages=[carrier_indexer, origin_indexer, dest_indexer,
                            onehotencoder_carrier_vector, onehotencoder_origin_vector,
                            onehotencoder_dest_vector])

transformed_df = pipeline.fit(classify_df).transform(classify_df)

In [None]:
# select columns that are combined to one feature column
feature_columns = transformed_df.columns

# remove cols that whould not be in our feature cols (label col, intermediate preprocessing cols)
for item in ["CANCELLED", "ORIGIN", "DEST", "OP_CARRIER", "OP_CARRIER_Index", "ORIGIN_Index", "DEST_Index"]:
    feature_columns.remove(item)


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# build feature col
assembled_df = assembler.transform(transformed_df)

In [None]:
# select only feature and label column
final_classify_df = assembled_df.select("features", F.col("CANCELLED").alias("label"))

In [None]:
final_classify_df.printSchema()

In [None]:
train, test = final_classify_df.randomSplit([.7, .3], seed=9) # 70, 30 split on balanced set or on subset of samples

In [None]:
#spark.catalog.clearCache()
# caching data into memory - models run quicker
train = train.repartition(32).cache()
test = test.repartition(32).cache()

## 5. Training Models (on balanced and unbalanced data)

In [None]:
# define the models
log_regress = LogisticRegression(labelCol = 'label', featuresCol = 'features')
decision_tree = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
rand_forest = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
gbt = GBTClassifier(labelCol = 'label', featuresCol = 'features')

In [None]:
log_regress_model = log_regress.fit(train)
images/README_1200x800.gif

In [None]:
decision_tree_model = decision_tree.fit(train)

In [None]:
rand_forest_model = rand_forest.fit(train)

In [None]:
gbt_model = gbt.fit(train)

## 6. Evaluation

In [None]:
# Predications on test set
log_regress_predictions = log_regress_model.transform(test)
decision_tree_predictions = decision_tree_model.transform(test)
rand_forest_predictions = rand_forest_model.transform(test)
gbt_predictions = gbt_model.transform(test)


In [None]:
# Define metrics to evaluate the models 
# ROC = areaUnderROC = Area under the Receiver Operating Characteristic (ROC) curve, 
# A curve that plots the TPR against the FPR. 
# The area under the ROC curve represents the probability that the model correctly ranks a randomly chosen positive instance higher than a randomly chosen negative instance.
# A higher value of areaUnderROC indicates better model performance, with 1.0 being the maximum achievable value.
evaluator_ROC = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')

# PR = areaUnderPR = Area Under the Precision-Recall curve
# A curve that plots the precision (positive predictive value) against the recall (sensitivity). 
# The area under the precision-recall curve represents the trade-off between precision and recall. 
# A higher value of areaUnderPR indicates better model performance, with 1.0 being the maximum achievable value.
evaluator_PR = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderPR')

# Accuracy
# in pyspark accuracy metrics is for multiclass-classification
evaluator_Acc = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')


In [None]:
# set evaluations

log_regress_ROC = evaluator_ROC.evaluate(log_regress_predictions)
decision_tree_ROC = evaluator_ROC.evaluate(decision_tree_predictions)
rand_forest_ROC = evaluator_ROC.evaluate(rand_forest_predictions)
gbt_ROC = evaluator_ROC.evaluate(gbt_predictions)

log_regress_PR = evaluator_PR.evaluate(log_regress_predictions)
decision_tree_PR = evaluator_PR.evaluate(decision_tree_predictions)
rand_forest_PR = evaluator_PR.evaluate(rand_forest_predictions)
gbt_PR = evaluator_PR.evaluate(gbt_predictions)

log_regress_Acc = evaluator_Acc.evaluate(log_regress_predictions)
decision_tree_Acc = evaluator_Acc.evaluate(decision_tree_predictions)
rand_forest_Acc = evaluator_Acc.evaluate(rand_forest_predictions)
gbt_Acc = evaluator_Acc.evaluate(gbt_predictions)


In [None]:
# Print the metrics of each model - unbalanced dataset
print('Metric esults:')
print('Area under Receiver Operating Characteristic curve:')
print("Logistic Regression ROC: {:.4f}".format(log_regress_ROC))
print("Decision Tree ROC: {:.4f}".format(decision_tree_ROC))
print("Random Forest ROC: {:.4f}".format(rand_forest_ROC))
print("Gradient Boosted Trees ROC: {:.4f}".format(gbt_ROC))

print('Area under Precision Recall curve:')
print("Logistic Regression PR: {:.4f}".format(log_regress_PR))
print("Decision Tree PR: {:.4f}".format(decision_tree_PR))
print("Random Forest PR: {:.4f}".format(rand_forest_PR))
print("Gradient Boosted Trees PR: {:.4f}".format(gbt_PR))

print('Accuracy:')
print("Logistic Regression PR: {:.4f}".format(log_regress_Acc))
print("Decision Tree PR: {:.4f}".format(decision_tree_Acc))
print("Random Forest PR: {:.4f}".format(rand_forest_Acc))
print("Gradient Boosted Trees PR: {:.4f}".format(gbt_Acc))

#### Example Results from 2 previous runs:

##### Unbalanced set:
Area under Receiver Operating Characteristic curve:
- Logistic Regression ROC: 0.6707
- Decision Tree ROC: 0.4989
- Random Forest ROC: 0.6255
- Gradient Boosted Trees ROC: 0.6616

Area under Precision Recall curve:
- Logistic Regression PR: 0.0318
- Decision Tree PR: 0.0187
- Random Forest PR: 0.0283
- Gradient Boosted Trees PR: 0.0371

Accuracy:
- Logistic Regression PR: 0.9834
- Decision Tree PR: 0.9833
- Random Forest PR: 0.9834
- Gradient Boosted Trees PR: 0.9833

##### Balanced set:
Area under Receiver Operating Characteristic curve:
- Logistic Regression ROC: 0.7126
- Decision Tree ROC: 0.5853
- Random Forest ROC: 0.6762
- Gradient Boosted Trees ROC: 0.7304

Area under Precision Recall curve:
- Logistic Regression PR: 0.6907
- Decision Tree PR: 0.5853
- Random Forest PR: 0.6621
- Gradient Boosted Trees PR: 0.7163

Accuracy:
- Logistic Regression PR: 0.6529
- Decision Tree PR: 0.6226
- Random Forest PR: 0.6290
- Gradient Boosted Trees PR: 0.6660