In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework') \
    .getOrCreate()

22/02/28 18:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Question 1
pyspark.__version__

'3.0.3'

In [4]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-28 18:55:36--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.88.228
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.88.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-28 18:55:59 (30.6 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [5]:
# Pandas is used to get the schema
import pandas as pd

In [6]:
df_pandas = pd.read_csv('fhvhv_tripdata_2021-02.csv', nrows=100)

In [7]:
spark.createDataFrame(df_pandas).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))

In [9]:
from pyspark.sql import types

In [12]:
schema = types.StructType([
    types.StructField("hvfhs_license_num",types.StringType(),True),
    types.StructField("dispatching_base_num",types.StringType(),True),
    types.StructField("pickup_datetime",types.TimestampType(),True),
    types.StructField("dropoff_datetime",types.TimestampType(),True),
    types.StructField("PULocationID",types.IntegerType(),True),
    types.StructField("DOLocationID",types.IntegerType(),True),
    types.StructField("SR_Flag",types.StringType(),True)
])

In [13]:
df_spark = spark.read \
    .schema(schema) \
    .option("header", "true") \
    .csv("fhvhv_tripdata_2021-02.csv")

In [17]:
df_spark

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string]

In [14]:
df_spark.show()

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [16]:
df_spark.repartition(24).write.parquet('data/pq/hvfhv/2021/02/')

                                                                                

In [18]:
df_spark.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [19]:
from pyspark.sql import functions as F

In [25]:
df_spark = df_spark \
    .withColumn('pickup_date', F.to_date(df_spark.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df_spark.dropoff_datetime))

In [26]:
df_spark.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|dropoff_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null| 2021-02-01|  2021-02-01|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null| 2021-02-01|  2021-02-01|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null| 2021-02-01|  2021-02-01|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null| 2021-02-01|  2021-02-01|
|           HV0003| 

In [42]:
# Question 3 using CSV data

df_spark \
    .select('pickup_date') \
    .filter(df_spark.pickup_date == '2021-02-15') \
    .count()

                                                                                

367170

In [43]:
# Reading the parquet data for the remaining questions:
df = spark.read.parquet('data/pq/hvfhv/2021/02/')

In [44]:
df

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string]

In [45]:
df.registerTempTable('hvfhv_tripdata')

In [None]:
F.datediff()