In [69]:
import requests
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import asc

In [47]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.conf.set("spark.sql.session.timeZone", "Europe/Berlin")

df = spark.read \
    .option("header", "true") \
    .parquet('download/yellow_tripdata_2024-10.parquet')

df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 02:30:44|  2024-10-01 02:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [54]:
spark.version

'3.3.2'

In [33]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True), StructField('pickup_date', DateT

In [49]:
df =df.withColumn('pickup_date',F.to_date(df.tpep_pickup_datetime))
df =df.withColumn('dropoff_date',F.to_date(df.tpep_dropoff_datetime))
df =df.withColumn('DiffInSeconds',df.tpep_dropoff_datetime.cast("long") - df.tpep_pickup_datetime.cast("long"))
df =df.withColumn('DiffInHours',F.round(df.DiffInSeconds/3600))
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+-------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|dropoff_date|DiffInSeconds|DiffInHours|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+-------------+-----------+
|       2| 2024-10-01 02:30:44|  2

In [3]:
df = df.repartition(4)

In [4]:
df.write.parquet('parquet/10/', mode='overwrite')

In [50]:
df.filter((df.pickup_date=='2024-10-15')).count()

125567

In [53]:
df.sort(df.DiffInSeconds.desc()).first().DiffInHours

163.0

In [56]:
zones = spark.read \
    .option("header", "true") \
    .csv('download/taxi_zone_lookup.csv')

zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [58]:
df_joined = df.join(zones,df.PULocationID==zones.LocationID,"leftouter")
df_joined.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+-------------+-----------+----------+---------+--------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|dropoff_date|DiffInSeconds|DiffInHours|LocationID|  Borough|                Zone|service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+---------

In [None]:
df_zones = df_joined.groupBy(df_joined.Zone).count().sort(asc("count")).show()

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|Governor's Island...|    1|
|       Arden Heights|    2|
|       Rikers Island|    2|
|         Jamaica Bay|    3|
| Green-Wood Cemetery|    3|
|Charleston/Totten...|    4|
|       Port Richmond|    4|
|   Rossville/Woodrow|    4|
|Eltingville/Annad...|    4|
|       West Brighton|    4|
|        Crotona Park|    6|
|         Great Kills|    6|
|Heartland Village...|    7|
|     Mariners Harbor|    7|
|Saint George/New ...|    9|
|             Oakwood|    9|
|New Dorp/Midland ...|   10|
|       Broad Channel|   10|
|         Westerleigh|   12|
|     Pelham Bay Park|   12|
+--------------------+-----+
only showing top 20 rows

