In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as func


In [7]:
!mkdir -p ./data
!wget -P ./data https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-03-02 14:05:07--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.111.67
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.111.67|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘./data/fhvhv_tripdata_2021-02.csv’


2022-03-02 14:06:15 (10.5 MB/s) - ‘./data/fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [8]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [9]:
fhvhv_df = spark.read \
.option("header","true") \
.csv("./data/fhvhv_tripdata_2021-02.csv")

                                                                                

In [10]:
fhvhv_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [11]:
fhvhv_df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [12]:
fhvhv_df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

In [13]:
schema = types.StructType([
	types.StructField('hvfhs_license_num',types.StringType(),True),
	types.StructField('dispatching_base_num',types.StringType(),True),
	types.StructField('pickup_datetime',types.TimestampType(),True),
	types.StructField('dropoff_datetime',types.TimestampType(),True),
	types.StructField('PULocationID',types.IntegerType(),True),
	types.StructField('DOLocationID',types.IntegerType(),True),
	types.StructField('SR_Flag',types.StringType(),True)
])

In [16]:
df = spark.read \
.option("header","true") \
.schema(schema) \
.csv("./data/fhvhv_tripdata_2021-02.csv")

In [17]:
partitioned_df = df.repartition(24)

In [18]:
partitioned_df.write.parquet("./data/parquet_data/",mode='overwrite')

                                                                                

In [20]:
df = spark.read.parquet("./data/parquet_data/")

In [22]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02866|2021-02-05 18:53:32|2021-02-05 19:00:26|          91|         155|   null|
|           HV0003|              B02876|2021-02-03 20:04:21|2021-02-03 20:19:09|          29|         154|   null|
|           HV0003|              B02888|2021-02-04 00:26:03|2021-02-04 00:40:33|         151|         229|   null|
|           HV0003|              B02887|2021-02-05 08:21:19|2021-02-05 08:26:49|         215|         215|   null|
|           HV0003|              B02764|2021-02-01 21:54:16|2021-02-01 22:09:23|          25|          49|   null|
|           HV0003|              B02872|2021-02-06 03:20:12|2021-02-06 03:30:25|

In [21]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [23]:
df.count()

11613942

In [25]:
df.createOrReplaceTempView("hvfhw_2021_02")

In [26]:
df.show(15)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02866|2021-02-05 18:53:32|2021-02-05 19:00:26|          91|         155|   null|
|           HV0003|              B02876|2021-02-03 20:04:21|2021-02-03 20:19:09|          29|         154|   null|
|           HV0003|              B02888|2021-02-04 00:26:03|2021-02-04 00:40:33|         151|         229|   null|
|           HV0003|              B02887|2021-02-05 08:21:19|2021-02-05 08:26:49|         215|         215|   null|
|           HV0003|              B02764|2021-02-01 21:54:16|2021-02-01 22:09:23|          25|          49|   null|
|           HV0003|              B02872|2021-02-06 03:20:12|2021-02-06 03:30:25|

In [27]:
spark.sql("""
select count(1) from hvfhw_2021_02
where pickup_datetime >= '2021-02-15 00:00:00' and pickup_datetime <= '2021-02-15 23:59:59'
""").show()

[Stage 11:>                                                         (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

In [28]:
timeformat="yyyy-MM-dd'T'HH:mm:ss.SSS"

In [29]:
df \
 .withColumn('pickup_date',func.to_date(df.pickup_datetime)) \
 .withColumn('dropoff_date',func.to_date(df.dropoff_datetime)) \
 .withColumn('duration',(func.unix_timestamp(df.dropoff_datetime, format=timeformat) - func.unix_timestamp(df.pickup_datetime, format=timeformat))) \
 .select('pickup_date','dropoff_date','duration') \
 .orderBy('duration',ascending=False) \
 .show()

                                                                                

+-----------+------------+--------+
|pickup_date|dropoff_date|duration|
+-----------+------------+--------+
| 2021-02-11|  2021-02-12|   75540|
| 2021-02-17|  2021-02-18|   57221|
| 2021-02-20|  2021-02-21|   44039|
| 2021-02-03|  2021-02-04|   40653|
| 2021-02-19|  2021-02-20|   37577|
| 2021-02-25|  2021-02-26|   35010|
| 2021-02-20|  2021-02-20|   34806|
| 2021-02-18|  2021-02-19|   34612|
| 2021-02-18|  2021-02-18|   34555|
| 2021-02-10|  2021-02-11|   34169|
| 2021-02-10|  2021-02-10|   32476|
| 2021-02-25|  2021-02-25|   32439|
| 2021-02-21|  2021-02-22|   32223|
| 2021-02-09|  2021-02-10|   32087|
| 2021-02-06|  2021-02-06|   31447|
| 2021-02-02|  2021-02-02|   30913|
| 2021-02-10|  2021-02-10|   30856|
| 2021-02-09|  2021-02-09|   30732|
| 2021-02-21|  2021-02-22|   30660|
| 2021-02-05|  2021-02-06|   30511|
+-----------+------------+--------+
only showing top 20 rows



In [35]:
df_most_frequent_dispatching = spark.sql("""
select dispatching_base_num,
     COUNT(dispatching_base_num) AS total_dispatches
     FROM hvfhw_2021_02
     GROUP BY dispatching_base_num
     ORDER by COUNT(dispatching_base_num) desc
""").show()



+--------------------+----------------+
|dispatching_base_num|total_dispatches|
+--------------------+----------------+
|              B02510|         3233664|
|              B02764|          965568|
|              B02872|          882689|
|              B02875|          685390|
|              B02765|          559768|
|              B02869|          429720|
|              B02887|          322331|
|              B02871|          312364|
|              B02864|          311603|
|              B02866|          311089|
|              B02878|          305185|
|              B02682|          303255|
|              B02617|          274510|
|              B02883|          251617|
|              B02884|          244963|
|              B02882|          232173|
|              B02876|          215693|
|              B02879|          210137|
|              B02867|          200530|
|              B02877|          198938|
+--------------------+----------------+
only showing top 20 rows



                                                                                

In [36]:
df_zones = spark.read \
.option("header","true") \
.csv("./taxi+_zone_lookup.csv")

In [38]:
df_zones.write.parquet("./data/parquet_zones",mode='overwrite')

In [39]:
df_zones = spark.read.parquet("./data/parquet_zones")

In [40]:
df_zones.createOrReplaceTempView("common_zones")

In [41]:
df_zone_most_common = spark.sql("""
SELECT CONCAT(pz.Zone,'/',dz.Zone) AS pickuplocation_and_droplocation,COUNT(CONCAT(pz.Zone,'/',dz.Zone)) AS total
FROM hvfhw_2021_02
INNER JOIN common_zones AS pz on hvfhw_2021_02.PULocationID = pz.LocationID
INNER JOIN common_zones AS dz on hvfhw_2021_02.DOLocationID = dz.LocationID
GROUP BY pz.Zone,dz.Zone
ORDER BY COUNT(CONCAT(pz.Zone,'/',dz.Zone)) DESC
""")

In [42]:
df_zone_most_common.write.parquet("./data/most_common_zones/",mode="overwrite")

                                                                                

In [44]:
df_zone_most_common = spark.read.parquet("./data/most_common_zones/")

In [46]:
df_zone_most_common.head(5)

[Row(pickuplocation_and_droplocation='East New York/East New York', total=45041),
 Row(pickuplocation_and_droplocation='Borough Park/Borough Park', total=37329),
 Row(pickuplocation_and_droplocation='Canarsie/Canarsie', total=28026),
 Row(pickuplocation_and_droplocation='Crown Heights North/Crown Heights North', total=25976),
 Row(pickuplocation_and_droplocation='Bay Ridge/Bay Ridge', total=17934)]