In [1]:
sc.version

'2.4.5'

In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import *
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import stddev as StdDev

spark = SparkSession.builder.getOrCreate()
spark

In [3]:
csvDataRDD = sc.textFile("flights_small.csv").map(lambda line: line.split(","))
type(csvDataRDD)

pyspark.rdd.PipelinedRDD

In [4]:
header = csvDataRDD.first()
csvDataRDD = csvDataRDD.filter(lambda x:x != header)
columns = ['year','month','day','dep_time','dep_delay',
               'arr_time','arr_delay','carrier','tailnum',
               'flight','origin','dest','air_time','distance','hour','minute']
csvDataDF = csvDataRDD.toDF(columns)
csvDataDF.head(5)

[Row(year='2014', month='12', day='8', dep_time='658', dep_delay='-7', arr_time='935', arr_delay='-5', carrier='VX', tailnum='N846VA', flight='1780', origin='SEA', dest='LAX', air_time='132', distance='954', hour='6', minute='58'),
 Row(year='2014', month='1', day='22', dep_time='1040', dep_delay='5', arr_time='1505', arr_delay='5', carrier='AS', tailnum='N559AS', flight='851', origin='SEA', dest='HNL', air_time='360', distance='2677', hour='10', minute='40'),
 Row(year='2014', month='3', day='9', dep_time='1443', dep_delay='-2', arr_time='1652', arr_delay='2', carrier='VX', tailnum='N847VA', flight='755', origin='SEA', dest='SFO', air_time='111', distance='679', hour='14', minute='43'),
 Row(year='2014', month='4', day='9', dep_time='1705', dep_delay='45', arr_time='1839', arr_delay='34', carrier='WN', tailnum='N360SW', flight='344', origin='PDX', dest='SJC', air_time='83', distance='569', hour='17', minute='5'),
 Row(year='2014', month='3', day='9', dep_time='754', dep_delay='-1', ar

In [5]:
print(type(csvDataDF))
csvDataDF.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [6]:
csvDataDF = csvDataDF.withColumn('year', csvDataDF['year'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('month', csvDataDF['month'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('day', csvDataDF['day'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('hour', csvDataDF['hour'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('minute', csvDataDF['minute'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('distance', csvDataDF['distance'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('air_time', csvDataDF['air_time'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('arr_delay', csvDataDF['arr_delay'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('dep_delay', csvDataDF['dep_delay'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('dep_time', csvDataDF['dep_time'].cast(IntegerType()))
csvDataDF = csvDataDF.withColumn('arr_time', csvDataDF['arr_time'].cast(IntegerType()))

csvDataDF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



In [7]:
# Write to HIVE
csvDataDF.write.format("ORC").saveAsTable("flights")

In [8]:
# Get the first 10 rows of flights
flights10 = spark.sql('FROM flights SELECT * LIMIT 10')

# Show the results
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    6| 12|    2121|       16|     518|       10|     UA| N489UA|   501|   SEA| EWR|     275|    2402|  21|    21|
|2014|    4| 30|     659|       -1|     952|       -8|     AS| N305AS|   724|   SEA| SLC|      89|     689|   6|    59|
|2014|    8| 30|     542|        7|     902|      -10|     UA| N24212|  1578|   SEA| DEN|     121|    1024|   5|    42|
|2014|    4| 17|     624|       -6|     834|       -5|     OO| N225AG|  3456|   PDX| BUR|     111|     817|   6|    24|
|2014|    9|  8|     705|       -5|    1239|      -21|     AA| N3GEAA|  1507|   SEA| DFW|     185|    1660|   7|     5|
|2014|    5|  1|     709|       -3|    1

In [9]:
flight_counts = spark.sql('SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest')

# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    SEA  BLI    5


In [10]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView('temp')

# Examine the tables in the catalog again
print(spark.catalog.listTables())

[Table(name='flights', database='default', description=None, tableType='MANAGED', isTemporary=False)]
[Table(name='flights', database='default', description=None, tableType='MANAGED', isTemporary=False), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [11]:
# Create the DataFrame flights
flights = spark.table('flights')

# Add duration_hrs
flights = flights.withColumn('duration_hrs', flights.air_time/60)

# Show the head
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    6| 12|    2121|       16|     518|       10|     UA| N489UA|   501|   SEA| EWR|     275|    2402|  21|    21| 4.583333333333333|
|2014|    4| 30|     659|       -1|     952|       -8|     AS| N305AS|   724|   SEA| SLC|      89|     689|   6|    59|1.4833333333333334|
|2014|    8| 30|     542|        7|     902|      -10|     UA| N24212|  1578|   SEA| DEN|     121|    1024|   5|    42|2.0166666666666666|
|2014|    4| 17|     624|       -6|     834|       -5|     OO| N225AG|  3456|   PDX| BUR|     111|     817|   6|    24|              1.85|
|2014|    9|  8|     705|  

In [12]:
# Filter flights by passing a string
long_flights1 = flights.filter('distance > 1000')

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Print the data to check they're equal
print(long_flights1.head())
print(long_flights2.head())

Row(year=2014, month=6, day=12, dep_time=2121, dep_delay=16, arr_time=518, arr_delay=10, carrier='UA', tailnum='N489UA', flight='501', origin='SEA', dest='EWR', air_time=275, distance=2402, hour=21, minute=21, duration_hrs=4.583333333333333)
Row(year=2014, month=6, day=12, dep_time=2121, dep_delay=16, arr_time=518, arr_delay=10, carrier='UA', tailnum='N489UA', flight='501', origin='SEA', dest='EWR', air_time=275, distance=2402, hour=21, minute=21, duration_hrs=4.583333333333333)


In [13]:
# Select the first set of columns
selected1 = flights.select('tailnum', 'origin', 'dest')

# Select the second set of columns
temp = flights.select(flights.origin, flights.dest, flights.carrier)

# Define first filter
filterA = flights.origin == "SEA"

# Define second filter
filterB = flights.dest == "PDX"

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)

In [14]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

In [15]:
# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin == "PDX").groupBy().min('distance').show()

# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin == "SEA").groupBy().max('air_time').show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



In [16]:
# Average duration of Delta flights
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg('air_time').show()

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum('duration_hrs').show()

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+

+-----------------+
|sum(duration_hrs)|
+-----------------+
|25289.60000000005|
+-----------------+



In [17]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
| N73283|    4|
| N954WN|    5|
| N388DA|    3|
| N513UA|    2|
| N38451|    4|
|  N6700|    1|
| N102UW|    2|
| N36472|    4|
| N567AA|    1|
| N516UA|    2|
| N607AS|   45|
| N445WN|    3|
| N584AS|   31|
| N835UA|    2|
| N251WN|    1|
| N669DN|    2|
+-------+-----+
only showing top 20 rows

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In [18]:
# Group by month and dest
by_month_dest = flights.groupBy('month', 'dest')

# Average departure delay by month and destination
by_month_dest.avg('dep_delay').show()

# Standard deviation of departure delay
by_month_dest.agg(StdDev('dep_delay')).show()

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|    4| PHX| 1.6833333333333333|
|    7| OMA|               -6.5|
|    1| RDM|             -1.625|
|    5| ONT| 3.5555555555555554|
|    6| DEN|  5.418181818181818|
|    8| MDW|               7.45|
|    5| IAD|               -4.0|
|   12| COS|               -1.0|
|   11| ANC|  7.529411764705882|
|    5| COS| 11.666666666666666|
|    2| PSP|                0.6|
|    5| AUS|              -0.75|
|   11| KOA|               -1.0|
|   10| DFW| 18.176470588235293|
|    4| ORD|0.14285714285714285|
|    8| JNU|             18.125|
|   10| DCA|               -1.5|
|   10| OMA|-0.6666666666666666|
|    6| PHX|               10.9|
|    1| JNU|                2.6|
+-----+----+-------------------+
only showing top 20 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    4| PHX|    15.003380033491737|
|    7| OMA|    2.1213203435596424|
| 

In [19]:
# Load from CSV file
airports_csv = 'airports.csv'
airports = spark.read.csv(airports_csv, header=True)
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [20]:
# Rename the faa column
airports = airports.withColumnRenamed("faa", "dest")

# Join the DataFrames
flights_with_airports = flights.join(airports, on='dest', how='leftouter')

# Examine the new DataFrame
flights_with_airports.show()

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
| EWR|2014|    6| 12|    2121|       16|     518|       10|     UA| N489UA|   501|   SEA|     275|    2402|  21|    21| 4.583333333333333| Newark Liberty Intl|  40.6925| -74.168667|  18| -5|  A|
| SLC|2014|    4| 30|     659|       -1|     952|       -8|     AS| N305AS|   724|   SEA|      89|     689|   6|    59|1.4833333333333334| Salt Lake City Intl|40.788389|-111.977772|4227| -7|  A|
| DEN|2014|    8| 30|    