In [364]:
import spark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc,asc, sum
from pyspark.sql.functions import when, to_timestamp, isnull, col, split, concat_ws, lpad, row_number, count, lag, minute, explode
from pyspark.sql.window import Window

In [366]:
spark = SparkSession.builder.appName('Basics').getOrCreate()
sc = spark.sparkContext
spark.conf.set('spark.sql.csv.parser.columnPruning.enabled', False)

In [11]:
pwd

'/home/jovyan'

In [388]:
df = spark.read.option("header",True).csv("/home/jovyan/work/datasets/flights.csv")

In [389]:
df.createOrReplaceTempView("table")

#### TASK 1

In [370]:
import pyspark.sql.functions as sf

In [371]:
res_sql=spark.sql('SELECT ORIGIN_AIRPORT, SUM(DEPARTURE_DELAY)  as TOTAL_DELAY FROM table GROUP BY ORIGIN_AIRPORT ORDER BY TOTAL_DELAY DESC LIMIT 10').show()

+--------------+-----------+
|ORIGIN_AIRPORT|TOTAL_DELAY|
+--------------+-----------+
|           ORD|  3930516.0|
|           ATL|  3216786.0|
|           DFW|  2693801.0|
|           DEN|  2297099.0|
|           LAX|  2054876.0|
|           IAH|  1675156.0|
|           SFO|  1633300.0|
|           LAS|  1449151.0|
|           EWR|  1346776.0|
|           MCO|  1324481.0|
+--------------+-----------+



In [372]:
a= spark.read.option('header', True).csv("/home/jovyan/work/datasets/flights.csv")
b = a.selectExpr("ORIGIN_AIRPORT", "CAST(DEPARTURE_DELAY AS INT)").groupBy("ORIGIN_AIRPORT").agg(sum("DEPARTURE_DELAY").alias("TOTAL_DELAY")).orderBy(desc("TOTAL_DELAY")).limit(10).show()


+--------------+-----------+
|ORIGIN_AIRPORT|TOTAL_DELAY|
+--------------+-----------+
|           ORD|    3930516|
|           ATL|    3216786|
|           DFW|    2693801|
|           DEN|    2297099|
|           LAX|    2054876|
|           IAH|    1675156|
|           SFO|    1633300|
|           LAS|    1449151|
|           EWR|    1346776|
|           MCO|    1324481|
+--------------+-----------+



#### TASK 2
##### QUERY 1

##### SQL

In [407]:
res_1=spark.sql("""SELECT  departure_date, departure_time, airline, flight_number, ROW_NUMBER() OVER (PARTITION BY departure_date, airline ORDER BY departure_time ASC) as daily_flight_serial_number,
COUNT(airline)  OVER (PARTITION BY departure_date, airline) airline_daily_flights_count,
lag(minute(departure_time))  OVER (PARTITION BY departure_date, airline ORDER BY departure_time ASC) as time_since_previous_departure
FROM 
(SELECT (CASE WHEN (to_date(CONCAT(CAST(YEAR AS STRING),'-',CAST(MONTH as varchar(2)),'-',CAST(DAY as varchar(2))))) is NULL THEN to_date('2015-01-01') ELSE to_date(CONCAT(CAST(YEAR AS STRING),'-',CAST(MONTH as varchar(2)),'-',CAST(DAY as varchar(2)))) END)as departure_date,
(CASE WHEN  DEPARTURE_TIME is NULL THEN split(to_timestamp('00:00:00'), ' ')[1] ELSE split(to_timestamp(DEPARTURE_TIME), ' ')[1] END) as departure_time, airline, flight_number FROM table) ORDER BY departure_time """)

##### DSL

In [404]:
res_2 = df.withColumn('departure_time', when(isnull('DEPARTURE_TIME'), '00:00:00').otherwise(split(to_timestamp('DEPARTURE_TIME'), ' ')[1])).fillna(
        {'YEAR':2015, 'DAY': 1, 'MONTH':1}).withColumn(
            'departure_date', concat_ws('-', lpad('YEAR', 4,'0'), lpad('MONTH', 2, '0'), lpad('DAY', 2, '0')) ).select('departure_time', 'departure_date', 'airline', 'flight_number').withColumn(
'daily_flight_serial_number', row_number().over(Window.partitionBy('departure_date', 'airline').orderBy(
col('departure_time',).asc()))).withColumn('airline_daily_flights_count', count('AIRLINE').over(Window.partitionBy('departure_date', 'airline'))).withColumn('time_since_previous_departure', lag(minute(to_timestamp('departure_time'))).over(Window.partitionBy('departure_date', 'airline').orderBy(
    col('departure_time').asc())))

##### Сравнение с golden dataset

In [None]:
import pandas 

gd = spark.read.option("header",True).parquet("/home/jovyan/work/datasets/query1/*.parquet")

dif_1_1 = res_1.subtract(gd)
dif_1_2 = gd.subtract(res_1)
dif1 = dif_1_1.unionAll(dif_1_2)


dif_2_1 = res_2.subtract(gd)
dif_2_2 = gd.subtract(res_2)
dif2 = dif_2_1.unionAll(dif_2_2)


dif1.describe().toPandas().to_excel('/home/jovyan/work/datasets/dif_sql_qolden_dataset.xls', sheet_name='Sheet3', index=False)
dif1.write.parquet('/home/jovyan/work/datasets/dif_sql_qolden_dataset_parquet')

In [None]:
dif2.describe().toPandas().to_excel('/home/jovyan/work/datasets/dif_dsl_qolden_dataset.xls', sheet_name='Sheet1', index=False)
dif2.write.parquet('/home/jovyan/work/datasets/dif_dsl_qolden_dataset_parquet')

In [343]:
pip install xlwt

Collecting xlwt
  Downloading xlwt-1.3.0-py2.py3-none-any.whl (99 kB)
[K     |████████████████████████████████| 99 kB 1.3 MB/s eta 0:00:01
[?25hInstalling collected packages: xlwt
Successfully installed xlwt-1.3.0
Note: you may need to restart the kernel to use updated packages.


### TASK_2

#### QUERY_2

In [77]:
airlines_json=spark.read.json('/home/jovyan/work/datasets/airlines.json', multiLine=True)
airlines_csv=spark.read.option('header', True).csv('/home/jovyan/work/datasets/airlines.csv')
airlines_csv.createOrReplaceTempView("table3")
airlines_json.createOrReplaceTempView("table2")

In [57]:
airlines_json.printSchema()

root
 |-- Airport: struct (nullable = true)
 |    |-- Code: string (nullable = true)
 |    |-- Name: string (nullable = true)
 |-- Statistics: struct (nullable = true)
 |    |-- # of Delays: struct (nullable = true)
 |    |    |-- Carrier: long (nullable = true)
 |    |    |-- Late Aircraft: long (nullable = true)
 |    |    |-- National Aviation System: long (nullable = true)
 |    |    |-- Security: long (nullable = true)
 |    |    |-- Weather: long (nullable = true)
 |    |-- Carriers: struct (nullable = true)
 |    |    |-- Names: string (nullable = true)
 |    |    |-- Total: long (nullable = true)
 |    |-- Flights: struct (nullable = true)
 |    |    |-- Cancelled: long (nullable = true)
 |    |    |-- Delayed: long (nullable = true)
 |    |    |-- Diverted: long (nullable = true)
 |    |    |-- On Time: long (nullable = true)
 |    |    |-- Total: long (nullable = true)
 |    |-- Minutes Delayed: struct (nullable = true)
 |    |    |-- Carrier: long (nullable = true)
 |    |  

In [47]:
airlines_json.show()

+--------------------+--------------------+--------------------+
|             Airport|          Statistics|                Time|
+--------------------+--------------------+--------------------+
|[ATL, Atlanta, GA...|[[1009, 1275, 321...|[2003/06, 6, June...|
|[BOS, Boston, MA:...|[[374, 495, 685, ...|[2003/06, 6, June...|
|[BWI, Baltimore, ...|[[296, 477, 389, ...|[2003/06, 6, June...|
|[CLT, Charlotte, ...|[[300, 472, 735, ...|[2003/06, 6, June...|
|[DCA, Washington,...|[[283, 268, 487, ...|[2003/06, 6, June...|
|[DEN, Denver, CO:...|[[516, 323, 664, ...|[2003/06, 6, June...|
|[DFW, Dallas/Fort...|[[986, 1390, 2147...|[2003/06, 6, June...|
|[DTW, Detroit, MI...|[[376, 371, 570, ...|[2003/06, 6, June...|
|[EWR, Newark, NJ:...|[[322, 519, 1948,...|[2003/06, 6, June...|
|[FLL, Fort Lauder...|[[247, 256, 427, ...|[2003/06, 6, June...|
|[IAD, Washington,...|[[320, 295, 573, ...|[2003/06, 6, June...|
|[IAH, Houston, TX...|[[329, 730, 1405,...|[2003/06, 6, June...|
|[JFK, New York, N...|[[3

*промежуточные вычисления*

In [128]:
a=spark.sql("""SELECT * from 
               (SELECT Time['Year'] as year,
               Time['Month'] as month,
               Airport['Code'] as airport_code,
               airline_name,
               Statistics['Flights']['Delayed'] + Statistics['Flights']['Cancelled'] + Statistics['Flights']['Diverted'] as number_of_delays_for_airport
               from table2
               LATERAL VIEW EXPLODE(split(Statistics['Carriers']['Names'], ',')) as airline_name
               where TIME['YEAR']=2015) json
               LEFT JOIN table3 on json.airline_name=table3.AIRLINE 
               """).show()

+----+-----+------------+--------------------+----------------------------+---------+--------------------+
|year|month|airport_code|        airline_name|number_of_delays_for_airport|IATA_CODE|             AIRLINE|
+----+-----+------------+--------------------+----------------------------+---------+--------------------+
|2015|    1|         ATL|American Airlines...|                        4651|       AA|American Airlines...|
|2015|    1|         ATL|Alaska Airlines Inc.|                        4651|       AS|Alaska Airlines Inc.|
|2015|    1|         ATL|Delta Air Lines Inc.|                        4651|       DL|Delta Air Lines Inc.|
|2015|    1|         ATL|ExpressJet Airlin...|                        4651|     null|                null|
|2015|    1|         ATL|Frontier Airlines...|                        4651|       F9|Frontier Airlines...|
|2015|    1|         ATL|           Envoy Air|                        4651|     null|                null|
|2015|    1|         ATL|    Spirit A

*промежуточные вычисления*

In [119]:
b=spark.sql("""SELECT YEAR, MONTH, DELAYED_AIRPORT, AIRLINE,
COUNT(DELAYED_AIRPORT) as number_of_delays_for_airline_in_airport
FROM(SELECT YEAR, MONTH, AIRLINE, CASE WHEN ((DEPARTURE_DELAY > 0 AND ARRIVAL_DELAY > 0) OR (CANCELLED='1')) THEN ORIGIN_AIRPORT
                           WHEN (DEPARTURE_DELAY <= 0 AND ARRIVAL_DELAY > 0) OR (DIVERTED='1') THEN DESTINATION_AIRPORT
                    ELSE NULL END as DELAYED_AIRPORT FROM table) GROUP BY YEAR, MONTH, AIRLINE, DELAYED_AIRPORT""").show()

+----+-----+---------------+-------+---------------------------------------+
|YEAR|MONTH|DELAYED_AIRPORT|AIRLINE|number_of_delays_for_airline_in_airport|
+----+-----+---------------+-------+---------------------------------------+
|2015|    1|            JLN|     MQ|                                     10|
|2015|    1|            CHS|     EV|                                     34|
|2015|    1|            JFK|     US|                                     35|
|2015|    1|            ABQ|     B6|                                      5|
|2015|    1|            JAC|     AA|                                      1|
|2015|    1|           null|     US|                                      0|
|2015|    1|            CLT|     US|                                    713|
|2015|    1|            SEA|     DL|                                     74|
|2015|    1|            RSW|     DL|                                     46|
|2015|    1|            MSP|     MQ|                                      8|

##### Итоговый SQL запрос

In [305]:
sql_query=spark.sql("""SELECT 
               lpad(a.year, 4,'0')||'-'||lpad(a.month, 2, '0') as year_month,
               a.airport_code,
               a.number_of_delays_for_airport,
               a.airline_name,
               IFNULL(a.IATA_CODE, 'N/A') as airline_iata_code,
               CASE WHEN b.number_of_delays_for_airline_in_airport is NULL AND a.IATA_CODE='N/A' THEN null
               WHEN b.number_of_delays_for_airline_in_airport is NULL AND a.IATA_CODE<>'N/A' THEN 0
               ELSE b.number_of_delays_for_airline_in_airport END as number_of_delays_for_airline_in_airport
               from                     
               (
               (SELECT Time['Year'] as year,
               Time['Month'] as month,
               Airport['Code'] as airport_code,
               airline_name,
               Statistics['Flights']['Delayed'] + Statistics['Flights']['Cancelled'] + Statistics['Flights']['Diverted'] as number_of_delays_for_airport
               from table2
               LATERAL VIEW EXPLODE(split(Statistics['Carriers']['Names'], ',')) as airline_name
               where TIME['YEAR']=2015) json
               LEFT JOIN table3 on json.airline_name=table3.AIRLINE) a
               LEFT JOIN 
               (SELECT YEAR as year, MONTH as month, DELAYED_AIRPORT as airport_code, 
               AIRLINE as airline_code,
               COUNT(DELAYED_AIRPORT) as number_of_delays_for_airline_in_airport
               FROM(SELECT YEAR, MONTH, AIRLINE, CASE WHEN ((DEPARTURE_DELAY > 0 AND ARRIVAL_DELAY > 0) OR (CANCELLED='1')) THEN ORIGIN_AIRPORT
               WHEN (DEPARTURE_DELAY <= 0 AND ARRIVAL_DELAY > 0) OR (DIVERTED='1') THEN DESTINATION_AIRPORT
               ELSE NULL END as DELAYED_AIRPORT FROM table) GROUP BY YEAR, MONTH, AIRLINE, DELAYED_AIRPORT) b
               ON a.year=b.year AND a.month=b.month AND a.airport_code=b.airport_code AND a.IATA_CODE=b.airline_code""")

##### DSL query

###### select из flights.csv 

In [256]:
d=df.withColumn('DELAYED_AIRPORT', when(((col('DEPARTURE_DELAY').cast(IntegerType()) > 0) &
                                                 (col('ARRIVAL_DELAY') > 0)) |
                                                (col('CANCELLED')=='1'), col('ORIGIN_AIRPORT')).when(((col('DEPARTURE_DELAY').cast(IntegerType()) <= 0) &
                                                 (col('ARRIVAL_DELAY') > 0)) |
                                                (col('DIVERTED')=='1'), col('DESTINATION_AIRPORT')
)).groupBy('YEAR', 'MONTH', 'AIRLINE', 'DELAYED_AIRPORT').count().select(col('YEAR').alias('year'), col('MONTH').alias('month'),col('AIRLINE').alias('airline_code'), col('DELAYED_AIRPORT').alias('airport_code'),col('count').alias('number_of_delays_for_airline_in_airport')).where(~ isnull('airport_code'))

###### select из json-файла и airlines.csv

In [254]:
tmp = airlines_json.withColumn('airline_name', explode(split(col('Statistics')['Carriers']['Names'],','))).select(
    (col('Time')['Year']).alias('year'), (col('Time')['Month']).alias('month'), (col('Airport')['Code']).alias('airport_code'), 'airline_name', (col('Statistics')['Flights']['Delayed'] + col('Statistics')['Flights']['Cancelled'] + col('Statistics')['Flights']['Diverted']).alias('number_of_delays_for_airport')).where('year=2015').join(airlines_csv, col('airline_name')==airlines_csv.AIRLINE, 'left')

###### объединение двух селектов выше и select из него

In [307]:
dsl_query=tmp.join(d,(d.year==tmp.year)&(d.month==tmp.month)&(d.airport_code==tmp.airport_code)&(d.airline_code==tmp.IATA_CODE) ,'left').withColumn('year_month',
concat_ws('-', lpad(tmp.year, 4,'0'), lpad(tmp.month, 2, '0'))).withColumn('airline_iata_code',tmp.IATA_CODE ).fillna(value='N/A', subset=['airline_iata_code']).withColumn('number_of_delays_for_airline_in_airport',
when((col('airline_iata_code')=='N/A')&(d.number_of_delays_for_airline_in_airport.isNull()), None).when((col('airline_iata_code')!='N/A')&(d.number_of_delays_for_airline_in_airport.isNull()),0).otherwise(d.number_of_delays_for_airline_in_airport)).select('year_month', tmp.airport_code, tmp.number_of_delays_for_airport, tmp.airline_name, 'airline_iata_code', 'number_of_delays_for_airline_in_airport')

##### Сравнение с golden dataset

In [309]:
gd2 = spark.read.option("header",True).parquet("/home/jovyan/work/datasets/query2/*.parquet")

dif_1_1 = sql_query.subtract(gd2)
dif_1_2 = gd2.subtract(sql_query)
dif1 = dif_1_1.unionAll(dif_1_2)


dif_2_1 = dsl_query.subtract(gd2)
dif_2_2 = gd2.subtract(dsl_query)
dif2 = dif_2_1.unionAll(dif_2_2)

dif1.describe().toPandas().to_excel('/home/jovyan/work/datasets/dif_sql_qolden_dataset_query2.xls', sheet_name='Sheet1', index=False)
dif1.write.parquet('/home/jovyan/work/datasets/dif_sql_qolden_dataset_parquet_query2')

dif2.describe().toPandas().to_excel('/home/jovyan/work/datasets/dif_dsl_qolden_dataset_query.xls', sheet_name='Sheet1', index=False)
dif2.write.parquet('/home/jovyan/work/datasets/dif_dsl_qolden_dataset_parquet_query2')

### Task_3

In [337]:
df = spark.read.option("header",True).csv("/home/jovyan/work/datasets/flights.csv")
df.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

In [339]:
columns = df.schema.names
for column, dtype in df.dtypes:
    if dtype=='string':
        print('{}:\t{}'.format(column, df.where(df[column].isNull()).count()))
    

YEAR:	0
MONTH:	0
DAY:	0
DAY_OF_WEEK:	0
AIRLINE:	0
FLIGHT_NUMBER:	0
TAIL_NUMBER:	14721
ORIGIN_AIRPORT:	0
DESTINATION_AIRPORT:	0
SCHEDULED_DEPARTURE:	0
DEPARTURE_TIME:	86153
DEPARTURE_DELAY:	86153
TAXI_OUT:	89047
WHEELS_OFF:	89047
SCHEDULED_TIME:	6
ELAPSED_TIME:	105071
AIR_TIME:	105071
DISTANCE:	0
WHEELS_ON:	92513
TAXI_IN:	92513
SCHEDULED_ARRIVAL:	0
ARRIVAL_TIME:	92513
ARRIVAL_DELAY:	105071
DIVERTED:	0
CANCELLED:	0
CANCELLATION_REASON:	5729195
AIR_SYSTEM_DELAY:	4755640
SECURITY_DELAY:	4755640
AIRLINE_DELAY:	4755640
LATE_AIRCRAFT_DELAY:	4755640
WEATHER_DELAY:	4755640
