# **Week 5 - Homework**

In this homework we'll put what we learned about Spark
in practice.

We'll use high volume for-hire vehicles (HVFHV) dataset for that.


## **Question 1. Install Spark and PySpark**

* Install Spark
* Run PySpark
* Create a local spark session 
* Execute `spark.version`

What's the output?

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('test')\
        .getOrCreate()

22/03/02 15:48:41 WARN Utils: Your hostname, ns3273592 resolves to a loopback address: 127.0.1.1; using 5.39.86.183 instead (on interface eno0)
22/03/02 15:48:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/02 15:48:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/02 15:48:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.version

'3.2.1'

> **answer**  
3.2.1

## **Question 2. HVFHW February 2021**

Download the HVFHV data for february 2021:

```bash
wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
```

Read it with Spark using the same schema as we did 
in the lessons. 

Repartition it to 24 partitions and save it to
parquet.

What's the size of the folder with results (in MB)?


In [None]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

In [11]:
!wc -l fhvhv_tripdata_2021-02.csv

     1	11613943 fhvhv_tripdata_2021-02.csv


In [12]:
!head -n 3 fhvhv_tripdata_2021-02.csv






In [13]:
from pyspark.sql import types

In [14]:
schema = types.StructType([
    types.StructField('hvfhs_license_num',types.StringType(),True),
    types.StructField('dispatching_base_num',types.StringType(),True),
    types.StructField('pickup_datetime',types.TimestampType(),True),
    types.StructField('dropoff_datetime',types.TimestampType(),True),
    types.StructField('PULocationID',types.IntegerType(),True),
    types.StructField('DOLocationID',types.IntegerType(),True),
    types.StructField('SR_Flag',types.StringType(),True)
])

In [15]:
df = spark.read\
    .option("header","true")\
    .schema(schema)\
    .csv('fhvhv_tripdata_2021-02.csv')


In [16]:
df.show(5)



+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
+-----------------+--------------------+-------------------+-------------------+

In [18]:
df = df.repartition(24)

In [19]:
df.write.parquet('fhvhv/2021/02/', mode='overwrite')



In [20]:
!du -h fhvhv

421M	fhvhv/2021/02
421M	fhvhv/2021
421M	fhvhv


> **answer**  
~ 208MB

## **Question 3. Count records** 

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [34]:
from pyspark.sql import functions as F

In [35]:
df = spark.read.parquet('fhvhv/2021/02/*')



In [36]:
df.registerTempTable('fhvhv')

In [None]:
query = """
            SELECT
                COUNT(*)
            FROM
                fhvhv
            WHERE
                date_format(pickup_datetime, 'yyyyMMdd') = '20210215'
"""


spark.sql(query).show()

> **answer**  
367170

## **Question 4. Longest trip for each day**

Now calculate the duration for each trip.

Trip starting on which day was the longest? 

In [38]:
from pyspark.sql.functions import col, asc,desc

In [44]:
timeFmt = "yyyy-MM-dd HH:mm:ss"
timeDiff = (F.unix_timestamp('dropoff_datetime', format=timeFmt)
            - F.unix_timestamp('pickup_datetime', format=timeFmt))/60
fhvhv_bis = df.withColumn("trip_duration", timeDiff)

fhvhv_bis.registerTempTable('fhvhv_bis')

In [45]:
query = """
        SELECT date_format(pickup_datetime, 'yyyy-MM-dd')
        FROM fhvhv_bis
        WHERE trip_duration = (SELECT MAX(trip_duration) FROM fhvhv_bis) 
"""

spark.sql(query).show()



+----------------------------------------+
|date_format(pickup_datetime, yyyy-MM-dd)|
+----------------------------------------+
|                              2021-02-11|
|                              2021-02-11|
+----------------------------------------+



> **answer**
2021-02-11

## **Question 5. Most frequent**

`dispatching_base_num`

Now find the most frequently occurring `dispatching_base_num` 
in this dataset.

How many stages this spark job has?

> Note: the answer may depend on how you write the query,
> so there are multiple correct answers. 
> Select the one you have.

In [46]:
query = """
            SELECT
                dispatching_base_num,
                COUNT(dispatching_base_num) AS nb
            FROM
                fhvhv_bis
            GROUP BY
                dispatching_base_num
            ORDER BY
                nb DESC
"""

spark.sql(query).show()



+--------------------+-------+
|dispatching_base_num|     nb|
+--------------------+-------+
|              B02510|6467328|
|              B02764|1931136|
|              B02872|1765378|
|              B02875|1370780|
|              B02765|1119536|
|              B02869| 859440|
|              B02887| 644662|
|              B02871| 624728|
|              B02864| 623206|
|              B02866| 622178|
|              B02878| 610370|
|              B02682| 606510|
|              B02617| 549020|
|              B02883| 503234|
|              B02884| 489926|
|              B02882| 464346|
|              B02876| 431386|
|              B02879| 420274|
|              B02867| 401060|
|              B02877| 397876|
+--------------------+-------+
only showing top 20 rows





> **answer**
B02510

## **Question 6. Most common locations pair**

Find the most common pickup-dropoff pair. 

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East". 

In [48]:
query_1 = """
            (SELECT concat(PULocationID, '|', DOLocationID) AS location_pair,
            COUNT(concat(PULocationID, '|', DOLocationID)) as count_location_pair
            FROM fhvhv_bis 
            GROUP BY concat(PULocationID, '|', DOLocationID) )
"""
spark.sql(query_1).createOrReplaceTempView("count_of_location_pair")


query_2 = """
            SELECT location_pair 
            FROM count_of_location_pair 
            WHERE count_location_pair = (
                                            SELECT MAX(count_location_pair)
                                            FROM count_of_location_pair
                                        )
"""
spark.sql(query_2).show()



+-------------+
|location_pair|
+-------------+
|        76|76|
+-------------+





> **answer**  
East New York / East New York

## **Bonus question. Join type**

(not graded) 

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?

And how many stages your spark job has?

> **answer**  
inner join  
The SQL query has 3 stages.