# Data preaparation and xgboost regressor training

# Spark set up

In [61]:
import platform
local_os = platform.system()
if local_os == 'Linux':
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
    !tar zxvf /content/spark-3.3.1-bin-hadoop3.tgz
    !pip install -q findspark
    import findspark
    findspark.init()
print(local_os)

Windows


In [62]:
from pyspark.sql import SparkSession

In [63]:
import os
if local_os == 'Linux':
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"
elif local_os == 'Windows':
    os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk-19/"
else:
    os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk/"


In [64]:
spark = SparkSession.builder\
        .master("local")\
        .appName("flights")\
        .getOrCreate()

In [65]:
spark

# Libs imports

In [1]:
from pyspark.sql.functions import desc, isnan, when, count, col, isnull
from collections import Counter

# Data loading

In [2]:
from pathlib import Path

if local_os == "Windows":
    source = "D:/data_sets/airlines/"
elif local_os == 'Linux':
    source = "/content/drive/MyDrive/datasets/flights_kaggle_split"
else:
    source = "../data/"
print(source)
source_path = Path(source).glob('*.parquet')
file_names = sorted(list(source_path))
file_names

NameError: name 'local_os' is not defined

In [68]:
def merge_data(file_names):
  first_file = file_names.pop(0)
  data = spark.read.parquet(first_file.as_posix())
  for file_name in file_names:
    temp_data = spark.read.parquet(file_name.as_posix())
    data = data.union(temp_data)
    print(file_name.as_posix())
  return data
data = merge_data(file_names)

D:/data_sets/airlines/Combined_Flights_2019.parquet
D:/data_sets/airlines/Combined_Flights_2020.parquet
D:/data_sets/airlines/Combined_Flights_2021.parquet
D:/data_sets/airlines/Combined_Flights_2022.parquet


In [69]:
data.printSchema()

root
 |-- FlightDate: timestamp (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: long (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID_Mar

# EDA

In [70]:
data.count(), len(data.columns)

(29193782, 62)

In [71]:
data.show(5)

+-------------------+-----------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+-----------------+
|         FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|D

## Data types exploration
Handling categorical values to train model

In [72]:
data = data.drop(data.__index_level_0__)

In [73]:
dtypes = set()
[dtypes.add(item[1]) for item in data.dtypes]
dtypes

{'bigint', 'boolean', 'double', 'string', 'timestamp'}

### Boolean exploration

In [74]:
[item[0] for item in data.dtypes if item[1] == 'boolean']

['Cancelled', 'Diverted']

In [75]:
data.groupBy('Cancelled').count().orderBy('count').collect()

[Row(Cancelled=True, count=777267), Row(Cancelled=False, count=28416515)]

In [76]:
data.groupBy('Diverted').count().orderBy('count').collect()

[Row(Diverted=True, count=68349), Row(Diverted=False, count=29125433)]

### String exploration

In [77]:
str_columns = [item[0] for item in data.dtypes if item[1] == 'string']

In [78]:
data.select(str_columns).show(5)

+-----------------+------+----+-------------------------+---------------------------------------+---------------------------+-----------------+---------------------------+-----------+--------------+-----------+---------------+------------+---------+-------------+----------+----------+
|          Airline|Origin|Dest|Marketing_Airline_Network|Operated_or_Branded_Code_Share_Partners|IATA_Code_Marketing_Airline|Operating_Airline|IATA_Code_Operating_Airline|Tail_Number|OriginCityName|OriginState|OriginStateName|DestCityName|DestState|DestStateName|DepTimeBlk|ArrTimeBlk|
+-----------------+------+----+-------------------------+---------------------------------------+---------------------------+-----------------+---------------------------+-----------+--------------+-----------+---------------+------------+---------+-------------+----------+----------+
|Endeavor Air Inc.|   ABY| ATL|                       DL|                           DL_CODESHARE|                         DL|               9E

### Timestamp exploration

In [79]:
timestamp_columns = [item[0] for item in data.dtypes if item[1] == 'timestamp']

In [80]:
data.select(timestamp_columns).show(5)

+-------------------+
|         FlightDate|
+-------------------+
|2018-01-22 18:00:00|
|2018-01-23 18:00:00|
|2018-01-24 18:00:00|
|2018-01-25 18:00:00|
|2018-01-26 18:00:00|
+-------------------+
only showing top 5 rows



## Null values handling

In [81]:
null_count = data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns])
null_count.show()

+----------+-------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinutes|AirTime|

In [82]:
null_values = null_count.collect()[0].asDict()

In [83]:
only_nulls = null_count.select([key for key in null_values if null_values[key] != 0])
only_nulls.show()

+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+-----------+--------+--------------------+-------+---------+--------+------+--------+--------+------------------+------------------+
|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinutes|AirTime|CRSElapsedTime|ActualElapsedTime|Tail_Number|DepDel15|DepartureDelayGroups|TaxiOut|WheelsOff|WheelsOn|TaxiIn|ArrDelay|ArrDel15|ArrivalDelayGroups|DivAirportLandings|
+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+-----------+--------+--------------------+-------+---------+--------+------+--------+--------+------------------+------------------+
| 761652|         763084|  763084| 786177|         846183| 852561|            22|           845637|     267613|  763084|              763084| 780561|   780551|  793133|793143|  846183|  846183|            846183|                90|
+-------+---------------+--------+-------+---------------+-------+------

## Drop all null values

In [84]:
data_no_na = data.dropna()

# Feature selection

In [85]:
#Drop timestamp data
data_no_na = data_no_na.drop('FlightDate')

In [86]:
data_no_na.count(), len(data_no_na.columns)

(28339510, 60)

In [87]:
cols_to_encode = ['Airline', 'Origin', 'Dest']
[str_columns.remove(x) for x in cols_to_encode]
None

In [88]:
data_no_na = data_no_na.drop(*str_columns)
data_no_na.count(), len(data_no_na.columns)

(28339510, 46)

In [89]:
data_no_na.printSchema()

root
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: long (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- DOT_ID_Marketing_Airline: long (nullable = true)
 |-- Flight_Number_Marketing_Airline: long (nullable = true)
 |-- DOT_ID_Operating_Airline: long (nullable = true)
 |-- Flight_Number_

In [90]:
dep_cols = [x for x in data_no_na.columns if "Dep" in x]
arr_cols = [x for x in data_no_na.columns if "Arr" in x]

In [91]:
time_columns = dep_cols + arr_cols
time_columns

['CRSDepTime',
 'DepTime',
 'DepDelayMinutes',
 'DepDelay',
 'DepDel15',
 'DepartureDelayGroups',
 'ArrTime',
 'ArrDelayMinutes',
 'CRSArrTime',
 'ArrDelay',
 'ArrDel15',
 'ArrivalDelayGroups']

In [92]:
time_columns.remove('DepDelay')
# time_columns.remove('ArrTime')
time_columns

['CRSDepTime',
 'DepTime',
 'DepDelayMinutes',
 'DepDel15',
 'DepartureDelayGroups',
 'ArrTime',
 'ArrDelayMinutes',
 'CRSArrTime',
 'ArrDelay',
 'ArrDel15',
 'ArrivalDelayGroups']

In [93]:
data_no_na.select('DepDelay').describe().show()

+-------+-----------------+
|summary|         DepDelay|
+-------+-----------------+
|  count|         28339510|
|   mean| 9.23847367156313|
| stddev|47.10140749050439|
|    min|          -1280.0|
|    max|           7223.0|
+-------+-----------------+



In [94]:
data_no_na.select('ArrDelay').describe().show()

+-------+------------------+
|summary|          ArrDelay|
+-------+------------------+
|  count|          28339510|
|   mean|3.6081859213514984|
| stddev| 49.28063347282263|
|    min|           -1290.0|
|    max|            7232.0|
+-------+------------------+



In [95]:
data_no_na = data_no_na.drop(*time_columns)

In [96]:
data_no_na.printSchema()

root
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- DOT_ID_Marketing_Airline: long (nullable = true)
 |-- Flight_Number_Marketing_Airline: long (nullable = true)
 |-- DOT_ID_Operating_Airline: long (nullable = true)
 |-- Flight_Number_Operating_Airline: long (nullable = true)
 |-- OriginAirportID: long (nullable = true)
 |-- OriginAirportSeqID: long (nullable = true)
 |-- OriginCityMarketID: long (nullable = true)
 |-- OriginStateFips: long (n

## One hot enconding

In [97]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

### Vectorized implementation of one hot encoding

In [98]:
numerical_cols = [item[0] for item in data_no_na.dtypes if item[1] not in ['string', 'boolean']]

bigint represented as long by spark

In [99]:
col_dict = dict(data_no_na.dtypes).values()
c = Counter(col_dict)
c.most_common()

[('bigint', 20), ('double', 10), ('string', 3), ('boolean', 2)]

In [100]:
indexed_out_names = [x + '_num' for x in cols_to_encode]
vector_out_names = [x + '_vec' for x in cols_to_encode]

In [101]:
idxer = StringIndexer(inputCols=cols_to_encode, outputCols=indexed_out_names)
enconder = OneHotEncoder(inputCols=idxer.getOutputCols(), outputCols=vector_out_names)

In [102]:
# TODO: Design Transformer to convert vectorized columns to one hot encoded columns

In [103]:
pipeline = Pipeline(stages=[idxer, enconder])

In [104]:
preprocess = pipeline.fit(data_no_na)

In [105]:
df_transformed = preprocess.transform(data_no_na)
df_transformed.first()

Row(Airline='Endeavor Air Inc.', Origin='ABY', Dest='ATL', Cancelled=False, Diverted=False, DepDelay=-5.0, AirTime=38.0, CRSElapsedTime=62.0, ActualElapsedTime=59.0, Distance=145.0, Year=2018, Quarter=1, Month=1, DayofMonth=23, DayOfWeek=2, DOT_ID_Marketing_Airline=19790, Flight_Number_Marketing_Airline=3298, DOT_ID_Operating_Airline=20363, Flight_Number_Operating_Airline=3298, OriginAirportID=10146, OriginAirportSeqID=1014602, OriginCityMarketID=30146, OriginStateFips=13, OriginWac=34, DestAirportID=10397, DestAirportSeqID=1039707, DestCityMarketID=30397, DestStateFips=13, DestWac=34, TaxiOut=14.0, WheelsOff=1211.0, WheelsOn=1249.0, TaxiIn=7.0, DistanceGroup=1, DivAirportLandings=0.0, Airline_num=8.0, Origin_num=271.0, Dest_num=0.0, Airline_vec=SparseVector(27, {8: 1.0}), Origin_vec=SparseVector(387, {271: 1.0}), Dest_vec=SparseVector(387, {0: 1.0}))

In [106]:
col_dict = dict(df_transformed.dtypes).values()
c = Counter(col_dict)
c.most_common()

[('bigint', 20), ('double', 13), ('string', 3), ('vector', 3), ('boolean', 2)]

In [107]:
from pyspark.ml.functions import vector_to_array

In [108]:
df_transformed.first()

Row(Airline='Endeavor Air Inc.', Origin='ABY', Dest='ATL', Cancelled=False, Diverted=False, DepDelay=-5.0, AirTime=38.0, CRSElapsedTime=62.0, ActualElapsedTime=59.0, Distance=145.0, Year=2018, Quarter=1, Month=1, DayofMonth=23, DayOfWeek=2, DOT_ID_Marketing_Airline=19790, Flight_Number_Marketing_Airline=3298, DOT_ID_Operating_Airline=20363, Flight_Number_Operating_Airline=3298, OriginAirportID=10146, OriginAirportSeqID=1014602, OriginCityMarketID=30146, OriginStateFips=13, OriginWac=34, DestAirportID=10397, DestAirportSeqID=1039707, DestCityMarketID=30397, DestStateFips=13, DestWac=34, TaxiOut=14.0, WheelsOff=1211.0, WheelsOn=1249.0, TaxiIn=7.0, DistanceGroup=1, DivAirportLandings=0.0, Airline_num=8.0, Origin_num=271.0, Dest_num=0.0, Airline_vec=SparseVector(27, {8: 1.0}), Origin_vec=SparseVector(387, {271: 1.0}), Dest_vec=SparseVector(387, {0: 1.0}))

In [109]:
df_transformed.select('Airline_num').filter(col('Airline_num') == 0).show(1)

+-----------+
|Airline_num|
+-----------+
|        0.0|
+-----------+
only showing top 1 row



In [110]:
df_col_onehot = df_transformed.select('*', vector_to_array('Airline_vec').alias('airline_one_hot'), vector_to_array('Origin_vec').alias('origin_one_hot'), vector_to_array('Dest_vec').alias('dest_one_hot'))
df_col_onehot.first()

Row(Airline='Endeavor Air Inc.', Origin='ABY', Dest='ATL', Cancelled=False, Diverted=False, DepDelay=-5.0, AirTime=38.0, CRSElapsedTime=62.0, ActualElapsedTime=59.0, Distance=145.0, Year=2018, Quarter=1, Month=1, DayofMonth=23, DayOfWeek=2, DOT_ID_Marketing_Airline=19790, Flight_Number_Marketing_Airline=3298, DOT_ID_Operating_Airline=20363, Flight_Number_Operating_Airline=3298, OriginAirportID=10146, OriginAirportSeqID=1014602, OriginCityMarketID=30146, OriginStateFips=13, OriginWac=34, DestAirportID=10397, DestAirportSeqID=1039707, DestCityMarketID=30397, DestStateFips=13, DestWac=34, TaxiOut=14.0, WheelsOff=1211.0, WheelsOn=1249.0, TaxiIn=7.0, DistanceGroup=1, DivAirportLandings=0.0, Airline_num=8.0, Origin_num=271.0, Dest_num=0.0, Airline_vec=SparseVector(27, {8: 1.0}), Origin_vec=SparseVector(387, {271: 1.0}), Dest_vec=SparseVector(387, {0: 1.0}), airline_one_hot=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,

In [111]:
# vecs_to_encode_lens = [len(df_col_onehot.first()["airline_one_hot"]), len(df_col_onehot.first()["origin_one_hot"]), len(df_col_onehot.first()["dest_one_hot"])]
vecs_to_encode= ["airline_one_hot", "origin_one_hot", "dest_one_hot"]
cols_expanded = [(col(col_name)[i].alias(f'{col_name.split("_")[0] + preprocess.stages[0].labelsArray[idx][i].replace(" ", "").replace(".", "")}')) for idx, col_name in enumerate(vecs_to_encode) for i in range(len(df_col_onehot.first()[col_name]))]
df_transformed =  df_col_onehot.select('*', *cols_expanded)
df_transformed = df_transformed.drop(*vecs_to_encode + vector_out_names + cols_to_encode + indexed_out_names)

In [112]:
df_transformed.printSchema()

root
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- DOT_ID_Marketing_Airline: long (nullable = true)
 |-- Flight_Number_Marketing_Airline: long (nullable = true)
 |-- DOT_ID_Operating_Airline: long (nullable = true)
 |-- Flight_Number_Operating_Airline: long (nullable = true)
 |-- OriginAirportID: long (nullable = true)
 |-- OriginAirportSeqID: long (nullable = true)
 |-- OriginCityMarketID: long (nullable = true)
 |-- OriginStateFips: long (nullable = true)
 |-- OriginWac: long (nullable = true)
 |-- DestAirportID: long (nullable = true)
 |-- DestAirpor

In [113]:
col_dict = dict(df_transformed.dtypes).values()
c = Counter(col_dict)
c.most_common()

[('double', 811), ('bigint', 20), ('boolean', 2)]

## Generate feature vector from columns

In [114]:
all_columns = list(set(df_transformed.columns).difference(idxer.getOutputCols()))
vectorizer = VectorAssembler(inputCols=df_transformed.columns, outputCol='features').transform(df_transformed)
vectorizer.printSchema()

root
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- DOT_ID_Marketing_Airline: long (nullable = true)
 |-- Flight_Number_Marketing_Airline: long (nullable = true)
 |-- DOT_ID_Operating_Airline: long (nullable = true)
 |-- Flight_Number_Operating_Airline: long (nullable = true)
 |-- OriginAirportID: long (nullable = true)
 |-- OriginAirportSeqID: long (nullable = true)
 |-- OriginCityMarketID: long (nullable = true)
 |-- OriginStateFips: long (nullable = true)
 |-- OriginWac: long (nullable = true)
 |-- DestAirportID: long (nullable = true)
 |-- DestAirpor

In [118]:
# all_columns.append('features')
vectorizer.select('features', 'DepDelay').show(1)

+--------------------+--------+
|            features|DepDelay|
+--------------------+--------+
|(833,[2,3,4,5,6,7...|    -5.0|
+--------------------+--------+
only showing top 1 row



## Data preparation

In [119]:
# Maybe implement standard scaler

## 

# Train Boosted GBTRegressor

In [None]:
# from spark.ml.re

# Train XGBoost model

In [120]:
from xgboost.spark import SparkXGBRegressor

In [124]:
label = "DepDelay"
feature_names = 'features'

In [125]:
regressor = SparkXGBRegressor(
  features_col=feature_names,
  label_col=label,
  num_workers=1,
  use_gpu=True,
)

In [126]:
model = regressor.fit(vectorizer)

You enabled use_gpu in spark local mode. Please make sure your local node has at least 1 GPUs
