In [1]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 10:41:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [1]:
# download the parquet data
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-06 13:32:57--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.239.238.119, 18.239.238.212, 18.239.238.133, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.239.238.119|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-06 13:33:04 (8.20 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [7]:
# Read the parquet file into spark dataframe
df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

In [15]:
import pandas as pd

In [17]:
# Load the entire Parquet file
df = pd.read_parquet("yellow_tripdata_2024-10.parquet")

# Save the DataFrame as a CSV file
df.to_csv("yellow_tripdata_2024-10.csv", index=False)

In [23]:
# Extract a sample data and store in a file called head
!head -n 1001 yellow_tripdata_2024-10.csv > head.csv

In [25]:
# Read the head file
df_pandas = pd.read_csv('head.csv')

In [27]:
# Get the data types 
df_pandas.dtypes

VendorID                   int64
tpep_pickup_datetime      object
tpep_dropoff_datetime     object
passenger_count          float64
trip_distance            float64
RatecodeID               float64
store_and_fwd_flag        object
PULocationID               int64
DOLocationID               int64
payment_type               int64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
improvement_surcharge    float64
total_amount             float64
congestion_surcharge     float64
Airport_fee              float64
dtype: object

In [29]:
# Get the schema using spark dataframe
spark.createDataFrame(df_pandas).schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [31]:
from pyspark.sql import types

In [37]:
# Define schema manually
schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.DoubleType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.DoubleType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True),
    types.StructField("Airport_fee", types.DoubleType(), True)
])

In [39]:
# Read the data in the dataframe with the defined schema
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('yellow_tripdata_2024-10.csv')

In [43]:
# Split the file into partitions
df = df.repartition(4)

In [45]:
# Save the partitioned dataframe to parquet
output_path = f'data/pq/yellow_tripdata_2024-10/'
df.write.parquet(output_path)

                                                                                

In [47]:
# Check the number of partitions used 
df.rdd.getNumPartitions()



4

In [None]:
# Check the average size of the parquet file created
import os
import glob

# Path to the written Parquet files
output_path = f'data/pq/yellow_tripdata_2024-10/'

# Get a list of all Parquet files in the directory
parquet_files = glob.glob(os.path.join(output_path, "*.parquet"))

# Calculate total size of all Parquet files
total_size_bytes = sum(os.path.getsize(f) for f in parquet_files)

# Number of Parquet files (partitions)
num_files = len(parquet_files)

# Calculate average file size (convert from bytes to MB)
average_size_mb = (total_size_bytes / num_files) / (1024 * 1024) if num_files > 0 else 0

# Print results
print(f"Total Size: {total_size_bytes / (1024 * 1024):.2f} MB")
print(f"Number of Parquet Files: {num_files}")
print(f"Average Parquet File Size: {average_size_mb:.2f} MB")