# Flight Data Fusion

In [1]:
import sys
sys.path.append("..")
# Spark libs
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.types import TimestampType
# helpers
from helpers.data_prep_and_print import print_df
from helpers.path_translation import translate_to_file_string

### Input Files

In [2]:
input_file_aaa = translate_to_file_string("../data/2011-12-01-dataaa.csv")
input_file_airtravelcenter = translate_to_file_string("../data/2011-12-01-dataairtravelcenter.csv")
input_file_dfw = translate_to_file_string("../data/2011-12-01-datadfw.csv")

### Spark Session Creation

In [3]:
spark = (SparkSession
       .builder
       .appName("Flight Data Fusion")
       .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")

### Open the files in data frames

In [4]:
dfaaa = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ";") \
       .csv(input_file_aaa)
dfatc = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ";") \
       .csv(input_file_airtravelcenter)
dfdfw = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ";") \
       .csv(input_file_dfw) \
       .withColumn("depact", expr("depact").cast(TimestampType())) \
       .withColumn("arrivalact", expr("arrivalact").cast(TimestampType()))


### Union all Dataframes

In [6]:
# TODO combine the dataframes

## Data Structure for each dataframe
print(dfaaa.printSchema())
print(dfatc.printSchema())
print(dfdfw.printSchema())
### Same structure everywhere -> Join dataframes -> append

## append data frames

df_all = dfaaa.union(dfatc).union(dfdfw)
df_all.show()

root
 |-- website: string (nullable = true)
 |-- flightno: string (nullable = true)
 |-- depsched: timestamp (nullable = true)
 |-- depact: timestamp (nullable = true)
 |-- depgate: string (nullable = true)
 |-- arrivalsched: timestamp (nullable = true)
 |-- arrivalact: timestamp (nullable = true)
 |-- arrivalgat: string (nullable = true)

None
root
 |-- website: string (nullable = true)
 |-- flightno: string (nullable = true)
 |-- depsched: timestamp (nullable = true)
 |-- depact: timestamp (nullable = true)
 |-- depgate: string (nullable = true)
 |-- arrivalsched: timestamp (nullable = true)
 |-- arrivalact: timestamp (nullable = true)
 |-- arrivalgat: string (nullable = true)

None
root
 |-- website: string (nullable = true)
 |-- flightno: string (nullable = true)
 |-- depsched: timestamp (nullable = true)
 |-- depact: timestamp (nullable = true)
 |-- depgate: string (nullable = true)
 |-- arrivalsched: timestamp (nullable = true)
 |-- arrivalact: timestamp (nullable = true)
 |-- ar

### Remove Duplicates

In [26]:
# TODO Remove duplicates

# droped_df = df_all.dropDuplicates(['flightno', 'depsched'])
droped_df = df_all.dropDuplicates(['flightno'])
droped_df.show()

+-------+---------------+-------------------+-------------------+-------+-------------------+-------------------+----------+
|website|       flightno|           depsched|             depact|depgate|       arrivalsched|         arrivalact|arrivalgat|
+-------+---------------+-------------------+-------------------+-------+-------------------+-------------------+----------+
|     aa|  AA-10-LAX-JFK|2011-12-01 21:20:00|               NULL|    47B|2011-12-02 05:30:00|               NULL|        42|
|     aa|AA-1007-MIA-PHX|2011-12-01 16:55:00|2011-12-01 17:08:00|     D5|2011-12-01 20:05:00|2011-12-01 19:55:00|         4|
|     aa|AA-1007-TPA-MIA|2011-12-01 13:55:00|2011-12-01 14:07:00|    F78|2011-12-01 15:00:00|2011-12-01 14:57:00|        D5|
|     aa|AA-1011-MIA-GCM|2011-12-01 19:15:00|2011-12-01 19:29:00|    D17|2011-12-01 20:40:00|2011-12-01 20:36:00|      NULL|
|     aa|AA-1039-ATL-MIA|2011-12-01 13:25:00|2011-12-01 13:46:00|    T11|2011-12-01 15:20:00|2011-12-01 15:14:00|       D48|


### Aggregation of the instances

In [None]:
# TODO make useful data fusions

## figure out which data set is most complete / has the least null-data in each column
Dict_Null_aaa = {col:dfaaa.filter(dfaaa[col].isNull()).count() for col in dfaaa.columns}
print(Dict_Null_aaa)

Dict_Null_atc = {col:dfatc.filter(dfatc[col].isNull()).count() for col in dfatc.columns}
print(Dict_Null_atc)

Dict_Null_dfw = {col:dfdfw.filter(dfdfw[col].isNull()).count() for col in dfdfw.columns}
print(Dict_Null_dfw)


{'website': 0, 'flightno': 0, 'depsched': 0, 'depact': 47, 'depgate': 3, 'arrivalsched': 0, 'arrivalact': 87, 'arrivalgat': 14}
{'website': 0, 'flightno': 0, 'depsched': 736, 'depact': 473, 'depgate': 1209, 'arrivalsched': 736, 'arrivalact': 473, 'arrivalgat': 1209}
{'website': 0, 'flightno': 0, 'depsched': 24, 'depact': 72, 'depgate': 24, 'arrivalsched': 48, 'arrivalact': 72, 'arrivalgat': 48}


In [25]:
## fill nan based on flightno and depsched

df_all.groupby(['flightno']).count().show()

+---------------+-----+
|       flightno|count|
+---------------+-----+
|AA-4180-ORD-CLT|    2|
|AA-2495-BDL-DFW|    3|
|AA-2493-DFW-LAX|    3|
|AA-4388-ORD-DTW|    2|
|AA-4064-ORD-CLE|    2|
|AA-2049-DFW-DEN|    2|
|AA-4010-CLT-ORD|    2|
|AA-1775-ORD-LAS|    2|
|AA-1201-ORD-STL|    2|
| AA-692-MIA-EWR|    2|
|AA-3804-PHL-ORD|    2|
|AA-1687-MIA-SJU|    2|
|AA-1044-DFW-CMH|    3|
|AA-3823-LAX-DEN|    2|
|  AA-75-IAD-LAX|    2|
|  AA-18-SFO-JFK|    2|
| AA-202-LAX-MIA|    2|
|AA-3739-ORD-EWR|    2|
|AA-3533-MIA-CLT|    2|
|AA-4310-CLT-ORD|    2|
+---------------+-----+
only showing top 20 rows



In [None]:
spark.stop()