# San Francisco Airport arrivals from within California

All instructions, code comments, etc. in this notebook are part of project submission for OMSCS Course CSE6242 


#### Pyspark Imports
<span style="color:red">*All the imports required to run the pyspark statements in this notebook*</span>

In [3]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

#### Define Spark Context

In [4]:
sc = pyspark.SparkContext(appName="DVA-CA-Analysis")
sqlContext = SQLContext(sc)

#### Functions to load data

In [5]:
def load_ca_airport_data():
    ca_airports_df = sqlContext.read.option("header",True).csv("CA_Airports.csv")
    return ca_airports_df

In [6]:
def load_ca_covid_data():
    ca_covid_df = sqlContext.read.option("header",True).csv("CA_County_Covid.csv")
    return ca_covid_df

In [7]:
def load_sfo_arrivals_data():
    sfo_arrivals_df = sqlContext.read.option("header",True).csv("SFO_Arrivals.csv")
    return sfo_arrivals_df

In [8]:
ca_airports_df = load_ca_airport_data()
ca_airports_df.printSchema()

root
 |-- City_served: string (nullable = true)
 |-- FAA: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- Airport_name: string (nullable = true)
 |-- Role: string (nullable = true)
 |-- Enpl.: string (nullable = true)
 |-- COUNTY: string (nullable = true)



In [9]:
ca_covid_df = load_ca_covid_data()
ca_covid_df = ca_covid_df.withColumn("14_DAY_AVERAGE", ca_covid_df["14_DAY_AVERAGE"].cast(IntegerType()))
ca_covid_df.printSchema()

root
 |-- COUNTY: string (nullable = true)
 |-- 14_DAY_AVERAGE: integer (nullable = true)



In [10]:
sfo_arrivals_df = load_sfo_arrivals_data()
sfo_arrivals_df.printSchema()


root
 |-- TIME: string (nullable = true)
 |-- FLIGHT: string (nullable = true)
 |-- FROM_AIRPORT: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRCRAFT: string (nullable = true)
 |-- STATUS: string (nullable = true)



##  Enrich County names from where the flight is arriving into San Francisco

Join San Francisco Airport arrivals with Califronia Airport data based on IATA column.
This will enrich county into all arrivals from within California.   

In [11]:
def enrich_county_in_sfo_arrivals(sfo_arrivals_df, ca_airports_df):
    '''
    input: sfo_arrivals_df a dataframe, ca_airports_df a dataframe
    output: county_enriched_sfo_arrivals_df a dataframe with county populated into SFO arrivals
    '''
    county_enriched_sfo_arrivals_df = sfo_arrivals_df.join(ca_airports_df, on =['IATA'], how='inner')\
    .drop('City_served')\
    .drop('FAA')\
    .drop('ICAO')\
    .drop('Airport_name')\
    .drop('Role')\
    .drop('Enpl.')
    
    # END YOUR CODE HERE -----------
    return county_enriched_sfo_arrivals_df

In [12]:
county_enriched_sfo_arrivals_df = enrich_county_in_sfo_arrivals(sfo_arrivals_df, ca_airports_df)

In [13]:
county_enriched_sfo_arrivals_df.show()

+----+----+------+----------------+------------------+-------------+---------+---------------+
|IATA|TIME|FLIGHT|    FROM_AIRPORT|           AIRLINE|     AIRCRAFT|   STATUS|         COUNTY|
+----+----+------+----------------+------------------+-------------+---------+---------------+
| LAX|0:30| OZ285|    Los Angeles |   Asiana Airlines|         B741|Scheduled|    Los Angeles|
| LAX|0:30| OZ286|    Los Angeles |   Asiana Airlines|         B741|Scheduled|    Los Angeles|
| LAX|1:25| KE214|    Los Angeles |        Korean Air|         B744|Scheduled|    Los Angeles|
| ONT|7:34|UA5572|        Ontario |    United Express|E75L (N119SY)|Scheduled| San Bernardino|
| ACV|7:38|UA5313|         Arcata |    United Express|E75L (N116SY)|Scheduled|       Humboldt|
| BFL|7:38|UA5631|    Bakersfield |    United Express|E75L (N621UX)|Scheduled|           Kern|
| SBP|7:38|UA5882|San Luis Obispo |    United Express|CRJ2 (N652BR)|Scheduled|San Luis Obispo|
| SMF|7:44|UA5324|     Sacramento |    United Expr

In [14]:

sfo_arrivals_df.count()

275

In [15]:
county_enriched_sfo_arrivals_df.count()

74

In [None]:
##  Enrich Covid Measure Type based on county information.


Join San Francisco Airport arrivals with County to populate covid measure type

In [17]:
covid_measure_enriched_in_sfo_arrivals = enrich_covid_measure_in_sfo_arrivals(county_enriched_sfo_arrivals_df, ca_covid_df)

In [18]:
covid_measure_enriched_in_sfo_arrivals.show()

+---------------+----+----+------+----------------+------------------+-------------+---------+--------------+------------------+
|         COUNTY|IATA|TIME|FLIGHT|    FROM_AIRPORT|           AIRLINE|     AIRCRAFT|   STATUS|14_DAY_AVERAGE|COVID_MEASURE_TYPE|
+---------------+----+----+------+----------------+------------------+-------------+---------+--------------+------------------+
|    Los Angeles| LAX|0:30| OZ285|    Los Angeles |   Asiana Airlines|         B741|Scheduled|          1434|              High|
|    Los Angeles| LAX|0:30| OZ286|    Los Angeles |   Asiana Airlines|         B741|Scheduled|          1434|              High|
|    Los Angeles| LAX|1:25| KE214|    Los Angeles |        Korean Air|         B744|Scheduled|          1434|              High|
| San Bernardino| ONT|7:34|UA5572|        Ontario |    United Express|E75L (N119SY)|Scheduled|           420|              High|
|       Humboldt| ACV|7:38|UA5313|         Arcata |    United Express|E75L (N116SY)|Scheduled|   

In [19]:
covid_measure_enriched_in_sfo_arrivals.select('FLIGHT').distinct().count()

74

In [20]:
covid_measure_enriched_in_sfo_arrivals.count()

74

In [None]:
covid_measure_enriched_in_sfo_arrivals.coalesce(1).write.csv('SFO_Arrivals_From_Within_CA', header=True)

In [29]:
covid_measure_enriched_in_sfo_arrivals.filter(col('COVID_MEASURE_TYPE') == 'Low').show() 

In [21]:
covid_measure_enriched_in_sfo_arrivals.filter(col('COVID_MEASURE_TYPE') == 'Low').show() 

+--------+----+-----+------+------------+--------------+-------------+---------+--------------+------------------+
|  COUNTY|IATA| TIME|FLIGHT|FROM_AIRPORT|       AIRLINE|     AIRCRAFT|   STATUS|14_DAY_AVERAGE|COVID_MEASURE_TYPE|
+--------+----+-----+------+------------+--------------+-------------+---------+--------------+------------------+
|Humboldt| ACV| 7:38|UA5313|     Arcata |United Express|E75L (N116SY)|Scheduled|             2|               Low|
|Humboldt| ACV|15:23|UA5386|     Arcata |United Express|E75L (N120SY)|Scheduled|             2|               Low|
+--------+----+-----+------+------------+--------------+-------------+---------+--------------+------------------+



In [22]:
df=covid_measure_enriched_in_sfo_arrivals

##  Get Flight Count from a given county into San Francisco airport.

Get the Flight Count for San Francisco airport per Source airport.

In [24]:
county_measure_groupedDf = df.groupBy(df['COUNTY'], df['COVID_MEASURE_TYPE'])\
                                                          .agg({'FLIGHT':'count'}).withColumnRenamed("count(FLIGHT)","SFO_FLIGHT_COUNT")\
    .orderBy(col('SFO_FLIGHT_COUNT'), ascending=False)
county_measure_groupedDf.show()

+---------------+------------------+----------------+
|         COUNTY|COVID_MEASURE_TYPE|SFO_FLIGHT_COUNT|
+---------------+------------------+----------------+
|    Los Angeles|              High|              30|
|      San Diego|              High|              11|
|         Orange|              High|               8|
|      Riverside|              High|               7|
|     Sacramento|              High|               3|
| San Bernardino|              High|               3|
|San Luis Obispo|            Medium|               2|
|       Humboldt|               Low|               2|
|  Santa Barbara|            Medium|               2|
|         Fresno|              High|               2|
|        Alameda|            Medium|               1|
|           Kern|            Medium|               1|
|    Santa Clara|              High|               1|
|         Shasta|            Medium|               1|
+---------------+------------------+----------------+



In [27]:
county_measure_groupedDf.coalesce(1).write.csv('CA_County_SFO_FlightCount', header=True)