# DATA 516 Lab 03
##### Mina Nielsen, Fall 2025
The goal of this assignment is to build a data process that establishes a performance baseline using CSV files with eaget APIs for comparison with Parquet lazy execution.

In [64]:
import polars as pl
import time
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

### Inspect Data

In [65]:
q1 = pl.read_csv("data/orders_2023_Q1.csv")
q2 = pl.read_csv("data/orders_2023_Q2.csv")
q3 = pl.read_csv("data/orders_2023_Q3.csv")
q4 = pl.read_csv("data/orders_2023_Q4.csv")

As shown by running ```ls -lh``` in ```data```, each of the 4 quarter files are 30M

In [66]:
print(q1.head(3))
print(q2.schema)
print(q3.schema)
print(len(q4.schema))

shape: (3, 18)
┌──────────┬────────────┬───────────┬──────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ order_id ┆ customer_i ┆ product_i ┆ category ┆ … ┆ blob_a    ┆ blob_b    ┆ blob_c    ┆ optional_ │
│ ---      ┆ d          ┆ d         ┆ ---      ┆   ┆ ---       ┆ ---       ┆ ---       ┆ score     │
│ i64      ┆ ---        ┆ ---       ┆ str      ┆   ┆ str       ┆ str       ┆ str       ┆ ---       │
│          ┆ i64        ┆ i64       ┆          ┆   ┆           ┆           ┆           ┆ f64       │
╞══════════╪════════════╪═══════════╪══════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 1        ┆ 352        ┆ 10027     ┆ cat_02   ┆ … ┆ lorem     ┆ lorem     ┆ lorem     ┆ 73.530998 │
│          ┆            ┆           ┆          ┆   ┆ ipsum     ┆ ipsum     ┆ ipsum     ┆           │
│          ┆            ┆           ┆          ┆   ┆ dolor sit ┆ dolor sit ┆ dolor sit ┆           │
│          ┆            ┆           ┆          ┆   ┆ amet lor… ┆ amet lor… ┆

Can confirm all 4 files have the same 18 fields.

### Eager CSV Baseline Analysis

Apply a consistent transformation pattern for later comparison. Including full process in one code block to time full process.

In [67]:
start_time = time.time()

q1 = pl.read_csv("data/orders_2023_Q1.csv")
q2 = pl.read_csv("data/orders_2023_Q2.csv")
q3 = pl.read_csv("data/orders_2023_Q3.csv")
q4 = pl.read_csv("data/orders_2023_Q4.csv")
egr = pl.concat([q1, q2, q3, q4])

egr = egr.rename({
    "order_year": "year",
    "order_month": "month"
})

egr = egr.filter((pl.col("year") == 2023)
    & (pl.col("month").is_in([7, 8, 9]))
    & (pl.col("status") == "DELIVERED")
)
egr = egr.select(["category", "region", "extended_price", "optional_score"])
egr = egr.group_by(["category", "region"]).agg(
    pl.sum("extended_price").round(2).alias("total_cost"),
    pl.mean("optional_score").alias("avg_optional_score"),
    pl.len().alias("cnt_rows")
)
print(egr.schema)
print(egr.head(3))
egr.write_csv("output_eager.csv")

end_time = time.time()
print(f"Total time spent for Eager/Baseline CSV Analysis : {end_time - start_time} seconds")

Schema({'category': String, 'region': String, 'total_cost': Float64, 'avg_optional_score': Float64, 'cnt_rows': UInt32})
shape: (3, 5)
┌──────────┬────────┬────────────┬────────────────────┬──────────┐
│ category ┆ region ┆ total_cost ┆ avg_optional_score ┆ cnt_rows │
│ ---      ┆ ---    ┆ ---        ┆ ---                ┆ ---      │
│ str      ┆ str    ┆ f64        ┆ f64                ┆ u32      │
╞══════════╪════════╪════════════╪════════════════════╪══════════╡
│ cat_10   ┆ EU     ┆ 3373.9     ┆ 69.927778          ┆ 55       │
│ cat_13   ┆ LATAM  ┆ 836.55     ┆ 74.239738          ┆ 15       │
│ cat_23   ┆ NA     ┆ 10755.97   ┆ 70.254793          ┆ 197      │
└──────────┴────────┴────────────┴────────────────────┴──────────┘
Total time spent for Eager/Baseline CSV Analysis : 0.39859843254089355 seconds


### Convert to Partitioned Parquet and Lazy Execution

Re-loading the data to start from raw.

In [68]:
q1 = pl.read_csv("data/orders_2023_Q1.csv")
q2 = pl.read_csv("data/orders_2023_Q2.csv")
q3 = pl.read_csv("data/orders_2023_Q3.csv")
q4 = pl.read_csv("data/orders_2023_Q4.csv")
lzy = pl.concat([q1, q2, q3, q4])

lzy = lzy.rename({
    "order_year": "year",
    "order_month": "month"
})

Now, the goal is to produce a Hive-style Parquet dataset and demonstrate lazy execution benefits with redicate/projection pushdown. Start by converting the df to partitoned Parquet.

In [69]:
table = lzy.to_arrow()
table = table.set_column(
    table.schema.get_field_index("year"),
    "year",
    pa.array([f"{int(y):04d}" for y in table.column("year").to_pylist()], pa.string())
)
table = table.set_column(
    table.schema.get_field_index("month"),
    "month",
    pa.array([f"{int(m):02d}" for m in table.column("month").to_pylist()], pa.string())
)

# lzy.write_parquet("./data/parquet", compression='zstd', statistics=True, partition_by=["year", "month"])

pq_format = ds.ParquetFileFormat()
fileops = pq_format.make_write_options(compression = "zstd")
                                    
ds.write_dataset(
    data = table,
    base_dir = "./data/parquet/",
    format = "parquet",
    partitioning = ds.HivePartitioning(pa.schema([("year", pa.int64()), ("month", pa.string())])),
    existing_data_behavior = "overwrite_or_ignore",
    file_options = fileops
)

Now we perform the lazy execution analysis, again all in one code block to time the full process.

In [70]:
lazy_start = time.time()

lf = pl.scan_parquet("./data/parquet/")
# apply projection and filters before calling collect()
lf = lf.filter(
    (pl.col("year") == 2023)
    & (pl.col("month").is_in([7, 8, 9]))
    & (pl.col("status") == "DELIVERED")
)
lf = lf.select(["category", "region", "extended_price", "optional_score"])
lf = lf.group_by(["category", "region"]).agg(
    pl.sum("extended_price").round(2).alias("total_cost"),
    pl.mean("optional_score").alias("avg_optional_score"),
    pl.len().alias("cnt_rows")
)

lf.collect()
lazy_end = time.time()
print(f"Total time spent for Lazy Parquet Analysis : {lazy_end - lazy_start} seconds")

Total time spent for Lazy Parquet Analysis : 0.27909183502197266 seconds


In [71]:
print(lf.explain())
# DOCUMENT EVIDENCE OF PUSHDOWN OPTIMIZATIONS IN MKDWN BLOCK


AGGREGATE[maintain_order: false]
  [col("extended_price").sum().round().alias("total_cost"), col("optional_score").mean().alias("avg_optional_score"), len().alias("cnt_rows")] BY [col("category"), col("region")]
  FROM
  simple π 4/4 ["category", "region", ... 2 other columns]
    Parquet SCAN [./data/parquet/year=2023\month=01\part-0.parquet, ... 11 other sources]
    PROJECT 7/18 COLUMNS
    SELECTION: [([([(col("status")) == ("DELIVERED")]) & (col("month").is_in([[7, 8, 9]]))]) & ([(col("year")) == (2023)])]


##### Evidence of Pushdown Optimization
Polars LazyFrame.explain() gives a query plan that is read from the bottom up. As is clear from the call output in the cell above, both selection of instances with specificed column values and projection of specified columns happens before the Parquet Scan step. This means that the selection and projection are applied during the scan rather than after the data is all loaded in.

### Comparison & Results

On one run of both eager and lazy processes, the eager analysis took 0.31 seconds while the lazy took 0.19 seconds. Parquet uses less memory because it only loads in the columns that are specfied ahead of loading. It also uses a compression method when reading the files in, which furthers the memory savings compared to the CSV eager approach. Both time and file sizes differ between the two approaches, with lazy having lower values for both metrics.