### Question 1. Install Spark and PySpark

In [22]:
import findspark
findspark.init()

In [23]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [24]:
pyspark.__version__

'3.0.3'

### Question 2. HVFHW February 2021
Download the HVFHV data for february 2021:

`wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv`

Read it with Spark using the same schema as we did in the lessons. We will use this dataset for all the remaining questions.

Repartition it to 24 partitions and save it to parquet.

What's the size of the folder with results (in MB)?

In [None]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

In [59]:
from pyspark.sql import types

# define schema
taxi_schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [60]:
# read data with spark
df = spark.read \
    .option("header", "true") \
    .schema(taxi_schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [None]:
# repartition data and parquetize
df.repartition(24) \
    .write.parquet('fhvhv/2021/02/')

22/03/02 08:03:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/03/02 08:03:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/03/02 08:03:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

### Question 3. Count Records
How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [61]:
df = spark.read.parquet('fhvhv/2021/02/')

In [27]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [35]:
from pyspark.sql import functions as F

df = df.withColumn('pickup_date', F.to_date(df.pickup_datetime))


In [38]:
df_feb15 = df.filter(df.pickup_date == '2021-02-15')
df_feb15.count()

                                                                                

367170

### Question 4. Longest trip for each day
Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [None]:
df.registerTempTable('fhv_feb')

In [40]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'pickup_date']

In [49]:
spark.sql('''
SELECT
    hvfhs_license_num,
    pickup_datetime,
    dropoff_datetime,
    (CAST(dropoff_datetime AS long) - CAST(pickup_datetime AS long))/60 AS trip_duration
FROM
    fhv_feb
ORDER BY trip_duration DESC
''').show()

[Stage 16:>                                                         (0 + 8) / 8]

+-----------------+-------------------+-------------------+------------------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|     trip_duration|
+-----------------+-------------------+-------------------+------------------+
|           HV0005|2021-02-11 13:40:44|2021-02-12 10:39:44|            1259.0|
|           HV0004|2021-02-17 15:54:53|2021-02-18 07:48:34| 953.6833333333333|
|           HV0004|2021-02-20 12:08:15|2021-02-21 00:22:14| 733.9833333333333|
|           HV0003|2021-02-03 20:24:25|2021-02-04 07:41:58|            677.55|
|           HV0003|2021-02-19 23:17:44|2021-02-20 09:44:01| 626.2833333333333|
|           HV0003|2021-02-25 17:13:35|2021-02-26 02:57:05|             583.5|
|           HV0003|2021-02-20 01:36:13|2021-02-20 11:16:19|             580.1|
|           HV0005|2021-02-18 15:24:19|2021-02-19 01:01:11| 576.8666666666667|
|           HV0003|2021-02-18 01:31:20|2021-02-18 11:07:15| 575.9166666666666|
|           HV0005|2021-02-10 20:51:39|2021-02-11 06

                                                                                

### Question 5. Most frequent dispatching_base_num

Now find the most frequently occurring dispatching_base_num in this dataset.

How many stages this spark job has?

In [52]:
spark.sql('''
SELECT
    dispatching_base_num,
    count(*) AS total
FROM
    fhv_feb
GROUP BY dispatching_base_num
ORDER BY total DESC
''').show()

[Stage 29:>                                                         (0 + 8) / 8]

+--------------------+-------+
|dispatching_base_num|  total|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



                                                                                

### Question 6. Most common locations pair

Find the most common pickup-dropoff pair.

In [53]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-03-02 10:33:20--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.165.224
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.165.224|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2022-03-02 10:33:22 (41.4 KB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [64]:
df_zone = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('taxi+_zone_lookup.csv')

df.printSchema()
df_zone.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [65]:
df.registerTempTable('trip_table')
df_zone.registerTempTable('zone_table')

In [77]:
spark.sql('''
SELECT
    z1.Zone AS pickup_zone,
    z2.Zone AS dropoff_zone,
    count(*) AS frequency
FROM
    trip_table
LEFT JOIN
    zone_table AS z1
ON (trip_table.PULocationID = z1.LocationID)
LEFT JOIN
    zone_table AS z2
ON (trip_table.DOLocationID = z2.LocationID)
GROUP BY 
    z1.Zone, z2.Zone
ORDER BY 3 DESC
''').show()



+--------------------+--------------------+---------+
|         pickup_zone|        dropoff_zone|frequency|
+--------------------+--------------------+---------+
|       East New York|       East New York|    45041|
|        Borough Park|        Borough Park|    37329|
|            Canarsie|            Canarsie|    28026|
| Crown Heights North| Crown Heights North|    25976|
|           Bay Ridge|           Bay Ridge|    17934|
|             Astoria|             Astoria|    14688|
|     Jackson Heights|     Jackson Heights|    14688|
|Central Harlem North|Central Harlem North|    14481|
|      Bushwick South|      Bushwick South|    14424|
|Flatbush/Ditmas Park|Flatbush/Ditmas Park|    13976|
|    South Ozone Park|    South Ozone Park|    13716|
|         Brownsville|         Brownsville|    12829|
|         JFK Airport|                  NA|    12542|
|Prospect-Lefferts...| Crown Heights North|    11814|
|        Forest Hills|        Forest Hills|    11548|
|      Bushwick North|      

                                                                                