In [7]:
#import pyspark as spark
#import findspark
#import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Creating a schema for the data uing af Struct type

In [57]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([ 
    StructField("Year", IntegerType(), True),
    StructField("Month", IntegerType(), True),
    StructField("DayofMonth", IntegerType(), True),
    StructField("DayOfWeek", IntegerType(), True),
    StructField("DepTime", IntegerType(), True),
    StructField("CRSDepTime", IntegerType(), True),
    StructField("ArrTime", IntegerType(), True),
    StructField("CRSArrTime", IntegerType(), True),
    StructField("UniqueCarrier", StringType(), True),
    StructField("FlightNum", IntegerType(), True),
    StructField("TailNum", StringType(), True),
    StructField("ActualElapsedTime", IntegerType(), True),
    StructField("CRSElapsedTime", IntegerType(), True),
    StructField("AirTime", IntegerType(), True),
    StructField("ArrDelay", IntegerType(), True),
    StructField("DepDelay", IntegerType(), True),
    StructField("Origin", StringType(), True),
    StructField("Dest", StringType(), True),
    StructField("Distance", IntegerType(), True),
    StructField("TaxiIn", IntegerType(), True),
    StructField("TaxiOut", IntegerType(), True),
    StructField("Cancelled", IntegerType(), True),
    StructField("CancellationCode", StringType(), True),
    StructField("Diverted", IntegerType(), True),
    StructField("CarrierDelay", IntegerType(), True),
    StructField("WeatherDelay", IntegerType(), True),
    StructField("NASDelay", IntegerType(), True),
    StructField("SecurityDelay", IntegerType(), True),
    StructField("LateAircraftDelay", IntegerType(), True)])
flights = spark.read.csv("./data/2008.csv",header=True,schema=schema, nullValue='NA')

Finding the number of flights from JFK to LAX

In [58]:
flights.where((col('Origin') == 'JFK') & (col('Dest') == 'LAX')).count()

8078

Finding the sum and average of all arrival delays for all delayed flights
Average could be found using "Describe", but to include sum, we will use select

In [59]:
#df.describe('ArrDelay').show()
flights.select(avg('ArrDelay'), sum('ArrDelay')).show()

+----------------+-------------+
|   avg(ArrDelay)|sum(ArrDelay)|
+----------------+-------------+
|8.16845238729114|     55994978|
+----------------+-------------+



Finding the average departure delay for each state.
To do this, we need the airport data from airports.csv. Instead of defining the schema explicitely as above, for illustration purposes, we´ll just "infer" the schema, which means asking Spark to figure it out by presampling rows.

In [56]:
airports = spark.read.csv("./data/airports.csv",header=True,inferSchema=True, nullValue='NA')
airports.show(2)

+----+--------------------+-----------+-----+-------+-----------+------------+
|iata|             airport|       city|state|country|        lat|        long|
+----+--------------------+-----------+-----+-------+-----------+------------+
| 00M|            Thigpen |Bay Springs|   MS|    USA|31.95376472|-89.23450472|
| 00R|Livingston Municipal| Livingston|   TX|    USA|30.68586111|-95.01792778|
+----+--------------------+-----------+-----+-------+-----------+------------+
only showing top 2 rows



Now, lets join the dataframes, group the result on states and calculate the average departure-delay- To illustrate the "agg" function used with a map, we´ll add the average arrival-delays aswell

In [68]:
flights.join(airports, flights.Origin == airports.iata).groupBy(airports.state)\
.agg({"DepDelay": "avg", "ArrDelay": "avg"}).show()

+-----+------------------+-------------------+
|state|     avg(DepDelay)|      avg(ArrDelay)|
+-----+------------------+-------------------+
|   AZ| 7.821406634575592| 4.2603182919641105|
|   SC|10.073743016759776|  8.942515845928815|
|   LA|  8.75001528397628|  8.065231382163653|
|   MN| 7.420257912196289| 7.5076487421901925|
|   NJ| 18.28530315230682| 17.073619219183303|
|   OR| 6.988035144205845|  3.913974378255698|
|   VA| 9.741461461852408|  9.015987468487651|
| null| 6.630157701447397|  5.631909820073704|
|   RI|10.345095558668053|  7.284535521603119|
|   KY| 9.408317082603078|  8.848705808601547|
|   WY| 4.837221577113903|  4.455306079220504|
|   NH|10.483407140123559|  7.463268777088934|
|   MI| 8.959508598521376|  9.411726489355809|
|   NV|10.047854928293972|  5.234664517182271|
|   WI| 9.898691052537206| 10.273451327433628|
|   ID| 4.312914217246415|  1.876640912159628|
|   CA| 8.567509354076408|  5.481386483085351|
|   NE| 8.982463876263482|  7.693161316676312|
|   CT|7.5934

In [12]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (nullable = tru