In [2]:
import urllib.request
url = "https://huggingface.co/datasets/BigDataUB/synthetic-columnar/resolve/main/orders.csv"
print("Downloading dataset (~830MB, this will take a few minutes)...")
urllib.request.urlretrieve(url, "/app/data/orders.csv")
print("Done!")

Downloading dataset (~830MB, this will take a few minutes)...
Done!


In [3]:
import os
import time
import pyarrow as pa
import pyarrow.csv as pcsv
import pyarrow.parquet as pq
import duckdb
import pandas as pd

In [4]:
pa.__version__

'23.0.1'

Q1

In [5]:
table = pa.csv.read_csv('/app/data/orders.csv')

In [6]:
print(table.schema)

order_id: int64
order_date: date32[day]
customer_id: int64
country: string
category: string
product: string
quantity: int64
unit_price: double
total_amount: double
payment_method: string
status: string


In [7]:
print(f"Rows: {table.num_rows:,}")

Rows: 10,000,000


In [8]:
print(f"Rows: {table.num_columns}")

Rows: 11


Q2

In [9]:
csv_size =os.path.getsize('/app/data/orders.csv')
print(f"CSV Size: {csv_size / (1_000_000):.1f} MB")

CSV Size: 829.4 MB


Q3


In [21]:
start=time.time()
pcsv.read_csv('/app/data/orders.csv')
csv_read_time = time.time()-start
print(f"CSV read time: {csv_read_time:.2f}s")

CSV read time: 12.57s


In [11]:
%timeit table = pcsv.read_csv("/app/data/orders.csv")

8.47 s ± 1 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


Q4

In [13]:
sample = table.slice(0,100_000)

In [15]:
df = sample.to_pandas()

In [17]:
df.to_json("/app/data/orders_sample.json",orient="records", lines=True)

Q5

In [35]:
start = time.time()
pq.write_table(table, "/app/data/orders_default.parquet")
parquet_write_time = time.time() - start
print(f"Parquet write time: {parquet_write_time:.2f}s")
#snappy compression by deafult

Parquet write time: 5.96s


Q6

In [38]:
metadata = pq.read_metadata("/app/data/orders_default.parquet")
print(metadata)

<pyarrow._parquet.FileMetaData object at 0x78f408514680>
  created_by: parquet-cpp-arrow version 23.0.1
  num_columns: 11
  num_rows: 10000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 14195


In [33]:
metadata.row_group(0).column(0).compression

'SNAPPY'

In [34]:
metadata.row_group(0).column(0)

<pyarrow._parquet.ColumnChunkMetaData object at 0x78f3a1c48360>
  file_offset: 0
  file_path: 
  physical_type: INT64
  num_values: 1048576
  path_in_schema: order_id
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x78f3a1c48270>
      has_min_max: True
      min: 1
      max: 1048576
      null_count: 0
      distinct_count: None
      num_values: 1048576
      physical_type: INT64
      logical_type: None
      converted_type (legacy): NONE
  geo_statistics:
    None
  compression: SNAPPY
  encodings: ('PLAIN', 'RLE', 'RLE_DICTIONARY')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 525089
  total_compressed_size: 4471417
  total_uncompressed_size: 8663180

Q7

In [43]:
for name, path in [("CSV", "orders.csv"), ("JSON (100k)", "orders_sample.json"),
                    ("Parquet (default)", "orders_default.parquet")]:
    size = os.path.getsize(f"/app/data/{path}")
    if name == "JSON (100k)":
        size = size * 100
    print(f"{name}: {size / 1e6:.1f} MB")

CSV: 829.4 MB
JSON (100k): 2289.4 MB
Parquet (default): 186.0 MB


Q8

In [44]:
for codec in ["snappy", "zstd", "none", "gzip"]:
    start = time.time()
    pq.write_table(table, f"/app/data/orders_{codec}.parquet", compression=codec)
    elapsed = time.time() - start
    size = os.path.getsize(f"/app/data/orders_{codec}.parquet")
    print(f"{codec:8s}: {size / 1e6:8.1f} MB  write: {elapsed:.2f}s")

snappy  :    186.0 MB  write: 6.10s
zstd    :    134.3 MB  write: 6.50s
none    :    262.1 MB  write: 6.05s
gzip    :    136.0 MB  write: 113.53s


In [45]:
for codec in ["snappy", "zstd", "none", "gzip"]:
    start = time.time()
    t = pq.read_table(f"/app/data/orders_{codec}.parquet")
    elapsed = time.time() - start
    print(f"{codec:8s}: read {elapsed:.2f}s")

snappy  : read 3.54s
zstd    : read 5.25s
none    : read 8.68s
gzip    : read 3.94s


snappy best trade-off (deafult option)

Q10

In [46]:
start = time.time()
t_all = pq.read_table("/app/data/orders_default.parquet")
time_all = time.time() - start

start = time.time()
t_pruned = pq.read_table("/app/data/orders_default.parquet",
                          columns=["category", "total_amount"])
time_pruned = time.time() - start

print(f"All columns: {time_all:.2f}s")
print(f"2 columns:   {time_pruned:.2f}s")

All columns: 4.78s
2 columns:   0.94s


csv is not columnar! no difference, need to go row by row (all columns then)

Q11

In [48]:
import pyarrow.compute as pc

years = pc.year(table.column("order_date"))
table_with_year = table.append_column("year", years)

In [49]:
pq.write_to_dataset(table_with_year, "/app/data/orders_partitioned",
                     partition_cols=["year"])

Subfolders like year=2020 ...


Q13

In [50]:
import pyarrow.dataset as ds

dataset = ds.dataset("/app/data/orders_partitioned", partitioning="hive")
# Read only one partition:
subset = dataset.to_table(filter=ds.field("year") == 2024)

In [None]:
import pyarrow.dataset as ds
import pyarrow.compute as pc

# With partition pruning (only reads year=2024 directory)
start = time.time()
dataset = ds.dataset("/app/data/orders_partitioned", partitioning="hive")
t_pruned = dataset.to_table(filter=ds.field("year") == 2024)
time_pruned = time.time() - start

# Without pruning (reads everything, filters in memory)
start = time.time()
t_all = pq.read_table("/app/data/orders_default.parquet")
t_filtered = t_all.filter(pc.equal(pc.year(t_all.column("order_date")), 2024))
time_full = time.time() - start

print(f"With partition pruning: {time_pruned:.2f}s ({t_pruned.num_rows:,} rows)")
print(f"Without pruning:       {time_full:.2f}s ({t_filtered.num_rows:,} rows)")