In [1]:
df = spark.read.option("header", "true").csv("s3://us-flight-delay-data/raw/Airline_Delay_Cause.csv")
df.printSchema()
df.show(5)


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1749513307316_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- arr_flights: string (nullable = true)
 |-- arr_del15: string (nullable = true)
 |-- carrier_ct: string (nullable = true)
 |-- weather_ct: string (nullable = true)
 |-- nas_ct: string (nullable = true)
 |-- security_ct: string (nullable = true)
 |-- late_aircraft_ct: string (nullable = true)
 |-- arr_cancelled: string (nullable = true)
 |-- arr_diverted: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier_delay: string (nullable = true)
 |-- weather_delay: string (nullable = true)
 |-- nas_delay: string (nullable = true)
 |-- security_delay: string (nullable = true)
 |-- late_aircraft_delay: string (nullable = true)

+----+-----+-------+-----------------+-------+--------------------+-----------+---------+----------+

In [2]:
from pyspark.sql.functions import col

numeric_cols = [
    "arr_flights", "arr_del15", "carrier_ct", "weather_ct", "nas_ct", 
    "security_ct", "late_aircraft_ct", "arr_cancelled", "arr_diverted", 
    "arr_delay", "carrier_delay", "weather_delay", "nas_delay", 
    "security_delay", "late_aircraft_delay"
]

df_casted = df
for c in numeric_cols:
    df_casted = df_casted.withColumn(c, col(c).cast("double"))

df_casted.printSchema()
df_casted.show(5)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- arr_flights: double (nullable = true)
 |-- arr_del15: double (nullable = true)
 |-- carrier_ct: double (nullable = true)
 |-- weather_ct: double (nullable = true)
 |-- nas_ct: double (nullable = true)
 |-- security_ct: double (nullable = true)
 |-- late_aircraft_ct: double (nullable = true)
 |-- arr_cancelled: double (nullable = true)
 |-- arr_diverted: double (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier_delay: double (nullable = true)
 |-- weather_delay: double (nullable = true)
 |-- nas_delay: double (nullable = true)
 |-- security_delay: double (nullable = true)
 |-- late_aircraft_delay: double (nullable = true)

+----+-----+-------+-----------------+-------+--------------------+-----------+---------+----------+

In [3]:
from pyspark.sql.functions import col, when, expr

# 1. Select columns of interest
selected_cols = [
    "year", "month", "carrier", "carrier_name", "airport", "airport_name",
    "arr_flights", "arr_del15", "arr_cancelled", "arr_diverted",
    "arr_delay", "carrier_delay", "weather_delay", "nas_delay", "security_delay", "late_aircraft_delay"
]

df = df.select(*selected_cols)

# 2. Filter to keep rows where flights were not cancelled or diverted
df_filtered = df.filter(
    (col("arr_cancelled").cast("double") == 0) &
    (col("arr_diverted").cast("double") == 0) &
    (col("arr_flights").cast("double") > 0)  # Ensure flights exist to avoid division by zero
)

# 3. Create binary target: flag if more than 10% of flights delayed by 15+ minutes
df_filtered = df_filtered.withColumn(
    "Delayed_Arrival",
    when((col("arr_del15").cast("double") / col("arr_flights").cast("double")) > 0.10, 1).otherwise(0)
)

# 4. (Optional) Create a "Route" column - since you have only one airport per row, just use airport code
df_filtered = df_filtered.withColumn("Route", col("airport"))

# 5. Select final columns for modeling or analysis
df_final = df_filtered.select(
    "year", "month", "carrier", "carrier_name", "Route",
    "arr_flights", "arr_del15", "arr_delay",
    "carrier_delay", "weather_delay", "nas_delay", "security_delay", "late_aircraft_delay",
    "Delayed_Arrival"
)

df_final.printSchema()
df_final.show(5, truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- arr_flights: string (nullable = true)
 |-- arr_del15: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier_delay: string (nullable = true)
 |-- weather_delay: string (nullable = true)
 |-- nas_delay: string (nullable = true)
 |-- security_delay: string (nullable = true)
 |-- late_aircraft_delay: string (nullable = true)
 |-- Delayed_Arrival: integer (nullable = false)

+----+-----+-------+-----------------+-----+-----------+---------+---------+-------------+-------------+---------+--------------+-------------------+---------------+
|year|month|carrier|carrier_name     |Route|arr_flights|arr_del15|arr_delay|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|Delayed_Arrival|
+----+-----+-------+-----------------+-----+-----------+---------+-

In [4]:
df_final.write.mode("overwrite").parquet("s3://us-flight-delay-data/raw/processed/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
sc.install_pypi_package("numpy")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting numpy
  Downloading numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.5 MB)
Installing collected packages: numpy
Successfully installed numpy-2.0.2


In [7]:
train_df = spark.read.parquet("s3://us-flight-delay-data/raw/processed/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier


# 1. Cast necessary string columns to numeric (DoubleType)
numeric_cols = [
    "month", "arr_delay", "carrier_delay", "weather_delay", 
    "nas_delay", "security_delay", "late_aircraft_delay"
]

for c in numeric_cols:
    train_df = train_df.withColumn(c, col(c).cast("double"))

# 2. Define the categorical columns to be indexed
categorical_cols = ["carrier", "Route"]

# Create StringIndexers for categorical features
indexers = [
    StringIndexer(inputCol=col_name, outputCol=col_name + "_indexed", handleInvalid="keep")
    for col_name in categorical_cols
]

# 3. Assemble all features into a single vector
feature_cols = ["month", "arr_delay", "carrier_delay", "weather_delay", 
                "nas_delay", "security_delay", "late_aircraft_delay"] + \
               [c + "_indexed" for c in categorical_cols]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# 4. Define the classifier (Random Forest here)
rf = RandomForestClassifier(labelCol="Delayed_Arrival", featuresCol="features",maxBins=400)

# 5. Build the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# 6. Fit the pipeline to training data
model = pipeline.fit(train_df)




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
test_df= spark.read.parquet("s3://us-flight-delay-data/raw/processed/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Cast numeric columns in test_df as well
for c in numeric_cols:
    test_df = test_df.withColumn(c, col(c).cast("double"))

# Predict
predictions = model.transform(test_df)

# Show some predictions
predictions.select("features", "prediction", "Delayed_Arrival", "probability").show(5)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+---------------+--------------------+
|            features|prediction|Delayed_Arrival|         probability|
+--------------------+----------+---------------+--------------------+
|[2.0,733.0,578.0,...|       1.0|              1|[0.19493893976820...|
|[2.0,803.0,379.0,...|       1.0|              1|[0.12347828225929...|
|[2.0,964.0,101.0,...|       1.0|              1|[0.16388305654942...|
|[2.0,67.0,16.0,0....|       1.0|              1|[0.48347621612690...|
|[2.0,1244.0,797.0...|       1.0|              1|[0.11195560420719...|
+--------------------+----------+---------------+--------------------+
only showing top 5 rows

In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col

# Accuracy using ML evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Delayed_Arrival", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy = {accuracy:.3f}")

# Prepare RDD for MulticlassMetrics
preds_rdd = predictions.select("prediction", "Delayed_Arrival") \
                       .rdd.map(lambda row: (float(row['prediction']), float(row['Delayed_Arrival'])))
preds_rdd.cache()

# Compute confusion matrix and metrics
metrics = MulticlassMetrics(preds_rdd)

print("Confusion Matrix:")
print(metrics.confusionMatrix().toArray())

print(f"Precision = {metrics.precision(1.0):.3f}")
print(f"Recall = {metrics.recall(1.0):.3f}")
print(f"F1 Score = {metrics.fMeasure(1.0):.3f}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy = 0.780
Confusion Matrix:
[[10051.  8330.]
 [ 2455. 28258.]]
Precision = 0.772
Recall = 0.920
F1 Score = 0.840

In [12]:
# 1. Access the Random Forest model from the pipeline
rf_model = model.stages[-1]  

# 2. Get feature importances
importances = rf_model.featureImportances

# 3. Define feature names in same order as in assembler
categorical_cols = ["carrier", "Route"]
indexed_cols = [col + "_indexed" for col in categorical_cols]

feature_cols = [
    "month", "arr_delay", "carrier_delay", "weather_delay", 
    "nas_delay", "security_delay", "late_aircraft_delay"
] + indexed_cols

# 4. Zip feature names with importance scores
feature_importance_list = list(zip(feature_cols, importances.toArray()))

# 5. Sort by importance
sorted_importance = sorted(feature_importance_list, key=lambda x: x[1], reverse=True)

# 6. Print nicely
print("Feature Importances (Descending):")
for feature, score in sorted_importance:
    print(f"{feature:25} -> {score:.4f}")



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Feature Importances (Descending):
arr_delay                 -> 0.3904
nas_delay                 -> 0.1912
late_aircraft_delay       -> 0.1607
carrier_delay             -> 0.1502
Route_indexed             -> 0.0484
carrier_indexed           -> 0.0327
weather_delay             -> 0.0212
month                     -> 0.0051
security_delay            -> 0.0001

In [13]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Data
data = {
    "Feature": [
        "arr_delay", "nas_delay", "carrier_delay", "late_aircraft_delay", 
        "Route_indexed", "carrier_indexed", "weather_delay", "month", "security_delay"
    ],
    "Importance": [
        0.3859, 0.1922, 0.1586, 0.1499, 0.0546, 0.0313, 0.0224, 0.0050, 0.0002
    ]
}
df = pd.DataFrame(data).sort_values(by="Importance", ascending=True)

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(x="Importance", y="Feature", data=df, palette="viridis")
plt.title("Feature Importances from Random Forest")
plt.xlabel("Importance Score")
plt.ylabel("Features")
plt.tight_layout()

# Save to /tmp so boto3 can find it
plot_path = "/tmp/feature_importance.png"
plt.savefig(plot_path)
plt.close()
print(f"Plot saved to {plot_path}")




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
No module named 'pandas'
Traceback (most recent call last):
ModuleNotFoundError: No module named 'pandas'



In [18]:
import boto3

s3 = boto3.client("s3")
s3.upload_file("/tmp/feature_importance.png", "us-flight-delay-data", "visualizations/feature_importance.png")
print("Plot uploaded to s3://us-flight-delay-data/visualizations/feature_importance.png")



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
No module named 'boto3'
Traceback (most recent call last):
ModuleNotFoundError: No module named 'boto3'



In [26]:
sc.install_pypi_package("boto3")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3
  Downloading boto3-1.38.33-py3-none-any.whl (139 kB)
Collecting botocore<1.39.0,>=1.38.33
  Downloading botocore-1.38.33-py3-none-any.whl (13.6 MB)
Collecting s3transfer<0.14.0,>=0.13.0
  Downloading s3transfer-0.13.0-py3-none-any.whl (85 kB)
Installing collected packages: botocore, s3transfer, boto3
Successfully installed boto3-1.38.33 botocore-1.38.33 s3transfer-0.13.0


In [27]:
sc.install_pypi_package("pandas")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Package already installed for current Spark context!
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1749513307316_0001/container_1749513307316_0001_01_000001/pyspark.zip/pyspark/context.py", line 2614, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [28]:
sc.install_pypi_package("IPython")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Package already installed for current Spark context!
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1749513307316_0001/container_1749513307316_0001_01_000001/pyspark.zip/pyspark/context.py", line 2614, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [29]:
sc.install_pypi_package("matplotlib")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Package already installed for current Spark context!
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1749513307316_0001/container_1749513307316_0001_01_000001/pyspark.zip/pyspark/context.py", line 2614, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [30]:
sc.install_pypi_package("seaborn")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Package already installed for current Spark context!
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1749513307316_0001/container_1749513307316_0001_01_000001/pyspark.zip/pyspark/context.py", line 2614, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [33]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import boto3

# Step 1: Run model prediction on your test set
predictions = model.transform(test_df)  # model is your PipelineModel

# Step 2: Extract 'probability' column and convert to Pandas
pandas_df = predictions.select("probability").toPandas()

# Step 3: Extract probability of delay (assuming it's the second element)
pandas_df["delay_prob"] = pandas_df["probability"].apply(lambda x: float(x[1]))

# Step 4: Plot the distribution
plt.figure(figsize=(6, 4))
sns.histplot(pandas_df["delay_prob"], bins=20, kde=True, color='orange')
plt.title("Prediction Probability Distribution (Delayed Arrival)")
plt.xlabel("Predicted Probability of Delay")
plt.ylabel("Frequency")
plt.tight_layout()

# Step 5: Save the plot locally
plot_path = "/tmp/probability_distribution.png"
plt.savefig(plot_path)
plt.close()

# Step 6: Upload to S3
s3 = boto3.client("s3")
s3.upload_file(
    Filename=plot_path,
    Bucket="us-flight-delay-data",
    Key="visualizations/probability_distribution.png"
)

print("✅ Plot uploaded to s3://us-flight-delay-data/visualizations/probability_distribution.png")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? Plot uploaded to s3://us-flight-delay-data/visualizations/probability_distribution.png

In [23]:
sc.install_pypi_package("pandas_df")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


ERROR: Could not find a version that satisfies the requirement pandas_df (from versions: none)
ERROR: No matching distribution found for pandas_df

In [35]:
# Convert Spark DataFrame to pandas DataFrame
pandas_df = predictions.select("probability").toPandas()

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import boto3

# Extract delay probability from the 'probability' column
pandas_df["delay_prob"] = pandas_df["probability"].apply(lambda x: float(x[1]))

# Plot the distribution
plt.figure(figsize=(6, 4))
sns.histplot(pandas_df["delay_prob"], bins=20, kde=True, color='orange')
plt.title("Prediction Probability Distribution (Delayed Arrival)")
plt.xlabel("Predicted Probability of Delay")
plt.ylabel("Frequency")
plt.tight_layout()

# Save the plot locally
plot_path = "/tmp/probability_distribution.png"
plt.savefig(plot_path)
plt.close()

# Upload to S3
s3 = boto3.client("s3")
s3.upload_file(
    Filename=plot_path,
    Bucket="us-flight-delay-data",
    Key="visualizations/probability_distribution.png"
)

print("✅ Plot uploaded to s3://us-flight-delay-data/visualizations2/probability_distribution.png")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? Plot uploaded to s3://us-flight-delay-data/visualizations2/probability_distribution.png