## Week 5 Homework

In this homework we'll put what we learned about Spark
in practice.

We'll use high volume for-hire vehicles (HVFHV) dataset for that.

## Question 1. Install Spark and PySpark

* Install Spark
* Run PySpark
* Create a local spark session 
* Execute `spark.version`

What's the output?

In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.version

22/02/28 04:09:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


'3.0.3'

## Question 2. HVFHW February 2021

Download the HVFHV data for february 2021:

```bash
wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
```

Read it with Spark using the same schema as we did 
in the lessons. We will use this dataset for all
the remaining questions.

Repartition it to 24 partitions and save it to
parquet.

What's the size of the folder with results (in MB)?

In [38]:
!mkdir -p data/raw/fhvhv/ data/pq/fhvhv/2021/02
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv -O data/raw/fhvhv/2021-02.csv

--2022-02-28 03:26:10--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.206.89
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.206.89|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘data/raw/fhvhv/2021-02.csv’


2022-02-28 03:26:21 (64.7 MB/s) - ‘data/raw/fhvhv/2021-02.csv’ saved [733822658/733822658]



In [2]:
!head data/raw/fhvhv/2021-02.csv

hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
HV0003,B02764,2021-02-01 00:10:40,2021-02-01 00:21:09,35,39,
HV0003,B02764,2021-02-01 00:27:23,2021-02-01 00:44:01,39,35,
HV0005,B02510,2021-02-01 00:28:38,2021-02-01 00:38:27,39,91,
HV0005,B02510,2021-02-01 00:43:37,2021-02-01 01:23:20,91,228,
HV0003,B02872,2021-02-01 00:08:42,2021-02-01 00:17:57,126,250,
HV0003,B02872,2021-02-01 00:26:02,2021-02-01 00:42:51,208,243,
HV0003,B02872,2021-02-01 00:45:50,2021-02-01 01:02:50,243,220,
HV0003,B02764,2021-02-01 00:06:42,2021-02-01 00:31:50,49,37,
HV0003,B02764,2021-02-01 00:34:34,2021-02-01 00:58:13,37,76,


In [3]:
!wc -l data/raw/fhvhv/2021-02.csv

11613943 data/raw/fhvhv/2021-02.csv


In [4]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

df = (spark.read 
    .option("header", "true")
    .schema(schema)
    .csv('data/raw/fhvhv/2021-02.csv')   
)

In [5]:
df = df.repartition(24)

In [6]:
df.write.parquet('data/pq/fhvhv/2021/02', mode='overwrite')

                                                                                

In [12]:
!du -h data/pq/fhvhv/2021/02

210M	data/pq/fhvhv/2021/02


## Question 3. Count records 

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [16]:
df = spark.read.parquet('data/pq/fhvhv/2021/02/*')
df = (df
   .withColumn('pickup_date', F.to_date('pickup_datetime'))
)

In [7]:
df.filter(df.pickup_date == '2021-02-15').count()

                                                                                

367170

## Question 4. Longest trip for each day

Now calculate the duration for each trip.

Trip starting on which day was the longest? 

In [21]:
df = (df
        .withColumn('trip_duration', df.dropoff_datetime.cast("long") - df.pickup_datetime.cast("long"))
     )

df.orderBy('trip_duration', ascending=False).select('pickup_date').show(1)

[Stage 14:>                                                         (0 + 4) / 4]

+-----------+
|pickup_date|
+-----------+
| 2021-02-11|
+-----------+
only showing top 1 row





## Question 5. Most frequent `dispatching_base_num`

Now find the most frequently occurring `dispatching_base_num` 
in this dataset.

How many stages this spark job has?

> Note: the answer may depend on how you write the query,
> so there are multiple correct answers. 
> Select the one you have.

In [26]:
df.groupBy("dispatching_base_num").count().orderBy('count', ascending=False).show(5)



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
+--------------------+-------+
only showing top 5 rows



                                                                                

## Question 6. Most common locations pair

Find the most common pickup-dropoff pair. 

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East". 


## Bonus question. Join type

(not graded) 

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?

And how many stages your spark job has?


## Submitting the solutions

* Form for submitting: https://forms.gle/dBkVK9yT8cSMDwuw7
* You can submit your homework multiple times. In this case, only the last submission will be used. 

Deadline: 02 March (Wednesday), 22:00 CET