In [116]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from operator import add
import sys
import operator
from pyspark.sql.functions import broadcast
from pyspark.sql.types import StringType
from pyspark.sql.window import Window

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def parseAvgFlights(avg_flights_df):
    results = avg_flights_df.collect()
    results.sort(key=operator.itemgetter(0))
    for i in range(len(results)):
        print(
            "{} \t {} \t {} \t {} \t {}".format(
                results[i][0],
                results[i][1],
                results[i][2],
                results[i][3],
                results[i][4],
            ),
            file=sys.stdout,
        )

def delayed_flights(spark, flights_file_path, other_files_path, year):
    """

    PARAMETERS
    ----------

    spark: SparkSession
    file:  The data file e.g."s3://air-traffic-dataset/ontimeperformance_flights_test.csv".
    year: 1994-2008 inclusive for the tiny dataset.

    """

    flights_tiny_df = (
        spark.read.format("csv")
        .options(header="true")
        .load(flights_file_path)
    )

    airlines_df = (
        spark.read.format("csv")
        .options(header="true")
        .load("{}/ontimeperformance_airlines.csv".format(other_files_path))
    )

    # flight_cols = ["carrier_code", "tail_number", "delayed_time"]

    flights_tiny_df = flights_tiny_df \
        .filter(
                flights_tiny_df["flight_date"].between(
                    "{}-01-01".format(year), "{}-12-31".format(year)
                )
            ) \
        .withColumn(
            "scheduled_departure_timestamp",
            F.to_timestamp(
                F.when(
                    F.col("scheduled_depature_time") == "24:00:00", "00:00:00"
                ).otherwise(F.col("scheduled_depature_time")),
                "HH:mm:ss",
            ),
        ) \
        .withColumn(
            "actual_departure_timestamp",
            F.to_timestamp(
                F.when(F.col("actual_departure_time") == "24:00:00", "00:00:00").otherwise(
                    F.col("actual_departure_time")
                ),
                "HH:mm:ss",
            ),
        ) \
        .withColumn(
            "delayed_time",
            F.when(
                F.col("actual_departure_timestamp").cast("long")
                - F.col("scheduled_departure_timestamp").cast("long")
                > (60 * 60 * 12),
                (
                    F.col("scheduled_departure_timestamp").cast("long")
                    + (60 * 60 * 24)
                    - F.col("actual_departure_timestamp").cast("long")
                )
                / 60,
            )
            .when(
                F.col("scheduled_departure_timestamp").cast("long")
                - F.col("actual_departure_timestamp").cast("long")
                > (60 * 60 * 12),
                (
                    F.col("actual_departure_timestamp").cast("long")
                    + (60 * 60 * 24)
                    - F.col("scheduled_departure_timestamp").cast("long")
                )
                / 60,
            )
            .otherwise(
                (
                    F.col("actual_departure_timestamp").cast("long")
                    - F.col("scheduled_departure_timestamp").cast("long")
                )
                / 60
            ),
        ) \
        .filter(F.col("actual_departure_timestamp").isNotNull()) \
        .filter(F.col("delayed_time") > 0) \
        .groupBy("carrier_code") \
        .agg(
            F.count("delayed_time").alias("numOfDelays"),
            F.mean("delayed_time").alias("avgDelays"),
            F.min("delayed_time").alias("minDelay"),
            F.max("delayed_time").alias("maxDelay"),
        ) \
        .withColumn("avgDelays", F.round(F.col("avgDelays"), 2)) \
        .select(
            "carrier_code",
            "numOfDelays",
            "avgDelays",
            "minDelay",
            "maxDelay"
        ) \
        .repartition(12, "carrier_code")

    avg_flights = F.broadcast(airlines_df) \
        .join(
            flights_tiny_df,
            flights_tiny_df.carrier_code == airlines_df.carrier_code
        ) \
        .select(
            "name",
            "numOfDelays",
            "avgDelays",
            "minDelay",
            "maxDelay"
        )

    parseAvgFlights(avg_flights)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [121]:
flights_file_path="s3://usyddata3404/ontimeperformance_flights_large.csv"
other_files_path="s3://air-traffic-dataset"
year="1994"

delayed_flights(spark, flights_file_path, other_files_path, year)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Alaska Airlines Inc. 	 1787 	 18.34 	 1.0 	 213.0
American Airlines Inc. 	 81351 	 15.93 	 1.0 	 660.0
Continental Air Lines Inc. 	 49785 	 20.42 	 1.0 	 534.0
Delta Air Lines Inc. 	 113621 	 11.18 	 1.0 	 705.0
Northwest Airlines Inc. 	 25480 	 19.21 	 1.0 	 718.0
Southwest Airlines Co. 	 19731 	 18.55 	 1.0 	 299.0
Sparrow Aviation 	 16055 	 19.53 	 1.0 	 568.0
Tway Air 	 9874 	 25.45 	 1.0 	 441.0
US Airways 	 21393 	 15.36 	 1.0 	 478.0
United Airlines 	 102607 	 13.9 	 1.0 	 487.0

In [135]:
flights_tiny_df = (
    spark.read.format("csv")
    .options(header="true")
    .load(flights_file_path)
)
flights_tiny_df = flights_tiny_df.withColumn("year", F.year("flight_date"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [136]:
flights_tiny_df.select(F.col("year") == 1994).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

13051212

In [143]:
flights_file_path="s3://usyddata3404/ontimeperformance_flights_massive.csv"

ftd = (
    spark.read.format("csv")
    .options(header="true")
    .load(flights_file_path)
)
ftd.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------+-------------+-----------+------+-----------+-----------+-----------------------+----------------------+---------------------+-------------------+--------+
|  flight_id|carrier_code|flight_number|flight_date|origin|destination|tail_number|scheduled_depature_time|scheduled_arrival_time|actual_departure_time|actual_arrival_time|distance|
+-----------+------------+-------------+-----------+------+-----------+-----------+-----------------------+----------------------+---------------------+-------------------+--------+
|20191000001|          OO|         5657| 2019-01-04|   SBP|        SFO|     N945SW|               14:00:00|              15:10:00|             13:53:00|           14:44:00|     190|
|20191000002|          OO|         5658| 2019-01-04|   IAH|        XNA|     N932SW|               09:35:00|              11:18:00|             09:30:00|           11:19:00|     438|
|20191000003|          OO|         5658| 2019-01-04|   SGF|        IAH|     N932SW|       

In [153]:
ftd.groupBy("year", "carrier_code").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------------+------+
|year|carrier_code| count|
+----+------------+------+
|2019|          B6| 45500|
|2017|          OO|695497|
|2015|          MQ|279431|
|2009|          B6|191650|
|2013|          UA|500727|
|2006|          RU|209258|
|2014|          VX| 57173|
|2012|          VX| 54449|
|2007|          US|475583|
|2010|          YV|171285|
|2018|          G4| 95412|
|2019|          OH| 42818|
|2009|          FL|249450|
|2007|          AQ| 45956|
|2009|          HA| 72764|
|2009|          UA|370586|
|2012|          MQ|462600|
|2014|          AA|529093|
|2014|          HA| 74569|
|2011|          HA| 66617|
+----+------------+------+
only showing top 20 rows

In [144]:
ftd = ftd.withColumn("year", F.year("flight_date"))
ftd.select(
    "year",
    "flight_date"
).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+
|year|flight_date|
+----+-----------+
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
|2019| 2019-01-04|
+----+-----------+
only showing top 20 rows

In [155]:
filename="ontimeperformance_flights_massive_T2"

maxRow = 100000
spark.conf.set("spark.sql.files.maxRecordsPerFile", maxRow)

ftd.repartition("year","carrier_code") \
    .write.partitionBy("year", "carrier_code") \
    .option("header", "true") \
    .csv("s3://air-traffic-dataset/data-partitioned/{}".format(filename)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
ftd.groupBy("carrier_code").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------+
|carrier_code|  count|
+------------+-------+
|          UA|2238841|
|          AA|2184830|
|          NW| 942386|
|          EV| 386535|
|          B6| 142498|
|          HP| 440904|
|          TW| 129301|
|          DL|1812014|
|          OO| 521115|
|          F9| 105315|
|          YV| 130027|
|          TZ|  18387|
|          US| 395331|
|          MQ| 617290|
|          OH|  92429|
|          HA|   4385|
|          XE| 464612|
|          DH|  54145|
|          AS| 115018|
|          CO|1220729|
+------------+-------+
only showing top 20 rows

In [154]:
ftd.filter(F.col("year") == 1995).groupBy("carrier_code").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----+
|carrier_code|count|
+------------+-----+
+------------+-----+

In [36]:
ftd.filter(F.col("year") == 1996).groupBy("carrier_code").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|carrier_code| count|
+------------+------+
|          UA|218187|
|          AA|183627|
|          NW| 84249|
|          HP| 44424|
|          TW| 21811|
|          DL|162980|
|          US| 24258|
|          AS|  8076|
|          CO|108569|
|          WN| 55727|
+------------+------+

In [39]:
ftd.filter(F.col("year") == 1996).groupBy("carrier_code").count().agg({"count": "sum"}).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|sum(count)|
+----------+
|    911908|
+----------+

In [37]:
ftd.filter(F.col("year") == 2002).groupBy("carrier_code").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----+
|carrier_code|count|
+------------+-----+
+------------+-----+

In [107]:
flights_file_path2="s3://air-traffic-dataset/ontimeperformance_flights_tiny.csv"

ftd = (
    spark.read.format("csv")
    .options(header="true")
    .load(flights_file_path2)
#     .option("maxRecordsPerFile", 400000)
)
ftd.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------------+-------------+-----------+------+-----------+-----------+-----------------------+----------------------+---------------------+-------------------+--------+
|flight_id|carrier_code|flight_number|flight_date|origin|destination|tail_number|scheduled_depature_time|scheduled_arrival_time|actual_departure_time|actual_arrival_time|distance|
+---------+------------+-------------+-----------+------+-----------+-----------+-----------------------+----------------------+---------------------+-------------------+--------+
|  5017626|          UA|          370| 1994-05-23|   ORD|        PIT|       null|               20:15:00|              22:32:00|             20:17:00|           22:42:00|     412|
|  5017876|          UA|          376| 1994-05-12|   ORD|        PHL|       null|               18:00:00|              20:53:00|             18:00:00|           20:37:00|     678|
|  5017887|          UA|          376| 1994-05-26|   ORD|        PHL|       null|               18:0

In [108]:
ftd.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<bound method DataFrame.printSchema of DataFrame[flight_id: string, carrier_code: string, flight_number: string, flight_date: string, origin: string, destination: string, tail_number: string, scheduled_depature_time: string, scheduled_arrival_time: string, actual_departure_time: string, actual_arrival_time: string, distance: string]>

In [None]:
ftd.show1

In [156]:
flights_file_path="s3://usyddata3404/ontimeperformance_flights_large_T2"

ftd2 = (
    spark.read.format("csv")
    .options(header="true")
    .load(flights_file_path)
)
ftd2.select(

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------------+-------------+-----------+------+-----------+-----------+-----------------------+----------------------+---------------------+-------------------+--------+
|flight_id|carrier_code|flight_number|flight_date|origin|destination|tail_number|scheduled_depature_time|scheduled_arrival_time|actual_departure_time|actual_arrival_time|distance|
+---------+------------+-------------+-----------+------+-----------+-----------+-----------------------+----------------------+---------------------+-------------------+--------+
|  5017604|          UA|          370| 1994-05-01|   ORD|        PIT|       null|               20:15:00|              22:32:00|             20:19:00|           22:36:00|     412|
|  5017607|          UA|          370| 1994-05-04|   ORD|        PIT|       null|               20:15:00|              22:32:00|             20:14:00|           22:36:00|     412|
|  5017609|          UA|          370| 1994-05-06|   ORD|        PIT|       null|               20:1