In [1]:
# data reading, processing before join tables

# imports
import sys
import numpy as np
import matplotlib.pyplot as plt
import sklearn
import pdb
import pandas as pd
from functools import reduce as rd
from sklearn.decomposition import PCA
%reload_ext autoreload
%autoreload 2

from pyspark.sql.functions import avg, min, max, col, sum

In [2]:
# set up constants for ease of directory use
FINAL_DIR = 'gs://final-bucket-jy'
DATA_DIR = f'{FINAL_DIR}/data/'

In [3]:
# start a spark session
from pyspark.sql import SparkSession

app_name = "final_project"
master = "local[50]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .config("spark.sql.legacy.timeParserPolicy", "CORRECTED") \
        .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/15 04:56:00 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/04/15 04:56:00 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/04/15 04:56:00 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/04/15 04:56:00 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [4]:
sc.setLogLevel('OFF')

In [5]:
#sc.stop()

In [6]:
# read from google cloud using spark

fdwind = spark.read.csv(DATA_DIR + 'training2_fdwind.csv',inferSchema=True,header=True)
fdwindairport = spark.read.csv(DATA_DIR + 'training2_fdwindairport.csv',inferSchema=True,header=True)
fdwindaltitude = spark.read.csv(DATA_DIR + 'training2_fdwindaltitude.csv',inferSchema=True,header=True)
flighthistory = spark.read.csv(DATA_DIR + 'training2_flighthistory.csv',inferSchema=True,header=True)
asdiflightplan = spark.read.csv(DATA_DIR + 'training2_asdiflightplan.csv',inferSchema=True,header=True)

                                                                                

# Join tables

## 1. fdwind_all.csv

1. outer join ( training2_fdwindaltitude.csv & training2_fdwindairport.csv ) with column “fbwindreportid”
2. outer join ( 1 & training2_fdwind.csv ) with column “fbwindairportid”, “ordinal”

Column names: (8 columns)  
fbwindairportid, fbwindreportid, ordinal, airportcode, altitude, bearing, knots, temperature


In [7]:
# 1. outer join ( training2_fdwindaltitude.csv & training2_fdwindairport.csv ) with column “fbwindreportid”

fdwind_al_ap = fdwindaltitude.join(fdwindairport, 'fbwindreportid', 'outer')
#fdwind_al_ap.printSchema()

In [8]:
# 2. outer join ( 1 & training2_fdwind.csv ) with column “fbwindairportid”, “ordinal”
# AND drop two id columns that will not be used later

fdwind_all = fdwind_al_ap.join(fdwind, ['fbwindairportid', 'ordinal'], 'outer')\
                            .drop('fbwindairportid','fbwindreportid')
#dwind_all.printSchema()

In [9]:
# Data processing: Aggregate 
# Steps:
# 1. group by airportcode
# 2. calculate average altitude, bearing, knots, temperature [ignore null value]

fdwind_all_agg = fdwind_all.groupBy('airportcode')\
                            .agg(avg('altitude').alias('avg_altitude'),\
                                 avg('bearing').alias('avg_bearing'),\
                                 avg('knots').alias('avg_knots'),\
                                 avg('temperature').alias('avg_temperature'),\
                                 min('altitude').alias('min_altitude'),\
                                 min('bearing').alias('min_bearing'),\
                                 min('knots').alias('min_knots'),\
                                 min('temperature').alias('min_temperature'),\
                                 max('altitude').alias('max_altitude'),\
                                 max('bearing').alias('max_bearing'),\
                                 max('knots').alias('max_knots'),\
                                 max('temperature').alias('max_temperature'))

In [10]:
# there are total 403 airport code

#fdwind_all_agg.count() 

In [11]:
# Those two line is to check how spark handle null value in the groupBy

#fdwind_all_agg.where(fdwind_all_agg.airportcode=='PNI').show()
#fdwind_all_agg.where(fdwind_all_agg.airportcode=='BFF').show()

In [12]:
# to cache fdwind_all in rdd for later join

#fdwind_all_rdd = fdwind_all.cache()
fdwind_all_agg_rdd = fdwind_all_agg.cache()

## 2. asdi_flighthistory.csv:

outer join ( training2_flighthistory.csv & training2_asdiflightplan.csv )
    with column “id” == “flighthistory_id”


In [13]:
# outer join ( training2_flighthistory.csv & training2_asdiflightplan.csv ) with column “flighthistory_id”

# Drop columns in flighthistory:
# 1. For useless columns: flighthistory_id, id, actual_aircraft_type(cuz missing all)
# 2. For columns that are duplicate: airline_icao_code, departure_airport_icao_code,
#           arrival_airport_icao_code,
# 3. Columns that are not similar to published departure/arrival time:
#           scheduled_gate_departure, actual_gate_departure,
#           scheduled_gate_arrival, actual_gate_arrival,
#           scheduled_runway_departure, actual_runway_departure,
#           scheduled_runway_arrival, actual_runway_arrival,
# 4. Others not important columns:
#           departure_airport_timezone_offset, arrival_airport_timezone_offset,          

# Drop other columns in asdiflightplan:
# 1. useless: update_time_utc

asdi_flighthistory = flighthistory.withColumnRenamed('id','flighthistory_id').join(asdiflightplan, 'flighthistory_id', 'outer')\
                                    .drop('flighthistory_id', 'id', 
                                          'actual_aircraft_type',
                                          'airline_icao_code', 'departure_airport_icao_code', 'arrival_airport_icao_code',
                                          'scheduled_gate_departure', 'actual_gate_departure',
                                          'scheduled_gate_arrival', 'actual_gate_arrival',
                                          'scheduled_runway_departure', 'actual_runway_departure',
                                          'scheduled_runway_arrival', 'actual_runway_arrival',
                                          'departure_airport_timezone_offset', 'arrival_airport_timezone_offset',
                                          'update_time_utc',
                                          'estimated_departure_utc', 'estimated_arrival_utc' # those two are not for sure to drop
                                         )
#asdi_flighthistory.printSchema()

#flighthistory.count() # 711576
#asdiflightplan.count() # 7954958
#asdi_flighthistory.count() # 8034562

In [14]:
# pyspark dataframe after join and drop

#asdi_flighthistory.show()

In [15]:
# In case after we drop some column, there is duplicates in the data

asdi_fh_unique = asdi_flighthistory.dropDuplicates()
#asdi_fh_unique.count() # 3240421

In [16]:
# cache as rdd

#asdi_flighthistory_rdd = asdi_flighthistory.cache()
asdi_fh_unique_rdd = asdi_fh_unique.cache() # we should use data that has no duplicate 

## 3. fdwind_asdi_final.csv:

1. outer join ( fdwind_all.csv & asdi_flighthistory.csv ) with column “airportcode” == “arrival_airport_code”   
(remember to rename the columns in fdwind_all.csv to show it is for arrival airport)

2. outer join ( fdwind_all.csv & 1 ) with column “airportcode” == “departure_airport_code”   
(remember to rename the columns in fdwind_all.csv to show it is for departure airport)


In [17]:
#fdwind_all_agg_rdd.printSchema()

In [18]:
#asdi_fh_unique_rdd.printSchema()

In [19]:
# TODO-Join: This will help us get the final data
## For arrival airport:
# 1. fdwind: rename airportcode to arrival_airport_code
# 2. fdwind: rename all the rest column names with arrival in it
# 3. join with asdi_fh with arrival_airport_code
## For departure airport:
# 1. fdwind: rename airportcode to departure_airport_code
# 2. fdwind: rename all the rest column names with departure in it
# 3. join with asdi_fh with departure_airport_code


# 1. outer join ( fdwind_all.csv & asdi_flighthistory.csv ) with column “airportcode” == “arrival_airport_code”

fdwind_asdi_arrival = fdwind_all_agg_rdd\
    .withColumnRenamed('airportcode','arrival_airport_code')\
    .withColumnRenamed('avg_altitude','arrival_airport_avg_altitude')\
    .withColumnRenamed('avg_bearing','arrival_airport_avg_bearing')\
    .withColumnRenamed('avg_knots','arrival_airport_avg_knots')\
    .withColumnRenamed('avg_temperature','arrival_airport_avg_temperature')\
    .withColumnRenamed('min_altitude','arrival_airport_min_altitude')\
    .withColumnRenamed('min_bearing','arrival_airport_min_bearing')\
    .withColumnRenamed('min_knots','arrival_airport_min_knots')\
    .withColumnRenamed('min_temperature','arrival_airport_min_temperature')\
    .withColumnRenamed('max_altitude','arrival_airport_max_altitude')\
    .withColumnRenamed('max_bearing','arrival_airport_max_bearing')\
    .withColumnRenamed('max_knots','arrival_airport_max_knots')\
    .withColumnRenamed('max_temperature','arrival_airport_max_temperature')\
    .join(asdi_fh_unique_rdd, 
          'arrival_airport_code',
          'outer')
#fdwind_asdi_arrival.printSchema()

In [20]:
#fdwind_asdi_arrival.count() #3240678 Now it is much lesser, and the useful information is denser

# The original output without limit the columns we need has 3001895001 rows, which is too many.
# Solution:
#    We would like to go back to the pyspark dataframe fdwind_all, and asdi_flighthistory
#    to choose columns that we think we really need it to predict the flight delay. 

In [21]:
# 2. outer join ( fdwind_all.csv & 1 ) with column “airportcode” == “departure_airport_code”  

fdwind_asdi_final = fdwind_all_agg_rdd\
    .withColumnRenamed('airportcode','departure_airport_code')\
    .withColumnRenamed('avg_altitude','departure_airport_avg_altitude')\
    .withColumnRenamed('avg_bearing','departure_airport_avg_bearing')\
    .withColumnRenamed('avg_knots','departure_airport_avg_knots')\
    .withColumnRenamed('avg_temperature','departure_airport_avg_temperature')\
    .withColumnRenamed('min_altitude','departure_airport_min_altitude')\
    .withColumnRenamed('min_bearing','departure_airport_min_bearing')\
    .withColumnRenamed('min_knots','departure_airport_min_knots')\
    .withColumnRenamed('min_temperature','departure_airport_min_temperature')\
    .withColumnRenamed('max_altitude','departure_airport_max_altitude')\
    .withColumnRenamed('max_bearing','departure_airport_max_bearing')\
    .withColumnRenamed('max_knots','departure_airport_max_knots')\
    .withColumnRenamed('max_temperature','departure_airport_max_temperature')\
    .join(fdwind_asdi_arrival, 
          'departure_airport_code',
          'outer')
#fdwind_asdi_final.printSchema()

In [22]:
#fdwind_asdi_final.count() # the number of rows are 3240934

## 4. flight_full.csv

In this dataset, we only keep those records that are having both published_arrival and original_arrival_utc.   
*published_arrival: represent the official/actuall arrival time*  
*original_arrival_utc: represent the original time that the flight is been scheduled*  

The reason is that we want to use those two columns to help us decide whether the flight is delayed or not.  
Either of them missing will not helping us to label whether the flight is delayed so we would like to filter out:  
1. Rows that has "published_arrival" missing;  
2. Rows that has "original_arrival_utc" missing.

In [23]:
flight = fdwind_asdi_final.filter(fdwind_asdi_final['published_arrival'].isNotNull() & fdwind_asdi_final['original_arrival_utc'].isNotNull())

In [24]:
#flight.count() # There are 3228608, so we filtered out about 12000 rows

In [25]:
#flight.show()

# Data Engineering

## 1. column name: delay

If original_arrival_utc < published_arrival, label it as 1 (delayed).  
Otherwise, label it as 0 (not delayed).

In [26]:
#flight.printSchema()

In [27]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import TimestampType

flight = flight.withColumn("published_departure_timestamp", to_timestamp("published_departure"))
flight = flight.withColumn("published_arrival_timestamp", to_timestamp("published_arrival"))
flight = flight.withColumn("original_departure_utc_timestamp", to_timestamp("original_departure_utc"))
flight = flight.withColumn("original_arrival_timestamp", to_timestamp("original_arrival_utc"))

In [28]:
# outdated code

# from pyspark.sql.functions import to_timestamp
# from pyspark.sql.types import TimestampType

# flight = flight.withColumn("published_departure_timestamp", to_timestamp(flight["published_departure"], "yyyy-MM-dd HH:mm:ss"))
# flight = flight.withColumn("published_arrival_timestamp", to_timestamp(flight["published_arrival"], "yyyy-MM-dd HH:mm:ss"))
# flight = flight.withColumn("original_departure_utc_timestamp", to_timestamp(flight["original_departure_utc"], "yyyy-MM-dd HH:mm:ss"))
# flight = flight.withColumn("original_arrival_timestamp", to_timestamp(flight["original_arrival_utc"], "yyyy-MM-dd HH:mm:ss"))

In [29]:
flight = flight.drop("published_departure", "published_arrival", "original_departure_utc", "original_arrival_utc")

In [30]:
#flight.printSchema()

In [31]:
#flight.show()

In [32]:
# missing_values_count = flight.select([sum(col(column).isNull().cast("integer")).alias(column) for column in flight.columns])
# missing_values_count = missing_values_count.toPandas()
# missing_values_count

In [33]:
#flight.count()

In [30]:
flight = flight.dropna().cache()

24/04/15 04:57:08 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [35]:
#flight.count()

In [36]:
#flight.show()

In [31]:
from pyspark.sql.functions import unix_timestamp, col, when

# Calculate the time difference in seconds
flight = flight.withColumn(
    "time_difference_seconds",
    (unix_timestamp("published_arrival_timestamp") - unix_timestamp("original_arrival_timestamp"))
)

# Check if delay for more than 1 min, if yes, return 1, if no, return 0
flight = flight.withColumn(
    "Delay",
    when(col("time_difference_seconds") > 0, 1).otherwise(0)
)

#flight.select("published_arrival_timestamp", "original_arrival_timestamp", "Delay").show()

In [38]:
# grace delay time:  1 min

# from pyspark.sql.functions import unix_timestamp, col, when

# # Calculate the time difference in seconds
# flight = flight.withColumn(
#     "time_difference_seconds",
#     (unix_timestamp("published_arrival_timestamp") - unix_timestamp("original_arrival_timestamp"))
# )

# # Check if delay for more than 1 min, if yes, return 1, if no, return 0
# flight = flight.withColumn(
#     "Delay",
#     when(col("time_difference_seconds") > (1 * 60), 1).otherwise(0)
# )

# #flight.select("published_arrival_timestamp", "original_arrival_timestamp", "Delay").show()

In [32]:
flight = flight.cache()

In [40]:
# counts = flight.groupBy('Delay').count()
# counts.show()

In [33]:
# Balance data with the same number of label counts

# Get the number of rows for the minority label
# minority_count = counts.filter(col('Delay') == 1).collect()[0]['count']

# Sample the majority class to match the number of rows in the minority class
downsampled_majority = flight.filter(col('Delay') != 1).sample(withReplacement=False, fraction=27000/618706, seed=42)

# Combine the downsampled majority class with the minority class
downsampled_df = downsampled_majority.union(flight.filter(col('Delay') == 1))

# Show the downsampled DataFrame
#downsampled_df.show()

In [None]:
# counts = downsampled_df.groupBy('Delay').count()
# counts.show()

In [34]:
flight = downsampled_df.cache()

In [None]:
#flight.printSchema()

In [None]:
# import matplotlib.pyplot as plt

# # Aggregate the data to count
# counts = flight.groupBy('Delay').count()

# # Convert to a Pandas DataFrame
# counts_pandas = counts.toPandas()
# counts_pandas.sort_values('Delay', inplace=True)

# # Plotting the bar plot
# plt.figure(figsize=(8, 6))
# plt.bar(counts_pandas['Delay'].astype(str), counts_pandas['count'], color=['red', 'green'])
# plt.xlabel('Labels')
# plt.ylabel('Count')
# plt.title('Delay VS Not Delay')
# plt.xticks([str(x) for x in counts_pandas['Delay'].unique()])
# plt.show()

In [None]:
# # Convert to a Pandas DataFrame
# pandas_df = flight.select("arrival_airport_avg_temperature").toPandas()

# plt.figure(figsize=(10, 6))
# plt.hist(pandas_df['arrival_airport_avg_temperature'], bins=30, alpha=0.7)
# plt.title('Distribution of Arrival Airport Average Temperature')
# plt.xlabel('Temperature')
# plt.ylabel('Frequency')
# plt.grid(True)
# plt.show()

In [None]:
# # Convert to a Pandas DataFrame
# pandas_df = flight.select("arrival_airport_min_temperature").toPandas()

# plt.figure(figsize=(10, 6))
# plt.hist(pandas_df['arrival_airport_min_temperature'], bins=30, alpha=0.7)
# plt.title('Distribution of Arrival Airport Min Temperature')
# plt.xlabel('Temperature')
# plt.ylabel('Frequency')
# plt.grid(True)
# plt.show()

In [None]:
# # Convert to a Pandas DataFrame
# pandas_df = flight.select("arrival_airport_max_temperature").toPandas()

# plt.figure(figsize=(10, 6))
# plt.hist(pandas_df['arrival_airport_max_temperature'], bins=30, alpha=0.7)
# plt.title('Distribution of Arrival Airport Max Temperature')
# plt.xlabel('Temperature')
# plt.ylabel('Frequency')
# plt.grid(True)
# plt.show()

In [None]:
# # arrival_airport_avg_altitude
# pandas_df = flight.select("arrival_airport_avg_altitude").toPandas()

# plt.figure(figsize=(10, 6))
# plt.hist(pandas_df['arrival_airport_avg_altitude'], bins=30, alpha=0.7)
# plt.title('Distribution of Arrival Airport Average Altitude')
# plt.xlabel('Altitude')
# plt.ylabel('Frequency')
# plt.grid(True)
# plt.show()

In [None]:
# # arrival_airport_avg_knots
# pandas_df = flight.select("arrival_airport_avg_knots").toPandas()

# plt.figure(figsize=(10, 6))
# plt.hist(pandas_df['arrival_airport_avg_knots'], bins=30, alpha=0.7)
# plt.title('Distribution of Arrival Airport Average Knots')
# plt.xlabel('Knots')
# plt.ylabel('Frequency')
# plt.grid(True)
# plt.show()

In [None]:
# # arrival_airport_avg_bearing
# pandas_df = flight.select("arrival_airport_avg_bearing").toPandas()

# plt.figure(figsize=(10, 6))
# plt.hist(pandas_df['arrival_airport_avg_bearing'], bins=30, alpha=0.7)
# plt.title('Distribution of Arrival Airport Average Bearing')
# plt.xlabel('Bearing')
# plt.ylabel('Frequency')
# plt.grid(True)
# plt.show()

## Encoding

In [None]:
# We will focus on 10 columns 
# 1，departure_airport_code: string
# 2，arrival_airport_code: string
# 3，airline_code: string 
# 4，creator_code: string ？？
# 5, scheduled_aircraft_type: string 
# 6, icao_aircraft_type_actual: string
# 7, departure_airport: string 
# 8, arrival_airport: string 
# 9, aircraft_id: string 
#10, legacy_route: string 


In [35]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline  # Corrected import here
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType
import math

columns_to_encode = [
    "departure_airport_code", "arrival_airport_code", "airline_code", 
    "creator_code", "scheduled_aircraft_type", "icao_aircraft_type_actual",
    "departure_airport", "arrival_airport", "aircraft_id", "legacy_route"
]

# Create a StringIndexer for each column and collect them in a list
indexers = [
    StringIndexer(inputCol=column, outputCol=f"{column}_Index")
    for column in columns_to_encode
]

# Use a Pipeline to apply all indexers
pipeline = Pipeline(stages=indexers)
flight_indexed = pipeline.fit(flight).transform(flight)

                                                                                

In [None]:
#flight.show()

In [36]:
# Define a function to convert index to binary string
def index_to_binary(index, length):
    binary_str = bin(int(index))[2:]  # Convert to binary string
    return binary_str.zfill(length)  # Pad with zeros

#Define a function to perform binary encoding
def apply_binary_encoding(df, column):
    max_index = df.agg({f"{column}_Index": "max"}).collect()[0][0]
    #print(f"Max index: {max_index} type: {type(max_index)}")
    num_bits = math.ceil(math.log(max_index + 1, 2))
    #print(f"Num bits: {num_bits} type: {type(num_bits)}")
    
    udf_convert = F.udf(lambda x: index_to_binary(x, num_bits), StringType())
    df = df.withColumn(f"{column}_binary", udf_convert(F.col(f"{column}_Index")))
    
    for bit in range(num_bits):
        bit_col_name = f"{column}_bit_{bit}"
        df = df.withColumn(bit_col_name, F.substring(F.col(f"{column}_binary"), num_bits - bit, 1).cast(IntegerType()))
    
    return df, num_bits

# Apply binary encoding to each indexed column
for column in columns_to_encode:
    flight_indexed, _ = apply_binary_encoding(flight_indexed, column)

# Optionally drop intermediate columns
columns_to_drop = [f"{column}_Index" for column in columns_to_encode] + [f"{column}_binary" for column in columns_to_encode]
flight_encoded = flight_indexed.drop(*columns_to_drop)

24/04/15 05:00:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.8 MiB
                                                                                

In [None]:
# import math
# from pyspark.sql import functions as F
# from pyspark.sql.types import StringType, IntegerType

# def apply_binary_encoding(df, column):
#     max_index_row = df.agg({f"{column}_Index": "max"}).collect()[0]
#     print(f"Max index row {max_index_row} {type(max_index_row)}")
#     max_index = max_index_row[0] if max_index_row is not None else 0  # Use 0 if max_index_row is None
#     print(f"Max index for column {column}: {max_index}")
#     num_bits = math.ceil(math.log(max_index + 1, 2))
#     print(f"Number of bits for column {column}: {num_bits}")
    
#     udf_convert = F.udf(lambda x: index_to_binary(x, num_bits), StringType())
#     df = df.withColumn(f"{column}_binary", udf_convert(F.col(f"{column}_Index")))
    
#     for bit in range(num_bits):
#         bit_col_name = f"{column}_bit_{bit}"
#         df = df.withColumn(bit_col_name, F.substring(F.col(f"{column}_binary"), num_bits - bit, 1).cast(IntegerType()))
    
#     return df, num_bits

# # Apply binary encoding to each indexed column
# for column in columns_to_encode:
#     print(f"Applying binary encoding to column: {column}")
#     flight_indexed, _ = apply_binary_encoding(flight_indexed, column)

# # Optionally drop intermediate columns
# columns_to_drop = [f"{column}_Index" for column in columns_to_encode] + [f"{column}_binary" for column in columns_to_encode]
# flight_encoded = flight_indexed.drop(*columns_to_drop)


In [None]:
# import math
# from pyspark.sql import functions as F
# from pyspark.sql.types import StringType, IntegerType

# def apply_binary_encoding(df, column):
#     max_index_row = df.agg({f"{column}_Index": "max"}).collect()[0]
#     max_index = max_index_row[0] if max_index_row is not None else 0  # Use 0 if max_index_row is None
#     num_bits = math.ceil(math.log(max_index + 1, 2))
    
#     udf_convert = F.udf(lambda x: index_to_binary(x, num_bits), StringType())
#     df = df.withColumn(f"{column}_binary", udf_convert(F.col(f"{column}_Index")))
    
#     for bit in range(num_bits):
#         bit_col_name = f"{column}_bit_{bit}"
#         df = df.withColumn(bit_col_name, F.substring(F.col(f"{column}_binary"), num_bits - bit, 1).cast(IntegerType()))
    
#     return df, num_bits


In [None]:
# # Define a function to convert index to binary string
# def index_to_binary(index, length):
#     binary_str = bin(int(index))[2:]  # Convert to binary string
#     return binary_str.zfill(length)  # Pad with zeros

# #Define a function to perform binary encoding
# def apply_binary_encoding(df, column):
#     max_index = df.agg({f"{column}_Index": "max"}).collect()[0][0]
#     num_bits = math.ceil(math.log(max_index + 1, 2))
    
#     udf_convert = F.udf(lambda x: index_to_binary(x, num_bits), StringType())
#     df = df.withColumn(f"{column}_binary", udf_convert(F.col(f"{column}_Index")))
    
#     for bit in range(num_bits):
#         bit_col_name = f"{column}_bit_{bit}"
#         df = df.withColumn(bit_col_name, F.substring(F.col(f"{column}_binary"), num_bits - bit, 1).cast(IntegerType()))
    
#     return df, num_bits

# # Apply binary encoding to each indexed column
# for column in columns_to_encode:
#     flight_indexed, _ = apply_binary_encoding(flight_indexed, column)

# # Optionally drop intermediate columns
# columns_to_drop = [f"{column}_Index" for column in columns_to_encode] + [f"{column}_binary" for column in columns_to_encode]
# flight_encoded = flight_indexed.drop(*columns_to_drop)


In [37]:
from pyspark.sql.functions import expr, udf
from pyspark.sql.types import ArrayType, IntegerType

# Assuming your DataFrame is named df and the column with letters is named "letter"

# Step 1: Convert letters to numeric indices (Label Encoding)
flight_encoded = flight_encoded.withColumn("numeric_index", expr("ascii(lower(creator_code)) - ascii('a')"))

# Step 2: Convert numeric indices to binary form
def index_to_binary_array(value):
    # Assuming 5 bits for 26 letters
    binary_str = format(value, '05b')  
    return [int(bit) for bit in binary_str]

index_to_binary_array_udf = udf(index_to_binary_array, ArrayType(IntegerType()))

# Step 3: Apply UDF to create a binary representation column
flight_encoded = flight_encoded.withColumn("binary_encoded", index_to_binary_array_udf("numeric_index"))

# Step 4: Optionally, expand the binary_encoded array into separate columns
num_bits = 5
for bit in range(num_bits):
    flight_encoded = flight_encoded.withColumn(f"bit_{bit}", 
flight_encoded["binary_encoded"].getItem(bit))

#flight_encoded.printSchema()


In [38]:
from pyspark.sql import SparkSession


columns_to_drop = ['binary_encoded', 'element', 'numeric_index']

flight_encoded = flight_encoded.drop(*columns_to_drop)

#flight_encoded.printSchema()


In [39]:
# creator_code_numeric_index
columns_to_drop = ['creator_code_numeric_index']

flight_encoded = flight_encoded.drop(*columns_to_drop)

#flight_encoded.printSchema()

In [40]:
columns_to_drop = ['departure_airport_code',
'arrival_airport_code',
'airline_code',
'creator_code',
'scheduled_aircraft_type',
'icao_aircraft_type_actual',
'departure_airport',
'arrival_airport',
'aircraft_id',
'legacy_route',
'published_departure_timestamp',
'published_arrival_timestamp',
'original_departure_utc_timestamp',
'original_arrival_timestamp',
'time_difference_seconds']

flight_encoded = flight_encoded.drop(*columns_to_drop)

#flight_encoded.printSchema()

In [58]:
flight_encoded.show()

24/04/14 20:19:14 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/04/14 20:19:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.2 MiB


+------------------------------+-----------------------------+---------------------------+---------------------------------+------------------------------+-----------------------------+---------------------------+---------------------------------+------------------------------+-----------------------------+---------------------------+---------------------------------+----------------------------+---------------------------+-------------------------+-------------------------------+----------------------------+---------------------------+-------------------------+-------------------------------+----------------------------+---------------------------+-------------------------+-------------------------------+-------------+------------------+--------------------+-----+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------

In [41]:
flight_encoded_rdd = flight_encoded.cache() 

In [None]:
flight_encoded_rdd.write.option("header", "true").csv("gs://final-bucket-jy/flight_encoded_rdd.csv")



In [42]:
##### with balanced data

flight_encoded_rdd.write.option("header", "true").csv("gs://final-bucket-jy/balanced_flight_encoded.csv")

24/04/15 05:01:22 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.4 MiB
----------------------------------------                                        
Exception happened during processing of request from ('127.0.0.1', 34766)
Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/usr/lib/spark/python/pyspark/accumulators.py", 

In [24]:
flight_dd = spark.read.csv("gs://final-bucket-jy/flight_encoded_rdd.csv",inferSchema=True,header=True)



In [80]:
#flight_dd.take(2)

In [57]:
#flight_encoded_rdd.show()

In [55]:
#flight_dd = spark.sparkContext.textFile('gs://final-bucket-jy/flight_encoded_rdd.csv/part-00000-3352d399-ed5f-488e-919a-8e54de70f4ee-c000.csv')

In [61]:
flight_dd = spark.read.csv("gs://final-bucket-jy/flight_encoded_rdd.csv/part-00000-3352d399-ed5f-488e-919a-8e54de70f4ee-c000.csv", header=True, inferSchema=True)

In [81]:
########### try read data in rdd
###### Set up session & data ######
from pyspark.sql import SparkSession, Row

# Initialize Spark session
spark = SparkSession.builder.appName("DecisionTreeJob").getOrCreate()

# Import the data as an RDD
csv_rdd = spark.sparkContext.textFile("gs://final-bucket-jy/flight_encoded_rdd.csv/part-00000-3352d399-ed5f-488e-919a-8e54de70f4ee-c000.csv")

In [82]:
# Convert the RDD to a DataFrame
csv_rows = csv_rdd.map(lambda line: line.split(","))
header = csv_rows.first()  # Extract header
csv_data = csv_rows.filter(lambda row: row != header).map(lambda row: Row(**{header[i]: float(row[i]) for i in range(len(header))}))

In [83]:
# Create a DataFrame from the RDD of Rows
sampled_df = spark.createDataFrame(csv_data)

In [62]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#sampled_df = flight_dd.limit(30)

In [84]:
feature_columns = sampled_df.columns[:-1] #[col for col in sampled_df.columns if col != 'Delay']

In [85]:
# the target column
target_column = 'Delay'

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [86]:
# A simple pipeline
pipeline = Pipeline(stages=[assembler])

# Fit the pipeline
pipelineModel = pipeline.fit(sampled_df)

# Transform the sampled data
sampled_df_transformed = pipelineModel.transform(sampled_df)

In [87]:
# Select only the features and target column for modeling
sampled_df_model = sampled_df_transformed.select("features", target_column)

# Split the sampled DataFrame into training and test sets
train_df, test_df = sampled_df_model.randomSplit([0.8, 0.2], seed=42)

In [31]:
train_df.show()

+--------------------+-----+
|            features|Delay|
+--------------------+-----+
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
|(114,[0,1,2,3,4,5...|    0|
+--------------------+-----+
only showing top 20 rows



# Decision tree

In [40]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Delay", featuresCol="features")

# Train the decision tree model
dt_model = dt.fit(train_df)

# Make predictions on the test data
predictions = dt_model.transform(test_df)

24/04/09 07:00:52 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 23 (= number of training instances)


In [41]:
# Evaluate the model using accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="Delay", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.8571428571428571


In [49]:
###### Modeling ######
# Initialize the DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol=target_column, featuresCol="features")

# Train the decision tree model
dt_model = dt.fit(train_df)

# Evaluate the model on the training set by accuracy & AUC
train_predictions = dt_model.transform(train_df)
# by accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(train_predictions)
print(f"Training Accuracy: {accuracy}")

# Evaluate the model on the test set by accuracy & AUC
predictions = dt_model.transform(test_df)
# by accuracy
accuracy = evaluator_acc.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")

24/04/09 07:32:31 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 23 (= number of training instances)


Training Accuracy: 1.0
Test Accuracy: 0.8571428571428571


## Hyperparameter tuning

In [42]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define a grid of hyperparameters to search
param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.maxBins, [20, 30, 40]) \
    .build()

# Set up the cross-validation
evaluator = MulticlassClassificationEvaluator(labelCol="Delay", predictionCol="prediction", metricName="accuracy")
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)  # You can adjust the number of folds as needed

# Run cross-validation to find the best hyperparameters
cv_model = crossval.fit(train_df)

# Make predictions on the test data using the best model
best_model = cv_model.bestModel
predictions = best_model.transform(test_df)

# Evaluate the best model's performance
accuracy = evaluator.evaluate(predictions)
print(f"Best Model Accuracy: {accuracy}")

# Optional: You can also view the best model's parameters
best_maxDepth = best_model._java_obj.getMaxDepth()
best_maxBins = best_model._java_obj.getMaxBins()
print(f"Best Max Depth: {best_maxDepth}, Best Max Bins: {best_maxBins}")

24/04/09 07:00:58 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 20 to 12 (= number of training instances)
24/04/09 07:00:59 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 30 to 12 (= number of training instances)
24/04/09 07:00:59 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 12 (= number of training instances)
24/04/09 07:01:00 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 20 to 12 (= number of training instances)
24/04/09 07:01:00 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 30 to 12 (= number of training instances)
24/04/09 07:01:00 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 12 (= number of training instances)
24/04/09 07:01:01 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree 

Best Model Accuracy: 0.8571428571428571
Best Max Depth: 5, Best Max Bins: 20


In [64]:
# Train a logistic regression model on the subset of data
lr = LogisticRegression(featuresCol='features', labelCol=target_column)
lrModel = lr.fit(train_df)

24/04/08 17:04:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:06:10 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:06:42 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:06:44 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:06:44 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/04/08 17:06:44 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/04/08 17:06:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:06:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:06:46 WARN org.apache.spark.scheduler.DAGSc

In [65]:
# Evaluate on the training set
train_predictions = lrModel.transform(train_df)
evaluator = BinaryClassificationEvaluator(labelCol=target_column, rawPredictionCol="rawPrediction")
train_auc = evaluator.evaluate(train_predictions, {evaluator.metricName: "areaUnderROC"})
print(f"Training AUC: {train_auc}")

# Final evaluation on the blind test set
blind_test_predictions = lrModel.transform(test_df)
blind_test_auc = evaluator.evaluate(blind_test_predictions, {evaluator.metricName: "areaUnderROC"})
print(f"Blind Test AUC: {blind_test_auc}")

24/04/08 17:08:41 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:09:12 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
                                                                                

Training AUC: 1.0


24/04/08 17:09:14 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
24/04/08 17:09:51 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 29.9 MiB
[Stage 497:>                                                        (0 + 1) / 1]

Blind Test AUC: 0.0


                                                                                

In [196]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Assemble features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Define logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol=target_column)

In [197]:
# Create a pipeline
pipeline = Pipeline(stages=[assembler, lr])

# Define parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [198]:
# Define evaluator
evaluator = BinaryClassificationEvaluator(labelCol=target_column, metricName="areaUnderROC")

# Set up 5-fold cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

In [None]:
# Split the data
train_df, test_df = sampled_df.randomSplit([0.8, 0.2], seed=42)

# Run cross-validation, and choose the best set of parameters
cvModel = crossval.fit(train_df)

In [None]:
# Fetch the best model
bestModel = cvModel.bestModel

# Make predictions on the test set
test_predictions = bestModel.transform(test_df)

In [None]:
# Evaluate the best model on the test set
test_auc = evaluator.evaluate(test_predictions)

print(f"Test AUC: {test_auc}")