# Benchmark CSV → Parquet

Benchmark complet comparant **Pandas**, **Polars**, **DuckDB** et **PySpark** pour:
1. **Conversion** CSV → Parquet (avec monitoring CPU/RAM)
2. **Requêtes** SELECT sur CSV vs Parquet
3. **Écriture** des résultats

Ce notebook utilise le module `benchmark_lib.py` pour les fonctions de conversion et monitoring.

## 1. Configuration et Imports

In [26]:
import time
import json
import os
from pathlib import Path

import pandas as pd
import polars as pl
import duckdb

# Import du module de benchmark
from benchmark_lib import (
    Config, PerformanceMonitor,
    run_benchmark, cleanup_memory,
    convert_pandas, convert_polars, convert_duckdb, convert_pyspark,
    find_csv_file, get_system_info, save_summary
)

# Configuration
config = Config.from_env()
os.makedirs(config.output_dir, exist_ok=True)
os.makedirs(config.metrics_dir, exist_ok=True)

# Fichiers
CSV_FILE = find_csv_file(config.data_dir)
PARQUET_FILE = f"{config.output_dir}/polars.parquet"
OUTPUT_DIR = Path(config.output_dir) / "query_results"
OUTPUT_DIR.mkdir(exist_ok=True)

# Afficher la config
file_size_gb = os.path.getsize(CSV_FILE) / (1024**3)
sys_info = get_system_info()

print("=" * 60)
print("BENCHMARK CSV → PARQUET")
print("=" * 60)
print(f"Fichier : {CSV_FILE} ({file_size_gb:.2f} GB)")
print(f"Systeme : {sys_info['cpu_physical']} cores / {sys_info['cpu_logical']} threads, {sys_info['ram_gb']:.1f} GB RAM")
print("=" * 60)

BENCHMARK CSV → PARQUET
Fichier : data\StockUniteLegale_utf8.csv (3.84 GB)
Systeme : 6 cores / 12 threads, 63.9 GB RAM


---
## 2. Benchmark Conversion CSV → Parquet
---

In [27]:
# Resultats de conversion
conversion_results = {}

### 2.1 Pandas

In [28]:
conversion_results["Pandas"] = run_benchmark(
    "Pandas",
    lambda: convert_pandas(CSV_FILE, config.output_dir),
    config=config
)
cleanup_memory(config.cooldown_time, "Polars")


[START] Pandas



Columns (2,4,5,6,7,8,9,10,11,12,17,21,22,24,25,26,32) have mixed types. Specify dtype option on import or set low_memory=False.



[OK] Pandas: 243.37s
     CPU: 8.9% avg | 11.6% max
     RAM: 34.4 GB avg | 45.3 GB max

[CLEANUP] Nettoyage memoire... RAM: 28.4% -> 28.4%
[WAIT] Cooldown 5s avant Polars... 5 4 3 2 1 OK


### 2.2 Polars

In [29]:
conversion_results["Polars"] = run_benchmark(
    "Polars",
    lambda: convert_polars(CSV_FILE, config.output_dir),
    config=config
)
cleanup_memory(config.cooldown_time, "DuckDB")


[START] Polars
[OK] Polars: 10.59s
     CPU: 60.8% avg | 78.4% max
     RAM: 6.0 GB avg | 7.7 GB max

[CLEANUP] Nettoyage memoire... RAM: 31.0% -> 31.0%
[WAIT] Cooldown 5s avant DuckDB... 5 4 3 2 1 OK


### 2.3 DuckDB

In [30]:
conversion_results["Duckdb"] = run_benchmark(
    "Duckdb",
    lambda: convert_duckdb(CSV_FILE, config.output_dir),
    config=config
)
cleanup_memory(config.cooldown_time, "PySpark")


[START] Duckdb
[OK] Duckdb: 51.18s
     CPU: 23.6% avg | 67.7% max
     RAM: 8.4 GB avg | 15.6 GB max

[CLEANUP] Nettoyage memoire... RAM: 31.7% -> 31.7%
[WAIT] Cooldown 5s avant PySpark... 5 4 3 2 1 OK


### 2.4 PySpark

In [31]:
conversion_results["Pyspark"] = run_benchmark(
    "Pyspark",
    lambda: convert_pyspark(CSV_FILE, config.output_dir, config),
    config=config,
    use_system_memory=True  # PySpark utilise la JVM, mesure delta RAM systeme
)
cleanup_memory()


[START] Pyspark
     Spark session: 0.12s | Write: 28.75s
[OK] Pyspark: 29.78s
     CPU: 91.8% avg | 100.0% max
     RAM: 2.5 GB avg | 3.0 GB max

[CLEANUP] Nettoyage memoire... RAM: 36.0% -> 35.9%


In [32]:
# Sauvegarder le resume de conversion
summary = save_summary(conversion_results, config, CSV_FILE)
print("\nResume sauvegarde dans", config.metrics_dir)


Resume sauvegarde dans benchmark_metrics


---
## 3. Benchmark Requêtes CSV vs Parquet
---

In [33]:
# Configuration requetes
TARGET_SIREN = 920900271  # TBC CORP => Liiink
query_results = {"read": {}, "write": {}}

print(f"SIREN cible: {TARGET_SIREN}")
print(f"CSV: {CSV_FILE}")
print(f"Parquet: {PARQUET_FILE}")

SIREN cible: 920900271
CSV: data\StockUniteLegale_utf8.csv
Parquet: benchmark_outputs/polars.parquet


### 3.1 Pandas

In [34]:
# Pandas - CSV
start = time.perf_counter()
df = pd.read_csv(CSV_FILE)
result = df[df["siren"] == TARGET_SIREN]
query_results["read"]["Pandas CSV"] = time.perf_counter() - start
print(f"Pandas CSV: {query_results['read']['Pandas CSV']:.3f}s - {len(result)} lignes")

# Pandas - Parquet
start = time.perf_counter()
df = pd.read_parquet(PARQUET_FILE)
result = df[df["siren"] == TARGET_SIREN]
query_results["read"]["Pandas Parquet"] = time.perf_counter() - start
print(f"Pandas Parquet: {query_results['read']['Pandas Parquet']:.3f}s")


Columns (2,4,5,6,7,8,9,10,11,12,17,21,22,24,25,26,32) have mixed types. Specify dtype option on import or set low_memory=False.



Pandas CSV: 159.359s - 1 lignes
Pandas Parquet: 26.212s


### 3.2 Polars (Eager)

In [35]:
# Polars - CSV
start = time.perf_counter()
df = pl.read_csv(CSV_FILE)
result = df.filter(pl.col("siren") == TARGET_SIREN)
query_results["read"]["Polars CSV"] = time.perf_counter() - start
print(f"Polars CSV: {query_results['read']['Polars CSV']:.3f}s")

# Polars - Parquet
start = time.perf_counter()
df = pl.read_parquet(PARQUET_FILE)
result = df.filter(pl.col("siren") == TARGET_SIREN)
query_results["read"]["Polars Parquet"] = time.perf_counter() - start
print(f"Polars Parquet: {query_results['read']['Polars Parquet']:.3f}s")

Polars CSV: 7.874s
Polars Parquet: 2.122s


### 3.3 Polars Lazy (Predicate Pushdown)

In [36]:
# Polars Lazy - CSV
start = time.perf_counter()
result = pl.scan_csv(CSV_FILE).filter(pl.col("siren") == TARGET_SIREN).collect()
query_results["read"]["Polars Lazy CSV"] = time.perf_counter() - start
print(f"Polars Lazy CSV: {query_results['read']['Polars Lazy CSV']:.3f}s")

# Polars Lazy - Parquet (predicate pushdown)
start = time.perf_counter()
result = pl.scan_parquet(PARQUET_FILE).filter(pl.col("siren") == TARGET_SIREN).collect()
query_results["read"]["Polars Lazy Parquet"] = time.perf_counter() - start
print(f"Polars Lazy Parquet: {query_results['read']['Polars Lazy Parquet']:.3f}s")

Polars Lazy CSV: 7.659s
Polars Lazy Parquet: 0.064s


### 3.4 DuckDB

In [37]:
# DuckDB - CSV
start = time.perf_counter()
result = duckdb.query(f"""
    SELECT * FROM read_csv_auto('{CSV_FILE}', ignore_errors=true)
    WHERE siren = {TARGET_SIREN}
""").df()
query_results["read"]["DuckDB CSV"] = time.perf_counter() - start
print(f"DuckDB CSV: {query_results['read']['DuckDB CSV']:.3f}s")

# DuckDB - Parquet
start = time.perf_counter()
result = duckdb.query(f"""
    SELECT * FROM read_parquet('{PARQUET_FILE}')
    WHERE siren = {TARGET_SIREN}
""").df()
query_results["read"]["DuckDB Parquet"] = time.perf_counter() - start
print(f"DuckDB Parquet: {query_results['read']['DuckDB Parquet']:.3f}s")

DuckDB CSV: 6.094s
DuckDB Parquet: 0.127s


### 3.5 PySpark

In [38]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("QueryBenchmark") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

print(f"PySpark initialise: {spark.version}")

PySpark initialise: 3.5.3


In [39]:
# PySpark - CSV
start = time.perf_counter()
df = spark.read.csv(CSV_FILE, header=True, inferSchema=True)
result = df.filter(df["siren"] == TARGET_SIREN)
count = result.count()
query_results["read"]["PySpark CSV"] = time.perf_counter() - start
print(f"PySpark CSV: {query_results['read']['PySpark CSV']:.3f}s - {count} lignes")

# PySpark - Parquet
start = time.perf_counter()
df = spark.read.parquet(PARQUET_FILE)
result = df.filter(df["siren"] == TARGET_SIREN)
count = result.count()
query_results["read"]["PySpark Parquet"] = time.perf_counter() - start
print(f"PySpark Parquet: {query_results['read']['PySpark Parquet']:.3f}s")

PySpark CSV: 21.330s - 1 lignes
PySpark Parquet: 0.180s


---
## 4. Benchmark Écriture
---

In [40]:
# Preparer un dataset pour l'ecriture (10 millions de lignes)
WRITE_ROWS = 10_000_000

start = time.perf_counter()
big_result = pl.scan_parquet(PARQUET_FILE).head(WRITE_ROWS).collect()
actual_rows = len(big_result)
print(f"Dataset: {actual_rows:,} lignes prepare en {time.perf_counter() - start:.2f}s")

if actual_rows < WRITE_ROWS:
    print(f"⚠️ Le fichier source ne contient que {actual_rows:,} lignes (demande: {WRITE_ROWS:,})")

Dataset: 10,000,000 lignes prepare en 0.66s


In [41]:
# Polars
start = time.perf_counter()
big_result.write_csv(OUTPUT_DIR / "result_polars.csv")
query_results["write"]["Polars CSV"] = time.perf_counter() - start

start = time.perf_counter()
big_result.write_parquet(OUTPUT_DIR / "result_polars.parquet")
query_results["write"]["Polars Parquet"] = time.perf_counter() - start

print(f"Polars CSV: {query_results['write']['Polars CSV']:.3f}s")
print(f"Polars Parquet: {query_results['write']['Polars Parquet']:.3f}s")

Polars CSV: 2.167s
Polars Parquet: 2.327s


In [42]:
# Pandas
big_result_pd = big_result.to_pandas()

start = time.perf_counter()
big_result_pd.to_csv(OUTPUT_DIR / "result_pandas.csv", index=False)
query_results["write"]["Pandas CSV"] = time.perf_counter() - start

start = time.perf_counter()
big_result_pd.to_parquet(OUTPUT_DIR / "result_pandas.parquet", index=False)
query_results["write"]["Pandas Parquet"] = time.perf_counter() - start

print(f"Pandas CSV: {query_results['write']['Pandas CSV']:.3f}s")
print(f"Pandas Parquet: {query_results['write']['Pandas Parquet']:.3f}s")

Pandas CSV: 56.742s
Pandas Parquet: 19.387s


In [43]:
# DuckDB
duckdb.register("result_df", big_result_pd)

start = time.perf_counter()
duckdb.query(f"COPY result_df TO '{OUTPUT_DIR}/result_duckdb.csv' (FORMAT CSV, HEADER)")
query_results["write"]["DuckDB CSV"] = time.perf_counter() - start

start = time.perf_counter()
duckdb.query(f"COPY result_df TO '{OUTPUT_DIR}/result_duckdb.parquet' (FORMAT PARQUET)")
query_results["write"]["DuckDB Parquet"] = time.perf_counter() - start

print(f"DuckDB CSV: {query_results['write']['DuckDB CSV']:.3f}s")
print(f"DuckDB Parquet: {query_results['write']['DuckDB Parquet']:.3f}s")

DuckDB CSV: 10.055s
DuckDB Parquet: 21.681s


In [44]:
# PySpark - Lire depuis le fichier parquet ecrit par Polars pour eviter les problemes d'inference de type
# Le fichier result_polars.parquet a ete ecrit plus haut, on le reutilise
polars_parquet_path = str(OUTPUT_DIR / "result_polars.parquet")
big_result_spark = spark.read.parquet(polars_parquet_path)

# Ecriture CSV
start = time.perf_counter()
big_result_spark.write.mode("overwrite").option("header", True).csv(str(OUTPUT_DIR / "result_pyspark_csv"))
query_results["write"]["PySpark CSV"] = time.perf_counter() - start

# Ecriture Parquet
start = time.perf_counter()
big_result_spark.write.mode("overwrite").parquet(str(OUTPUT_DIR / "result_pyspark.parquet"))
query_results["write"]["PySpark Parquet"] = time.perf_counter() - start

print(f"PySpark CSV: {query_results['write']['PySpark CSV']:.3f}s")
print(f"PySpark Parquet: {query_results['write']['PySpark Parquet']:.3f}s")

PySpark CSV: 8.360s
PySpark Parquet: 9.023s


In [45]:
# Sauvegarder les resultats de requetes
with open(f"{config.metrics_dir}/query_benchmark.json", "w") as f:
    json.dump({
        "target_siren": TARGET_SIREN,
        "read_benchmarks": query_results["read"],
        "write_benchmarks": query_results["write"],
    }, f, indent=2)

print("Resultats sauvegardes!")

Resultats sauvegardes!


---
## 5. Visualisation
---

In [46]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Couleurs par librairie
COLORS = {
    "Pandas": "#636EFA",
    "Polars": "#FFDD00",
    "Duckdb": "#00D4AA",
    "DuckDB": "#00D4AA",
    "Pyspark": "#EF553B",
    "PySpark": "#EF553B",
}

def get_color(name):
    for key in COLORS:
        if key in name:
            return COLORS[key]
    return "#888888"

### 5.1 Dashboard Conversion CSV → Parquet

In [47]:
# Charger les metriques de conversion
metrics = {}
for name in ["Pandas", "Polars", "Duckdb", "Pyspark"]:
    metrics_file = Path(config.metrics_dir) / f"{name.lower()}_metrics.json"
    if metrics_file.exists():
        with open(metrics_file) as f:
            metrics[name] = json.load(f)

# Trier par duree
sorted_data = []
for name, data in metrics.items():
    if data.get("success"):
        stats = data["stats"]
        sorted_data.append({
            "name": name,
            "duration": stats["duration_seconds"],
            "memory_max": stats["memory_max_mb"],
            "cpu_avg": stats.get("cpu_avg", 0),
            "color": COLORS.get(name, "#666666"),
            "timeseries": data.get("timeseries", [])
        })

sorted_data.sort(key=lambda x: x["duration"])

# Calculer les ratios
min_duration = min(d["duration"] for d in sorted_data) if sorted_data else 1
ratios = [d["duration"] / min_duration for d in sorted_data]

print(f"Benchmarks charges: {[d['name'] for d in sorted_data]}")

Benchmarks charges: ['Polars', 'Pyspark', 'Duckdb', 'Pandas']


In [51]:
# Dashboard Conversion
names = [d["name"] for d in sorted_data]
durations = [d["duration"] for d in sorted_data]
memory_peaks = [d["memory_max"] for d in sorted_data]
colors = [d["color"] for d in sorted_data]
min_duration = min(durations) if durations else 1
ratios = [d / min_duration for d in durations]

fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=(
        "<b>Duree (CSV → Parquet)</b>",
        "<b>RAM Peak</b>",
        "<b>CPU au fil du temps</b>",
        "<b>RAM au fil du temps</b>"
    ),
    specs=[[{"type": "bar"}, {"type": "bar"}], [{"type": "scatter"}, {"type": "scatter"}]],
    vertical_spacing=0.12,
)

# Duree
fig.add_trace(go.Bar(
    x=names, y=durations, marker_color=colors,
    text=[f"<b>{d:.1f}s</b>" for d in durations],
    textposition="outside", textfont=dict(size=14, color="white"),
), row=1, col=1)

# RAM Peak
fig.add_trace(go.Bar(
    x=names, y=[m/1024 for m in memory_peaks], marker_color=colors,
    text=[f"<b>{m/1024:.1f} GB</b>" for m in memory_peaks],
    textposition="outside", textfont=dict(size=14, color="white"),
), row=1, col=2)

# CPU over time
for d in sorted_data:
    ts = d["timeseries"]
    if ts:
        fig.add_trace(go.Scatter(
            x=[p["timestamp"] for p in ts],
            y=[p.get("cpu_percent", 0) for p in ts],
            mode="lines", name=d["name"],
            line=dict(color=d["color"], width=2),
        ), row=2, col=1)

# RAM over time
for d in sorted_data:
    ts = d["timeseries"]
    if ts:
        fig.add_trace(go.Scatter(
            x=[p["timestamp"] for p in ts],
            y=[p.get("memory_mb", 0) / 1024 for p in ts],
            mode="lines", name=d["name"], showlegend=False,
            line=dict(color=d["color"], width=2),
        ), row=2, col=2)

fig.update_layout(
    title=f"<b>Benchmark Conversion CSV → Parquet</b><br><span style='font-size:12px;color:#888'>{file_size_gb:.2f} GB | {sys_info['cpu_logical']} threads</span>",
    height=700, template="plotly_dark", paper_bgcolor="#111111", plot_bgcolor="#1a1a1a",
    legend=dict(orientation="h", y=1.02, x=0.5, xanchor="center"),
)
fig.update_yaxes(title_text="Secondes", row=1, col=1)
fig.update_yaxes(title_text="GB", row=1, col=2)
fig.update_yaxes(title_text="CPU %", row=2, col=1)
fig.update_yaxes(title_text="GB", row=2, col=2)
fig.update_xaxes(title_text="Temps (s)", row=2, col=1)
fig.update_xaxes(title_text="Temps (s)", row=2, col=2)

fig.show()

### 5.2 Dashboard Requêtes CSV vs Parquet

In [48]:
# Dashboard Requetes
read_data = sorted(query_results["read"].items(), key=lambda x: x[1])
write_data = sorted(query_results["write"].items(), key=lambda x: x[1])

fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=("<b>Lecture (SELECT WHERE siren=X)</b>", "<b>Ecriture</b>"),
    horizontal_spacing=0.15
)

# Lecture
fig.add_trace(go.Bar(
    y=[x[0] for x in read_data],
    x=[x[1] for x in read_data],
    orientation="h",
    marker_color=[get_color(x[0]) for x in read_data],
    text=[f"<b>{x[1]:.3f}s</b>" for x in read_data],
    textposition="outside",
    textfont=dict(size=11, color="white"),
), row=1, col=1)

# Ecriture
fig.add_trace(go.Bar(
    y=[x[0] for x in write_data],
    x=[x[1] for x in write_data],
    orientation="h",
    marker_color=[get_color(x[0]) for x in write_data],
    text=[f"<b>{x[1]:.3f}s</b>" for x in write_data],
    textposition="outside",
    textfont=dict(size=11, color="white"),
), row=1, col=2)

fig.update_layout(
    title="<b>Benchmark Requetes: CSV vs Parquet</b>",
    height=500, showlegend=False,
    template="plotly_dark", paper_bgcolor="#111111", plot_bgcolor="#1a1a1a",
)
fig.update_xaxes(title_text="Secondes", row=1, col=1)
fig.update_xaxes(title_text="Secondes", row=1, col=2)

fig.show()

---
## 6. Tableau Récapitulatif
---

In [49]:
# Recapitulatif Conversion
print("=" * 70)
print("CONVERSION CSV → PARQUET")
print("=" * 70)
print(f"{'Librairie':<12} {'Duree':>10} {'RAM Max':>12} {'Ratio':>12}")
print("-" * 50)
for i, d in enumerate(sorted_data):
    ratio_str = f"{ratios[i]:.1f}x" if ratios[i] > 1 else "FASTEST"
    print(f"{d['name']:<12} {d['duration']:>10.1f}s {d['memory_max']/1024:>10.1f} GB {ratio_str:>12}")

# Recapitulatif Requetes
print("\n" + "=" * 70)
print("LECTURE (SELECT WHERE siren=X)")
print("=" * 70)
min_read = min(query_results["read"].values())
for name, t in sorted(query_results["read"].items(), key=lambda x: x[1]):
    ratio = t / min_read
    status = "FASTEST" if ratio == 1 else f"{ratio:.1f}x"
    print(f"{name:<25} {t:>8.3f}s {status:>12}")

print("\n" + "=" * 70)
print("ECRITURE")
print("=" * 70)
min_write = min(query_results["write"].values())
for name, t in sorted(query_results["write"].items(), key=lambda x: x[1]):
    ratio = t / min_write
    status = "FASTEST" if ratio == 1 else f"{ratio:.1f}x"
    print(f"{name:<25} {t:>8.3f}s {status:>12}")

CONVERSION CSV → PARQUET
Librairie         Duree      RAM Max        Ratio
--------------------------------------------------
Polars             10.6s        7.7 GB      FASTEST
Pyspark            29.8s        3.0 GB         2.8x
Duckdb             51.2s       15.6 GB         4.8x
Pandas            243.4s       45.3 GB        23.0x

LECTURE (SELECT WHERE siren=X)
Polars Lazy Parquet          0.064s      FASTEST
DuckDB Parquet               0.127s         2.0x
PySpark Parquet              0.180s         2.8x
Polars Parquet               2.122s        33.4x
DuckDB CSV                   6.094s        95.9x
Polars Lazy CSV              7.659s       120.5x
Polars CSV                   7.874s       123.9x
PySpark CSV                 21.330s       335.5x
Pandas Parquet              26.212s       412.3x
Pandas CSV                 159.359s      2506.9x

ECRITURE
Polars CSV                   2.167s      FASTEST
Polars Parquet               2.327s         1.1x
PySpark CSV                  8.360s 

In [50]:
# Arreter Spark
try:
    spark.stop()
    print("Spark arrete.")
except:
    pass

Spark arrete.
