<img src="images/cads-logo.png" style="height: 100px;padding-top:5px" align=left> <img src="images/apache_spark.png" style="height: 20%;width:20%; padding-top:0px" align=right>

# Manipulating Data using Apache Spark

In this notebook, we are going to get our hands dirty with Spark DataFrame API to perform common data operations.

In [148]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [149]:
import findspark
findspark.init()

If you take a look at dataset folder, you will see `flights.csv` that contains a row for every flight that left Portland International Airport (PDX) or Seattle-Tacoma International Airport (SEA) in 2014 and 2015.

In the first step, we should create a DataFrame using `flights.csv` file and then create a table (temporaray view) for querying flights by using SQL commands. Let's do it.

In [112]:
import os
MAIN_DIRECTORY = os.getcwd()

In [152]:
MAIN_DIRECTORY

'C:\\Users\\Syaidatul Syafira\\OneDrive - studentupmedumy.onmicrosoft.com\\Desktop\\DA\\Big Data Analytics with Apache Spark\\Apache Spark SC'

In [153]:
file_path = MAIN_DIRECTORY+"/dataset/flights.csv"

In [154]:
#df_flights = spark.read.format("csv").option("header","true").option('inferSchema','true').load(file_path)

In [155]:
# A simple way to create a dataframe in Spark
df_flights = spark.read.csv(file_path, header=True, inferSchema = True)

In [156]:
df_flights.createOrReplaceTempView('flights')

### Exercise 1: Use SQL to get the first five rows of the flights table and save the result to flights5, finally show the results. 

In [117]:
flights5 = spark.sql("SELECT * FROM flights LIMIT 5")

In [118]:
flights5.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

### Exercise 2: Write a query that counts the number of flights to each airport from SEA and PDX.

In [119]:

spark.sql("SELECT origin, dest, count(*) FROM flights \
          GROUP BY origin, dest").show()
        

+------+----+--------+
|origin|dest|count(1)|
+------+----+--------+
|   SEA| RNO|       8|
|   SEA| DTW|      98|
|   SEA| CLE|       2|
|   SEA| LAX|     450|
|   PDX| SEA|     144|
|   SEA| BLI|       5|
|   PDX| IAH|      57|
|   PDX| PHX|     209|
|   SEA| SLC|     225|
|   SEA| SBA|      23|
|   SEA| BWI|      29|
|   PDX| IAD|      23|
|   PDX| SFO|     305|
|   SEA| KOA|      40|
|   PDX| MCI|      15|
|   SEA| SJC|     213|
|   SEA| ABQ|      43|
|   SEA| SAT|      18|
|   PDX| ONT|      57|
|   SEA| LAS|     364|
+------+----+--------+
only showing top 20 rows



### Exercise 3: Write a piece of code to create a DataFrame using `airports.csv`, this file contains information about different airports all over the world. 

In [120]:
file_path = MAIN_DIRECTORY + "/dataset/airports.csv"

In [121]:
df_airports = spark.read.csv(file_path, header=True, inferSchema = True)

In [122]:
df_airports.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



Let's look at performing column-wise operations. In Apache Spark, you can do this using the `.withColumn(colName, col)`  which returns a new DataFrame by adding a column or replacing the existing column that has the same name.

*Parameters*:  
- **colName** – string, name of the new column.
- **col** – a Column expression for the new column. 

The new `column` must be an object of class Column. Creating one of these is as easy as extracting a column from your DataFrame using `df.colName`.
Apache Spark DataFrame is **immutable**. Immutable means that it can't be changed, and so columns can't be updated in place.
For example:
```python
df = df.withColumn("newCol", df.oldCol + 1)
```
The above code creates a DataFrame with the same columns as df plus a new column, `newCol`, where every entry is equal to the corresponding entry from `oldCol`, plus one.

Sometimes we have to change a column data type to another one, in this case, we can use the following code:
```python
from pyspark.sql.functions import col
df_name = df_name.withColumn("columnName", col("columnName").cast("DataType"))
```

### Exercise 4: Update `flights` DataFrame to include a new column called `duration_hrs`, that contains the duration of each flight in hours.

In [91]:
from pyspark.sql.functions import hours

In [92]:
from pyspark.sql.functions import col

In [123]:
df_flights = df_flights.withColumn("duration_hrs", df_flights.air_time/60)

In [124]:
df_flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

### Exercise 5: Write a query using the `.filter()` method to find all the flights that flew over 1000 miles.

In [18]:
#spark.sql("SELECT origin, dest, count(*) FROM flights \
    #      GROUP BY origin, dest").show()

In [125]:
df_flights.select('flight', 'distance').filter(df_flights['distance'] > 1000).show()

+------+--------+
|flight|distance|
+------+--------+
|   851|    2677|
|   490|    1050|
|    26|    1721|
|   656|    1107|
|   121|    1448|
|   827|    1733|
|    24|    2496|
|   616|    2378|
|    29|    2640|
|   488|    1050|
|   907|    1448|
|   815|    2701|
|    18|    2554|
|  1598|    2182|
|    99|    1448|
|   794|    1180|
|  1212|    1874|
|   500|    1107|
|  2164|    1426|
|   593|    1107|
+------+--------+
only showing top 20 rows



In [126]:
df_flights.select('flight', 'distance').filter(df_flights['distance'] > 1000).count()

4883

### Exercise 6: Write a query using `.filter()` method, to only keep flights from SEA to PDX. This query should only return `tailnum`, `origin`, and `dest` columns.

In [127]:
# Solution 1
df_flights.select('tailnum', 'origin','dest').filter(df_flights.origin == 'SEA' ).filter(df_flights.dest == 'PDX').show(5)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N810SK|   SEA| PDX|
| N822SK|   SEA| PDX|
| N586SW|   SEA| PDX|
| N223SW|   SEA| PDX|
| N580SW|   SEA| PDX|
+-------+------+----+
only showing top 5 rows



In [None]:
#Solution 2


We can perform column-wise operations using `.select()` method. When we select a column using the `df.colName` notation. In `.select()` method, we can perform any column operation and it will return the transformed column. 
For example, the following command returns a column of flight durations in hours instead of minutes.
```python
df_flights.select(df_flights.air_time/60)
```
We can use the `alias()` method to rename a column we've selected. The following example shows how we can do that.
```python
df_flights.select((df_flights.air_time/60).alias("duration_hrs")
```
If we want to stick to the SQL syntax, we can use `.selectExpr()` method as well. The following commad is equivalent to the previous code.

```python
df_flights.selectExpr("air_time/60 as duration_hrs")
```

### Exercise 7: Write a query that return these columns, `origin`, `dest`, `tailnum`, and average speed in KM per hour.

In [128]:
#Solution 1
df_flights.select("origin","dest", "tailnum", (df_flights.distance / df_flights.hour)).alias("average_speed").show(5)

+------+----+-------+------------------+
|origin|dest|tailnum| (distance / hour)|
+------+----+-------+------------------+
|   SEA| LAX| N846VA|             159.0|
|   SEA| HNL| N559AS|             267.7|
|   SEA| SFO| N847VA|              48.5|
|   PDX| SJC| N360SW|33.470588235294116|
|   SEA| BUR| N612AS|133.85714285714286|
+------+----+-------+------------------+
only showing top 5 rows



In [129]:
#Solution 1
df_flights.select("origin","dest", "tailnum", (df_flights.distance / (df_flights.air_time/60))).alias("average_speed").show(5)

+------+----+-------+----------------------------+
|origin|dest|tailnum|(distance / (air_time / 60))|
+------+----+-------+----------------------------+
|   SEA| LAX| N846VA|           433.6363636363636|
|   SEA| HNL| N559AS|           446.1666666666667|
|   SEA| SFO| N847VA|          367.02702702702703|
|   PDX| SJC| N360SW|           411.3253012048193|
|   SEA| BUR| N612AS|           442.6771653543307|
+------+----+-------+----------------------------+
only showing top 5 rows



In [None]:
#Solution 2


### Exercise 8: Find the the shortest (in terms of distance) flight that left PDX by first filtering and using the `.min()` method. Perform the filtering by referencing the column directly, not passing a SQL string.

In [130]:
#Solution 1
df_flights.filter(df_flights.origin == 'PDX' ).groupBy(df_flights.origin).agg({'distance':'min'}).show()

+------+-------------+
|origin|min(distance)|
+------+-------------+
|   PDX|          106|
+------+-------------+



In [None]:
#Solutin 2


In [None]:
# Solution 3


### Exercise 9: Find the the longest (in terms of time) flight that left SEA by filtering and using the `.max()` method. Perform the filtering by referencing the column directly, not passing a SQL string.

if we run the following code, we will get an error, because `air_time` data type is string, first we should cast it to an integer column.

In [131]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [159]:
from pyspark.sql.functions import col

df_flights = df_flights.withColumn("air_time", col("air_time").cast("integer"))


In [160]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [134]:
#Solution 1
df_flights.filter(df_flights.origin == 'SEA' ).groupBy(df_flights.origin).agg({'air_time':'max'}).alias("Longest time").show()

+------+-------------+
|origin|max(air_time)|
+------+-------------+
|   SEA|          409|
+------+-------------+



In [None]:
#Solution 2


### Exercise 10: Write a query that uses the `.avg()` method to get the average air time of Delta Airlines flights ( the carrier column value is "DL") that left SEA.

In [135]:
#Solution 1
df_flights.filter(df_flights.carrier == 'DL' ).filter(df_flights.origin =="SEA").groupBy("origin","carrier").agg({'air_time':'avg'}).show()

+------+-------+------------------+
|origin|carrier|     avg(air_time)|
+------+-------+------------------+
|   SEA|     DL|188.20689655172413|
+------+-------+------------------+



### Exercise 11: Write a query that uses the `.sum()` method to get the total number of hours all planes spent in the air by creating a column called `duration_hrs` from the column `air_time`.

In [136]:
from pyspark.sql.functions import sum

df_flights = df_flights.withColumn("duration_hrs", df_flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



### Exercise 12: Write a query that uses `tailnum` column to count the number of flights each plane made.

In [137]:
spark.sql("SELECT tailnum, count(*) FROM flights \
            GROUP BY tailnum").show()

+-------+--------+
|tailnum|count(1)|
+-------+--------+
| N442AS|      38|
| N102UW|       2|
| N36472|       4|
| N38451|       4|
| N73283|       4|
| N513UA|       2|
| N954WN|       5|
| N388DA|       3|
| N567AA|       1|
| N516UA|       2|
| N927DN|       1|
| N8322X|       1|
| N466SW|       1|
|  N6700|       1|
| N607AS|      45|
| N622SW|       4|
| N584AS|      31|
| N914WN|       4|
| N654AW|       2|
| N336NW|       1|
+-------+--------+
only showing top 20 rows



### Exercise 13: Write a query that returns the average duration of flights from `PDX` and `SEA`.

In [161]:
df_flights.groupBy('origin').avg('air_time').show()

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



### Exercise 14: Write a query that returns the average departure delay (`dep_delay`) in each month for each destination. Then import PySpark functions to calculate the standard deviation of `dep_delay` by using `stddev()` function.

In [157]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [162]:
df_flights = df_flights.withColumn("dep_delay", col("dep_delay").cast("integer"))

In [163]:
df_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [164]:
month_dest = df_flights.groupBy("month","dest").agg({'dep_delay':'avg'})

In [165]:
month_dest.show(5)

+-----+----+------------------+
|month|dest|    avg(dep_delay)|
+-----+----+------------------+
|    4| PHX|1.6833333333333333|
|    1| RDM|            -1.625|
|    5| ONT|3.5555555555555554|
|    7| OMA|              -6.5|
|    8| MDW|              7.45|
+-----+----+------------------+
only showing top 5 rows



In [167]:
from pyspark.sql.functions import stddev

In [170]:
month_dest = df_flights.groupBy("month","dest").agg({'dep_delay':'stddev'}).show()

+-----+----+------------------+
|month|dest| stddev(dep_delay)|
+-----+----+------------------+
|    4| PHX|15.003380033491737|
|    1| RDM| 8.830749846821778|
|    5| ONT|18.895178691342874|
|    7| OMA|2.1213203435596424|
|    8| MDW|14.467659032985843|
|    6| DEN|13.536905534420026|
|    5| IAD|3.8078865529319543|
|   12| COS|1.4142135623730951|
|   11| ANC|18.604716401245316|
|    5| AUS| 4.031128874149275|
|    5| COS| 33.38163167571851|
|    2| PSP| 4.878524367060187|
|    4| ORD|11.593882803741764|
|   10| DFW| 45.53019017606675|
|   10| DCA|0.7071067811865476|
|    8| JNU| 40.79368823727514|
|   11| KOA|1.8708286933869707|
|   10| OMA|5.8594652770823155|
|    6| ONT| 25.98316762829351|
|    3| MSP|21.556779370817555|
+-----+----+------------------+
only showing top 20 rows



### Exercise 15: Write a query that performs left outer join on the flights and airports DataFrames.
- The flights and airports DataFrames are already in the workspace. 
- First, examine the airports DataFrame by calling .show() method. 
- Note which key column will let you join these two DataFrames.
- Before joining these two DataFrames, rename the `faa` column in `airports` to `dest`, and then convert this DataFrame to a temporary view (table).
- Use `spark.sql` to perform left outer join on these two tables. 


In [174]:
df_airports.columns

['faa', 'name', 'lat', 'lon', 'alt', 'tz', 'dst']

In [173]:
df_flights.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute']

In [175]:
df_airports = df_airports.withColumnRenamed('faa', 'dest')

In [176]:
df_airports.columns

['dest', 'name', 'lat', 'lon', 'alt', 'tz', 'dst']

In [177]:
df_airports.createOrReplaceTempView("airports")

In [179]:
df_join = spark.sql("SELECT * \
                     FROM flights \
                     LEFT OUTER JOIN airports \
                     ON flights.dest = airports.dest")
df_join.show() 

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+----+--------------------+---------+-----------+----+---+---+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|dest|                name|      lat|        lon| alt| tz|dst|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+----+--------------------+---------+-----------+----+---+---+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58| LAX|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40| HNL|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     1

### Exercise 16: Rewrite the previous query by using DataFrame API `.join` method. 

In PySpark, we can use `.join` method to perform joins. This method takes three arguments. 
- The first argument is the second DataFrame that we want to join with the first one. 
- The second argument, `on`, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. 
- The third argument, `how`, specifies the kind of join to perform. 

To perform left outer join set the value of `how` to `"leftouter"`.

In [180]:
df1_join = df_flights.join(df_airports, on='dest', how='leftouter')

In [181]:
df1_join.show(5)

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|                name|      lat|        lon|alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+---+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|    Los Angeles Intl|33.942536|-118.408075|126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|       Honolulu Intl|21.318681|-157.922428| 13|-10|  N|
| SFO|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA|     111|     679|  14|    43|  San 

#### Awesome