# Importing Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.ml.stat import Correlation
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns
pd.set_option('display.max_columns', None)

spark = SparkSession.builder.appName("IndividualAssignment").getOrCreate()

# Reading Data

In [None]:
user_defined_schema = StructType() \
        .add("year", IntegerType(), True) \
        .add("month", IntegerType(), True) \
        .add("carrier", StringType(), True) \
        .add("carrier_name", StringType(), True) \
        .add("airport", StringType(), True) \
        .add("airport_name", StringType(), True) \
        .add("arr_flights", IntegerType(), True) \
        .add("arr_del15", IntegerType(), True) \
        .add("carrier_ct", DoubleType(), True) \
        .add("weather_ct", DoubleType(), True) \
        .add("nas_ct", DoubleType(), True) \
        .add("security_ct", DoubleType(), True) \
        .add("late_aircraft_ct", DoubleType(), True) \
        .add("arr_cancelled", IntegerType(), True) \
        .add("arr_diverted", IntegerType(), True) \
        .add("arr_delay", IntegerType(), True) \
        .add("carrier_delay", IntegerType(), True) \
        .add("weather_delay", IntegerType(), True) \
        .add("nas_delay", IntegerType(), True) \
        .add("security_delay", IntegerType(), True) \
        .add("late_aircraft_delay", IntegerType(), True)

In [None]:
df = spark.read.format("csv") \
        .option("header", True) \
        .option("quote", "\"") \
        .option("escape", "\"") \
        .option("nullValue", "null") \
        .schema(user_defined_schema) \
        .load("hdfs://namenode:8020/datasets/Airline_Delay_Cause.csv")

# Exploratory Data Analysis

In [None]:
#Shape of the dataframe
print((df.count(), len(df.columns)))

In [None]:
list(df.columns)

In [None]:
#Schema of the Dataframe
df.printSchema()

In [None]:
#Description of each column
df.describe().toPandas()

# 1. Cleaning Data

In [None]:
#trim and get cleaned airport_name
df = df.withColumn("airport_name", trim(element_at(split(col("airport_name"), ":"), -1)))

In [None]:
#Getting Date Column
df = df.withColumn('Date', concat_ws('-', df["year"].cast(StringType()), df["month"].cast(StringType())).cast('date'))

In [None]:
#List of Numerical columns
list_of_numerical_columns = [i[0] for i in df.dtypes if i[1] not in ["string", "date"]]
list_of_numerical_columns = list_of_numerical_columns[2:]

In [None]:
#Adding total_number_delays column
df = df.withColumn('total_number_delays', df.carrier_ct + df.weather_ct + df.nas_ct + df.late_aircraft_ct)
df = df.withColumn('total_number_delays', df["total_number_delays"].cast('integer'))

In [None]:
# Adding Busy_type column based on arrival flights
def busytypefunction(row):
    value = temp_df[(temp_df["carrier"] == row["carrier"]) & (temp_df["year"] == row["year"])].reset_index()["arr_flights"][0]
    
    if((value >= 0) and (value < 40000)):
        busy_type = 0
    elif((value >= 40000) and (value < 80000)):
        busy_type = 1
    elif(value >= 80000):
        busy_type = 2
    else:
        busy_type = -1
    
    return busy_type

temp_df = (df.toPandas().groupby(["carrier", "year"])["arr_flights"].sum()/12).reset_index()
temp_df["arr_flights"] = temp_df["arr_flights"].astype(int)

temp_df["busy_type"] = temp_df.apply(lambda row: busytypefunction(row), axis=1)
temp_df = temp_df.drop(['arr_flights'], axis=1)
temp_df = spark.createDataFrame(temp_df)
df = df.join(temp_df, ["carrier", "year"])

# 2. Removing Duplicates

In [None]:
#Removing duplicate rows
df = df.distinct()
print((df.count(), len(df.columns)))

# 3. Imputating Missing values

In [None]:
#Count of Nulls in each column
null_count = df.select(*(count(when(col(i).isNull(), i)).cast("string").alias(i) for i in df.columns)).toPandas()

#Percentage of Nulls in each column
null_percent = df.select([(count(when(col(i).isNull(), i))/count(lit(1))*100).alias(i) for i in df.columns]).toPandas()

null_df = null_count.append(null_percent)
null_df.index = ["Count", "Percentage"]
null_df

In [None]:
#Imputation of nulls in columns

for i in df.dtypes:
    if((i[1] != "string") and (i[1] != "date")):
        mean_value = df.agg({i[0] : 'mean'}).collect()[0][0]
        df = df.na.fill(int(mean_value), i[0])

null_df = df.select(*(count(when(col(i).isNull(), i)).cast("string").alias(i) for i in df.columns)).toPandas()
null_df.index = ["Count"]
null_df

# 4. Removing outliers

In [None]:
#Boxplot for Outlier detection

outlier_boxplot = df.toPandas().boxplot(column = list_of_numerical_columns, figsize=(15, 5))
outlier_boxplot.plot()
plt.xticks(rotation = 90)
plt.show()

In [None]:
#Removing outliers for the the numerical columns

for i in ["arr_delay", "carrier_delay", "nas_delay", "late_aircraft_delay"]:
    
    percentile_range = df.approxQuantile(i, [0.25, 0.75], 0)
    
    iqr = percentile_range[1] - percentile_range[0]    
    upper = percentile_range[1] + (iqr*1.5)
    lower = percentile_range[0] - (iqr*1.5)
    
    print(i + " :: upper: " + str(upper) + ", lower: " + str(lower))
    df = df.filter(df[i].between(lower, upper))

# 5. Coorelation Matrix heatmap

In [None]:
#Coorelation matrix
corrMatrix = df[list_of_numerical_columns].toPandas().corr()
plt.figure(figsize=(15,8))
sns.heatmap(corrMatrix, annot=True)
plt.show()

# Big Data Queries using Spark SQL & Visualizations

In [None]:
# Registering dataframe as temptable
df.registerTempTable("df")

In [None]:
#Number of delays in each year
year_delays = spark.sql("select year, sum(total_number_delays) as number_of_delays from df group by year order by year").toPandas()

plt.figure(figsize=(15,5))
plt.plot(year_delays["year"], year_delays["number_of_delays"], linewidth = 2, color = "#13b026", marker = "o", markeredgecolor = "#000000")
plt.xticks(year_delays["year"])
plt.xlabel("YEAR", labelpad = 15)
plt.ylabel("Number of Delays", labelpad = 15)
plt.title("Number of Delays for Each Year")
plt.grid()
plt.show()

In [None]:
#number of average monthly delays for each carrier for year 2021
monthly_delays = spark.sql("select carrier_name, round(sum(total_number_delays)/12, 4) as average_monthly_delays from df where year = 2021 group by carrier_name order by round(sum(total_number_delays)/12) desc").toPandas()

plt.figure(figsize=(15, 5))
sns.barplot(x = monthly_delays['carrier_name'], y = monthly_delays['average_monthly_delays'], edgecolor = "#000000", palette = 'Wistia_r')
plt.title("Average Monthly Delays for each Airline")
plt.xlabel("Carrier", labelpad = 15)
plt.ylabel("Average Monthly Delays", labelpad = 15)
plt.xticks(rotation = 90)
plt.show()

In [None]:
#number of arriving flights, cancelled flights for each carrier in year 2021
arrive_cancelled = spark.sql("select carrier_name, sum(arr_flights) as arriving_flights, sum(arr_cancelled) as cancelled_flights, round(sum(arr_cancelled)/sum(arr_flights), 4) as ratio from df where year >= 2021 group by carrier_name order by ratio").toPandas()
arrive_cancelled.index = arrive_cancelled["carrier_name"]

fig, ax1 = plt.subplots(figsize=(15, 5))
ax1.set_xlabel('Carriers')
ax1.set_ylabel('Number of flights', labelpad = 20)
ax1.bar(arrive_cancelled["carrier_name"], arrive_cancelled["arriving_flights"], edgecolor = "#000000", color = "#2399fa")

ax2 = ax1.twinx()  
ax2.set_ylabel("Number of cancelled flights", labelpad = 20, rotation = 270)
ax2.plot(arrive_cancelled["carrier_name"], arrive_cancelled["cancelled_flights"], linewidth = 2, color = "#db0f1d", marker = "o", markeredgecolor = "#000000")

ax1.tick_params(labelrotation = 90)
fig.legend(["Number of Flights", "Number of Cancelled Flights"])
plt.title("Number of Flights to Number of Cancelled Flights")
plt.show()

In [None]:
#Number of flights for each carrier
carrier_flights = spark.sql("select carrier_name, sum(arr_flights) as number_of_flights from df where year >= 2021 group by carrier_name order by sum(arr_flights) desc").toPandas()

plt.figure(figsize=(15, 8))
sns.barplot(y = carrier_flights['carrier_name'], x = carrier_flights['number_of_flights'], orient = "h", edgecolor = "#000000", palette = 'GnBu_r')
plt.ylabel("Carrier", labelpad = 15)
plt.xlabel("Flights served", labelpad = 15)

for index, value in enumerate(carrier_flights["number_of_flights"]):
    plt.text(value, index, "  " + str(value))

plt.title("Number of Flights for each Airline in year 2021-present")
plt.show()

In [None]:
#Ratio of delays to arrived flights
ratio_arr_delay = spark.sql("select carrier_name, sum(arr_flights) as number_of_arriving_flights, sum(total_number_delays) as number_of_delays, round(sum(total_number_delays)/sum(arr_flights), 4) as ratio from df where year >= 2021 group by carrier_name order by round(sum(total_number_delays)/sum(arr_flights), 4)").toPandas()

ratio_arr_delay[["carrier_name", "number_of_arriving_flights", "number_of_delays"]].plot(x = "carrier_name", kind='bar', stacked=True, edgecolor = "#000000", figsize=(15, 5))
plt.xlabel("Carrier", labelpad = 15)
plt.ylabel("Count", labelpad = 15)
plt.title("Arranged in ascending ratio of delays to total arrivals")
plt.show()

In [None]:
#Ratio of delays to delayed_upto_15min for each carrier
spark.sql("select carrier_name, sum(arr_del15) as delayed_upto_15_min, sum(total_number_delays) as total_delays, round(sum(arr_del15)/sum(total_number_delays), 4) as ratio from df where year >= 2021 group by carrier_name order by round(sum(arr_del15)/sum(total_number_delays), 4) desc").show()

In [None]:
#Average minutes of delay due to National Aviation System greater than 24 hours monthly
nas_delay = spark.sql("select airport_name, round(round(sum(nas_delay)/12)/60) as average_monthly_hours from df where year >= 2021 group by airport_name order by round(sum(nas_delay)/12)").toPandas()
nas_delay = nas_delay[nas_delay["average_monthly_hours"] > 24]

nas_delay.plot.bar(x = 'airport_name', y = 'average_monthly_hours', mark_right = True, figsize=(15, 5), edgecolor = "#000000", color = "#fc9403")
plt.ylabel("Average Monthly Hours", labelpad = 15)
plt.xlabel("Airport Name", labelpad = 15)
plt.title("Average Hours of Delay Monthly due to National Aviation Systems")
plt.show()

In [None]:
#Number of flights cancelled for each carrier average monthly and total yearly
yearly_cancelled = spark.sql("select year, carrier_name, sum(arr_cancelled) as cancelled_flights from df group by year, carrier_name order by year, carrier_name").toPandas()

plt.figure(figsize=(20, 10))

for i in yearly_cancelled["carrier_name"].unique():
    yearly_cancelled_df = yearly_cancelled[yearly_cancelled["carrier_name"] == i]
    plt.plot(yearly_cancelled_df["year"], yearly_cancelled_df["cancelled_flights"], label = i, linewidth = 2)

plt.legend(ncol=2)
plt.xticks(yearly_cancelled["year"].unique())
plt.ylabel("Number of Cancelled Flights", labelpad = 15)
plt.xlabel("YEAR", labelpad = 15)
plt.title("Total Number of Cancelled Flights for each Airline Yearly")
plt.show()

In [None]:
#Average number of flights for each carrier every month
average_flights = spark.sql("select carrier_name, month, cast(round(avg(arr_flights)) as int) as number_of_flights from df group by carrier_name, month order by carrier_name, month").toPandas()

plt.figure(figsize=(20, 10))

for i in average_flights["carrier_name"].unique():
    average_flights_df = average_flights[average_flights["carrier_name"] == i]
    plt.plot(average_flights_df["month"], average_flights_df["number_of_flights"], label = i, linewidth = 2)

plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left")
plt.xticks(average_flights["month"].unique())
plt.ylabel("Average number of Flights every month", labelpad = 15)
plt.xlabel("Month", labelpad = 15)
plt.title("Average number of Flights each Month for each Airline")
plt.show()

In [None]:
#Most security breaches airports

print("Top Airports with most security breaches:")
most_dangerous = spark.sql("select airport_name, round(sum(security_ct)/sum(arr_flights), 4) as ratio_of_security_breaches_to_flights from df group by airport_name order by round(sum(security_ct)/sum(arr_flights), 4) desc limit 15").show()
most_dangerous

In [None]:
#yearly monthly security breaches
security_breaches = spark.sql("select year, month, cast(concat_ws('-', year, month, 1)as date) as Date, cast(round(sum(security_ct)) as int) as count_of_security_breaches from df group by year, month order by year, month").toPandas()

plt.figure(figsize=(15, 5))
plt.plot(security_breaches["Date"], security_breaches["count_of_security_breaches"], linestyle = ":", linewidth = 3, color = "#db0f1d", marker = "*", markeredgecolor = "#000000")
plt.ylabel("Number of Security Breaches", labelpad = 15)
plt.xlabel("YEARS", labelpad = 15)
plt.title("Number of Security Breaches over the YEARS")
plt.show()

In [None]:
#Delay of each carrier because of returning flights
number_late_returns_delays = spark.sql("select carrier_name, cast(sum(late_aircraft_ct) as int) as number_of_late_returns, round(avg(late_aircraft_delay), 4) as average_delay_min from df where year >= 2021 group by carrier_name order by 2 desc, 3 desc").toPandas()


sns.lmplot(x = "number_of_late_returns", y = "average_delay_min", data = number_late_returns_delays, ci = 0.95, markers = 'x', line_kws={'color': 'red'})
plt.ylabel("Average Delays in minutes", labelpad = 15)
plt.xlabel("Number of Late returns", labelpad = 15)
plt.title("Scatter Plot")
plt.xticks(range(0, 11000, 1000))
plt.show()
print(number_late_returns_delays)

In [None]:
#Delayed pie chart
delayed_reasons = spark.sql("select sum(carrier_ct), sum(weather_ct), sum(nas_ct), sum(late_aircraft_ct), sum(security_ct) from df").toPandas()

plt.figure(figsize=(15, 8))
labels = ["Carrier (staff issues)", "Weather", "Air traffic", "Aircrafts late arrival", "security"]
plt.pie(delayed_reasons.values.tolist()[0], labels = labels, autopct = '%1.1f%%',
        wedgeprops = {"edgecolor" : "black",
                      'linewidth': 1,
                      'antialiased': True})

plt.title("Percentage of Reasons Delay")
plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left")
plt.show() 

In [None]:
#diverted airports
diverted_df = spark.sql("select airport_name, cast(sum(arr_diverted)/12 as int) as average_monthly_diverted from df where year >= 2021 group by airport_name order by sum(arr_diverted)/12 desc").toPandas()

plt.figure(figsize=(15, 5))
plt.hist(diverted_df["average_monthly_diverted"], bins=[0,1,2,3,4,5,6,7], edgecolor = "#000000", color = "#008080")
plt.ylabel("Number of Airports", labelpad = 15)
plt.xlabel("Number of Diversions from the Airport", labelpad = 15)
plt.title("Histogram for Number of Airport for Diversions occured")
plt.show()

In [None]:
#Delay due to carrier issues
carrier_delay = spark.sql("select carrier_name, cast(sum(carrier_ct)/12 as int) as monthly_number_of_delays from df where year >= 2021 group by carrier_name order by sum(carrier_ct)/12 desc").toPandas()

plt.figure(figsize=(15, 5))
sns.barplot(x = carrier_delay['carrier_name'], y = carrier_delay['monthly_number_of_delays'], edgecolor = "#000000", palette='Blues_r')
plt.xlabel("Carrier", labelpad = 15)
plt.ylabel("Monthly Delays due to Carrier Issues", labelpad = 15)
plt.title("Monthly Delays because of Airline Issues")
plt.xticks(rotation = 90)
plt.show()

# Linear Regression Classification to Predict Number of delays

In [None]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
corrMatrix = df[list_of_numerical_columns].toPandas().corr().loc['arr_delay'].to_frame()
sns.heatmap(corrMatrix, square=True, cmap='Blues')

correlation_columns = [i for i in corrMatrix.index if(corrMatrix.loc[i][0] >= 0.5)]
correlation_columns.remove('arr_delay')

In [None]:
assembler = VectorAssembler(inputCols = correlation_columns, outputCol = 'features')
lr_df = assembler.transform(df)

lr_df = lr_df.select('features', 'arr_delay')
lr_df.show(10)

In [None]:
train_df, test_df = lr_df.randomSplit([0.75, 0.25])

In [None]:
lnr = LinearRegression(featuresCol='features', labelCol='arr_delay')

model = lnr.fit(train_df)
results = model.evaluate(train_df)

print('R Squared Error :', results.r2)
print('Mean Squared Error :', results.meanSquaredError)
print('Mean Absolute Error :', results.meanAbsoluteError)

plt.figure(figsize=(15, 5))
plt.hist(results.residuals.toPandas(), bins=range(-400, 400, 25), edgecolor = "#000000", color = "#00ffae")
plt.ylabel("Number of Occurances", labelpad = 15)
plt.xlabel("Range of values", labelpad = 15)
plt.title("Plotted Residuals")
plt.show()

In [None]:
predictions = model.transform(test_df.select('features')).toPandas()

In [None]:
plt.figure(figsize = (15,5))
plt.hist(test_df.toPandas()["arr_delay"], bins = range(0, 5000, 250), edgecolor = "#000000", color = "#213adb")
plt.hist(predictions["prediction"], bins = range(0, 5000, 250), edgecolor = "#000000", color = "#ff0044", alpha = 0.5)
plt.legend(["Test", "Predicted"])
plt.ylabel("Count")
plt.xlabel("Bins of Value")
plt.title("Test VS Predicted")
plt.show()

# K-Means Clustering to predict Type of Busy Airports

In [None]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
k = df.select("busy_type").distinct().count()
k

In [None]:
corrMatrix = df.toPandas().corr().loc['busy_type'].to_frame()
sns.heatmap(corrMatrix, annot=True)
plt.show()

In [None]:
correlation_columns = [i for i in corrMatrix.index if(corrMatrix.loc[i][0] >= 0.15)]

In [None]:
assembler = VectorAssembler(inputCols = correlation_columns, outputCol = "features")
kmeans_df = assembler.transform(df)

kmeans = KMeans(featuresCol = 'features', k = k, maxIter = 100)
model = kmeans.fit(kmeans_df)
predictions = model.transform(kmeans_df)
predictions = predictions.select("busy_type", "prediction")

In [None]:
print(classification_report(predictions.select("busy_type").collect(), predictions.select("prediction").collect()))

In [None]:
predictions_df = predictions.groupBy("busy_type", "prediction").count().orderBy("busy_type", "prediction").toPandas()
predictions_df

In [None]:
predictions_df = predictions_df.groupby(['busy_type', 'prediction']).agg({'count': 'sum'})
predictions_df = predictions_df.groupby(level=0).apply(lambda x:100 * x / float(x.sum()))
predictions_df.rename(columns = {'count':'percentage'}, inplace = True)

plt.figure(figsize = (15,5))
predictions_df.unstack().plot(kind='bar', edgecolor = "#000000", stacked=True)
plt.xlabel("Original Values")
plt.ylabel("Percentage")
plt.title("% of predicted values")
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5), labels = ["0% occurance", "1% occurance", "2% occurance"])
plt.show()

# logistic Regression for predicting Security Issues Reported

In [None]:
df = df.withColumn('security_issue', F.when(F.col("security_ct") > 0, 1).when(F.col("security_ct") == 0, 0).otherwise(-1))
df.groupby("security_issue").count().show()

In [None]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
corrMatrix = df.toPandas().corr().loc["security_issue"].to_frame()
sns.heatmap(corrMatrix, annot=True, cmap="YlGnBu")
plt.show()

In [None]:
correlation_columns = [i for i in corrMatrix.index if(corrMatrix.loc[i][0] >= 0.1)]
correlation_columns.remove("security_issue")

In [None]:
assembler = VectorAssembler(inputCols = correlation_columns, outputCol = "features")

log_df = assembler.transform(df)
log_df = log_df.select("features", "security_issue")

In [None]:
train_df, test_df = log_df.randomSplit([0.75,0.25])

model = LogisticRegression(labelCol = "security_issue").fit(train_df)

train_results = model.evaluate(train_df).predictions
train_results.show(10)

In [None]:
print(classification_report(train_results.select("security_issue").collect(), train_results.select("prediction").collect()))

In [None]:
#Testing on test data
results = model.evaluate(test_df).predictions
results.groupby("security_issue", "prediction").agg({'prediction':'count'}).show()

# Conclusion

•	The number of average Delays before the covid pandemic decreased steadily over the years but has seen a sharp increase after that period.
•	Almost all the airlines saw a sharp increase in the number of cancelled flights during the covid pandemic season.
•	Carrier issues like staff not being available accounted for at least 40% of the total delays in 2021.
•	The number of security breaches over the years has reduced sharply.
•	October and November are the months with the most scheduled flights.
•	Skywest Airlines has the highest average monthly delay and the most cancelled flights for the year 2021.
•	The number of diversions to other airports is minimal and has not increased over the years.
•	There is a high correlation between a delayed flight and the delays occurring due to Airline issues.
•	Cold Bay Airport, Alaska, has the highest number of breaches compared to the number of flights arriving.