In [1]:
import glob
import os
import pandas as pd
import polars as pl
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

data_dir = "output_data"

os.makedirs(f"{data_dir}", exist_ok=True)

folder_path = "data/*.parquet"
file_paths = glob.glob(folder_path)

### Testing

In [None]:
%%timeit

df = pl.scan_parquet(folder_path)\
    .group_by(["VendorID"])\
    .agg([
        pl.col("passenger_count").sum().alias("total_passengers"),
        pl.col("total_amount").mean().alias("avg_total"),
        pl.col("trip_distance").mean().alias("avg_trip_distance")
    ]).collect()
df.write_parquet(f'{data_dir}/temp_polars.parquet')

peak memory: 132.73 MiB, increment: 0.30 MiB
peak memory: 1696.88 MiB, increment: 0.00 MiB
peak memory: 1747.72 MiB, increment: 0.00 MiB
peak memory: 1783.75 MiB, increment: 0.00 MiB
peak memory: 1829.88 MiB, increment: 0.01 MiB
peak memory: 1789.63 MiB, increment: 0.11 MiB
peak memory: 1774.90 MiB, increment: 0.02 MiB
peak memory: 1760.92 MiB, increment: 0.00 MiB
1.55 s ± 76.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
%%timeit

df = pd.concat([pd.read_parquet(file) for file in file_paths], ignore_index=True)
df_agg = df.groupby("VendorID")[["passenger_count", "total_amount", "trip_distance"]].agg(
    total_passengers=("passenger_count", "sum"),
    avg_total=("total_amount", "mean"),
    avg_trip_distance=("trip_distance", "mean"),
)
df_agg = df_agg.reset_index()
df_agg.to_parquet(f"{data_dir}/temp_pandas.parquet")

peak memory: 1790.34 MiB, increment: 0.02 MiB
peak memory: 2163.10 MiB, increment: 0.00 MiB
peak memory: 2230.88 MiB, increment: 0.00 MiB


KeyboardInterrupt: 

In [4]:
spark = SparkSession.builder.master("local[1]").appName('taxi-example').getOrCreate()

In [5]:
%%timeit

df = spark.read.parquet(file_paths[0])
df = df.groupBy("VendorID").agg(
    F.sum("passenger_count").alias("total_passengers"),
    F.mean("total_amount").alias("avg_total"),
    F.mean("trip_distance").alias("avg_trip_distance")
).na.fill({"total_passengers": 0})
df.write.mode('overwrite').parquet(f'{data_dir}/temp_spark.parquet')

513 ms ± 57.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
agg_pandas = pd.read_parquet(f"{data_dir}/temp_pandas.parquet")
agg_polars = pd.read_parquet(f"{data_dir}/temp_polars.parquet")
agg_spark = pd.read_parquet(f"{data_dir}/temp_spark.parquet")

### Verification

Making sure that the 3 different parquet files we wrote are the same.

In [7]:
agg_pandas.shape, agg_polars.shape, agg_spark.shape

((4, 4), (4, 4), (4, 4))

In [8]:
agg_pandas.sort_values(["VendorID"]).head()

Unnamed: 0,VendorID,total_passengers,avg_total,avg_trip_distance
0,1,13536686.0,20.904553,3.248677
1,2,40110228.0,21.923248,7.031941
2,5,0.0,69.756713,14.742867
3,6,0.0,46.795499,8.835186


In [9]:
agg_polars.sort_values(["VendorID"]).head()

Unnamed: 0,VendorID,total_passengers,avg_total,avg_trip_distance
0,1,13536686.0,20.904553,3.248677
1,2,40110228.0,21.923248,7.031941
2,5,0.0,69.756713,14.742867
3,6,0.0,46.795499,8.835186


In [10]:
agg_spark.sort_values(["VendorID"]).head()

Unnamed: 0,VendorID,total_passengers,avg_total,avg_trip_distance
2,1,856427.0,19.253229,2.987275
3,2,2467740.0,19.070821,6.39503
1,5,0.0,59.997778,14.911111
0,6,0.0,38.115932,8.256171
