In [1]:
#!pip install findspark



In [1]:
import findspark
findspark.init()

In [2]:
# Import SparkSession from pyspark.sql
import configparser
from datetime import datetime
import os
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.window import Window
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

In [3]:
spark = (SparkSession
        .builder
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")
        .getOrCreate())

In [7]:
#spark = SparkSession \
#    .builder \
#    .appName("Spark SQL") \
#    .getOrCreate()

In [4]:
# Verify SparkContext
print(spark)

<pyspark.sql.session.SparkSession object at 0x121523550>


In [5]:
# Print Spark version
print(spark.version)

2.4.5


#### You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.

In [6]:
# Print the tables in the catalog
print(spark.catalog.listTables())

[]


In [7]:
# Define input file schema
FlightsSchema = R([
    Fld("year",Str()),
    Fld("month",Str()),
    Fld("day",Str()),
    Fld("dep_time",Str()),
    Fld("dep_delay",Str()),
    Fld("arr_time",Str()),
    Fld("arr_delay",Str()),
    Fld("carrier",Str()),
    Fld("tailnum",Str()),
    Fld("flight",Str()),
    Fld("origin",Str()),
    Fld("dest",Str()),
    Fld("air_time",Str()),
    Fld("distance",Str()),
    Fld("hour",Str()),
    Fld("minute",Str())
    ])

In [8]:
flight_file = "datasets/flights_small.csv"
#flight = spark.read.csv(flight_file,sep=",", inferSchema=True, header=True)
flight = spark.read.csv(flight_file,sep=",", schema=FlightsSchema, header=True)
#flight = spark.read.format("csv").load("datasets/flights_small.csv", schema=FlightsSchema)
flight.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [9]:
flight.createOrReplaceTempView("flights")

In [10]:
# Don't change this query
query = "SELECT * FROM flights LIMIT 10"

In [11]:
# Get the first 10 rows of flights
flights10 = spark.sql(query)

In [12]:
#flights10.printSchema()

In [13]:
# Show the results
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

### Pandafy a Spark DataFrame

In [14]:
# Don't change this query
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

In [15]:
# Run the query
flight_counts = spark.sql(query)

In [16]:
# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

In [17]:
# Print the head of pd_counts
print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


### Convert Pandas DataFrame to Spark Dataframe

In [18]:
# Create pandas pd_temp dataframe
pd_temp = pd.DataFrame(np.random.random(10))
pd_temp.head()

Unnamed: 0,0
0,0.207101
1,0.147619
2,0.896143
3,0.493121
4,0.516901


In [19]:
# The .createDataFrame() method takes a pandas DataFrame and returns a Spark DataFrame.
# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

In [20]:
# Examine the tables in the catalog
print(spark.catalog.listTables())

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [21]:
# Add spark_temp to the catalog
#There is the method .createOrReplaceTempView(). This safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. 
#You'll use this method to avoid running into problems with duplicate tables.
spark_temp.createOrReplaceTempView("temp")

In [22]:
# Examine the tables in the catalog again
print(spark.catalog.listTables())

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


### Read airport.csv file 

In [23]:
# Don't change this file path
file_path = "datasets/airports.csv"

In [24]:
# Read in the airports data
airports = spark.read.csv(file_path, header=True)

In [25]:
# Show the data
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [26]:
# .withColumn() method, which takes two arguments. First, a string with the name of your new column, and second the new column itself.
# Create the DataFrame flights
flights = spark.table("flights")

In [29]:
# Show the head
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

In [28]:
# Add duration_hrs
flights = flights.withColumn("duration_hrs",flights.air_time/60)

### Filtering Data
The .filter() method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.

For example, the following two expressions will produce the same output:
flights.filter("air_time > 120").show()
flights.filter(flights.air_time > 120).show()

In [30]:
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")

In [32]:
# Print the data to check they're equal
long_flights1.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

In [31]:
# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

In [33]:
long_flights2.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

### Selecting - Approach I
The Spark variant of SQL's SELECT is the .select() method. This method takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it, much like inside .withColumn().

The difference between .select() and .withColumn() methods is that .select() returns only the columns you specify, while .withColumn() returns all the columns of the DataFrame in addition to the one you defined. 

In [41]:
# Create the DataFrame flights
flights = spark.table("flights")
flights.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [36]:
# Select the first set of columns
selected1 = flights.select("tailnum","origin","dest")
selected1.show(n=5)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
| N360SW|   PDX| SJC|
| N612AS|   SEA| BUR|
+-------+------+----+
only showing top 5 rows



In [42]:
# Select the second set of columns
temp = flights.select(flights.origin, flights.dest, flights.carrier)
#temp.show(n=5)

# Define first filter
filterA = flights.origin == "SEA"

# Define second filter
filterB = flights.dest == "PDX"

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)
selected2.show(n=5)

+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
+------+----+-------+
only showing top 5 rows



### Selecting II
Similar to SQL, you can also use the .select() method to perform column-wise operations. When you're selecting a column using the df.colName notation, you can perform any column operation and the .select() method will return the transformed column. For example,
flights.select(flights.air_time/60)

returns a column of flight durations in hours instead of minutes. You can also use the .alias() method to rename a column you're selecting. So if you wanted to .select() the column duration_hrs (which isn't in your DataFrame) you could do
flights.select((flights.air_time/60).alias("duration_hrs"))

In [54]:
# Define avg_speed column
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

In [55]:
# Select the correct columns
speed1 = flights.select("origin","dest","tailnum",avg_speed)
speed1.show(n=5)

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows



In [56]:
# The equivalent Spark DataFrame method .selectExpr() takes SQL expressions as a string:
# flights.selectExpr("air_time/60 as duration_hrs")
# with the SQL as keyword being equivalent to the .alias() method. To select multiple columns, you can pass multiple strings.

speed2 = flights.selectExpr("origin","dest","tailnum","distance/(air_time/60) as avg_speed")
speed2.show(n=5)

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows



### Aggregating
All of the common aggregation methods, like .min(), .max(), and .count() are GroupedData methods. These are created by calling the .groupBy() DataFrame method.
For example, to find the minimum value of a column, col, in a DataFrame, df, you could do
df.groupBy().min("col").show()

In [87]:
# Find the shortest flight from PDX in terms of distance
# Create the DataFrame flights
flights = spark.sql("select year,month,day,dep_time,cast(dep_delay as int) as dep_delay,cast(arr_time as int) as arr_time,arr_delay,carrier,tailnum,flight,origin,dest,cast(air_time as int) as air_time,cast(distance as int) as distance,hour,minute from flights")

flights.filter("origin == 'PDX'").groupBy().min("distance").show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



In [67]:
# Find the longest flight from SEA in terms of air time
flights.filter("origin == 'SEA'").groupBy().max("air_time").show()

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



Aggregating II

In [73]:
# Average duration of Delta flights

# Use the .avg() method to get the average air time of Delta Airlines flights (where the carrier column has the value "DL") that left SEA. 
# The place of departure is stored in the column origin. show() the result.

flights.filter(flights.carrier == "DL").filter(flights.origin == 'SEA').groupBy().avg("air_time").show()


+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+



In [75]:
# Total hours in the air

# Use the .sum() method to get the total number of hours all planes in this dataset spent in the air by creating a column called duration_hrs from the column air_time. show() the result.

flights.withColumn("duration_hrs",flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



### Grouping and Aggregating I
PySpark has a whole class devoted to grouped data frames: pyspark.sql.GroupedData, which you saw in the last two exercises.

You've learned how to create a grouped DataFrame by calling the .groupBy() method on a DataFrame with no arguments.

Now you'll see that when you pass the name of one or more columns in your DataFrame to the .groupBy() method, the aggregation methods behave like when you use a GROUP BY statement in a SQL query!

In [78]:
# Group by tailnum
# Create a DataFrame called by_plane that is grouped by the column tailnum.
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
# Use the .count() method with no arguments to count the number of flights each plane made.
by_plane.count().show(n=5)

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
+-------+-----+
only showing top 5 rows



In [82]:
# Group by origin
# Create a DataFrame called by_origin that is grouped by the column origin.
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
# Find the .avg() of the air_time column to find average duration of flights from PDX and SEA.
by_origin.avg("air_time").show()

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



### Grouping and Aggregating II
In addition to the GroupedData methods you've already seen, there is also the .agg() method. This method lets you pass an aggregate column expression that uses any of the aggregate functions from the pyspark.sql.functions submodule.

This submodule contains many useful functions for computing things like standard deviations. All the aggregation functions in this submodule take the name of a column in a GroupedData table.

In [83]:
# Import pyspark.sql.functions as F
# Import the submodule pyspark.sql.functions as F

import pyspark.sql.functions as F

In [88]:
# Group by month and dest
# Create a GroupedData table called by_month_dest that's grouped by both the month and dest columns. Refer to the two columns by passing both strings as separate arguments.
by_month_dest = flights.groupBy("month","dest")

In [90]:
# Average departure delay by month and destination
# Use the .avg() method on the by_month_dest DataFrame to get the average dep_delay in each month for each destination.
by_month_dest.avg("dep_delay").show(n=5)

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|   11| TUS|-2.3333333333333335|
|   11| ANC|  7.529411764705882|
|    1| BUR|              -1.45|
|    1| PDX|-5.6923076923076925|
|    6| SBA|               -2.5|
+-----+----+-------------------+
only showing top 5 rows



In [91]:
# Standard deviation of departure delay
# Find the standard deviation of dep_delay by using the .agg() method with the function F.stddev().
by_month_dest.agg(F.stddev("dep_delay")).show(n=5)

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|   11| TUS|    3.0550504633038935|
|   11| ANC|    18.604716401245316|
|    1| BUR|     15.22627576540667|
|    1| PDX|     5.677214918493858|
|    6| SBA|     2.380476142847617|
+-----+----+----------------------+
only showing top 5 rows

