# PRECONDITIONS

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.app.id', 'local-1616503488489'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '40761'),
 ('spark.driver.host', 'fa6ae6620e7d'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.executor.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.startTime', '1616503486876')]

In [534]:
df_csv = spark.read.option('sep',',').option("header",True).option("inferSchema", True).csv("./flights.csv")
# df_csv = spark.read.option('sep',',').option("header",True).csv("./flights.csv")

In [535]:
df = df_csv

In [536]:
# df.printSchema()

In [537]:
# df.show(1)

# 1.2. Implement each query using Spark Dataframe API

In [None]:
from pyspark.sql.functions import concat_ws
import pyspark.sql.functions as F
from pyspark.sql.window import Window

dfr1 = df\
.select(
    concat_ws(
        "-",
        F.format_string('%04d', df['YEAR']),
        F.format_string('%02d', df['MONTH']),
        F.format_string('%02d', df['DAY'])
    )
    .alias('departure_date'),
#     df['SCHEDULED_DEPARTURE'],
#     df['DEPARTURE_TIME'],
    F.when(F.col("DEPARTURE_TIME").isNull(), '00:00:00')
    .otherwise(
        concat_ws(
            ":",
            F.format_string('%02d', F.floor(df['DEPARTURE_TIME']/100)), 
            F.format_string('%02d', df['DEPARTURE_TIME']%100), 
            F.lit("00")
        )
    )
    .alias('departure_time'),
    df['AIRLINE'].alias('airline'),
    df['FLIGHT_NUMBER'].alias('flight_number'),
)
# .where(df['SCHEDULED_DEPARTURE'].between(59,101) )
# .orderBy(['departure_date', 'departure_time_f'])

# dfr.show(100)

dfr2 = dfr1\
.withColumn(
    'daily_flight_serial_number',
    F.row_number().over(
        Window.partitionBy('departure_date').orderBy('departure_time')
    )
)\
.withColumn(
    'airline_daily_flights_count', 
    F.count('*').over(
        Window.partitionBy('departure_date', 'airline')
    )
)\
.withColumn(
    'time_since_previous_departure',
    F.floor(
        (
            F.unix_timestamp('departure_time', 'HH:mm:ss') - F.lag(F.unix_timestamp('departure_time', 'HH:mm:ss'))
            .over(
                Window.partitionBy('departure_date', 'airline').orderBy('departure_time')
            )
        ) / 60
    )
)\
.orderBy(['departure_date', 'daily_flight_serial_number'])

dfr2.show(10)


In [539]:
df_parquet = spark.read.parquet('golden_dataset/query1')

In [540]:
df = df_parquet
# df.printSchema()
df.show(10)

+--------------+--------------+-------+-------------+--------------------------+---------------------------+-----------------------------+
|departure_date|departure_time|airline|flight_number|daily_flight_serial_number|airline_daily_flights_count|time_since_previous_departure|
+--------------+--------------+-------+-------------+--------------------------+---------------------------+-----------------------------+
|    2015-01-01|      00:00:00|     AA|         1285|                        59|                       1473|                            0|
|    2015-01-01|      00:00:00|     MQ|         3263|                       295|                        929|                            0|
|    2015-01-01|      00:00:00|     OO|         5445|                        33|                       1487|                            0|
|    2015-01-01|      06:05:00|     UA|         1224|                       840|                       1285|                            2|
|    2015-01-01|      06:12

# 1.2.2. Compare query results with golden dataset (to be provided) using Spark Dataframe API. It's allowed to use EXCEPT clause

In [541]:
diff = dfr2.subtract(df_parquet)

In [542]:
# diff.show()
diff.count()

10313

# 1.2.3. Save difference between datasets (if any) to both Excel and Parquet files

In [543]:
diff.toPandas().to_csv('diff.csv')
diff.write.parquet("diff.parquet")

AnalysisException: path file:/home/jovyan/diff.parquet already exists.

In [None]:
import pandas as pd 
import numpy as np 

# Reading the csv file 
df_csv = pd.read_csv('diff.csv') 
  
# saving xlsx file 
xlsx_writer = pd.ExcelWriter('diff.xlsx') 
df_csv.to_excel(GFG, index = False) 
  
xlsx_writer.save() 

# 1.2.4. Create test report consisting of:
	- Spark Dataframe API query
	- datasets comparison query
	- difference between datasets' rows in Excel format
	- difference between datasets' rows in Parquet format

In [None]:
diff.describe().show()

In [None]:
diff.summary().show()

# 1.1. Implement each query in Spark SQL (see org.apache.spark.sql.SparkSession#sql)

In [3]:
flights_csv = spark.read\
.option('multiline', True)\
.option("mode", "FAILFAST")\
.option("header", True)\
.option("inferSchema", "true")\
.csv('flights.csv')

In [4]:
# flights_csv.show()
flights_csv.createOrReplaceTempView('flights_csv')

In [5]:
spark.sql('SELECT YEAR, MONTH, AIRLINE, ORIGIN_AIRPORT, DESTINATION_AIRPORT, DEPARTURE_DELAY, ARRIVAL_DELAY, CANCELLED, DIVERTED FROM flights_csv').show(5)

+----+-----+-------+--------------+-------------------+---------------+-------------+---------+--------+
|YEAR|MONTH|AIRLINE|ORIGIN_AIRPORT|DESTINATION_AIRPORT|DEPARTURE_DELAY|ARRIVAL_DELAY|CANCELLED|DIVERTED|
+----+-----+-------+--------------+-------------------+---------------+-------------+---------+--------+
|2015|    1|     AS|           ANC|                SEA|            -11|          -22|        0|       0|
|2015|    1|     AA|           LAX|                PBI|             -8|           -9|        0|       0|
|2015|    1|     US|           SFO|                CLT|             -2|            5|        0|       0|
|2015|    1|     AA|           LAX|                MIA|             -5|           -9|        0|       0|
|2015|    1|     AS|           SEA|                ANC|             -1|          -21|        0|       0|
+----+-----+-------+--------------+-------------------+---------------+-------------+---------+--------+
only showing top 5 rows



In [None]:
df = spark.sql('''
SELECT 
    count(AIRLINE)
FROM
    flights_csv
WHERE 
    YEAR=2015 and MONTH=1 and AIRLINE="AA" and ORIGIN_AIRPORT = "LAX" and (
        ((DEPARTURE_DELAY > 0 AND ARRIVAL_DELAY > 0) OR (CANCELLED != 0)) 
    )
group by AIRLINE
''')

df.show(5)

In [None]:
df = spark.sql('''
SELECT 
    count(*)
FROM
    flights_csv
WHERE 
    YEAR=2015 and MONTH=1 and AIRLINE="AA" and DESTINATION_AIRPORT = "MIA" and (
        ((DEPARTURE_DELAY <= 0 AND ARRIVAL_DELAY > 0) OR (DIVERTED != 0))
    )
group by AIRLINE
''')

df.show(5)

In [6]:
airlines_csv = spark.read\
.option('multiline', True)\
.option("mode", "FAILFAST")\
.option("header", True)\
.option("inferSchema", "true")\
.csv('airlines.csv')

In [7]:
airlines_csv.show()

+---------+--------------------+
|IATA_CODE|             AIRLINE|
+---------+--------------------+
|       UA|United Air Lines ...|
|       AA|American Airlines...|
|       US|     US Airways Inc.|
|       F9|Frontier Airlines...|
|       B6|     JetBlue Airways|
|       OO|Skywest Airlines ...|
|       AS|Alaska Airlines Inc.|
|       NK|    Spirit Air Lines|
|       WN|Southwest Airline...|
|       DL|Delta Air Lines Inc.|
|       EV|Atlantic Southeas...|
|       HA|Hawaiian Airlines...|
|       MQ|American Eagle Ai...|
|       VX|      Virgin America|
+---------+--------------------+



In [8]:
airlines_csv.createOrReplaceTempView('airlines_iata')

In [9]:
spark.sql('SELECT * FROM airlines_iata').show()

+---------+--------------------+
|IATA_CODE|             AIRLINE|
+---------+--------------------+
|       UA|United Air Lines ...|
|       AA|American Airlines...|
|       US|     US Airways Inc.|
|       F9|Frontier Airlines...|
|       B6|     JetBlue Airways|
|       OO|Skywest Airlines ...|
|       AS|Alaska Airlines Inc.|
|       NK|    Spirit Air Lines|
|       WN|Southwest Airline...|
|       DL|Delta Air Lines Inc.|
|       EV|Atlantic Southeas...|
|       HA|Hawaiian Airlines...|
|       MQ|American Eagle Ai...|
|       VX|      Virgin America|
+---------+--------------------+



In [10]:
df_json = spark.read.option('multiline', True).option("mode", "FAILFAST").option("inferSchema", "true").json('airlines.json')

In [11]:
import pyspark.sql.functions as F

df = df_json.select(
    F.col('Airport.Code').alias('Airport.Code'),
    F.col('Airport.Name').alias('Airport.Name'),

    F.col('Time.Label').alias('Time.Label'),
    F.col('Time.Month').alias('Time.Month'),
    F.col('Time.Month Name').alias('Time.Month Name'),
    F.col('Time.Year').alias('Time.Year'),
    
    F.col('Statistics.# of Delays.Carrier').alias('Statistics.# of Delays.Carrier'),
    F.col('Statistics.# of Delays.Late Aircraft').alias('Statistics.# of Delays.Late Aircraft'),
    F.col('Statistics.# of Delays.National Aviation System').alias('Statistics.# of Delays.National Aviation System'),
    F.col('Statistics.# of Delays.Security').alias('Statistics.# of Delays.Security'),
    F.col('Statistics.# of Delays.Weather').alias('Statistics.# of Delays.Weather'),

    F.col('Statistics.Carriers.Names').alias('Statistics.Carriers.Names'),
    F.col('Statistics.Carriers.Total').alias('Statistics.Carriers.Total'),

    F.col('Statistics.Flights.Cancelled').alias('Statistics.Flights.Cancelled'),
    F.col('Statistics.Flights.Delayed').alias('Statistics.Flights.Delayed'),
    F.col('Statistics.Flights.Diverted').alias('Statistics.Flights.Diverted'),
    F.col('Statistics.Flights.On Time').alias('Statistics.Flights.On Time'),
    F.col('Statistics.Flights.Total').alias('Statistics.Flights.Total'),
    
    F.col('Statistics.Minutes Delayed.Carrier').alias('Statistics.Minutes Delayed.Carrier'),
    F.col('Statistics.Minutes Delayed.Late Aircraft').alias('Statistics.Minutes Delayed.Late Aircraft'),
    F.col('Statistics.Minutes Delayed.National Aviation System').alias('Statistics.Minutes Delayed.National Aviation System'),
    F.col('Statistics.Minutes Delayed.Security').alias('Statistics.Minutes Delayed.Security'),
    F.col('Statistics.Minutes Delayed.Total').alias('Statistics.Minutes Delayed.Total'),
    F.col('Statistics.Minutes Delayed.Weather').alias('Statistics.Minutes Delayed.Weather'),
)

In [None]:
df.show(1)

In [12]:
df.createOrReplaceTempView('airlines_json')

In [13]:
df_json = spark.sql('SELECT * FROM airlines_json')
df_json.show(2)

+------------+--------------------+----------+----------+---------------+---------+------------------------------+------------------------------------+-----------------------------------------------+-------------------------------+------------------------------+-------------------------+-------------------------+----------------------------+--------------------------+---------------------------+--------------------------+------------------------+----------------------------------+----------------------------------------+---------------------------------------------------+-----------------------------------+--------------------------------+----------------------------------+
|Airport.Code|        Airport.Name|Time.Label|Time.Month|Time.Month Name|Time.Year|Statistics.# of Delays.Carrier|Statistics.# of Delays.Late Aircraft|Statistics.# of Delays.National Aviation System|Statistics.# of Delays.Security|Statistics.# of Delays.Weather|Statistics.Carriers.Names|Statistics.Carriers.Total|Sta

In [14]:
df_res = spark.sql('''
SELECT 
    year_month,
    airport_code,
    number_of_delays_for_airport,
    airline_name,
    ifnull(airlines_iata.IATA_CODE, "N/A") as airline_iata_code,
    CASE 
        WHEN airlines_iata.IATA_CODE is null 
        THEN null 
        ELSE (
            SELECT 
                count(*)
            FROM
                flights_csv
            WHERE 
                year_month = CONCAT(`YEAR`, "-", format_number(`MONTH`, "00")) and 
                    AIRLINE=airlines_iata.IATA_CODE and 
                    ORIGIN_AIRPORT = airport_code and (
                        ((DEPARTURE_DELAY > 0 AND ARRIVAL_DELAY > 0) OR (CANCELLED != 0)) 
                    )
            GROUP BY AIRLINE 
            ) + (
            SELECT 
                count(*)
            FROM
                flights_csv
            WHERE 
                year_month = CONCAT(`YEAR`, "-", format_number(`MONTH`, "00")) and
                AIRLINE=airlines_iata.IATA_CODE and
                DESTINATION_AIRPORT = airport_code and (
                    ((DEPARTURE_DELAY <= 0 AND ARRIVAL_DELAY > 0) OR (DIVERTED != 0))
                )
            GROUP BY AIRLINE
            )
    END AS number_of_delays_for_airline_in_airport
FROM (

    SELECT 
        CONCAT(`Time.Year`, "-", format_number(`Time.Month`, "00")) as year_month,
        `Airport.Code` as airport_code,
        (`Statistics.Flights.Cancelled` + `Statistics.Flights.Delayed` + `Statistics.Flights.Diverted`) as number_of_delays_for_airport,
        airline_name
    FROM airlines_json 
        LATERAL VIEW EXPLODE(split(`Statistics.Carriers.Names`, ',')) AS airline_name
    WHERE `Time.Year` = "2015"

)
LEFT JOIN airlines_iata ON airlines_iata.AIRLINE=airline_name
''')
df_res.show(1000)
# df1.count()

+----------+------------+----------------------------+--------------------+-----------------+---------------------------------------+
|year_month|airport_code|number_of_delays_for_airport|        airline_name|airline_iata_code|number_of_delays_for_airline_in_airport|
+----------+------------+----------------------------+--------------------+-----------------+---------------------------------------+
|   2015-01|         ATL|                        4651|American Airlines...|               AA|                                    146|
|   2015-02|         MDW|                        1572|ExpressJet Airlin...|              N/A|                                   null|
|   2015-02|         MDW|                        1572|SkyWest Airlines ...|              N/A|                                   null|
|   2015-02|         DCA|                        1929|      Virgin America|               VX|                                     54|
|   2015-03|         DEN|                        3354|United A

In [15]:
df_res.createOrReplaceTempView('df_res')

In [16]:
df_lve = spark.sql('''
SELECT 
    airline_name
FROM airlines_json
    LATERAL VIEW EXPLODE(split(`Statistics.Carriers.Names`, ',')) AS airline_name
WHERE `Statistics.Minutes Delayed.Carrier` = 61606
''')
# df1.count()
df_lve.show(400)

+--------------------+
|        airline_name|
+--------------------+
|American Airlines...|
|     JetBlue Airways|
|Continental Air L...|
|Delta Air Lines Inc.|
|Atlantic Southeas...|
|AirTran Airways C...|
|America West Airl...|
|Northwest Airline...|
|ExpressJet Airlin...|
|United Air Lines ...|
|     US Airways Inc.|
+--------------------+



# 1.1.2. Compare query results with corresponding golden dataset using Spark SQL. Do not use EXCEPT clause

In [17]:
df_parquet = spark.read.parquet('golden_dataset/query2')
df_parquet.createOrReplaceTempView('df_parquet')

In [18]:
# df_parquet.show()
df_parquet.printSchema()

root
 |-- year_month: string (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- number_of_delays_for_airport: long (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- airline_iata_code: string (nullable = true)
 |-- number_of_delays_for_airline_in_airport: long (nullable = true)



In [19]:
df_res.printSchema()

root
 |-- year_month: string (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- number_of_delays_for_airport: long (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- airline_iata_code: string (nullable = false)
 |-- number_of_delays_for_airline_in_airport: long (nullable = true)



In [20]:
df_diff = spark.sql('''
SELECT *
FROM df_parquet MINUS ALL
    SELECT * FROM df_res
UNION ALL
SELECT *
FROM df_res MINUS ALL
    SELECT * FROM df_parquet
''')
df_diff.createOrReplaceTempView('df_diff')

In [21]:
df_diff.count()

0

# 1.1.3. Save difference between datasets (if any) to both Excel and Parquet files

In [31]:
df_diff.toPandas().to_csv('df_diff_1.csv', header=True)
df_diff.write.parquet("df_diff_1.parquet", 'overwrite')

import pandas as pd 
import numpy as np 

# Reading the csv file 
df_csv = pd.read_csv('df_diff_1.csv') 
  
# saving xlsx file 
xlsx_writer = pd.ExcelWriter('df_diff_1.xlsx') 
df_csv.to_excel(xlsx_writer, index = False) 
  
xlsx_writer.save() 

In [32]:
df_diff.show()

+----------+------------+----------------------------+------------+-----------------+---------------------------------------+
|year_month|airport_code|number_of_delays_for_airport|airline_name|airline_iata_code|number_of_delays_for_airline_in_airport|
+----------+------------+----------------------------+------------+-----------------+---------------------------------------+
+----------+------------+----------------------------+------------+-----------------+---------------------------------------+



# 1.1.4. Create test report consisting of:
	- Spark SQL query
	- datasets comparison query
	- difference between datasets' rows in Excel format
	- difference between datasets' rows in Parquet format

In [33]:
df_diff.describe().show()

+-------+----------+------------+----------------------------+------------+-----------------+---------------------------------------+
|summary|year_month|airport_code|number_of_delays_for_airport|airline_name|airline_iata_code|number_of_delays_for_airline_in_airport|
+-------+----------+------------+----------------------------+------------+-----------------+---------------------------------------+
|  count|         0|           0|                           0|           0|                0|                                      0|
|   mean|      null|        null|                        null|        null|             null|                                   null|
| stddev|      null|        null|                        null|        null|             null|                                   null|
|    min|      null|        null|                        null|        null|             null|                                   null|
|    max|      null|        null|                        null|

In [34]:
df_diff.summary().show()

+-------+----------+------------+----------------------------+------------+-----------------+---------------------------------------+
|summary|year_month|airport_code|number_of_delays_for_airport|airline_name|airline_iata_code|number_of_delays_for_airline_in_airport|
+-------+----------+------------+----------------------------+------------+-----------------+---------------------------------------+
|  count|         0|           0|                           0|           0|                0|                                      0|
|   mean|      null|        null|                        null|        null|             null|                                   null|
| stddev|      null|        null|                        null|        null|             null|                                   null|
|    min|      null|        null|                        null|        null|             null|                                   null|
|    25%|      null|        null|                        null|