## Week 5 Homework 

In this homework we'll put what we learned about Spark in practice.

For this homework we will be using the FHVHV 2021-06 data found here. [FHVHV Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz )


### Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?
- 3.3.2
- 2.1.4
- 1.2.3
- 5.4
</br></br>

R:/ 3.3.2

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

### Question 2: 

**HVFHW June 2021**

Read it with Spark using the same schema as we did in the lessons.</br> 
We will use this dataset for all the remaining questions.</br>
Repartition it to 12 partitions and save it to parquet.</br>
What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.</br>


- 2MB
- 24MB
- 100MB
- 250MB

R:/ 24 MB
</br></br>

In [4]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [None]:
df = spark.read \
    .option("header", "true").option("inferSchema", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

df = df.repartition(12)

df.repartition(12).write.mode("overwrite").parquet('data/pq/fhvhv/2021/06/')

### Question 3: 

**Count records**  

How many taxi trips were there on June 15?</br></br>
Consider only trips that started on June 15.</br>

- 308,164
- 12,856
- 452,470
- 50,982

R: 452,470
</br></br>

In [23]:
df2 = spark.read \
    .option("header", "true").option("inferSchema", "true") \
    .parquet('data/pq/fhvhv/2021/06/')

df2.printSchema()
df2.createOrReplaceTempView ('fhvhv_2021_06')

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [21]:
df2.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02510|2021-06-01 16:24:08|2021-06-01 17:25:06|         129|          69|      N|                  null|
|              B02510|2021-06-01 12:48:33|2021-06-01 13:03:00|         229|         100|      N|                  null|
|              B02879|2021-06-01 07:20:27|2021-06-01 08:10:56|          68|         265|      N|                B02879|
|              B02889|2021-06-02 10:00:42|2021-06-02 10:09:50|         164|          79|      N|                B02889|
|              B02872|2021-06-03 11:38:01|2021-06-03 11:59:39|         162|          79|      N|                B02872|
+--------------------+------------------

In [24]:
spark.sql("""
    SELECT COUNT(*)
    FROM 
        fhvhv_2021_06
    WHERE to_date(pickup_datetime) = '2021-06-15'
""").show()

[Stage 24:>                                                       (0 + 12) / 12]

+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

### Question 4: 

**Longest trip for each day**  

Now calculate the duration for each trip.</br>
How long was the longest trip in Hours?</br>

- 66.87 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours
</br></br>

In [31]:
spark.sql("""
    SELECT
        MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600) AS duration
    FROM 
        fhvhv_2021_06
""").show()

[Stage 39:>                                                       (0 + 12) / 12]

+----------------+
|        duration|
+----------------+
|66.8788888888889|
+----------------+



                                                                                

### Question 5: 

**User Interface**

 Sparkâ€™s User Interface which shows application's dashboard runs on which local port?</br>

- 80
- 443
- 4040
- 8080

R: 4040
</br></br>


### Question 6: 

**Most frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)</br>

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?</br>

- East Chelsea
- Astoria
- Union Sq
- Crown Heights North

R: Crown Heights North
</br></br>


In [None]:
zones_df = spark.read \
    .option("header", "true").option("inferSchema", "true") \
    .csv('taxi_zone_lookup.csv')

In [None]:
df2.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02510|2021-06-01 16:24:08|2021-06-01 17:25:06|         129|          69|      N|                  null|
|              B02510|2021-06-01 12:48:33|2021-06-01 13:03:00|         229|         100|      N|                  null|
|              B02879|2021-06-01 07:20:27|2021-06-01 08:10:56|          68|         265|      N|                B02879|
|              B02889|2021-06-02 10:00:42|2021-06-02 10:09:50|         164|          79|      N|                B02889|
|              B02872|2021-06-03 11:38:01|2021-06-03 11:59:39|         162|          79|      N|                B02872|
+--------------------+------------------

In [None]:
zones_df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [None]:
joined_df = df2.join(
    zones_df,
    df2.PULocationID ==  zones_df.LocationID,
    "inner"
)

joined_df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|              B02510|2021-06-01 16:24:08|2021-06-01 17:25:06|         129|          69|      N|                  null|       129|   Queens|     Jackson Heights|   Boro Zone|
|              B02510|2021-06-01 12:48:33|2021-06-01 13:03:00|         229|         100|      N|                  null|       229|Manhattan|Sutton Place/Turt...| Yellow Zone|
|              B02879|2021-06-01 07:20:27|2021-06-01 08:10:56|          68|         265|      N|                B02879|      

In [None]:
joined_df.groupBy('Zone').count().orderBy('count', ascending=False).show()

+--------------------+------+
|                Zone| count|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
|     Lower East Side|151020|
|        East Chelsea|147673|
|Central Harlem North|146402|
|Williamsburg (Nor...|143683|
|          Park Slope|143594|
|  Stuyvesant Heights|141427|
|        Clinton East|139611|
|West Chelsea/Huds...|139431|
|             Bedford|138428|
|         Murray Hill|137879|
+--------------------+------+
only showing top 20 rows



## Submitting the solutions

* Form for submitting: https://forms.gle/EcSvDs6vp64gcGuD8
* You can submit your homework multiple times. In this case, only the last submission will be used. 

Deadline: 06 March (Monday), 22:00 CET


## Solution

We will publish the solution here