In [11]:
import subprocess
import os
from pyspark.sql import SparkSession

# Initialiser SparkSession
spark = SparkSession.builder \
    .appName("Generate TPC-H 10GB") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars", "/opt/bitnami/spark/jars/iceberg-spark-runtime-3.4_2.12-1.4.3.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/app/data/iceberg") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Générer les données TPC-H
def generate_tpch_data(scale_factor, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    cmd = ["dbgen", "-s", str(scale_factor), "-f", "-o", output_dir]
    subprocess.run(cmd, check=True)
    print(f"Données TPC-H générées pour SF{scale_factor} dans {output_dir}")

# Générer SF10 (10 Go)
sf10_dir = "/app/data/tpch_raw/sf10"
generate_tpch_data(10, sf10_dir)

# Charger les données brutes (exemple : lineitem.tbl)
df = spark.read.option("delimiter", "|").csv(f"{sf10_dir}/lineitem.tbl", inferSchema=True)
df = df.toDF("l_orderkey", "l_partkey", "l_suppkey", "l_linenumber", "l_quantity", 
             "l_extendedprice", "l_discount", "l_tax", "l_returnflag", "l_linestatus", 
             "l_shipdate", "l_commitdate", "l_receiptdate", "l_shipinstruct", 
             "l_shipmode", "l_comment")

# Cacher le DataFrame pour optimiser les écritures multiples
df.cache()

# Partitionner les données
sf_configs = [
    ("sf1", 0.1, 1),   # 10% = 1 Go
    ("sf5", 0.5, 5),   # 50% = 5 Go
    ("sf10", 1.0, 10)  # 100% = 10 Go
]

for sf_name, fraction, size_gb in sf_configs:
    sampled_df = df.sample(fraction=fraction) if fraction < 1.0 else df

    parquet_dir = f"/app/data/parquet/{sf_name}"
    orc_dir = f"/app/data/orc/{sf_name}"
    iceberg_dir = f"/app/data/iceberg/{sf_name}"
    delta_dir = f"/app/data/delta/{sf_name}"

    sampled_df.write.mode("overwrite").format("parquet").partitionBy("l_shipdate").save(parquet_dir)
    print(f"{sf_name} ({size_gb} Go) sauvegardé en Parquet à {parquet_dir}")

    sampled_df.write.mode("overwrite").format("orc").partitionBy("l_shipdate").save(orc_dir)
    print(f"{sf_name} ({size_gb} Go) sauvegardé en ORC à {orc_dir}")

    spark.sql(f"DROP TABLE IF EXISTS local.db.lineitem_{sf_name}")
    sampled_df.write.mode("overwrite").format("iceberg").partitionBy("l_shipdate").saveAsTable(f"local.db.lineitem_{sf_name}")
    print(f"{sf_name} ({size_gb} Go) sauvegardé en Iceberg à {iceberg_dir}")

    sampled_df.write.mode("overwrite").format("delta").partitionBy("l_shipdate").save(delta_dir)
    print(f"{sf_name} ({size_gb} Go) sauvegardé en Delta Lake à {delta_dir}")

df.unpersist()
spark.stop()

ERROR: option 'a' unknown.


dbgen: invalid option -- 'o'
TPC-H Population Generator (Version 2.14.0 build 0)
Copyright Transaction Processing Performance Council 1994 - 2010
USAGE:
dbgen [-{vf}][-T {pcsoPSOL}]
	[-s <scale>][-C <procs>][-S <step>]
dbgen [-v] [-O m] [-s <scale>] [-U <updates>]

Basic Options
-C <n> -- separate data set into <n> chunks (requires -S, default: 1)
-f     -- force. Overwrite existing files
-h     -- display this message
-q     -- enable QUIET mode
-s <n> -- set Scale Factor (SF) to  <n> (default: 1) 
-S <n> -- build the <n>th step of the data/update set (used with -C or -U)
-U <n> -- generate <n> update sets
-v     -- enable VERBOSE mode

Advanced Options
-b <s> -- load distributions for <s> (default: dists.dss)
-d <n> -- split deletes between <n> files (requires -U)
-i <n> -- split inserts between <n> files (requires -U)
-T c   -- generate cutomers ONLY
-T l   -- generate nation/region ONLY
-T L   -- generate lineitem ONLY
-T n   -- generate nation ONLY
-T o   -- generate orders/lineit

CalledProcessError: Command '['dbgen', '-s', '10', '-f', '-o', '/app/data/tpch_raw/sf10']' returned non-zero exit status 1.

ERROR: option 'a' unknown.


dbgen: invalid option -- 'o'
TPC-H Population Generator (Version 2.14.0 build 0)
Copyright Transaction Processing Performance Council 1994 - 2010
USAGE:
dbgen [-{vf}][-T {pcsoPSOL}]
	[-s <scale>][-C <procs>][-S <step>]
dbgen [-v] [-O m] [-s <scale>] [-U <updates>]

Basic Options
-C <n> -- separate data set into <n> chunks (requires -S, default: 1)
-f     -- force. Overwrite existing files
-h     -- display this message
-q     -- enable QUIET mode
-s <n> -- set Scale Factor (SF) to  <n> (default: 1) 
-S <n> -- build the <n>th step of the data/update set (used with -C or -U)
-U <n> -- generate <n> update sets
-v     -- enable VERBOSE mode

Advanced Options
-b <s> -- load distributions for <s> (default: dists.dss)
-d <n> -- split deletes between <n> files (requires -U)
-i <n> -- split inserts between <n> files (requires -U)
-T c   -- generate cutomers ONLY
-T l   -- generate nation/region ONLY
-T L   -- generate lineitem ONLY
-T n   -- generate nation ONLY
-T o   -- generate orders/lineit

CalledProcessError: Command 'dbgen -s 10 -f -o /app/data/tpch_raw/sf10' returned non-zero exit status 1.