# Predicting flight delays and cancellation with Pyspark 
Description:
- Purpose
- Steps
- Model

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
     |████████████████████████████████| 281.4 MB 40 kB/s              
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ done
[?25h  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=ac28b6e3f9ae47005c98333a23bf6d83b0dedc22be590be3025ad31124ced0db
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.2.1


# Import libraries

In [2]:
import numpy as np
import pandas as pd 
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import round

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator 

In [3]:
airlines_path = "../input/flight-delays/airlines.csv"
airports_path = "../input/flight-delays/airports.csv"
flights_path = "../input/flight-delays/flights.csv"

# Create Spark Session
Initializing PySpark application

In [4]:
spark = SparkSession.builder\
                    .master('local')\
                    .appName('Predict flight delays and cancellation')\
                    .getOrCreate()
print(spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/22 20:04:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3.2.1


# Exploratory Data Analysis
The following times are in the xx:yy - hour:minute format (e.g. 1536 means 3:36pm, 345 means 3:45am, 16 means 00:16am). 
- **YEAR**: Year of the Flight Trip | 1 distinct value
- **MONTH**: Month of the Flight Trip | 12 distinct values
- **DAY**: Day of the Flight Trip | 31
- **DAY_OF_WEEK**: Day of week of the Flight Trip | 7
- **AIRLINE**: Airline Identifier | 14 
- **FLIGHT_NUMBER**: Flight Identifier | 9855
- **TAIL_NUMBER**: Aircraft Identifier | 4897
- **ORIGIN_AIRPORT**: Starting Airport | 628
- **DESTINATION_AIRPORT**: Destination Airport | 629
- **SCHEDULED_DEPARTURE**: Planned Departure Time |
- **DEPARTURE_TIME**: WHEEL_OFF - TAXI_OUT
- **DEPARTURE_DELAY**: Total Delay on Departure (Negative means flight in advance)
- **TAXI_OUT**: The time duration elapsed between departure from the origin airport gate and wheels off.
- **WHEELS_OFF**: The time point that the aircraft's wheels leave the ground.
- **SCHEDULED_TIME**: Planned time amount needed for the flight trip
- **ELAPSED_TIME**: AIR_TIME+TAXI_IN+TAXI_OUT
- **AIR_TIME**: The time duration between wheels_off and wheels_on time
- **DISTANCE**: Distance between two airports
- **WHEELS_ON**: The time point that the aircraft's wheels touch on the ground.
- **TAXI_IN**: The time duration elapsed between wheels-on and gate arrival at the destination airport
- **SCHEDULED_ARRIVAL**: Planned arrival time
- **ARRIVAL_TIME**: WHEELS_ON+TAXI_IN
- **ARRIVAL_DELAY**: ARRIVAL_TIME-SCHEDULED_ARRIVAL
- **DIVERTED**: Aircraft landed on airport that out of schedule | 2
- **CANCELLED**: Flight Cancelled (1 = cancelled) | 2
- **CANCELLATION_REASON**: Reason for Cancellation of flight: A - Airline/Carrier; B - Weather; C - National Air System; D - Security | 5
- **AIR_SYSTEM_DELAY**: Delay caused by air system
- **SECURITY_DELAY**: Delay caused by security
- **AIRLINE_DELAY**: Delay caused by the airline
- **LATE_AIRCRAFT_DELAY**: Delay caused by aircraft
- **WEATHER_DELAY**: Delay caused by weather

## Read data

In [5]:
flights_df = spark.read.csv(flights_path,
                           header=True,
                           inferSchema=True,
                           nullValue="NA")

print("Number of columns %d"%len(flights_df.columns))
print("Number of records: %d"%flights_df.count())

# Rows are shown vertically because rows are too long to show horizontally
flights_df.show(1, vertical=True)

                                                                                

Number of columns 31


22/03/22 20:05:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Number of records: 5819079
-RECORD 0---------------------
 YEAR                | 2015   
 MONTH               | 1      
 DAY                 | 1      
 DAY_OF_WEEK         | 4      
 AIRLINE             | AS     
 FLIGHT_NUMBER       | 98     
 TAIL_NUMBER         | N407AS 
 ORIGIN_AIRPORT      | ANC    
 DESTINATION_AIRPORT | SEA    
 SCHEDULED_DEPARTURE | 5      
 DEPARTURE_TIME      | 2354   
 DEPARTURE_DELAY     | -11    
 TAXI_OUT            | 21     
 WHEELS_OFF          | 15     
 SCHEDULED_TIME      | 205    
 ELAPSED_TIME        | 194    
 AIR_TIME            | 169    
 DISTANCE            | 1448   
 WHEELS_ON           | 404    
 TAXI_IN             | 4      
 SCHEDULED_ARRIVAL   | 430    
 ARRIVAL_TIME        | 408    
 ARRIVAL_DELAY       | -22    
 DIVERTED            | 0      
 CANCELLED           | 0      
 CANCELLATION_REASON | null   
 AIR_SYSTEM_DELAY    | null   
 SECURITY_DELAY      | null   
 AIRLINE_DELAY       | null   
 LATE_AIRCRAFT_DELAY | null   
 WEATHER_DEL

In [6]:
flights_df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [7]:
# Enable spark.sql.repl.eagerEval.enabled configuration for the eager evaluation 
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
flights_df

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
2015,1,1,4,AS,98,N407AS,ANC,SEA,5,2354,-11,21,15,205,194,169,1448,404,4,430,408,-22,0,0,,,,,,
2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,2,-8,12,14,280,279,263,2330,737,4,750,741,-9,0,0,,,,,,
2015,1,1,4,US,840,N171US,SFO,CLT,20,18,-2,16,34,286,293,266,2296,800,11,806,811,5,0,0,,,,,,
2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,15,-5,15,30,285,281,258,2342,748,8,805,756,-9,0,0,,,,,,
2015,1,1,4,AS,135,N527AS,SEA,ANC,25,24,-1,11,35,235,215,199,1448,254,5,320,259,-21,0,0,,,,,,
2015,1,1,4,DL,806,N3730B,SFO,MSP,25,20,-5,18,38,217,230,206,1589,604,6,602,610,8,0,0,,,,,,
2015,1,1,4,NK,612,N635NK,LAS,MSP,25,19,-6,11,30,181,170,154,1299,504,5,526,509,-17,0,0,,,,,,
2015,1,1,4,US,2013,N584UW,LAX,CLT,30,44,14,13,57,273,249,228,2125,745,8,803,753,-10,0,0,,,,,,
2015,1,1,4,AA,1112,N3LAAA,SFO,DFW,30,19,-11,17,36,195,193,173,1464,529,3,545,532,-13,0,0,,,,,,
2015,1,1,4,DL,1173,N826DN,LAS,ATL,30,33,3,12,45,221,203,186,1747,651,5,711,656,-15,0,0,,,,,,


In [8]:
res_columns = ["DIVERTED","CANCELLED","CANCELLATION_REASON","AIR_SYSTEM_DELAY", \
"SECURITY_DELAY","AIRLINE_DELAY","LATE_AIRCRAFT_DELAY","WEATHER_DELAY"]
flights_df.select(res_columns).show(2)

+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|       0|        0|               null|            null|          null|         null|               null|         null|
|       0|        0|               null|            null|          null|         null|               null|         null|
+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
only showing top 2 rows



In [9]:
# DIVERTED
# CANCELLED
# CANCELLATION_REASON
# AIR_SYSTEM_DELAY
# SECURITY_DELAY
# AIRLINE_DELAY
# LATE_AIRCRAFT_DELAY
# WEATHER_DELAY

In [10]:
airlines_df = 0
airports_df = 0