In [1]:
import os
import pandas as pd
import numpy as np


spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connected to cloud.r-pro0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                       

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-01-29 20:37:36--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.2’


2022-01-29 20:37:36 (6.48 MB/s) - ‘postgresql-42.2.16.jar.2’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Final-Project").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
from pyspark.sql import *
from pyspark.sql.functions import col, when
from pyspark.sql import SparkSession
from pyspark.sql.functions import lpad
from pyspark.sql.functions import concat
from pyspark.sql.functions import lit
from pyspark.sql.functions import substring

In [5]:
from pyspark import SparkFiles
#url = "https://finalprojectstorage10.s3.us-east-2.amazonaws.com/2018.csv"
url = "https://storage.googleapis.com/uftairlinedbbucket/2018.csv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("2018.csv"), header=True)
#df.show()

In [6]:
from pyspark.sql.functions import *

#Timestamp function to fix formatting
def padTimeStamp(x,y):
  if y is None:
      y = "0"

  y = y.replace(".0","")
  return x + " " + y.zfill(4)[0:2] + ":" + y.zfill(4)[2:4]

#Create udf becuase python fucntions do not work with pyspark
padTimeStampUDF = udf(lambda x,y: padTimeStamp(x,y)) 

#inital dataframe applying the udf to pad and format the timestamp columns and pick the rest of the columns
initial_df = df.select(
                    "OP_CARRIER",
                    "OP_CARRIER_FL_NUM", 
                    "ORIGIN",
                    padTimeStampUDF(df["FL_DATE"],df["CRS_DEP_TIME"]).alias("CRS_DEPARTURE_TIMESTAMP"),
                    df.columns[0],df.columns[4],
                    padTimeStampUDF(df["FL_DATE"],df["DEP_TIME"]).alias("ACTUAL_DEPARTURE_TIMESTAMP"),
                    df.columns[7],df.columns[8],
                    padTimeStampUDF(df["FL_DATE"],df["WHEELS_OFF"]).alias("WHEELS_OFF_TIMESTAMP"),
                    padTimeStampUDF(df["FL_DATE"],df["WHEELS_ON"]).alias("WHEELS_ON_TIMESTAMP"),
                    df.columns[11],
                    padTimeStampUDF(df["FL_DATE"],df["CRS_ARR_TIME"]).alias("CRS_ARRIVAL_TIMESTAMP"),
                    padTimeStampUDF(df["FL_DATE"],df["ARR_TIME"]).alias("ACTUAL_ARRIVAL_TIMESTAMP"), 
                    df.columns[14], df.columns[15], df.columns[16], df.columns[17], df.columns[18], df.columns[19], df.columns[20], df.columns[21], df.columns[22], df.columns[23], df.columns[24], df.columns[25], df.columns[26] 
                  )
#initial_df.printSchema()
#initial_df.show()

#Changed the 1 and 0s to true and false
initial_df = initial_df.withColumn("DIVERTED", when(col("DIVERTED") == '1.0', True).otherwise(False))
initial_df = initial_df.withColumn("CANCELLED", when(col("CANCELLED") == '1.0', True).otherwise(False))

#Final df where we change the column names and cast the datatypes
final_df = initial_df.selectExpr(
                                "OP_CARRIER as AIRLINE_CARRIER_CODE",
                                "OP_CARRIER_FL_NUM",
                                "ORIGIN as ORIGIN_AIRPORT_CODE",
                                "cast(CRS_DEPARTURE_TIMESTAMP as timestamp) CRS_DEPARTURE_TIMESTAMP",
                                "cast(FL_DATE as date) FLIGHT_DT",
                                "DEST as DEST_AIRPORT_CODE",
                                "cast(ACTUAL_DEPARTURE_TIMESTAMP as timestamp) ACTUAL_DEPARTURE_TIMESTAMP",
                                "cast(DEP_DELAY as integer) as DEPARTURE_DELAY_MINUTES",
                                "cast(TAXI_OUT as integer) TAXI_OUT_MINUTES",
                                "cast(WHEELS_OFF_TIMESTAMP as timestamp) WHEELS_OFF_TIMESTAMP",
                                "cast(WHEELS_ON_TIMESTAMP as timestamp) WHEELS_ON_TIMESTAMP",
                                "cast(TAXI_IN as integer) TAXI_IN_MINUTES",
                                "cast(CRS_ARRIVAL_TIMESTAMP as timestamp) CRS_ARRIVAL_TIMESTAMP",
                                "cast(CRS_ARRIVAL_TIMESTAMP as timestamp) ACTUAL_ARRIVAL_TIMESTAMP",
                                "cast(ARR_DELAY as integer) ARRIVAL_DELAY_MINUTES",
                                "CANCELLED as CANCELLED_IND",
                                "CANCELLATION_CODE",
                                "DIVERTED as DIVERTED_IND",
                                "cast(CRS_ELAPSED_TIME as integer) CRS_ELAPSED_TIME_MINUTES",
                                "cast(ACTUAL_ELAPSED_TIME as integer) ACTUAL_ELAPSED_TIME_MINUTES",
                                "cast(AIR_TIME as integer) AIR_TIME_MINUTES",
                                "cast(DISTANCE as integer) DISTANCE_MILES",
                                "cast(CARRIER_DELAY as integer) CARRIER_DELAY_MINUTES",
                                "cast(WEATHER_DELAY as integer) WEATHER_DELAY_MINUTES",
                                "cast(NAS_DELAY as integer) NAS_DELAY_MINUTES",
                                "cast(SECURITY_DELAY as integer) SECURITY_DELAY_MINUTES",
                                "cast(LATE_AIRCRAFT_DELAY as integer) LATE_AIRCRAFT_DELAY_MINUTES"
                                )
#Fill the null values with 0
final_df = final_df.na.fill(value=0,subset=["CARRIER_DELAY_MINUTES", "WEATHER_DELAY_MINUTES", "NAS_DELAY_MINUTES", "SECURITY_DELAY_MINUTES","LATE_AIRCRAFT_DELAY_MINUTES"])

#final_df.show()

In [7]:
final_df.printSchema()

root
 |-- AIRLINE_CARRIER_CODE: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN_AIRPORT_CODE: string (nullable = true)
 |-- CRS_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- FLIGHT_DT: date (nullable = true)
 |-- DEST_AIRPORT_CODE: string (nullable = true)
 |-- ACTUAL_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- DEPARTURE_DELAY_MINUTES: integer (nullable = true)
 |-- TAXI_OUT_MINUTES: integer (nullable = true)
 |-- WHEELS_OFF_TIMESTAMP: timestamp (nullable = true)
 |-- WHEELS_ON_TIMESTAMP: timestamp (nullable = true)
 |-- TAXI_IN_MINUTES: integer (nullable = true)
 |-- CRS_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ACTUAL_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ARRIVAL_DELAY_MINUTES: integer (nullable = true)
 |-- CANCELLED_IND: boolean (nullable = false)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED_IND: boolean (nullable = false)
 |-- CRS_ELAPSED_TIME_MINUTES: integer (nullable = true)
 |-- A

In [8]:
#final_df.filter((final_df.AIRLINE_CARRIER_CODE == "YX") & (final_df.OP_CARRIER_FL_NUM == "3624") & (final_df.CRS_DEPARTURE_TIMESTAMP == '2018-06-21 20:07:00')).show(100)

## Airport Names 

In [9]:
url2 = "https://finalprojectstorage10.s3.us-east-2.amazonaws.com/airport.csv"
spark.sparkContext.addFile(url2)
airport_initial_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("airport.csv"), header=True)
#airport_initial_df.show()

In [10]:
#Select only relevant columns to later join with finaldf
airport_df = airport_initial_df.select("iata_code", "name", "latitude_deg", "longitude_deg")
#airport_df.show()
#airport_df.count()

In [11]:
#Drop null and duplicates
airport_df = airport_df.dropna()
airport_df = airport_df.drop_duplicates(['iata_code'])
airport_df.count()

9131

In [12]:
#Fix LAX NAME
airport_df = airport_df.withColumn("name", when(airport_df.name == " ","Los Angeles International Airport").otherwise(airport_df.name))


In [13]:
from pyspark.sql.types import *

test_df = final_df
#Origin Airport Code Join
joined_df = test_df.join(airport_df, test_df.ORIGIN_AIRPORT_CODE == airport_df.iata_code,"left")
joined_df = joined_df.withColumnRenamed("name","ORIGIN_AIRPORT_NAME").withColumnRenamed("latitude_deg","ORIGIN_LATITUDE_DEG").withColumnRenamed("longitude_deg","ORIGIN_LONGITUDE_DEG").drop(joined_df.iata_code)

#Departure Airport Code Join and rename columns
joined_df = joined_df.join(airport_df, joined_df.DEST_AIRPORT_CODE == airport_df.iata_code,"left")
joined_df = joined_df.withColumnRenamed("name","DEST_AIRPORT_NAME").withColumnRenamed("latitude_deg","DEST_LATITUDE_DEG").withColumnRenamed("longitude_deg","DEST_LONGITUDE_DEG").drop(joined_df.iata_code)

#Cast latitude and longitude to float
joined_df = joined_df.withColumn("ORIGIN_LATITUDE_DEG",col("ORIGIN_LATITUDE_DEG").cast(FloatType())) \
                     .withColumn("ORIGIN_LONGITUDE_DEG",col("ORIGIN_LONGITUDE_DEG").cast(FloatType())) \
                     .withColumn("DEST_LATITUDE_DEG",col("DEST_LATITUDE_DEG").cast(FloatType())) \
                     .withColumn("DEST_LONGITUDE_DEG",col("DEST_LONGITUDE_DEG").cast(FloatType()))

In [14]:
joined_df.printSchema()

root
 |-- AIRLINE_CARRIER_CODE: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN_AIRPORT_CODE: string (nullable = true)
 |-- CRS_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- FLIGHT_DT: date (nullable = true)
 |-- DEST_AIRPORT_CODE: string (nullable = true)
 |-- ACTUAL_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- DEPARTURE_DELAY_MINUTES: integer (nullable = true)
 |-- TAXI_OUT_MINUTES: integer (nullable = true)
 |-- WHEELS_OFF_TIMESTAMP: timestamp (nullable = true)
 |-- WHEELS_ON_TIMESTAMP: timestamp (nullable = true)
 |-- TAXI_IN_MINUTES: integer (nullable = true)
 |-- CRS_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ACTUAL_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ARRIVAL_DELAY_MINUTES: integer (nullable = true)
 |-- CANCELLED_IND: boolean (nullable = false)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED_IND: boolean (nullable = false)
 |-- CRS_ELAPSED_TIME_MINUTES: integer (nullable = true)
 |-- A

## Write to Database

In [15]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://34.74.181.190:5432/airlinedb_final"
config = {"user":"airlinedb", 
          "password": "KFG5ruuAfBBJGqhz", 
          "driver":"org.postgresql.Driver"}


In [17]:
#Write Joined Dataframe to joined flightdata table in RDS
joined_df.write.jdbc(url=jdbc_url, table='project.joined_flight_data', mode=mode, properties=config)

In [18]:
!curl ipecho.net/plain

34.73.85.97