## Question 1. Install Spark and PySpark


Execute `spark.version`

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/27 11:09:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark.version

'3.0.3'

## Question 2. HVFHW February 2021

Download the HVFHV data for february 2021:

In [4]:
!wget -P data/raw/fhvhv/ https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-26 20:52:00--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.172.169
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.172.169|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘data/raw/fhvhv/fhvhv_tripdata_2021-02.csv’


2022-02-26 20:52:25 (28.2 MB/s) - ‘data/raw/fhvhv/fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('data/raw/fhvhv/fhvhv_tripdata_2021-02.csv')

In [6]:
!head -n 11 data/raw/fhvhv/fhvhv_tripdata_2021-02.csv > head.csv

In [7]:
import pandas as pd

In [8]:
df_pandas = pd.read_csv('head.csv')

In [9]:
spark.createDataFrame(df_pandas).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))

In [10]:
from pyspark.sql import types

In [11]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [12]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/raw/fhvhv/fhvhv_tripdata_2021-02.csv')

In [13]:
!ls -lh data/raw/fhvhv

total 700M
-rw-rw-r-- 1 oseghalepatrick53 oseghalepatrick53 700M Oct 29 18:53 fhvhv_tripdata_2021-02.csv


In [14]:
df \
    .repartition(24)\
    .write.parquet('data/pq/fhvhv/', mode='overwrite')

                                                                                

`What's the size of the folder with results (in MB)?`

In [15]:
!ls -lh data/pq/fhvhv/

total 208M
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53    0 Feb 27 11:14 _SUCCESS
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 8.7M Feb 27 11:13 part-00000-94f70641-8e2a-444f-bfd7-35e21cc82b4b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 8.7M Feb 27 11:13 part-00001-94f70641-8e2a-444f-bfd7-35e21cc82b4b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 8.7M Feb 27 11:13 part-00002-94f70641-8e2a-444f-bfd7-35e21cc82b4b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 8.7M Feb 27 11:13 part-00003-94f70641-8e2a-444f-bfd7-35e21cc82b4b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 8.7M Feb 27 11:13 part-00004-94f70641-8e2a-444f-bfd7-35e21cc82b4b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 8.7M Feb 27 11:13 part-00005-94f70641-8e2a-444f-bfd7-35e21cc82b4b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 8.7M Feb 27 11:13 part-00006-94f70641-8e2a-444

In [16]:
df_fhvhv = spark.read.parquet('data/pq/fhvhv/*')

In [17]:
df_fhvhv.registerTempTable('fhvhv')

## Question 3. Count records
`How many taxi trips were there on February 15?`

Consider only trips that started on February 15.

In [18]:
spark.sql("""
SELECT 
    COUNT(1) AS trips
FROM 
    fhvhv
WHERE 
    pickup_datetime = '2021-02-15 00:00:00'
""").show()

[Stage 6:>                                                          (0 + 4) / 4]

+-----+
|trips|
+-----+
|    5|
+-----+





## Question 4. Longest trip for each day
Now calculate the duration for each trip.

`Trip starting on which day was the longest?`

In [19]:
spark.sql("""
SELECT 
    date_format(pickup_datetime, "yyyy-MM-dd") AS Date,
    max(unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) as max_seconds
FROM 
    fhvhv
GROUP BY
    1
ORDER BY
    2 DESC
""").show(28)



+----------+-----------+
|      Date|max_seconds|
+----------+-----------+
|2021-02-11|      75540|
|2021-02-17|      57221|
|2021-02-20|      44039|
|2021-02-03|      40653|
|2021-02-19|      37577|
|2021-02-25|      35010|
|2021-02-18|      34612|
|2021-02-10|      34169|
|2021-02-21|      32223|
|2021-02-09|      32087|
|2021-02-06|      31447|
|2021-02-02|      30913|
|2021-02-05|      30511|
|2021-02-12|      30148|
|2021-02-08|      30106|
|2021-02-14|      29777|
|2021-02-22|      28278|
|2021-02-27|      27170|
|2021-02-15|      25874|
|2021-02-04|      25592|
|2021-02-16|      25441|
|2021-02-23|      24439|
|2021-02-26|      24422|
|2021-02-24|      23669|
|2021-02-13|      21442|
|2021-02-01|      20638|
|2021-02-28|      19850|
|2021-02-07|      17672|
+----------+-----------+





## Question 5. Most frequent `dispatching_base_num`

Now find the most frequently occurring `dispatching_base_num` in this dataset.

How many stages this spark job has?

In [20]:
spark.sql("""
SELECT 
    dispatching_base_num
FROM (
    SELECT 
        dispatching_base_num,
        count(1)
    FROM 
        fhvhv
    GROUP BY
        1
    ORDER BY
        2 DESC
    LIMIT 
        1
)
""").show()



+--------------------+
|dispatching_base_num|
+--------------------+
|              B02510|
+--------------------+



                                                                                

In [83]:
df_zones = spark.read.parquet('zones/')

In [85]:
df_zones.registerTempTable('zones')

In [99]:
spark.sql("""
SELECT
    pickup_dropoff_zone
FROM (
    SELECT 
        CONCAT(COALESCE(puzone.zone, "Unknown"),'/', COALESCE(dozone.zone, "Unknown")) AS pickup_dropoff_zone,
        COUNT(1) AS trips
    FROM 
        fhvhv
    LEFT JOIN 
        zones AS puzone
        ON fhvhv.PULocationID = puzone.LocationID
    LEFT JOIN
        zones AS dozone
        ON fhvhv.DOLocationID = dozone.LocationID
    GROUP BY
        1
    ORDER BY 
        2 DESC
    LIMIT 1
)
""").show(truncate=False)

[Stage 146:>                                                        (0 + 4) / 4]

+---------------------------+
|pickup_dropoff_zone        |
+---------------------------+
|East New York/East New York|
+---------------------------+



                                                                                