In [None]:
import pyspark
from pyspark.sql import types
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
spark.version

'3.3.2'

In [6]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [7]:
df_trip_data = spark.read \
      .option("header", "true")\
      .option("inferSchema", "true")\
      .csv('data/fhvhv_tripdata_2021-06.csv')

                                                                                

In [6]:
df_trip_data.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [8]:
df_trip_data.head(5)

[Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 16, 16), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 21, 14), PULocationID=32, DOLocationID=254, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 27, 1), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 42, 11), PULocationID=240, DOLocationID=127, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 46, 8), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 53, 45), PULocationID=127, DOLocationID=235, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02510', pickup_datetime=datetime.d

In [None]:
df_trip_data = df_trip_data.repartition(12)
df_trip_data.write.parquet('data/2021/06/')

In [None]:
# How many taxi trips were there on June 15?

In [9]:
df_trip_data.createOrReplaceTempView('trips_data')

In [10]:
df_result = spark.sql("""
SELECT 
   count(*)
FROM
    trips_data
WHERE date(pickup_datetime) = "2021-06-15"
""")
df_result.show()

[Stage 3:>                                                          (0 + 8) / 8]

+--------+
|count(1)|
+--------+
|  452470|
+--------+





In [11]:
# Now calculate the duration for each trip.How long was the longest trip in Hours?

In [12]:
from datetime import datetime
def compute_duration(pickup_datetime, dropoff_datetime):
    duration = abs(dropoff_datetime - pickup_datetime)
    duration_in_s = duration.total_seconds()
    hours = divmod(duration_in_s, 3600)[0]
    minutes = divmod(duration_in_s, 60)[0]        # Seconds in a minute = 60
    return hours
    

In [13]:
compute_duration(datetime(2021, 6, 1, 0, 7, 46), datetime(2021, 6, 1, 0, 2, 41))

0.0

In [14]:
compute_duration_udf = F.udf(compute_duration)

In [15]:
df_trip_data \
    .withColumn("duration", (F.col("dropoff_datetime").cast("long") - F.col("pickup_datetime").cast("long"))/3600)\
    .select('duration')\
	.agg({"duration": "max"})\
    .collect()[0]

                                                                                

Row(max(duration)=66.8788888888889)

In [None]:
df_trip_data \
    .withColumn('duration', compute_duration_udf(df_trip_data.dropoff_datetime,  df_trip_data.pickup_datetime)) \
    .select('pickup_datetime', 'dropoff_datetime', 'duration', 'PULocationID', 'DOLocationID') \
    .orderBy(col("duration"))\
    .show()

In [None]:
# Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

In [16]:
df_zone_lookup = spark.read \
      .option("header", "true")\
      .option("inferSchema", "true")\
      .csv('data/taxi_zone_lookup.csv')

In [17]:
df_zone_lookup.createOrReplaceTempView('zone_lookup')

In [18]:
df_zone_lookup.head(5)

[Row(LocationID=1, Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID=2, Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID=3, Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID=4, Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID=5, Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [19]:
df_trip_data.head(5)

[Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 16, 16), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 21, 14), PULocationID=32, DOLocationID=254, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 27, 1), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 42, 11), PULocationID=240, DOLocationID=127, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 46, 8), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 53, 45), PULocationID=127, DOLocationID=235, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02510', pickup_datetime=datetime.d

In [26]:
df_join = df_trip_data.join(df_zone_lookup, df_trip_data.PULocationID == df_zone_lookup.LocationID)

In [29]:
df_join.head(3)

[Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag='N', Affiliated_base_number='B02764', LocationID=174, Borough='Bronx', Zone='Norwood', service_zone='Boro Zone'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 16, 16), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 21, 14), PULocationID=32, DOLocationID=254, SR_Flag='N', Affiliated_base_number='B02764', LocationID=32, Borough='Bronx', Zone='Bronxdale', service_zone='Boro Zone'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 27, 1), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 42, 11), PULocationID=240, DOLocationID=127, SR_Flag='N', Affiliated_base_number='B02764', LocationID=240, Borough='Bronx', Zone='Van Cortlandt Park', service_zone='Boro Zone')]

In [37]:
df_join.createOrReplaceTempView('trips_data_zone_looup')
df_join_result = spark.sql("""
SELECT 
   Zone,
   count(PULocationID)as nb_pickup
FROM
    trips_data_zone_looup
GROUP BY
1
ORDER BY nb_pickup desc
""")
df_join_result.show()

[Stage 42:>                                                         (0 + 8) / 8]

+--------------------+---------+
|                Zone|nb_pickup|
+--------------------+---------+
| Crown Heights North|   231279|
|        East Village|   221244|
|         JFK Airport|   188867|
|      Bushwick South|   187929|
|       East New York|   186780|
|TriBeCa/Civic Center|   164344|
|   LaGuardia Airport|   161596|
|            Union Sq|   158937|
|        West Village|   154698|
|             Astoria|   152493|
|     Lower East Side|   151020|
|        East Chelsea|   147673|
|Central Harlem North|   146402|
|Williamsburg (Nor...|   143683|
|          Park Slope|   143594|
|  Stuyvesant Heights|   141427|
|        Clinton East|   139611|
|West Chelsea/Huds...|   139431|
|             Bedford|   138428|
|         Murray Hill|   137879|
+--------------------+---------+
only showing top 20 rows



                                                                                