## Spark version

In [11]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

pyspark.__version__

'3.5.0'

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## FHV October 2019 partition size

In [15]:
from pyspark.sql import types

schema = types.StructType(spark.createDataFrame(pd.read_csv('fhv_tripdata_2019-10.csv', nrows=1000)).schema)

In [16]:
df = spark.read.option("header", "true").schema(schema).csv('fhv_tripdata_2019-10.csv')

In [18]:
df = df.repartition(6)
df.write.parquet('partitions/')

In [24]:
import os

directory_path = 'partitions/'

for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    if os.path.isfile(file_path) and filename.endswith('.parquet'):
        file_size_bytes = os.path.getsize(file_path)
        file_size_mb = file_size_bytes / 1024 ** 2
        print(f"The size of the file '{os.path.basename(file_path)}' is {file_size_mb:.2f} MB")

The size of the file 'part-00005-f24464ca-7ae1-4f6a-b438-f79644d8edc3-c000.snappy.parquet' is 5.66 MB
The size of the file 'part-00000-f24464ca-7ae1-4f6a-b438-f79644d8edc3-c000.snappy.parquet' is 5.66 MB
The size of the file 'part-00002-f24464ca-7ae1-4f6a-b438-f79644d8edc3-c000.snappy.parquet' is 5.66 MB
The size of the file 'part-00003-f24464ca-7ae1-4f6a-b438-f79644d8edc3-c000.snappy.parquet' is 5.65 MB
The size of the file 'part-00004-f24464ca-7ae1-4f6a-b438-f79644d8edc3-c000.snappy.parquet' is 5.67 MB
The size of the file 'part-00001-f24464ca-7ae1-4f6a-b438-f79644d8edc3-c000.snappy.parquet' is 5.66 MB


## Count records

In [39]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [40]:
df = df.withColumn("pickup_date", col("pickup_datetime").cast("date"))
october_15_trips = df.filter(col("pickup_date") == "2019-10-15")
october_15_trips.count()

62610

## Longest trip for each day

In [42]:
from pyspark.sql.functions import col, unix_timestamp

In [47]:
df = df.withColumn("pickup_timestamp", unix_timestamp(col("pickup_datetime")))
df = df.withColumn("dropOff_timestamp", unix_timestamp(col("dropOff_datetime")))
df = df.withColumn("trip_duration", col("dropOff_timestamp") - col("pickup_timestamp"))
max_duration_seconds = df.agg({"trip_duration": "max"}).collect()[0][0]
max_duration_hours = max_duration_seconds / 3600
print(f"Length of the longest trip in hours: {max_duration_hours}")

Length of the longest trip in hours: 631152.5


## Least frequent pickup location zone

In [48]:
pickup_counts = df.groupBy("PUlocationID").count()
least_frequent_pickup_zone = pickup_counts.orderBy(col("count")).limit(1).select("PUlocationID").collect()[0][0]
print(f"Least frequent pickup location zone: {least_frequent_pickup_zone}")

Least frequent pickup location zone: 2.0


In [53]:
zone_lookup_df = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)
zone_lookup_df.createOrReplaceTempView("taxi_zone_lookup")
zone_lookup_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly