In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
spark.version

'3.5.1'

In [4]:
file_path = './work/fhv_tripdata_2019-10.csv'
tmp_path = './work/head_tmp.csv'
output_path = './work/output/'
zones_path = './work/taxi_zone_lookup.csv'

In [5]:
!wc -l {file_path}

1897494 ./work/fhv_tripdata_2019-10.csv


In [6]:
!head -n 101 {file_path} > {tmp_path}

In [7]:
!head -n 10 {tmp_path}

dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014
B00021         ,2019-10-01 00:00:48,2019-10-01 00:07:12,129,129,,B00021         
B00021         ,2019-10-01 00:47:23,2019-10-01 00:53:25,57,57,,B00021         
B00021         ,2019-10-01 00:10:06,2019-10-01 00:19:50,173,173,,B00021         
B00021         ,2019-10-01 00:51:37,2019-10-01 01:06:14,226,226,,B00021         


In [8]:
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [9]:
import pandas as pd
df_pandas = pd.read_csv(tmp_path)
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID                int64
DOlocationID                int64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [10]:
spark.createDataFrame(df_pandas)

DataFrame[dispatching_base_num: string, pickup_datetime: string, dropOff_datetime: string, PUlocationID: bigint, DOlocationID: bigint, SR_Flag: double, Affiliated_base_number: string]

In [11]:
schema = types.StructType([
	types.StructField('dispatching_base_num', types.StringType(), True),
	types.StructField('pickup_datetime', types.TimestampType(), True),
	types.StructField('dropOff_datetime', types.TimestampType(), True),
	types.StructField('PUlocationID', types.IntegerType(), True),
	types.StructField('DOlocationID', types.IntegerType(), True),
	types.StructField('SR_Flag', types.StringType(), True),
	types.StructField('Affiliated_base_number', types.StringType(), True)
 ])

In [12]:
df = spark.read.schema(schema).csv(file_path, header=True)
    # .csv(file_path, header=True, inferSchema=True)
    

In [13]:
df.head(10)

[Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 13, 22), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 43), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 37, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 56, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 57, 47), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=

In [14]:
from pyspark.sql import functions as F
df_trimmed = df.withColumn('dispatching_base_num', F.trim(F.col('dispatching_base_num')))
df_trimmed.head(10)

[Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 13, 22), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 43), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 37, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 56, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 57, 47), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=

In [15]:
df_repartitioned = df.repartition(6)
df_repartitioned.write.mode('overwrite').parquet(output_path)


In [16]:
df = spark.read.parquet(output_path)
# df.show()
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [17]:
count_15th_oct = df.filter(F.date_format('pickup_datetime', 'yyyy-MM-dd') == '2019-10-15').count()
print("Number of records with pickup_datetime on 15th October:", count_15th_oct)

Number of records with pickup_datetime on 15th October: 62610


In [18]:
df_with_trip_len = df.withColumn("trip_len", (F.col("dropOff_datetime").cast("long") - F.col("pickup_datetime").cast("long")) / 3600)
max_trip_len = df_with_trip_len.agg(F.max("trip_len")).collect()[0][0]
print("Maximum value of trip_len column:", max_trip_len)

Maximum value of trip_len column: 631152.5


In [19]:
df_with_trip_len.select(['pickup_datetime','dropOff_datetime', 'trip_len']).orderBy(F.col("trip_len").desc()).show()

+-------------------+-------------------+------------------+
|    pickup_datetime|   dropOff_datetime|          trip_len|
+-------------------+-------------------+------------------+
|2019-10-11 18:00:00|2091-10-11 18:30:00|          631152.5|
|2019-10-28 09:00:00|2091-10-28 09:30:00|          631152.5|
|2019-10-31 23:46:33|2029-11-01 00:13:00| 87672.44083333333|
|2019-10-01 21:43:42|2027-10-01 21:45:23| 70128.02805555555|
|2019-10-17 14:00:00|2020-10-18 00:00:00|            8794.0|
|2019-10-26 21:26:00|2020-10-26 21:36:00| 8784.166666666666|
|2019-10-30 12:30:04|2019-12-30 13:02:08|1464.5344444444445|
|2019-10-25 07:04:57|2019-12-08 07:54:33|1056.8266666666666|
|2019-10-25 07:04:57|2019-12-08 07:21:11|1056.2705555555556|
|2019-10-01 13:47:17|2019-11-03 15:20:28| 793.5530555555556|
|2019-10-01 07:21:12|2019-11-03 08:44:21| 793.3858333333334|
|2019-10-01 13:41:00|2019-11-03 14:58:51|          793.2975|
|2019-10-01 18:43:20|2019-11-03 19:43:13| 792.9980555555555|
|2019-10-01 18:43:46|201

In [20]:
# df1 = spark.read.csv("file1.csv", header=True, inferSchema=True)
df_zones = spark.read.csv(zones_path, header=True, inferSchema=True)
df_zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [21]:
# joined_df = df1.join(df2, (df_with_trip_len["PUlocationID"] == df_zones["LocationID"]) & (df_with_trip_len["PUlocationID"] == df_zones["LocationID"]), how="inner")
joined_df = df_with_trip_len.join(df_zones, (df_with_trip_len["PUlocationID"] == df_zones["LocationID"]), how="inner")
joined_df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+----------+-------+----+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|          trip_len|LocationID|Borough|Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+----------+-------+----+------------+
|              B00536|2019-10-01 15:08:08|2019-10-01 15:19:09|         264|          85|   NULL|                B00536|0.1836111111111111|       264|Unknown|  NV|         N/A|
|              B01984|2019-10-01 09:27:00|2019-10-01 09:42:00|         264|          11|   NULL|                B01984|              0.25|       264|Unknown|  NV|         N/A|
|              B00037|2019-10-01 07:56:01|2019-10-01 08:12:44|         264|          39|   NULL|                B00037|0

In [22]:
result_df = joined_df.select('Zone').groupBy("Zone").agg(F.count("Zone").alias("Count"))
result_df.orderBy('Count').show()

+--------------------+-----+
|                Zone|Count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows

