## Week 5 Homework

In this homework we'll put what we learned about Spark
in practice.

We'll use high volume for-hire vehicles (HVFHV) dataset for that.

## Question 1. Install Spark and PySpark

* Install Spark
* Run PySpark
* Create a local spark session 
* Execute `spark.version`

What's the output?

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.version

'3.0.3'

### Proposed code

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.version

### Question 1 answer (correct)

```
3.0.3
```

## Question 2. HVFHW February 2021

Download the HVFHV data for february 2021:

In [3]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-28 18:28:19--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.90.236
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.90.236|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv.1’


2022-02-28 18:28:39 (35.3 MB/s) - ‘fhvhv_tripdata_2021-02.csv.1’ saved [733822658/733822658]



Read it with Spark using the same schema as we did 
in the lessons. We will use this dataset for all
the remaining questions.

In [4]:
from pyspark.sql import types

In [5]:
schema = types.StructType(
    [
        types.StructField('hvfhs_license_num', types.StringType(), True),
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True)
    ]
)

In [6]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [7]:
df.show()

                                                                                

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

Repartition it to 24 partitions and save it to
parquet.

In [8]:
df = df.repartition(24).write.parquet('fhvhv/2021/02/', mode='overwrite')

                                                                                

What's the size of the folder with results (in MB)?

In [9]:
!du -h fhvhv

216M	fhvhv/2021/01
210M	fhvhv/2021/02
426M	fhvhv/2021
426M	fhvhv


In [None]:
# proposed command
!ls -lh fhvhv/2021/02

### Proposed code

In [None]:
!ls -lh fhvhv_tripdata_2021-02.csv

schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

df = df.repartition(24)

df.write.parquet('data/pq/fhvhv/2021/02/', compression=)

df = spark.read.parquet('data/pq/fhvhv/2021/02/')

!ls -lh data/pq/fhvhv/2021/02/

### Question 2 answer (correct)

```
210MB

In the proposed answers, the closest one is 208MB
```

## Question 3. Count records 

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [10]:
from pyspark.sql import functions as F

In [11]:
df = spark.read.parquet('fhvhv/2021/02/*')

In [12]:
df.registerTempTable('fhvhv')

In [13]:
spark.sql("""
SELECT
    COUNT(*)
FROM
    fhvhv
WHERE
    DATE(pickup_datetime) = '2021-02-15'
""").show()

[Stage 4:>                                                          (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

### Proposed code

In [None]:
from pyspark.sql import functions as F

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-02-15'") \
    .count()

df.registerTempTable('fhvhv_2021_02')

spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2021_02
WHERE
    to_date(pickup_datetime) = '2021-02-15';
""").show()


### Question 3 answer (correct)

```
367170
```

## Question 4. Longest trip for each day

Now calculate the duration for each trip.

Trip starting on which day was the longest? 

In [14]:
from pyspark.sql.functions import col, asc,desc

In [15]:
df \
    .withColumn('duration_seconds', df.dropoff_datetime.cast('long')-df.pickup_datetime.cast('long')) \
    .orderBy(col('duration_seconds').desc()) \
    .show()



+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+----------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|duration_seconds|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+----------------+
|           HV0005|              B02510|2021-02-11 13:40:44|2021-02-12 10:39:44|         247|          41|   null|           75540|
|           HV0004|              B02800|2021-02-17 15:54:53|2021-02-18 07:48:34|         242|         254|   null|           57221|
|           HV0004|              B02800|2021-02-20 12:08:15|2021-02-21 00:22:14|         188|          55|   null|           44039|
|           HV0003|              B02864|2021-02-03 20:24:25|2021-02-04 07:41:58|          51|         147|   null|           40653|
|           HV0003|              B02887|2021-02-19 23:17:44|2021-02-20 09:44

                                                                                

In [16]:
spark.sql("""
SELECT
    pickup_datetime, dropoff_datetime,
    (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) AS duration
FROM
    fhvhv
SORT BY
    duration DESC
""").show()

[Stage 7:>                                                          (0 + 1) / 1]

+-------------------+-------------------+--------+
|    pickup_datetime|   dropoff_datetime|duration|
+-------------------+-------------------+--------+
|2021-02-11 13:40:44|2021-02-12 10:39:44|   75540|
|2021-02-17 15:54:53|2021-02-18 07:48:34|   57221|
|2021-02-25 09:18:18|2021-02-25 18:18:57|   32439|
|2021-02-12 06:16:42|2021-02-12 14:39:10|   30148|
|2021-02-10 15:00:54|2021-02-10 22:49:57|   28143|
|2021-02-09 12:40:43|2021-02-09 20:04:03|   26600|
|2021-02-23 08:02:37|2021-02-23 14:49:56|   24439|
|2021-02-24 10:54:08|2021-02-24 17:28:37|   23669|
|2021-02-05 14:09:56|2021-02-05 20:30:46|   22850|
|2021-02-11 12:36:26|2021-02-11 18:50:42|   22456|
|2021-02-17 10:34:15|2021-02-17 16:39:59|   21944|
|2021-02-26 16:34:50|2021-02-26 22:22:23|   20853|
|2021-02-24 13:20:32|2021-02-24 19:01:48|   20476|
|2021-02-10 15:23:08|2021-02-10 21:01:32|   20304|
|2021-02-08 18:26:37|2021-02-08 23:43:52|   19035|
|2021-02-05 11:02:24|2021-02-05 16:17:28|   18904|
|2021-02-16 17:02:02|2021-02-16

                                                                                

### Proposed code

In [None]:
df.columns

# method 1
df \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

# method 2
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration
FROM 
    fhvhv_2021_02
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
""").show()


### Question 4 answer (correct)

```
February 11th
```

## Question 5. Most frequent `dispatching_base_num`

Now find the most frequently occurring `dispatching_base_num` 
in this dataset.

How many stages this spark job has?

> Note: the answer may depend on how you write the query,
> so there are multiple correct answers. 
> Select the one you have.

In [17]:
spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(dispatching_base_num)
FROM
    fhvhv
GROUP BY
    dispatching_base_num
ORDER BY
    2 DESC
""").show()



+--------------------+---------------------------+
|dispatching_base_num|count(dispatching_base_num)|
+--------------------+---------------------------+
|              B02510|                    3233664|
|              B02764|                     965568|
|              B02872|                     882689|
|              B02875|                     685390|
|              B02765|                     559768|
|              B02869|                     429720|
|              B02887|                     322331|
|              B02871|                     312364|
|              B02864|                     311603|
|              B02866|                     311089|
|              B02878|                     305185|
|              B02682|                     303255|
|              B02617|                     274510|
|              B02883|                     251617|
|              B02884|                     244963|
|              B02882|                     232173|
|              B02876|         

                                                                                

### Proposed code

In [None]:
spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(1)
FROM 
    fhvhv_2021_02
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

df \
    .groupBy('dispatching_base_num') \
        .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()

### Question 5 answer (correct)

```
2 stages
```

Note that if you use `LIMIT 5` or any other number, an additional stage will be added.


## Question 6. Most common locations pair

Find the most common pickup-dropoff pair. 

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East". 

In [18]:
df_zones = spark.read.parquet('zones/')

In [56]:
zpu = df_zones \
    .withColumnRenamed('Zone', 'PUzone') \
    .withColumnRenamed('LocationID', 'zPULocationID') \
    .withColumnRenamed('Borough', 'PUBorough') \
    .drop('service_zone')
zdo = df_zones \
    .withColumnRenamed('Zone', 'DOzone') \
    .withColumnRenamed('LocationID', 'zDOLocationID') \
    .withColumnRenamed('Borough', 'DOBorough') \
    .drop('service_zone')

In [57]:
df_join_temp = df.join(zpu, df.PULocationID == zpu.zPULocationID)
df_join = df_join_temp.join(zdo, df_join_temp.DOLocationID == zdo.zDOLocationID)

In [59]:
df_join.drop('PULocationID', 'DOLocationID', 'zPULocationID', 'zDOLocationID').write.parquet('tmp/homework/6')

                                                                                

In [60]:
df_join = spark.read.parquet('tmp/homework/6')
df_join

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, SR_Flag: string, PUBorough: string, PUzone: string, DOBorough: string, DOzone: string]

In [61]:
df_join.registerTempTable('join_table')

In [87]:
spark.sql("""
SELECT
    CONCAT(coalesce(PUzone, 'Unknown'), '/', coalesce(DOzone, 'Unknown')) AS zone_pair,
    COUNT(1)
FROM
    join_table
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT
    1
;
""").show()



+--------------------+--------+
|           zone_pair|count(1)|
+--------------------+--------+
|East New York/Eas...|   45041|
+--------------------+--------+



                                                                                

### Proposed code

In [None]:
df_zones = spark.read.parquet('zones')

df_zones.columns

df.columns

df_zones.registerTempTable('zones')

spark.sql("""
SELECT
    CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,
    COUNT(1)
FROM 
    fhvhv_2021_02 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

### Question 6 answer (correct)

```
East New York / East New York
```

## Bonus question. Join type

(not graded) 

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?

And how many stages your spark job has?

### Bonus question answer

```
inner join

The SQL query has 3 stages.
```

### Actual answer

Broadcast join, because we broadcast the zones to the executors and join on the fly.