## Question 1. Install Spark and PySpark

In [7]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

In [8]:
spark.version

'3.0.3'

## Question 2. HVFHW February 2021

In [9]:
# Downloading the HVFHV data for february 2021

!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet

--2022-07-24 18:31:39--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.67.3.63, 18.67.3.5, 18.67.3.108, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.67.3.63|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 302633211 (289M) [application/x-www-form-urlencoded]
Saving to: ‘fhvhv_tripdata_2021-02.parquet.1’


2022-07-24 18:31:52 (23.4 MB/s) - ‘fhvhv_tripdata_2021-02.parquet.1’ saved [302633211/302633211]



In [10]:
# Defining schema

from pyspark.sql import types

schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('originating_base_num', types.StringType(), True),
    types.StructField('request_datetime', types.TimestampType(), True),
    types.StructField('on_scene_datetime', types.TimestampType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.LongType(), True),
    types.StructField('DOLocationID', types.LongType(), True),
    types.StructField('trip_miles', types.DoubleType(), True),
    types.StructField('trip_time', types.LongType(), True),
    types.StructField('base_passenger_fare', types.DoubleType(), True),
    types.StructField('tolls', types.DoubleType(), True),
    types.StructField('bcf', types.DoubleType(), True),
    types.StructField('sales_tax', types.DoubleType(), True),
    types.StructField('congestion_surcharge', types.DoubleType(), True),
    types.StructField('airport_fee', types.DoubleType(), True),
    types.StructField('tips', types.DoubleType(), True),
    types.StructField('driver_pay', types.DoubleType(), True),
    types.StructField('shared_request_flag', types.StringType(), True),
    types.StructField('shared_match_flag', types.StringType(), True),
    types.StructField('access_a_ride_flag', types.StringType(), True),
    types.StructField('wav_request_flag', types.StringType(), True),
    types.StructField('wav_match_flag', types.StringType(), True)
])

In [11]:
from pyspark.sql import functions as F

# Reading parquet file with a defined schema
df = spark.read \
    .option('header', 'true') \
    .schema(schema) \
    .parquet('fhvhv_tripdata_2021-02.parquet')

# Selecting columns of interest
df = df.select([
    'hvfhs_license_num'
    ,'dispatching_base_num'
    ,'pickup_datetime'
    ,'dropoff_datetime'
    ,'PULocationID'
    ,'DOLocationID'
])

# Casting from long to integer
df = df.withColumn('PULocationID',F.col('PULocationID').cast('integer')) \
       .withColumn('DOLocationID',F.col('DOLocationID').cast('integer'))

In [12]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 10, 40), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 21, 9), PULocationID=35, DOLocationID=39),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 27, 23), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 44, 1), PULocationID=39, DOLocationID=35),
 Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 28, 38), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 38, 27), PULocationID=39, DOLocationID=91),
 Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 43, 37), dropoff_datetime=datetime.datetime(2021, 2, 1, 1, 23, 20), PULocationID=91, DOLocationID=228),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02872', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 8, 42), dropoff_datetime

In [13]:
df = df.repartition(24)

In [14]:
df.write.parquet('fhvhv/2021/02/', mode='overwrite')

                                                                                

In [15]:
!ls -lh fhvhv/2021/02/

total 219M
-rw-r--r-- 1 paulooctavio paulooctavio    0 Jul 24 18:32 _SUCCESS
-rw-r--r-- 1 paulooctavio paulooctavio 9.1M Jul 24 18:32 part-00000-e999ead8-050a-477c-bb88-83ada34a2cdf-c000.snappy.parquet
-rw-r--r-- 1 paulooctavio paulooctavio 9.1M Jul 24 18:32 part-00001-e999ead8-050a-477c-bb88-83ada34a2cdf-c000.snappy.parquet
-rw-r--r-- 1 paulooctavio paulooctavio 9.1M Jul 24 18:32 part-00002-e999ead8-050a-477c-bb88-83ada34a2cdf-c000.snappy.parquet
-rw-r--r-- 1 paulooctavio paulooctavio 9.1M Jul 24 18:32 part-00003-e999ead8-050a-477c-bb88-83ada34a2cdf-c000.snappy.parquet
-rw-r--r-- 1 paulooctavio paulooctavio 9.1M Jul 24 18:32 part-00004-e999ead8-050a-477c-bb88-83ada34a2cdf-c000.snappy.parquet
-rw-r--r-- 1 paulooctavio paulooctavio 9.1M Jul 24 18:32 part-00005-e999ead8-050a-477c-bb88-83ada34a2cdf-c000.snappy.parquet
-rw-r--r-- 1 paulooctavio paulooctavio 9.1M Jul 24 18:32 part-00006-e999ead8-050a-477c-bb88-83ada34a2cdf-c000.snappy.parquet
-rw-r--r-- 1 paulooctavio paulooctavio 

## Question 3. Count records

In [16]:
# How many taxi trips were there on February 15?

df = spark.read.parquet('fhvhv/2021/02/')

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-02-15'") \
    .count()

                                                                                

367170

## Question 4. Longest trip for each day

In [47]:
from datetime import datetime

pickup_datetime=datetime(2021, 2, 1, 0, 10, 40)
dropoff_datetime=datetime(2021, 2, 1, 0, 21, 9)
print((dropoff_datetime - pickup_datetime).seconds)

629


In [48]:
# Defining function to calculate trip duration
def trip_duration(pickup_datetime, dropoff_datetime):
    return (dropoff_datetime - pickup_datetime).seconds

# Creating user defined function
trip_duration_udf = F.udf(trip_duration, returnType=types.IntegerType())

In [49]:
df = df.withColumn('trip_duration', trip_duration_udf(df.pickup_datetime, df.dropoff_datetime))

In [50]:
df.registerTempTable("fhvhv_tripdata")

In [51]:
# Trip starting on which day was the longest?

query = """
    SELECT 
        DATE(pickup_datetime) as pickup_date
        ,MAX(trip_duration) as trip_duration
    FROM 
        fhvhv_tripdata 
    GROUP BY 
        pickup_date
    ORDER BY 
        trip_duration DESC 
    LIMIT 5
"""

spark.sql(query).show()



+-----------+-------------+
|pickup_date|trip_duration|
+-----------+-------------+
| 2021-02-11|        75540|
| 2021-02-17|        57221|
| 2021-02-20|        44039|
| 2021-02-03|        40653|
| 2021-02-19|        37577|
+-----------+-------------+



                                                                                

## Question 5. Most frequent dispatching_base_num

In [115]:
# Find the most frequently occurring dispatching_base_num in this dataset.

query = """
    SELECT
         dispatching_base_num
        ,COUNT(1)
    FROM fhvhv_tripdata t1
    GROUP BY dispatching_base_num
    ORDER BY COUNT(1) DESC LIMIT 1
"""

spark.sql(query).show()

[Stage 136:>                                                        (0 + 4) / 4]

+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3233664|
+--------------------+--------+



                                                                                

## Question 6. Most common locations pair

In [52]:
# Readind zones lookup table
df_zones = spark.read.parquet('/home/paulooctavio/data-engineering-zoomcamp/week_5_batch_processing/code/zones')
df_zones.registerTempTable("zones")

# Join the zones table to the tripdata table
query = """
    SELECT
        coalesce(pu.Zone, 'Unknown') || ' / ' || coalesce(do.Zone, 'Unknown') as pickup_dropoff
        ,COUNT(1) as count
    FROM 
        fhvhv_tripdata fhv
    LEFT JOIN 
        zones pu ON fhv.PULocationID = pu.LocationID
    LEFT JOIN 
        zones do ON fhv.PULocationID = do.LocationID
    GROUP BY 
        1
    ORDER BY 
        2 DESC 
    LIMIT 5
"""

spark.sql(query).take(5)

                                                                                

[Row(pickup_dropoff='Crown Heights North / Crown Heights North', count=203777),
 Row(pickup_dropoff='East New York / East New York', count=166959),
 Row(pickup_dropoff='Bushwick South / Bushwick South', count=140636),
 Row(pickup_dropoff='East Village / East Village', count=137901),
 Row(pickup_dropoff='Central Harlem North / Central Harlem North', count=137246)]