The code below downloads the libraries needed for this project.

In [1]:
%pip install -U pandas pyarrow "modin[ray]" "modin[dask]" "dask[complete]" memory_profiler pyyaml

Note: you may need to restart the kernel to use updated packages.


The code below imports the libraries needed for this project.

In [2]:
import os
import time
import yaml
from memory_profiler import memory_usage
import pandas as pd
import dask.dataframe as dd
import ray
import modin.pandas as mpd
import modin.config as modin_cfg

The code below provides functions to run on each framework to test its computational efficiency and perform basic validation on data columns.

In [3]:
# Runs test on time taken and peak memory usage
def performance(func, *args, **kwargs):
    start = time.time()
    memUsage, result = memory_usage((func, args, kwargs), retval = True, max_iterations = 1, interval = 0.1)
    end = time.time()

    runtime = end - start
    peakMemory = max(memUsage)

    print(f"Time taken: {runtime:.2f} seconds")
    print(f"Peak memory usage: {peakMemory:.2f} MB\n")

    return result

# Cleans dataframe by removing special characters, null values,
# and white spaces from the column name
def cleanData(df):
    df.columns = df.columns.str.strip().str.lower()
    df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_', regex = True)
    df.dropna(axis = 1, how = "all", inplace = True)

    return df

# Converts CSV file to gz zipped file using pipe (|) as separator
def parquetToGz(file):
    df = pd.read_parquet(file)
    df.to_csv(file.replace(".parquet", ".gz"), sep = "|", index = False, compression = "gzip")

The code below analyzes Pandas' framework and validates the data.

In [4]:
# Prints Pandas' Framework Performance
print("Pandas Framework Performance: ")
pandasDf = performance(pd.read_parquet, "yellow_tripdata_2025-01.parquet")

# Cleans Pandas' Framework Dataframe
cleanData(pandasDf)

Pandas Framework Performance: 
Time taken: 0.34 seconds
Peak memory usage: 1261.55 MB



Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.00,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.00
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.10,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.00
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.10,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.00
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.20,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.00
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.80,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3475221,2,2025-01-31 23:01:48,2025-01-31 23:16:29,,3.35,,,79,237,0,15.85,0.0,0.5,0.00,0.0,1.0,20.60,,,0.75
3475222,2,2025-01-31 23:50:29,2025-02-01 00:17:27,,8.73,,,161,116,0,28.14,0.0,0.5,0.00,0.0,1.0,32.89,,,0.75
3475223,2,2025-01-31 23:26:59,2025-01-31 23:43:01,,2.64,,,144,246,0,14.91,0.0,0.5,0.00,0.0,1.0,19.66,,,0.75
3475224,2,2025-01-31 23:14:34,2025-01-31 23:34:52,,3.16,,,142,107,0,17.55,0.0,0.5,0.00,0.0,1.0,22.30,,,0.75


The code below analyzes Ray's framework and validates the data.

In [5]:
# Shutdown Ray Runtime
ray.shutdown()

# Initialize Ray to Start the Ray Runtime
ray.init(ignore_reinit_error = True)

# Performance Test on Ray
print("Ray Performance: ")
rayDf = performance(ray.data.read_parquet, "yellow_tripdata_2025-01.parquet")

# Convert Ray Dataset to Pandas DataFrame
rayDf = rayDf.to_pandas()

# Cleans Ray's Framework Dataframe
cleanData(rayDf)

2025-05-09 20:56:36,599	INFO worker.py:1888 -- Started a local Ray instance.


Ray Performance: 


Parquet Files Sample 0:   0%|          | 0.00/1.00 [00:00<?, ? file/s]

2025-05-09 20:56:38,244	INFO logging.py:290 -- Registered dataset logger for dataset dataset_0_0
2025-05-09 20:56:38,258	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_0_0. Full logs are in /tmp/ray/session_2025-05-09_20-56-35_783462_3033/logs/ray-data
2025-05-09 20:56:38,258	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_0_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet]


Time taken: 0.96 seconds
Peak memory usage: 1415.42 MB



Running 0: 0.00 row [00:00, ? row/s]

- ReadParquet->SplitBlocks(50) 1: 0.00 row [00:00, ? row/s]

2025-05-09 20:56:38,914	INFO streaming_executor.py:220 -- ✔️  Dataset dataset_0_0 execution finished in 0.66 seconds


Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.00,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.00
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.10,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.00
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.10,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.00
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.20,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.00
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.80,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3475221,2,2025-01-31 23:01:48,2025-01-31 23:16:29,,3.35,,,79,237,0,15.85,0.0,0.5,0.00,0.0,1.0,20.60,,,0.75
3475222,2,2025-01-31 23:50:29,2025-02-01 00:17:27,,8.73,,,161,116,0,28.14,0.0,0.5,0.00,0.0,1.0,32.89,,,0.75
3475223,2,2025-01-31 23:26:59,2025-01-31 23:43:01,,2.64,,,144,246,0,14.91,0.0,0.5,0.00,0.0,1.0,19.66,,,0.75
3475224,2,2025-01-31 23:14:34,2025-01-31 23:34:52,,3.16,,,142,107,0,17.55,0.0,0.5,0.00,0.0,1.0,22.30,,,0.75


The code below analyzes Dask's framework and validates the data.

In [6]:
# Performance Test on Dask prior to Computation
print("Dask Lazy Performance: ")
daskDf = performance(dd.read_parquet, "yellow_tripdata_2025-01.parquet")

# Performance Test on Dask after Computation
print("Dask Computed Performance: ")
daskDfComputed = performance(lambda: daskDf.head())

# Cleans Dask's Framework Dataframe
DaskDfComputed = daskDf.compute()
cleanData(daskDfComputed)

Dask Lazy Performance: 
Time taken: 0.25 seconds
Peak memory usage: 1017.14 MB

Dask Computed Performance: 
Time taken: 1.29 seconds
Peak memory usage: 1539.92 MB



Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.6,1.0,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.5,1.0,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.6,1.0,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0,0.0


The code below analyzes Modin Ray's framework and validates the data.

In [7]:
# Shutdown Ray Runtime
ray.shutdown()

# Wait for Ray Runtime to Shutdown
time.sleep(5)

# Switch Engine to Ray
modin_cfg.Engine.put("ray")

# Initialize Ray to Start the Ray Runtime
ray.init(ignore_reinit_error = True)

# Performance Test on Modin Ray
print("Modin Ray's Performance: ")
modinRayDf = performance(mpd.read_parquet, "yellow_tripdata_2025-01.parquet")

# Cleans Modin Ray's Framework Dataframe
cleanData(modinRayDf)

2025-05-09 20:56:50,054	INFO worker.py:1888 -- Started a local Ray instance.


Modin Ray's Performance: 
Time taken: 3.12 seconds
Peak memory usage: 848.33 MB



Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.00,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.00
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.10,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.00
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.10,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.00
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.20,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.00
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.80,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3475221,2,2025-01-31 23:01:48,2025-01-31 23:16:29,,3.35,,,79,237,0,15.85,0.0,0.5,0.00,0.0,1.0,20.60,,,0.75
3475222,2,2025-01-31 23:50:29,2025-02-01 00:17:27,,8.73,,,161,116,0,28.14,0.0,0.5,0.00,0.0,1.0,32.89,,,0.75
3475223,2,2025-01-31 23:26:59,2025-01-31 23:43:01,,2.64,,,144,246,0,14.91,0.0,0.5,0.00,0.0,1.0,19.66,,,0.75
3475224,2,2025-01-31 23:14:34,2025-01-31 23:34:52,,3.16,,,142,107,0,17.55,0.0,0.5,0.00,0.0,1.0,22.30,,,0.75


The code below analyzes Modin Dask's framework and validates the data.

In [8]:
# Shutdown Ray Runtime
ray.shutdown()

# Switch Engine to Dask
modin_cfg.Engine.put("dask")

# Performance Test on Modin Dask
print("Modin Dask's Performance: ")
modinDaskDf = performance(mpd.read_parquet, "yellow_tripdata_2025-01.parquet")

# Cleans Modin Dask's Framework Dataframe
cleanData(modinDaskDf)

Modin Dask's Performance: 
Time taken: 2.33 seconds
Peak memory usage: 217.28 MB



Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.00,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.00
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.10,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.00
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.10,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.00
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.20,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.00
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.80,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3475221,2,2025-01-31 23:01:48,2025-01-31 23:16:29,,3.35,,,79,237,0,15.85,0.0,0.5,0.00,0.0,1.0,20.60,,,0.75
3475222,2,2025-01-31 23:50:29,2025-02-01 00:17:27,,8.73,,,161,116,0,28.14,0.0,0.5,0.00,0.0,1.0,32.89,,,0.75
3475223,2,2025-01-31 23:26:59,2025-01-31 23:43:01,,2.64,,,144,246,0,14.91,0.0,0.5,0.00,0.0,1.0,19.66,,,0.75
3475224,2,2025-01-31 23:14:34,2025-01-31 23:34:52,,3.16,,,142,107,0,17.55,0.0,0.5,0.00,0.0,1.0,22.30,,,0.75


The code below creates the yaml file for this dataset.

In [None]:
with open('schema.yaml', 'r') as file:
    schema = yaml.safe_load(file)

readSeparator = schema["separator"]
writeSeparator = schema["write_separator"]
expectedColumns = schema["columns"]

df = pd.read_parquet("yellow_tripdata_2025-01.parquet")

actualColumns = df.columns.tolist()

if len(actualColumns) != len(expectedColumns):
    print(f"Error: Expected {len(expectedColumns)} columns, but found {len(actualColumns)} columns.")
else:
    print("Number of columns matched.")

missingColumns = set(expectedColumns) - set(actualColumns)
extraColumns = set(actualColumns) - set(expectedColumns)

if missingColumns:
    print(f"Missing columns: {', '.join(missingColumns)}")
if extraColumns:
    print(f"Extra columns: {', '.join(extraColumns)}")

if not missingColumns and not extraColumns:
    print("Column names match the expected schema.")

    parquetToGz("yellow_tripdata_2025-01.parquet")

Number of columns matched.
Column names match the expected schema.


The code below prints the summary of the file.

In [None]:
numRows = len(df)
numColumns = len(df.columns)
fileSize = os.path.getsize("yellow_tripdata_2025-01.gz")

print(f"Total Number of Rows: {numRows}")
print(f"Total Number of Columns: {numColumns}")
print(f"File Size: {fileSize} bytes")

Total Number of Rows: 3475226
Total Number of Columns: 20
File Size: 64542254 bytes


My findings and conclusion are below.

Average Findings over 10 Trials:
- Pandas Framework:
    - Time taken: 0.401 seconds
    - Peak memory usage: 1223.963 MB
- Ray Framework:
    - Time taken: 1.001 seconds
    - Peak memory usage: 984.904 MB
- Dask Framework:
    - Dask Lazy Performance: 
        - Time taken: 0.287 seconds
        - Peak memory usage: 1004.858 MB
    - Dask Computed Performance: 
        - Time taken: 1.254 seconds
        - Peak memory usage: 1438.356 MB
- Modin Ray Framework:
    - Time taken: 3.621 seconds
    - Peak memory usage: 756.368 MB
- Modin Dask Framework:
    - Time taken: 2.316 seconds
    - Peak memory usage: 230.119 MB

Conclusion:
We can see that the lowest time taken to compute is from the Pandas Framework, we disclude the Dask Lazy Performance since it's a lazy dataframe. The lowest peak memory usage is Modin Dask's Framework, yet it has the second highest time in computing time. Pandas Framework had the second highest peak memory usage, therefore it's about even for both. Looking at Ray's Framework, it's about split between both since it computes faster than Modin Dask's Framework and uses less memory than Pandas. For faster computations, Pandas would be the best choice with the fastest time. For less memory usage, Modin Dask would be the best choice with the lowest peak memory usage. For the best of both worlds, Ray is better since it's split evenly between the computation speed and memory efficiency.