In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Flight Status Prediction").getOrCreate()

# Load the previously saved dataset
intermediate_path = "gs://flight-analysis-ms-bucket/trusted/handled_missing_values.parquet"
flight_data = spark.read.parquet(intermediate_path)

# Verify data loaded
flight_data.show(5)


24/12/10 03:16:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+-------------------+-----------------+------+----+---------+--------+--------+------------------+-------------+-----------+---------+-----------------+----+-------+-----+----------+---------+--------+
|         FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|DepDel15|ArrivalDelayGroups|DistanceGroup|OriginState|DestState|Operating_Airline|Year|Quarter|Month|DayofMonth|DayOfWeek|Distance|
+-------------------+-----------------+------+----+---------+--------+--------+------------------+-------------+-----------+---------+-----------------+----+-------+-----+----------+---------+--------+
|2018-01-23 00:00:00|Endeavor Air Inc.|   ABY| ATL|        0|       0|       0|                -1|            1|         GA|       GA|               9E|2018|      1|    1|        23|        2|     145|
|2018-01-24 00:00:00|Endeavor Air Inc.|   ABY| ATL|        0|       0|       0|                -1|            1|         GA|       GA|               9E|2018|      1|    1|        24|        3|

In [2]:
from pyspark.ml.feature import VectorAssembler

# Select numerical columns to normalize
numerical_cols = ["Distance"]

# Assemble into a single vector column
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
flight_data = assembler.transform(flight_data)

# Show assembled features
flight_data.select("Distance", "numerical_features").show(5, truncate=False)


[Stage 2:>                                                          (0 + 1) / 1]

+--------+------------------+
|Distance|numerical_features|
+--------+------------------+
|145     |[145.0]           |
|145     |[145.0]           |
|145     |[145.0]           |
|145     |[145.0]           |
|145     |[145.0]           |
+--------+------------------+
only showing top 5 rows



                                                                                

In [3]:
from pyspark.ml.feature import MinMaxScaler

# Apply MinMaxScaler
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_features")
scaler_model = scaler.fit(flight_data)
flight_data = scaler_model.transform(flight_data)

# Show scaled features
flight_data.select("Distance", "scaled_features").show(5, truncate=False)


                                                                                

+--------+----------------------+
|Distance|scaled_features       |
+--------+----------------------+
|145     |[0.025971411314676868]|
|145     |[0.025971411314676868]|
|145     |[0.025971411314676868]|
|145     |[0.025971411314676868]|
|145     |[0.025971411314676868]|
+--------+----------------------+
only showing top 5 rows



In [6]:
from pyspark.ml.feature import StringIndexer

# Encode ArrivalDelayGroups
indexer = StringIndexer(inputCol="ArrivalDelayGroups", outputCol="ArrivalDelayGroups_indexed")
flight_data = indexer.fit(flight_data).transform(flight_data)

# Show the encoded column
flight_data.select("ArrivalDelayGroups", "ArrivalDelayGroups_indexed").show(5)


                                                                                

+------------------+--------------------------+
|ArrivalDelayGroups|ArrivalDelayGroups_indexed|
+------------------+--------------------------+
|                -1|                       0.0|
|                -1|                       0.0|
|                -1|                       0.0|
|                -1|                       0.0|
|                -1|                       0.0|
+------------------+--------------------------+
only showing top 5 rows



In [7]:
# Count unique values in categorical columns
flight_data.select("Origin").distinct().count()
flight_data.select("Dest").distinct().count()
flight_data.select("Airline").distinct().count()


                                                                                

28

In [8]:
from pyspark.ml.feature import StringIndexer

# Encode Airline
indexer = StringIndexer(inputCol="Airline", outputCol="Airline_indexed")
flight_data = indexer.fit(flight_data).transform(flight_data)

# Show the encoded column
flight_data.select("Airline", "Airline_indexed").show(5)


                                                                                

+-----------------+---------------+
|          Airline|Airline_indexed|
+-----------------+---------------+
|Endeavor Air Inc.|           13.0|
|Endeavor Air Inc.|           13.0|
|Endeavor Air Inc.|           13.0|
|Endeavor Air Inc.|           13.0|
|Endeavor Air Inc.|           13.0|
+-----------------+---------------+
only showing top 5 rows



In [9]:
# Count unique values in Origin
origin_cardinality = flight_data.select("Origin").distinct().count()
print(f"Cardinality of Origin: {origin_cardinality}")

# Count unique values in Dest
dest_cardinality = flight_data.select("Dest").distinct().count()
print(f"Cardinality of Dest: {dest_cardinality}")


                                                                                

Cardinality of Origin: 370
Cardinality of Dest: 370


In [10]:
from pyspark.sql.functions import col, count, lit

# Calculate frequency for Origin
origin_freq = flight_data.groupBy("Origin").agg(count("Origin").alias("Origin_frequency"))

# Join the frequency back to the main DataFrame
flight_data = flight_data.join(origin_freq, on="Origin", how="left")

# Calculate frequency for Dest
dest_freq = flight_data.groupBy("Dest").agg(count("Dest").alias("Dest_frequency"))

# Join the frequency back to the main DataFrame
flight_data = flight_data.join(dest_freq, on="Dest", how="left")

# Show the results
flight_data.select("Origin", "Origin_frequency", "Dest", "Dest_frequency").show(5)


                                                                                

+------+----------------+----+--------------+
|Origin|Origin_frequency|Dest|Dest_frequency|
+------+----------------+----+--------------+
|   ABY|             501| ATL|        223860|
|   ABY|             501| ATL|        223860|
|   ABY|             501| ATL|        223860|
|   ABY|             501| ATL|        223860|
|   ABY|             501| ATL|        223860|
+------+----------------+----+--------------+
only showing top 5 rows



In [11]:
# Define the path for the preprocessed data
preprocessed_path = "gs://flight-analysis-ms-bucket/trusted/preprocessed_with_frequencies.parquet"

# Save the dataset
flight_data.write.mode("overwrite").parquet(preprocessed_path)

print(f"Dataset with frequency encoding saved to {preprocessed_path}")


                                                                                

Dataset with frequency encoding saved to gs://flight-analysis-ms-bucket/trusted/preprocessed_with_frequencies.parquet
