In [1]:
import time
import json
from numpy import array, mean
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.functions import year
from pyspark.sql.functions import from_json, col, udf, schema_of_json, size, explode, collect_set, map_keys
from pyspark.sql.functions import isnan, when, count, col,lower, split, trim, concat_ws
from pyspark.sql.types import BooleanType, MapType, StructType, StructField, StringType

from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#create a Spark application
spark = SparkSession.builder.appName('chicago_crimes').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

#set log level
spark.sparkContext.setLogLevel("ERROR")

#review configuration settings
spark.sparkContext.getConf().getAll()

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fa26f46d-f55e-44a6-ac05-d1de5ada96b6;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.4.0 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.16.0 in central
	found com.google.guava#guava;31.1-jre in centra

[('spark.stage.maxConsecutiveAttempts', '10'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.submit.pyFiles',
  '/root/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp_2.12-4.4.0.jar,/root/.ivy2/jars/graphframes_graphframes-0.8.2-spark3.1-s_2.12.jar,/root/.ivy2/jars/com.typesafe_config-1.4.2.jar,/root/.ivy2/jars/org.rocksdb_rocksdbjni-6.29.5.jar,/root/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.828.jar,/root/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar,/root/.ivy2/jars/com.google.cloud_google-cloud-storage-2.16.0.jar,/root/.ivy2/jars/com.navigamez_greex-1.0.jar,/root/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.12-0.4.4.jar,/root/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar,/root/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar,/root/.ivy2/jars/com.google.guava_guava-31.1-jre.jar,/root/.ivy2/jars/com.google.guava_failureaccess-1.0.1.jar,/root/.ivy2/jars/com.google.guava_listenablefuture-9999.0-empty-to-avoid-conflict-

## Extract datasets

In [3]:
# Path 1
file_path_1_2009 = "gs://msca-bdp-student-gcs/Group4/2009.csv"
df_1_2009 = spark.read.csv(file_path_1_2009, header=True, inferSchema=True)

file_path_1_2010 = "gs://msca-bdp-student-gcs/Group4/2010.csv"
df_1_2010 = spark.read.csv(file_path_1_2010, header=True, inferSchema=True)

file_path_1_2011 = "gs://msca-bdp-student-gcs/Group4/2011.csv"
df_1_2011 = spark.read.csv(file_path_1_2011, header=True, inferSchema=True)

file_path_1_2012 = "gs://msca-bdp-student-gcs/Group4/2012.csv"
df_1_2012 = spark.read.csv(file_path_1_2012, header=True, inferSchema=True)

file_path_1_2013 = "gs://msca-bdp-student-gcs/Group4/2013.csv"
df_1_2013 = spark.read.csv(file_path_1_2013, header=True, inferSchema=True)

file_path_1_2014 = "gs://msca-bdp-student-gcs/Group4/2014.csv"
df_1_2014 = spark.read.csv(file_path_1_2014, header=True, inferSchema=True)

file_path_1_2015 = "gs://msca-bdp-student-gcs/Group4/2015.csv"
df_1_2015 = spark.read.csv(file_path_1_2015, header=True, inferSchema=True)

file_path_1_2016 = "gs://msca-bdp-student-gcs/Group4/2016.csv"
df_1_2016 = spark.read.csv(file_path_1_2016, header=True, inferSchema=True)

file_path_1_2017 = "gs://msca-bdp-student-gcs/Group4/2017.csv"
df_1_2017 = spark.read.csv(file_path_1_2017, header=True, inferSchema=True)

file_path_1_2018 = "gs://msca-bdp-student-gcs/Group4/2018.csv"
df_1_2018 = spark.read.csv(file_path_1_2018, header=True, inferSchema=True)

# Path 2
file_path_2_2019 = "gs://msca-bdp-student-gcs/Group4/Combined_Flights_2019.csv"
df_2_2019 = spark.read.csv(file_path_2_2019, header=True, inferSchema=True)

file_path_2_2020 = "gs://msca-bdp-student-gcs/Group4/Combined_Flights_2020.csv"
df_2_2020 = spark.read.csv(file_path_2_2020, header=True, inferSchema=True)

file_path_2_2021 = "gs://msca-bdp-student-gcs/Group4/Combined_Flights_2021.csv"
df_2_2021 = spark.read.csv(file_path_2_2021, header=True, inferSchema=True)

file_path_2_2022 = "gs://msca-bdp-student-gcs/Group4/Combined_Flights_2022.csv"
df_2_2022 = spark.read.csv(file_path_2_2022, header=True, inferSchema=True)

# Path 3
file_path_3 = "gs://msca-bdp-student-gcs/Group4/flights_sample_3m.csv"
df_3 = spark.read.csv(file_path_2_2019, header=True, inferSchema=True)

                                                                                

## Merge datasets from path1

In [5]:
merged_df_1 = (df_1_2009.union(df_1_2010)
                    .union(df_1_2011)
                    .union(df_1_2012)
                    .union(df_1_2013)
                    .union(df_1_2014)
                    .union(df_1_2015)
                    .union(df_1_2016)
                    .union(df_1_2017)
                    .union(df_1_2018))

In [6]:
merged_df_1.summary().show()

[Stage 32:>                                                         (0 + 1) / 1]

+-------+----------+----------+------------------+--------+--------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+-------------------+-----------------+--------------------+-----------------+-------------------+------------------+-----------------+-----------------+------------------+------------------+-------------------+-------------------+-----------+
|summary|   FL_DATE|OP_CARRIER| OP_CARRIER_FL_NUM|  ORIGIN|    DEST|      CRS_DEP_TIME|          DEP_TIME|        DEP_DELAY|          TAXI_OUT|       WHEELS_OFF|        WHEELS_ON|          TAXI_IN|      CRS_ARR_TIME|          ARR_TIME|         ARR_DELAY|          CANCELLED|CANCELLATION_CODE|            DIVERTED| CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|          AIR_TIME|         DISTANCE|    CARRIER_DELAY|     WEATHER_DELAY|         NAS_DELAY|     SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+-----

24/12/03 03:15:01 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
df_2_2020.printSchema()
df_2_2021.printSchema()
df_2_2022.printSchema()

root
 |-- FlightDate: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)


## Unify column names in Path2's dfs

In [8]:
from pyspark.sql.functions import col

# Cast `DivAirportLandings` to `double` in each DataFrame
df1 = df_2_2020.withColumn("DivAirportLandings", col("DivAirportLandings").cast("double"))
df2 = df_2_2021.withColumn("DivAirportLandings", col("DivAirportLandings").cast("double"))
df3 = df_2_2022.withColumn("DivAirportLandings", col("DivAirportLandings").cast("double"))

# Display the first 5 rows to confirm the structure
df1.show(5)
df2.show(5)
df3.show(5)

+----------+-----------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+----------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|    Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinute

## Merge df from path2

In [9]:
# Merge the three DataFrames vertically
merged_df_2 = df1.unionAll(df2).unionAll(df3)

# # Display the first 5 rows of the merged DataFrame
# # merged_df_2.show(5)
# merged_df_2.printSchema()
# merged_df_2.summary().show()

In [10]:
merged_df_1.show(5)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

In [11]:
merged_df_2.show(5)

+----------+-----------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+----------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|    Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinute

                                                                                

## Unify Airline names in merged_df 1&2

In [11]:
distinct_airlines_df2 = merged_df_2.select("Airline").distinct()
distinct_airlines_df2.show(truncate=False, n=distinct_airlines_df2.count())

                                                                                

+-----------------------------------------+
|Airline                                  |
+-----------------------------------------+
|GoJet Airlines, LLC d/b/a United Express |
|Endeavor Air Inc.                        |
|Allegiant Air                            |
|SkyWest Airlines Inc.                    |
|Horizon Air                              |
|United Air Lines Inc.                    |
|Air Wisconsin Airlines Corp              |
|Trans States Airlines                    |
|Compass Airlines                         |
|Comair Inc.                              |
|Frontier Airlines Inc.                   |
|Southwest Airlines Co.                   |
|ExpressJet Airlines Inc.                 |
|JetBlue Airways                          |
|Commutair Aka Champlain Enterprises, Inc.|
|Empire Airlines Inc.                     |
|Envoy Air                                |
|Capital Cargo International              |
|Hawaiian Airlines Inc.                   |
|Alaska Airlines Inc.           

In [12]:
from pyspark.sql.functions import col, lit
from pyspark.sql import functions as F

# Dictionary mapping airlines to their abbreviations
airline_dict = {
    "GoJet Airlines, LLC d/b/a United Express": "G7",
    "Endeavor Air Inc.": "9E",
    "Allegiant Air": "G4",
    "SkyWest Airlines Inc.": "OO",
    "Horizon Air": "QX",
    "United Air Lines Inc.": "UA",
    "Air Wisconsin Airlines Corp": "ZW",
    "Trans States Airlines": "AX",
    "Comair Inc.": "OH",
    "Frontier Airlines Inc.": "F9",
    "Compass Airlines": "CP",
    "Southwest Airlines Co.": "WN",
    "ExpressJet Airlines Inc.": "EV",
    "JetBlue Airways": "B6",
    "Commutair Aka Champlain Enterprises, Inc.": "C5",
    "Empire Airlines Inc.": "EM",
    "Capital Cargo International": "PT",
    "Envoy Air": "MQ",
    "Hawaiian Airlines Inc.": "HA",
    "Alaska Airlines Inc.": "AS",
    "Delta Air Lines Inc.": "DL",
    "Mesa Airlines Inc.": "YV",
    "American Airlines Inc.": "AA",
    "Republic Airlines": "YX",
    "Spirit Air Lines": "NK"
}

# Create a broadcast variable for the dictionary to optimize joins
airline_dict_broadcast = spark.sparkContext.broadcast(airline_dict)

# Define a UDF to map airline names to abbreviations
def get_op_carrier(airline):
    return airline_dict_broadcast.value.get(airline, None)

get_op_carrier_udf = F.udf(get_op_carrier)

# Add the OP_CARRIER column to merged_df_2 based on the Airline column
merged_df_2 = merged_df_2.withColumn("OP_CARRIER", get_op_carrier_udf(col("Airline")))

# Show the resulting DataFrame
merged_df_2.show()

[Stage 50:>                                                         (0 + 1) / 1]

+----------+-----------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+----------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+----------+
|FlightDate|    Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|Arr

                                                                                

In [13]:
# Assuming merged_df_2 is a Spark DataFrame
distinct_airlines_df1 = merged_df_1.select("OP_CARRIER").distinct()

# Show all distinct airline names without truncation
distinct_airlines_df1.show(truncate=False, n=distinct_airlines_df2.count())


                                                                                

+----------+
|OP_CARRIER|
+----------+
|UA        |
|NK        |
|AA        |
|NW        |
|EV        |
|B6        |
|DL        |
|OO        |
|F9        |
|YV        |
|US        |
|MQ        |
|OH        |
|HA        |
|XE        |
|G4        |
|YX        |
|AS        |
|FL        |
|CO        |
|VX        |
|WN        |
|9E        |
+----------+



In [21]:
from pyspark.sql.functions import col, lit
from pyspark.sql import functions as F

# Dictionary mapping airlines to their abbreviations
airline_dict = {
    "GoJet Airlines, LLC d/b/a United Express": "G7",
    "Endeavor Air Inc.": "9E",
    "Allegiant Air": "G4",
    "SkyWest Airlines Inc.": "OO",
    "Horizon Air": "QX",
    "United Air Lines Inc.": "UA",
    "Air Wisconsin Airlines Corp": "ZW",
    "Trans States Airlines": "AX",
    "Comair Inc.": "OH",
    "Frontier Airlines Inc.": "F9",
    "Compass Airlines": "CP",
    "Southwest Airlines Co.": "WN",
    "ExpressJet Airlines Inc.": "EV",
    "JetBlue Airways": "B6",
    "Commutair Aka Champlain Enterprises, Inc.": "C5",
    "Empire Airlines Inc.": "EM",
    "Capital Cargo International": "PT",
    "Envoy Air": "MQ",
    "Hawaiian Airlines Inc.": "HA",
    "Alaska Airlines Inc.": "AS",
    "Delta Air Lines Inc.": "DL",
    "Mesa Airlines Inc.": "YV",
    "American Airlines Inc.": "AA",
    "Republic Airlines": "YX",
    "Spirit Air Lines": "NK"
}

# Create a broadcast variable for the dictionary to optimize joins
airline_dict_broadcast = spark.sparkContext.broadcast(airline_dict)

# Define a UDF to map airline names to abbreviations
def get_op_carrier(airline):
    return airline_dict_broadcast.value.get(airline, None)

get_op_carrier_udf = F.udf(get_op_carrier)

# Add the OP_CARRIER column to merged_df_2 based on the Airline column
merged_df_2 = merged_df_2.withColumn("OP_CARRIER", get_op_carrier_udf(col("Airline")))

# Show the resulting DataFrame
merged_df_2.show()


[Stage 71:>                                                         (0 + 1) / 1]

+----------+-----------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+----------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+----------+
|FlightDate|    Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|Arr

                                                                                

## Unify columns name in df1&df2

In [14]:
merged_df_1.printSchema()
merged_df_2.printSchema()

root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: double (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: double (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CARRIER_DELAY: double (nullable = true)
 |-- WEATHER_DELAY: doub

In [15]:
# Create new columns in merged_df_2 with names matching merged_df_1, using values from the original columns
merged_df_2 = (
    merged_df_2
    .withColumn("FL_DATE", col("FlightDate"))
    .withColumn("OP_CARRIER", col("Airline"))
    .withColumn("ORIGIN", col("Origin"))
    .withColumn("DEST", col("Dest"))
    .withColumn("CRS_DEP_TIME", col("CRSDepTime"))
    .withColumn("DEP_TIME", col("DepTime"))
    .withColumn("DEP_DELAY", col("DepDelay"))
    .withColumn("CRS_ARR_TIME", col("CRSArrTime"))
    .withColumn("ARR_TIME", col("ArrTime"))
    .withColumn("ARR_DELAY", col("ArrDelay"))
    .withColumn("CANCELLED", col("Cancelled"))
    .withColumn("DIVERTED", col("Diverted"))
    .withColumn("CRS_ELAPSED_TIME", col("CRSElapsedTime"))
    .withColumn("ACTUAL_ELAPSED_TIME", col("ActualElapsedTime"))
    .withColumn("AIR_TIME", col("AirTime"))
    .withColumn("DISTANCE", col("Distance"))
    .withColumn("TAXI_OUT", col("TaxiOut"))
    .withColumn("WHEELS_OFF", col("WheelsOff"))
    .withColumn("WHEELS_ON", col("WheelsOn"))
    .withColumn("TAXI_IN", col("TaxiIn"))
)

# Show the result
merged_df_2.show()

+----------+-----------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+----------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+-----------+----------+------------+--------+---------+------------+--------+---------+----------------+------------------

In [16]:
merged_df_1 = merged_df_1.drop('CANCELLATION_CODE')
merged_df_1 = merged_df_1.drop('CARRIER_DELAY')
merged_df_1 = merged_df_1.drop('WEATHER_DELAY')
merged_df_1 = merged_df_1.drop('NAS_DELAY') 
merged_df_1 = merged_df_1.drop('SECURITY_DELAY')
merged_df_1 = merged_df_1.drop('LATE_AIRCRAFT_DELAY')
merged_df_1 = merged_df_1.drop('Unnamed: 27')

In [59]:
# Assuming df is your PySpark DataFrame
column_names1 = merged_df_2.columns
print(column_names1)

column_names = merged_df_1.columns
print(column_names)

lst = ['FL_DATE', 'OP_CARRIER', 'OP_CARRIER_FL_NUM', 'ORIGIN', 'DEST', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE']
merged_df_2_updated = merged_df_2[lst]
print(merged_df_2_updated.columns)
merged_df_2_updated = merged_df_2_updated.withColumn("CANCELLED", col("CANCELLED").cast("boolean"))

['FlightDate', 'Airline', 'ORIGIN', 'DEST', 'CANCELLED', 'DIVERTED', 'CRSDepTime', 'DepTime', 'DepDelayMinutes', 'DepDelay', 'ArrTime', 'ArrDelayMinutes', 'AirTime', 'CRSElapsedTime', 'ActualElapsedTime', 'DISTANCE', 'Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek', 'Marketing_Airline_Network', 'Operated_or_Branded_Code_Share_Partners', 'DOT_ID_Marketing_Airline', 'IATA_Code_Marketing_Airline', 'Flight_Number_Marketing_Airline', 'Operating_Airline', 'DOT_ID_Operating_Airline', 'IATA_Code_Operating_Airline', 'Tail_Number', 'Flight_Number_Operating_Airline', 'OriginAirportID', 'OriginAirportSeqID', 'OriginCityMarketID', 'OriginCityName', 'OriginState', 'OriginStateFips', 'OriginStateName', 'OriginWac', 'DestAirportID', 'DestAirportSeqID', 'DestCityMarketID', 'DestCityName', 'DestState', 'DestStateFips', 'DestStateName', 'DestWac', 'DepDel15', 'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOut', 'WheelsOff', 'WheelsOn', 'TaxiIn', 'CRSArrTime', 'ArrDelay', 'ArrDel15', 'ArrivalDelayGroups'

In [62]:
# Check data types for merged_df_1
merged_df_1.printSchema()

# Check data types for merged_df_2_updated
merged_df_2_updated.printSchema()

from pyspark.sql.functions import col

# Cast columns in merged_df_2_updated to match types in merged_df_1
merged_df_2_updated = merged_df_2_updated \
    .withColumn("CRS_DEP_TIME", col("CRS_DEP_TIME").cast("double")) \
    .withColumn("CRS_ARR_TIME", col("CRS_ARR_TIME").cast("double")) \
    .withColumn("CANCELLED", col("CANCELLED").cast("double")) \
    .withColumn("DIVERTED", col("DIVERTED").cast("double"))

# Now, perform the union
merged_df = merged_df_1.unionByName(merged_df_2_updated)


root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: double (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: double (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)

root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer

In [67]:
from pyspark.sql.functions import col, sum

# Count missing values for each column in merged_df
missing_values_1 = merged_df_1.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_df.columns])
missing_values_2 = merged_df_2_updated.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_df.columns])

# Show the result
missing_values_1.show()
missing_values_2.show()

                                                                                

+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|      0|         0|                0|     0|   0|           1|  935723|   940675|  963901|    963896|   997016| 997015|           2|  997015|  1121351|        0|       0|              60|            1118754| 1118753|       0|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+-



+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|      0|         0|                0|     0|   0|           0|  528378|   528591|  533445|    533445|   538667| 538667|           0|  538646|   568870|        0|       0|               6|             568870|  568870|       0|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+-

                                                                                

In [72]:
missing_values_2_ = merged_df_2.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_df.columns])
missing_values_2_.show()



+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|      0|         0|                0|     0|   0|           0|  528378|   528591|  533445|    533445|   538667| 538667|           0|  538646|   568870|        0|       0|               6|             568870|  568870|       0|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+-

                                                                                

In [63]:
merged_df = merged_df_1.unionByName(merged_df_2_updated)

In [64]:
merged_df.show(5)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|2009-01-01|        XE|             1204|   DCA| EWR|      1100.0|  1058.0|     -2.0|    18.0|    1116.0|   1158.0|    8.0|      1202.0|  1206.0|      4.0|      0.0|     0.0|            62.0|               68.0|    42.0|   199.0|
|2009-01-01|        XE|             1206|   EWR| IAD|      1510.0|  1509.0|     

In [65]:
from pyspark.sql.functions import col, sum

# Count missing values for each column in merged_df
missing_values = merged_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_df.columns])

# Show the result
missing_values.show()



+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|      0|         0|                0|     0|   0|           1| 1464101|  1469266| 1497346|   1497341|  1535683|1535682|           2| 1535661|  1690221|        0|       0|              66|            1687624| 1687623|       0|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+-

24/11/04 03:54:36 WARN org.apache.spark.storage.BlockManagerMasterEndpoint: No more replicas available for broadcast_126_python !


## Flight Delay and Cancellation Dataset

In [79]:
CRSElapsedTimedf_3_col = df_3.columns
print(df_3_col)

# merged_df_col = merged_df.columns
# print(merged_df_col)

merged_df_col_2 = merged_df_2.columns
print(merged_df_col_2)

merged_df_col_1 = merged_df_1.columns
print(merged_df_col_1)

['FlightDate', 'Airline', 'Origin', 'Dest', 'Cancelled', 'Diverted', 'CRSDepTime', 'DepTime', 'DepDelayMinutes', 'DepDelay', 'ArrTime', 'ArrDelayMinutes', 'AirTime', 'CRSElapsedTime', 'ActualElapsedTime', 'Distance', 'Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek', 'Marketing_Airline_Network', 'Operated_or_Branded_Code_Share_Partners', 'DOT_ID_Marketing_Airline', 'IATA_Code_Marketing_Airline', 'Flight_Number_Marketing_Airline', 'Operating_Airline', 'DOT_ID_Operating_Airline', 'IATA_Code_Operating_Airline', 'Tail_Number', 'Flight_Number_Operating_Airline', 'OriginAirportID', 'OriginAirportSeqID', 'OriginCityMarketID', 'OriginCityName', 'OriginState', 'OriginStateFips', 'OriginStateName', 'OriginWac', 'DestAirportID', 'DestAirportSeqID', 'DestCityMarketID', 'DestCityName', 'DestState', 'DestStateFips', 'DestStateName', 'DestWac', 'DepDel15', 'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOut', 'WheelsOff', 'WheelsOn', 'TaxiIn', 'CRSArrTime', 'ArrDelay', 'ArrDel15', 'ArrivalDelayGroups'

In [None]:
lst = ['FL_DATE', 'OP_CARRIER', 'OP_CARRIER_FL_NUM', 'ORIGIN', 'DEST', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE']
merged_df_2_updated = merged_df_2[lst]
print(merged_df_2_updated.columns)
merged_df_2_updated = merged_df_2_updated.withColumn("CANCELLED", col("CANCELLED").cast("boolean"))