<a href="https://colab.research.google.com/github/m-mehdi/Python101/blob/master/ApacheSpark_03_PNB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<img src="images/cads-logo.png" style="height: 100px;padding-top:5px" align=left> <img src="images/apache_spark.png" style="height: 20%;width:20%; padding-top:0px" align=right>

# Manipulating Data using Apache Spark

In this notebook, we are going to get our hands dirty with Spark DataFrame API to perform common data operations.

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 61kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 44.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=fe6ce28a2d81402433a7fed5b08885268e958ba60b868cb4b037c92d4764c455
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

If you take a look at dataset folder, you will see `flights.csv` that contains a row for every flight that left Portland International Airport (PDX) or Seattle-Tacoma International Airport (SEA) in 2014 and 2015.

In the first step, we should create a DataFrame using `flights.csv` file and then create a table (temporaray view) for querying flights by using SQL commands. Let's do it.

In [4]:
import os
MAIN_DIRECTORY = os.getcwd()
file_path =MAIN_DIRECTORY+"/dataset/flights.csv"

In [5]:
df_flights = spark.read.format("csv").option("header","true").option('inferSchema','true').load(file_path)

In [None]:
# A simple way to create a dataframe in Spark
# df_flights = spark.read.csv(file_path, header=True, inferSchema = True)

In [6]:
df_flights.createOrReplaceTempView('flights')

### Exercise 1: Use SQL to get the first five rows of the flights table and save the result to flights5, finally show the results. 

In [8]:
query ="SELECT * FROM flights LIMIT 5"
flights5 = spark.sql(query)
flights5.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

### Exercise 2: Write a query that counts the number of flights to each airport from SEA and PDX.

In [9]:
query ="SELECT DISTINCT origin FROM flights"
spark.sql(query).show()

+------+
|origin|
+------+
|   SEA|
|   PDX|
+------+



In [11]:
query = "SELECT origin, dest, count(*) FROM flights GROUP BY origin, dest"
flight_counts = spark.sql(query)
flight_counts.show()

+------+----+--------+
|origin|dest|count(1)|
+------+----+--------+
|   SEA| RNO|       8|
|   SEA| DTW|      98|
|   SEA| CLE|       2|
|   SEA| LAX|     450|
|   PDX| SEA|     144|
|   SEA| BLI|       5|
|   PDX| IAH|      57|
|   PDX| PHX|     209|
|   SEA| SLC|     225|
|   SEA| SBA|      23|
|   SEA| BWI|      29|
|   PDX| IAD|      23|
|   PDX| SFO|     305|
|   SEA| KOA|      40|
|   PDX| MCI|      15|
|   SEA| SJC|     213|
|   SEA| ABQ|      43|
|   SEA| SAT|      18|
|   PDX| ONT|      57|
|   SEA| LAS|     364|
+------+----+--------+
only showing top 20 rows



### Exercise 3: Write a piece of code to create a DataFrame using `airports.csv`, this file contains information about different airports all over the world. 

In [12]:
file_path = MAIN_DIRECTORY+'/dataset/airports.csv'
df_airports = spark.read.csv(file_path, header=True, inferSchema=True)
df_airports.show(10)

+---+--------------------+----------+------------+----+---+---+
|faa|                name|       lat|         lon| alt| tz|dst|
+---+--------------------+----------+------------+----+---+---+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722| -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408| -88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912| -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722| -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593| -4|  A|
|0G6|Williams County A...|41.4673056| -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.8835647| -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|39.7948244| -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|48.0538086|-122.8106436| 108| -8|  A|
+---+--------------------+----------+------------+----+---+---+
only showing top 10 rows



In [13]:
df_airports.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



Let's look at performing column-wise operations. In Apache Spark, you can do this using the `.withColumn(colName, col)`  which returns a new DataFrame by adding a column or replacing the existing column that has the same name.

*Parameters*:  
- **colName** – string, name of the new column.
- **col** – a Column expression for the new column. 

The new `column` must be an object of class Column. Creating one of these is as easy as extracting a column from your DataFrame using `df.colName`.
Apache Spark DataFrame is **immutable**. Immutable means that it can't be changed, and so columns can't be updated in place.
For example:
```python
df = df.withColumn("newCol", df.oldCol + 1)
```
The above code creates a DataFrame with the same columns as df plus a new column, `newCol`, where every entry is equal to the corresponding entry from `oldCol`, plus one.

Sometimes we have to change a column data type to another one, in this case, we can use the following code:
```python
from pyspark.sql.functions import col
df_name = df_name.withColumn("columnName", col("columnName").cast("DataType"))
```

### Exercise 4: Update `flights` DataFrame to include a new column called `duration_hrs`, that contains the duration of each flight in hours.

In [14]:
df_flights = df_flights.withColumn("duration_hrs", df_flights.air_time/60)

In [15]:
df_flights.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute',
 'duration_hrs']

In [16]:
df_flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

In [18]:
from pyspark.sql.functions import round

In [20]:
df_flights.select(['origin','dest', 'flight', 'air_time', round(df_flights.duration_hrs, 2).alias('Duration hrs')]).show(5)

+------+----+------+--------+------------+
|origin|dest|flight|air_time|Duration hrs|
+------+----+------+--------+------------+
|   SEA| LAX|  1780|     132|         2.2|
|   SEA| HNL|   851|     360|         6.0|
|   SEA| SFO|   755|     111|        1.85|
|   PDX| SJC|   344|      83|        1.38|
|   SEA| BUR|   522|     127|        2.12|
+------+----+------+--------+------------+
only showing top 5 rows



### Exercise 5: Write a query using the `.filter()` method to find all the flights that flew over 1000 miles.

In [21]:
long_flights = df_flights.filter(df_flights.distance > 1000)

In [22]:
long_flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|     duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|              6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|             2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|              3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20|2.566666666666667|
|2014|   11| 12|    2346|       -4

### Exercise 6: Write a query using `.filter()` method, to only keep flights from SEA to PDX. This query should only return `tailnum`, `origin`, and `dest` columns.

In [23]:
# Solution 1
df_flights.filter(df_flights.origin =='SEA').filter(df_flights.dest =='PDX')\
          .select(['tailnum','origin','dest']).show(5)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N810SK|   SEA| PDX|
| N822SK|   SEA| PDX|
| N586SW|   SEA| PDX|
| N223SW|   SEA| PDX|
| N580SW|   SEA| PDX|
+-------+------+----+
only showing top 5 rows



In [24]:
#Solution 2
df_flights.filter((df_flights.origin =='SEA') & (df_flights.dest =='PDX'))\
          .select(['tailnum','origin','dest']).show(5)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N810SK|   SEA| PDX|
| N822SK|   SEA| PDX|
| N586SW|   SEA| PDX|
| N223SW|   SEA| PDX|
| N580SW|   SEA| PDX|
+-------+------+----+
only showing top 5 rows



We can perform column-wise operations using `.select()` method. When we select a column using the `df.colName` notation. In `.select()` method, we can perform any column operation and it will return the transformed column. 
For example, the following command returns a column of flight durations in hours instead of minutes.
```python
df_flights.select(df_flights.air_time/60)
```
We can use the `alias()` method to rename a column we've selected. The following example shows how we can do that.
```python
df_flights.select((df_flights.air_time/60).alias("duration_hrs")
```
If we want to stick to the SQL syntax, we can use `.selectExpr()` method as well. The following commad is equivalent to the previous code.

```python
df_flights.selectExpr("air_time/60 as duration_hrs")
```

### Exercise 7: Write a query that return these columns, `origin`, `dest`, `tailnum`, and average speed in KM per hour.

In [25]:
#Solution 1
avg_speed = (df_flights.distance/(df_flights.air_time/60)).alias('avg_speed')

In [26]:
df_flights.select('origin','dest','tailnum',avg_speed).show(5)

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows



In [27]:
#Solution 2
df_flights.selectExpr('origin','dest','tailnum','distance/(air_time/60) as avg_speed').show(5)

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows



### Exercise 8: Find the the shortest (in terms of distance) flight that left PDX by first filtering and using the `.min()` method. Perform the filtering by referencing the column directly, not passing a SQL string.

In [28]:
#Solution 1
df_flights.filter(df_flights.origin =='PDX').groupBy(df_flights.origin)\
.agg({'distance':'min'}).show()

+------+-------------+
|origin|min(distance)|
+------+-------------+
|   PDX|          106|
+------+-------------+



In [32]:
#Solutin 2
from pyspark.sql.functions import min
df_flights.filter(df_flights.origin =='PDX').groupBy(df_flights.origin)\
.agg(min('distance').alias('Shortest Flight Distance')).show()

+------+------------------------+
|origin|Shortest Flight Distance|
+------+------------------------+
|   PDX|                     106|
+------+------------------------+



In [30]:
# Solution 3
df_flights.filter(df_flights.origin =='PDX').groupBy(df_flights.origin)\
.min('distance').show()

+------+-------------+
|origin|min(distance)|
+------+-------------+
|   PDX|          106|
+------+-------------+



### Exercise 9: Find the the longest (in terms of time) flight that left SEA by filtering and using the `.max()` method. Perform the filtering by referencing the column directly, not passing a SQL string.

if we run the following code, we will get an error, because `air_time` data type is string, first we should cast it to an integer column.

In [33]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [34]:
from pyspark.sql.functions import col
df_flights = df_flights.withColumn('air_time', col('air_time').cast('integer'))

In [35]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [36]:
from pyspark.sql.functions import max

In [37]:
#Solution 1
df_flights.filter(df_flights.origin =='SEA').groupBy(df_flights.origin)\
          .agg(max('air_time').alias('Longest Flight Time')).show()

+------+-------------------+
|origin|Longest Flight Time|
+------+-------------------+
|   SEA|                409|
+------+-------------------+



In [38]:
#Solution 2
df_flights.filter(df_flights.origin =='SEA').groupBy(df_flights.origin)\
          .max('air_time').show()

+------+-------------+
|origin|max(air_time)|
+------+-------------+
|   SEA|          409|
+------+-------------+



### Exercise 10: Write a query that uses the `.avg()` method to get the average air time of Delta Airlines flights ( the carrier column value is "DL") that left SEA.

In [39]:
from pyspark.sql.functions import avg
df_flights.filter((df_flights.carrier =='DL') & (df_flights.origin =='SEA')).groupBy(df_flights.carrier)\
          .agg(round(avg('air_time'),2).alias('Avg. Flight Time')).show()

+-------+----------------+
|carrier|Avg. Flight Time|
+-------+----------------+
|     DL|          188.21|
+-------+----------------+



### Exercise 11: Write a query that uses the `.sum()` method to get the total number of hours all planes spent in the air by creating a column called `duration_hrs` from the column `air_time`.

In [40]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [41]:
new_df = df_flights.groupBy().sum('duration_hrs')
new_df.show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



In [42]:
new_df.first()[0]

25289.600000000126

### Exercise 12: Write a query that uses `tailnum` column to count the number of flights each plane made.

In [44]:
df_flights.groupBy('tailnum').count().orderBy('count', ascending=False).show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N612AS|   51|
| N219AG|   50|
| N224AG|   50|
| N218AG|   49|
| N223AG|   49|
| N611AS|   46|
| N225AG|   45|
| N435AS|   45|
| N607AS|   45|
| N615AS|   45|
| N794AS|   42|
| N644AS|   41|
| N627AS|   41|
| N216AG|   41|
| N215AG|   41|
| N413AS|   40|
| N227AG|   40|
| N622AS|   40|
| N626AS|   40|
| N619AS|   40|
+-------+-----+
only showing top 20 rows



### Exercise 13: Write a query that returns the average duration of flights from `PDX` and `SEA`.

In [45]:
df_flights.groupBy('origin').avg('air_time').show()

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



### Exercise 14: Write a query that returns the average departure delay (`dep_delay`) in each month for each destination. Then import PySpark functions to calculate the standard deviation of `dep_delay` by using `stddev()` function.

In [46]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [47]:
df_flights=df_flights.withColumn('dep_delay', col('dep_delay').cast('integer'))

In [48]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [49]:
grouped_by_month_dest = df_flights.groupBy('month','dest')

In [50]:
type(grouped_by_month_dest)

pyspark.sql.group.GroupedData

In [51]:
grouped_by_month_dest.avg('dep_delay').orderBy('month').show(5)

+-----+----+--------------+
|month|dest|avg(dep_delay)|
+-----+----+--------------+
|    1| ABQ|          -3.0|
|    1| SLC|        6.5625|
|    1| MSP|          11.2|
|    1| RDM|        -1.625|
|    1| JNU|           2.6|
+-----+----+--------------+
only showing top 5 rows



In [52]:
from pyspark.sql.functions import stddev
grouped_by_month_dest.agg(stddev('dep_delay')).show(5)

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    4| PHX|    15.003380033491737|
|    1| RDM|     8.830749846821778|
|    5| ONT|    18.895178691342874|
|    7| OMA|    2.1213203435596424|
|    8| MDW|    14.467659032985843|
+-----+----+----------------------+
only showing top 5 rows



### Exercise 15: Write a query that performs left outer join on the flights and airports DataFrames.
- The flights and airports DataFrames are already in the workspace. 
- First, examine the airports DataFrame by calling .show() method. 
- Note which key column will let you join these two DataFrames.
- Before joining these two DataFrames, rename the `faa` column in `airports` to `dest`, and then convert this DataFrame to a temporary view (table).
- Use `spark.sql` to perform left outer join on these two tables. 


In [53]:
df_airports.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



In [54]:
df_flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

In [55]:
df_airports = df_airports.withColumnRenamed('faa','dest')

In [56]:
df_airports.show(3)

+----+--------------------+----------+-----------+----+---+---+
|dest|                name|       lat|        lon| alt| tz|dst|
+----+--------------------+----------+-----------+----+---+---+
| 04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
| 06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
| 06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
+----+--------------------+----------+-----------+----+---+---+
only showing top 3 rows



In [57]:
df_flights.createOrReplaceTempView('flights')
df_airports.createOrReplaceTempView('airports')


In [58]:
df_joined = spark.sql('SELECT f.origin, f.dest,a.name, f.flight FROM flights as f LEFT JOIN airports as a ON f.dest = a.dest')
df_joined.show()

+------+----+--------------------+------+
|origin|dest|                name|flight|
+------+----+--------------------+------+
|   SEA| LAX|    Los Angeles Intl|  1780|
|   SEA| HNL|       Honolulu Intl|   851|
|   SEA| SFO|  San Francisco Intl|   755|
|   PDX| SJC|Norman Y Mineta S...|   344|
|   SEA| BUR|            Bob Hope|   522|
|   PDX| DEN|         Denver Intl|    48|
|   PDX| OAK|Metropolitan Oakl...|  1520|
|   SEA| SFO|  San Francisco Intl|   755|
|   SEA| SAN|      San Diego Intl|   490|
|   SEA| ORD|  Chicago Ohare Intl|    26|
|   SEA| LAX|    Los Angeles Intl|   448|
|   SEA| PHX|Phoenix Sky Harbo...|   656|
|   SEA| LAS|      Mc Carran Intl|   608|
|   SEA| ANC|Ted Stevens Ancho...|   121|
|   SEA| SFO|  San Francisco Intl|   306|
|   PDX| SFO|  San Francisco Intl|  1458|
|   SEA| SMF|     Sacramento Intl|   368|
|   SEA| MDW| Chicago Midway Intl|   827|
|   SEA| BOS|General Edward La...|    24|
|   PDX| BUR|            Bob Hope|  3488|
+------+----+--------------------+

### Exercise 16: Rewrite the previous query by using DataFrame API `.join` method. 

In PySpark, we can use `.join` method to perform joins. This method takes three arguments. 
- The first argument is the second DataFrame that we want to join with the first one. 
- The second argument, `on`, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. 
- The third argument, `how`, specifies the kind of join to perform. 

To perform left outer join set the value of `how` to `"leftouter"`.

In [59]:
df_flights.join(df_airports, 'dest', 'leftouter').select('origin','dest','name','flight').show()

+------+----+--------------------+------+
|origin|dest|                name|flight|
+------+----+--------------------+------+
|   SEA| LAX|    Los Angeles Intl|  1780|
|   SEA| HNL|       Honolulu Intl|   851|
|   SEA| SFO|  San Francisco Intl|   755|
|   PDX| SJC|Norman Y Mineta S...|   344|
|   SEA| BUR|            Bob Hope|   522|
|   PDX| DEN|         Denver Intl|    48|
|   PDX| OAK|Metropolitan Oakl...|  1520|
|   SEA| SFO|  San Francisco Intl|   755|
|   SEA| SAN|      San Diego Intl|   490|
|   SEA| ORD|  Chicago Ohare Intl|    26|
|   SEA| LAX|    Los Angeles Intl|   448|
|   SEA| PHX|Phoenix Sky Harbo...|   656|
|   SEA| LAS|      Mc Carran Intl|   608|
|   SEA| ANC|Ted Stevens Ancho...|   121|
|   SEA| SFO|  San Francisco Intl|   306|
|   PDX| SFO|  San Francisco Intl|  1458|
|   SEA| SMF|     Sacramento Intl|   368|
|   SEA| MDW| Chicago Midway Intl|   827|
|   SEA| BOS|General Edward La...|    24|
|   PDX| BUR|            Bob Hope|  3488|
+------+----+--------------------+

#### Awesome