In [1]:
import pyspark
import os
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework_module5') \
    .getOrCreate()

In [7]:
spark.version

'3.5.4'

In [4]:
from pyspark.sql import types

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.LongType(), True),  # Changed from IntegerType to LongType
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.LongType(), True),  # Changed from IntegerType to LongType
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.LongType(), True),  # Changed from IntegerType to LongType
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])


In [5]:
df = spark.read \
    .option("header", "true") \
    .schema(yellow_schema) \
    .parquet('yellow_tripdata_2024-10.parquet')


In [None]:

df = df.repartition(4)


In [8]:

df.write.parquet('yellow_tripdata_2024-10_repartitioned.parquet')

In [None]:


# Path to the output Parquet directory
output_dir = 'yellow_tripdata_2024-10_repartitioned.parquet'

# Get a list of all Parquet files in the output directory (there will be 4 files)
parquet_files = [f for f in os.listdir(output_dir) if f.endswith('.parquet')]

# Get the size of each file in bytes
file_sizes = [os.path.getsize(os.path.join(output_dir, f)) for f in parquet_files]

# Convert the size to MB and calculate the average size
total_size = sum(file_sizes)
num_files = len(file_sizes)
average_size_mb = total_size / num_files / (1024 * 1024)

print(f"Average size of each partitioned Parquet file: {average_size_mb:.2f} MB")


Average size of each partitioned Parquet file: 24.08 MB


In [12]:
from pyspark.sql import functions as F

In [14]:
df \
    .withColumn('tpep_pickup_date', F.to_date(df.tpep_pickup_datetime)) \
    .filter("tpep_pickup_date = '2024-10-15'") \
    .count()

119103

In [None]:

# Convert tpep_pickup_datetime to a date format and filter for October 15, 2024
oct_15_trips = df.filter(to_date(col("tpep_pickup_datetime")) == "2024-10-15")

# Count the number of trips that started on the 15th of October
num_trips = oct_15_trips.count()

print(f"Number of taxi trips on 15th October 2024: {num_trips}")


Number of taxi trips on 15th October 2024: 119103


In [16]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [17]:
from pyspark.sql.functions import col, unix_timestamp

# Calculate the trip duration in seconds
df_with_duration = df.withColumn("trip_duration_seconds", 
                                 (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))))

# Convert the trip duration from seconds to hours
df_with_duration = df_with_duration.withColumn("trip_duration_hours", col("trip_duration_seconds") / 3600)

# Find the longest trip duration in hours
longest_trip_hours = df_with_duration.agg({"trip_duration_hours": "max"}).collect()[0][0]

print(f"The length of the longest trip is {longest_trip_hours:.2f} hours.")


The length of the longest trip is 162.62 hours.
