In [0]:
%pip install dbldatagen


In [0]:
import dbldatagen as dg
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import random

# Configuration
NUM_REFINERIES = 500
SENSORS_PER_REFINERY = 1000
DAYS_OF_HISTORY = 365
OUTPUT_PATH = "/mnt/data/crude_oil_pipeline"
CATALOG="wajdi_bounouara"
SCHEMA="parex_resources"


# Dimension Tables

In [0]:
# --- dim_refineries ---
def generate_refineries(spark, num_refineries=500):
    cities = [
        ("Houston", "TX", "USA", 29.76, -95.37),
        ("Beaumont", "TX", "USA", 30.08, -94.10),
        ("Lake Charles", "LA", "USA", 30.21, -93.22),
        ("Baton Rouge", "LA", "USA", 30.45, -91.15),
        ("New Orleans", "LA", "USA", 29.95, -90.07),
        ("Port Arthur", "TX", "USA", 29.90, -93.93),
        ("Corpus Christi", "TX", "USA", 27.80, -97.40),
        ("Philadelphia", "PA", "USA", 39.95, -75.17),
        ("Los Angeles", "CA", "USA", 34.05, -118.24),
        ("San Francisco", "CA", "USA", 37.77, -122.42),
        ("Chicago", "IL", "USA", 41.88, -87.63),
        ("Detroit", "MI", "USA", 42.33, -83.05),
        ("Rotterdam", "ZH", "NLD", 51.92, 4.48),
        ("Singapore", "SG", "SGP", 1.35, 103.82),
        ("Jamnagar", "GJ", "IND", 22.47, 70.07),
    ]
    
    operators = ["PetroCorp", "GlobalRefining", "CoastalEnergy", "TransOil", 
                 "PacificPetroleum", "AtlanticFuels", "MidwestRefinery", "SunsetOil"]
    
    refinery_spec = (
        dg.DataGenerator(spark, name="refineries", rows=num_refineries, partitions=4)
        .withColumn("refinery_id", "string", expr="concat('REF-', lpad(cast(id as string), 3, '0'))")
        .withColumn("city_idx", "int", minValue=0, maxValue=len(cities)-1, random=True)
        .withColumn("refinery_name", "string", 
                   expr=f"concat(element_at(array{tuple([c[0] for c in cities])}, city_idx+1), ' Refinery ', chr(65 + (id % 26)))")
        .withColumn("location_city", "string", 
                   expr=f"element_at(array{tuple([c[0] for c in cities])}, city_idx+1)")
        .withColumn("location_state", "string",
                   expr=f"element_at(array{tuple([c[1] for c in cities])}, city_idx+1)")
        .withColumn("location_country", "string",
                   expr=f"element_at(array{tuple([c[2] for c in cities])}, city_idx+1)")
        .withColumn("latitude", "double",
                   expr=f"element_at(array{tuple([c[3] for c in cities])}, city_idx+1) + (rand() - 0.5) * 0.5")
        .withColumn("longitude", "double",
                   expr=f"element_at(array{tuple([c[4] for c in cities])}, city_idx+1) + (rand() - 0.5) * 0.5")
        .withColumn("capacity_bpd", "int", minValue=50000, maxValue=700000, step=10000, random=True)
        .withColumn("complexity_index", "double", minValue=5.0, maxValue=15.0, random=True)
        .withColumn("commissioned_date", "date", 
                   begin="1960-01-01", end="2020-12-31", random=True)
        .withColumn("operator_name", "string", values=operators, random=True)
        .withColumn("is_active", "boolean", expr="rand() > 0.05")  # 95% active
        .withColumn("last_turnaround_date", "date",
                   begin="2020-01-01", end="2024-06-30", random=True)
    )
    
    df = refinery_spec.build()
    return df.drop("city_idx", "id")

In [0]:
# --- dim_processing_units ---
def generate_processing_units(spark, refineries_df, avg_units_per_refinery=10):
    unit_types = [
        ("Crude Distillation", "Atmospheric", 0.95),
        ("Crude Distillation", "Vacuum", 0.93),
        ("Fluid Catalytic Cracker", "Resid FCC", 0.90),
        ("Hydrocracker", "Mild", 0.92),
        ("Hydrocracker", "Full Conversion", 0.88),
        ("Reformer", "Continuous", 0.91),
        ("Reformer", "Semi-Regenerative", 0.89),
        ("Alkylation", "HF Process", 0.94),
        ("Alkylation", "Sulfuric Acid", 0.93),
        ("Coker", "Delayed", 0.87),
        ("Coker", "Fluid", 0.85),
        ("Hydrotreater", "Naphtha", 0.96),
        ("Hydrotreater", "Diesel", 0.95),
        ("Sulfur Recovery", "Claus", 0.98),
    ]
    
    manufacturers = ["Honeywell UOP", "Axens", "Shell Global", "ExxonMobil", 
                    "Chevron Lummus", "KBR", "CB&I", "Technip"]
    
    refinery_ids = [row.refinery_id for row in refineries_df.select("refinery_id").collect()]
    total_units = len(refinery_ids) * avg_units_per_refinery
    
    unit_spec = (
        dg.DataGenerator(spark, name="processing_units", rows=total_units, partitions=8)
        .withColumn("unit_id", "string", expr="concat('UNIT-', lpad(cast(id as string), 5, '0'))")
        .withColumn("refinery_idx", "int", minValue=0, maxValue=len(refinery_ids)-1, random=True)
        .withColumn("refinery_id", "string", 
                   expr=f"element_at(array{tuple(refinery_ids)}, refinery_idx+1)")
        .withColumn("unit_type_idx", "int", minValue=0, maxValue=len(unit_types)-1, random=True)
        .withColumn("unit_type", "string",
                   expr=f"element_at(array{tuple([u[0] for u in unit_types])}, unit_type_idx+1)")
        .withColumn("unit_subtype", "string",
                   expr=f"element_at(array{tuple([u[1] for u in unit_types])}, unit_type_idx+1)")
        .withColumn("design_efficiency", "double",
                   expr=f"element_at(array{tuple([u[2] for u in unit_types])}, unit_type_idx+1)")
        .withColumn("unit_name", "string",
                   expr="concat(substring(unit_type, 1, 3), '-', (id % 10) + 1)")
        .withColumn("capacity_bpd", "int", minValue=10000, maxValue=200000, step=5000, random=True)
        .withColumn("installation_date", "date", begin="1980-01-01", end="2023-12-31", random=True)
        .withColumn("last_maintenance_date", "date", begin="2023-01-01", end="2025-06-30", random=True)
        .withColumn("maintenance_interval_days", "int", values=[90, 180, 365], random=True)
        .withColumn("manufacturer", "string", values=manufacturers, random=True)
        .withColumn("model_number", "string", 
                   expr="concat(substring(manufacturer, 1, 2), '-', cast(floor(rand()*9000+1000) as int))")
        .withColumn("is_active", "boolean", expr="rand() > 0.03")
    )
    
    df = unit_spec.build()
    return df.drop("refinery_idx", "unit_type_idx", "id")


In [0]:
# --- dim_sensors ---
def generate_sensors(spark, units_df, avg_sensors_per_unit=100):
    sensor_types = [
        ("Temperature", "Fahrenheit", 0, 1200, 200, 900, 100, 1000),
        ("Temperature", "Celsius", -50, 600, 50, 500, 0, 550),
        ("Pressure", "PSI", 0, 5000, 50, 3000, 10, 4000),
        ("Pressure", "Bar", 0, 350, 5, 200, 1, 280),
        ("Flow Rate", "BPH", 0, 50000, 1000, 40000, 500, 45000),
        ("Flow Rate", "GPM", 0, 100000, 2000, 80000, 1000, 90000),
        ("Level", "Percent", 0, 100, 10, 95, 5, 98),
        ("Level", "Feet", 0, 50, 5, 45, 2, 48),
        ("Vibration", "mm/s", 0, 50, 0, 20, 0, 35),
        ("Vibration", "mils", 0, 10, 0, 5, 0, 8),
        ("pH", "pH", 0, 14, 4, 10, 2, 12),
        ("Density", "API", 0, 80, 20, 50, 10, 60),
        ("Viscosity", "cSt", 0, 1000, 1, 500, 0.5, 800),
    ]
    
    manufacturers = ["Emerson", "Honeywell", "Siemens", "ABB", "Yokogawa", "Endress+Hauser"]
    
    unit_ids = [row.unit_id for row in units_df.select("unit_id").collect()]
    unit_refinery_map = {row.unit_id: row.refinery_id 
                        for row in units_df.select("unit_id", "refinery_id").collect()}
    
    total_sensors = len(unit_ids) * avg_sensors_per_unit
    
    sensor_spec = (
        dg.DataGenerator(spark, name="sensors", rows=total_sensors, partitions=16)
        .withColumn("sensor_id", "string", expr="concat('SENS-', lpad(cast(id as string), 6, '0'))")
        .withColumn("unit_idx", "int", minValue=0, maxValue=len(unit_ids)-1, random=True)
        .withColumn("unit_id", "string",
                   expr=f"element_at(array{tuple(unit_ids)}, unit_idx+1)")
        .withColumn("sensor_type_idx", "int", minValue=0, maxValue=len(sensor_types)-1, random=True)
        .withColumn("sensor_type", "string",
                   expr=f"element_at(array{tuple([s[0] for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("measurement_unit", "string",
                   expr=f"element_at(array{tuple([s[1] for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("min_value", "double",
                   expr=f"element_at(array{tuple([float(s[2]) for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("max_value", "double",
                   expr=f"element_at(array{tuple([float(s[3]) for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("warning_low", "double",
                   expr=f"element_at(array{tuple([float(s[4]) for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("warning_high", "double",
                   expr=f"element_at(array{tuple([float(s[5]) for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("critical_low", "double",
                   expr=f"element_at(array{tuple([float(s[6]) for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("critical_high", "double",
                   expr=f"element_at(array{tuple([float(s[7]) for s in sensor_types])}, sensor_type_idx+1)")
        .withColumn("sensor_name", "string",
                   expr="concat(substring(sensor_type, 1, 4), '-', lpad(cast((id % 1000) as string), 3, '0'))")
        .withColumn("reading_interval_seconds", "int", values=[5, 10, 30, 60], weights=[0.1, 0.5, 0.3, 0.1], random=True)
        .withColumn("manufacturer", "string", values=manufacturers, random=True)
        .withColumn("model", "string", expr="concat(substring(manufacturer, 1, 3), '-', cast(floor(rand()*900+100) as int))")
        .withColumn("installation_date", "date", begin="2015-01-01", end="2025-01-01", random=True)
        .withColumn("calibration_date", "date", begin="2023-06-01", end="2025-06-30", random=True)
        .withColumn("is_active", "boolean", expr="rand() > 0.02")
    )
    
    df = sensor_spec.build()
    
    # Add refinery_id via join (more efficient than lookup)
    df = df.join(units_df.select("unit_id", "refinery_id"), "unit_id", "left")
    
    return df.drop("unit_idx", "sensor_type_idx", "id")


In [0]:
# --- dim_crude_types ---
def generate_crude_types(spark):
    crude_data = [
        ("CRUDE-001", "West Texas Intermediate", "WTI", "USA", "Permian Basin", 39.6, 0.24, "Light", "Sweet", -3.50),
        ("CRUDE-002", "Brent", "BRENT", "UK", "North Sea", 38.3, 0.37, "Light", "Sweet", 0.00),
        ("CRUDE-003", "Dubai", "DUBAI", "UAE", "Persian Gulf", 31.0, 2.00, "Medium", "Sour", -2.00),
        ("CRUDE-004", "Arab Light", "AL", "SAU", "Ghawar", 33.0, 1.77, "Light", "Sour", -1.50),
        ("CRUDE-005", "Arab Heavy", "AH", "SAU", "Safaniya", 27.0, 2.80, "Heavy", "Sour", -5.00),
        ("CRUDE-006", "Bonny Light", "BONNY", "NGA", "Niger Delta", 33.4, 0.16, "Light", "Sweet", 2.00),
        ("CRUDE-007", "Mars", "MARS", "USA", "Gulf of Mexico", 31.0, 1.93, "Medium", "Sour", -4.00),
        ("CRUDE-008", "Maya", "MAYA", "MEX", "Campeche", 22.0, 3.30, "Heavy", "Sour", -10.00),
        ("CRUDE-009", "Urals", "URALS", "RUS", "Western Siberia", 31.7, 1.35, "Medium", "Sour", -15.00),
        ("CRUDE-010", "Tapis", "TAPIS", "MYS", "Malay Basin", 44.3, 0.04, "Light", "Sweet", 5.00),
        ("CRUDE-011", "Saharan Blend", "SAHARAN", "DZA", "Hassi Messaoud", 45.5, 0.09, "Light", "Sweet", 3.00),
        ("CRUDE-012", "Forcados", "FORCADOS", "NGA", "Niger Delta", 29.6, 0.18, "Light", "Sweet", 1.50),
        ("CRUDE-013", "Ekofisk", "EKO", "NOR", "North Sea", 43.4, 0.17, "Light", "Sweet", 1.00),
        ("CRUDE-014", "Louisiana Light", "LLS", "USA", "Gulf Coast", 36.0, 0.40, "Light", "Sweet", -1.00),
        ("CRUDE-015", "Heavy Louisiana Sweet", "HLS", "USA", "Gulf Coast", 32.5, 0.30, "Medium", "Sweet", -2.50),
    ]
    
    schema = StructType([
        StructField("crude_type_id", StringType()),
        StructField("crude_name", StringType()),
        StructField("crude_abbreviation", StringType()),
        StructField("origin_country", StringType()),
        StructField("origin_region", StringType()),
        StructField("api_gravity_typical", DoubleType()),
        StructField("sulfur_content_typical", DoubleType()),
        StructField("classification", StringType()),
        StructField("sweetness", StringType()),
        StructField("benchmark_price_differential", DoubleType()),
    ])
    
    return spark.createDataFrame(crude_data, schema)


In [0]:
# --- dim_products ---
def generate_products(spark):
    product_data = [
        ("PROD-001", "Regular Gasoline", "Gasoline", "87 Octane", 720, 775, 10, 0.25),
        ("PROD-002", "Premium Gasoline", "Gasoline", "91 Octane", 720, 770, 10, 0.15),
        ("PROD-003", "Super Premium Gasoline", "Gasoline", "93 Octane", 715, 765, 10, 0.10),
        ("PROD-004", "Ultra Low Sulfur Diesel", "Diesel", "ULSD", 820, 860, 15, 0.28),
        ("PROD-005", "Heating Oil", "Distillate", "No. 2", 830, 870, 500, 0.08),
        ("PROD-006", "Jet Fuel A", "Jet Fuel", "Jet A", 775, 840, 3000, 0.08),
        ("PROD-007", "Kerosene", "Distillate", "K-1", 780, 830, 40, 0.03),
        ("PROD-008", "Naphtha", "Feedstock", "Light", 650, 720, 500, 0.05),
        ("PROD-009", "LPG", "Gas", "Propane/Butane", 500, 600, 50, 0.04),
        ("PROD-010", "Residual Fuel Oil", "Residual", "No. 6", 950, 1010, 35000, 0.03),
        ("PROD-011", "Asphalt", "Residual", "Paving Grade", 1010, 1060, 50000, 0.02),
        ("PROD-012", "Petroleum Coke", "Solid", "Fuel Grade", 1400, 1600, 80000, 0.04),
        ("PROD-013", "Lubricant Base Oil", "Lubricant", "Group II", 850, 890, 10, 0.02),
        ("PROD-014", "Sulfur", "Byproduct", "Eleite", 1800, 2100, 0, 0.015),
    ]
    
    schema = StructType([
        StructField("product_id", StringType()),
        StructField("product_name", StringType()),
        StructField("product_category", StringType()),
        StructField("product_grade", StringType()),
        StructField("density_min", DoubleType()),
        StructField("density_max", DoubleType()),
        StructField("sulfur_max_ppm", IntegerType()),
        StructField("typical_yield_pct", DoubleType()),
    ])
    
    df = spark.createDataFrame(product_data, schema)
    return df.withColumn("unit_of_measure", F.lit("barrels"))


In [0]:
# --- dim_storage_tanks ---
def generate_storage_tanks(spark, refineries_df, avg_tanks_per_refinery=4):
    tank_types = ["Floating Roof", "Fixed Roof", "Spherical", "Horizontal"]
    product_types = ["Crude Oil", "Gasoline", "Diesel", "Jet Fuel", "Residual"]
    
    refinery_ids = [row.refinery_id for row in refineries_df.select("refinery_id").collect()]
    total_tanks = len(refinery_ids) * avg_tanks_per_refinery
    
    tank_spec = (
        dg.DataGenerator(spark, name="storage_tanks", rows=total_tanks, partitions=4)
        .withColumn("tank_id", "string", expr="concat('TANK-', lpad(cast(id as string), 5, '0'))")
        .withColumn("refinery_idx", "int", minValue=0, maxValue=len(refinery_ids)-1, random=True)
        .withColumn("refinery_id", "string",
                   expr=f"element_at(array{tuple(refinery_ids)}, refinery_idx+1)")
        .withColumn("tank_name", "string", expr="concat('TK-', lpad(cast((id % 500) + 100 as string), 3, '0'))")
        .withColumn("tank_type", "string", values=tank_types, weights=[0.4, 0.3, 0.15, 0.15], random=True)
        .withColumn("product_type", "string", values=product_types, weights=[0.3, 0.25, 0.2, 0.15, 0.1], random=True)
        .withColumn("capacity_barrels", "int", values=[100000, 250000, 500000, 750000, 1000000], 
                   weights=[0.2, 0.3, 0.25, 0.15, 0.1], random=True)
        .withColumn("min_operating_level", "double", minValue=0.08, maxValue=0.15, random=True)
        .withColumn("max_operating_level", "double", minValue=0.90, maxValue=0.98, random=True)
        .withColumn("diameter_feet", "double", minValue=100, maxValue=350, random=True)
        .withColumn("height_feet", "double", minValue=40, maxValue=65, random=True)
        .withColumn("material", "string", values=["Carbon Steel", "Stainless Steel"], weights=[0.8, 0.2], random=True)
        .withColumn("has_heating", "boolean", expr="rand() > 0.6")
        .withColumn("has_mixer", "boolean", expr="rand() > 0.5")
        .withColumn("last_inspection_date", "date", begin="2022-01-01", end="2025-06-30", random=True)
        .withColumn("is_active", "boolean", expr="rand() > 0.05")
    )
    
    df = tank_spec.build()
    return df.drop("refinery_idx", "id")


# Fact Tables

In [0]:
# --- fact_sensor_readings (STREAMING) ---
def generate_sensor_readings_batch(spark, sensors_df, start_date, end_date, sample_rate=0.01):
    """
    Generate historical sensor readings.
    sample_rate: fraction of sensors × time intervals to generate (0.01 = 1%)
    For full data, use sample_rate=1.0 but expect ~70GB+
    """
    from pyspark.sql import functions as F
    
    # Get sensor metadata
    sensor_data = sensors_df.select(
        "sensor_id", "unit_id", "refinery_id", "sensor_type",
        "min_value", "max_value", "reading_interval_seconds"
    )
    
    # Calculate time range
    days = (end_date - start_date).days
    
    # Generate readings
    readings_spec = (
        dg.DataGenerator(spark, name="sensor_readings", 
                        rows=int(500000 * 8640 * days * sample_rate),  # Adjust based on sample_rate
                        partitions=64)
        .withColumn("reading_id", "string", expr="uuid()")
        .withColumn("sensor_idx", "int", minValue=0, maxValue=499999, random=True)  # 500K sensors
        .withColumn("day_offset", "int", minValue=0, maxValue=days-1, random=True)
        .withColumn("second_of_day", "int", minValue=0, maxValue=86399, random=True)
        .withColumn("base_value", "double", minValue=0.3, maxValue=0.7, random=True)  # Normalized
        .withColumn("noise", "double", minValue=-0.05, maxValue=0.05, random=True)
        .withColumn("anomaly_flag", "double", expr="rand()")  # For introducing anomalies
        .withColumn("reading_quality", "string", 
                   values=["GOOD", "UNCERTAIN", "BAD"], 
                   weights=[0.97, 0.025, 0.005], random=True)
        .withColumn("is_interpolated", "boolean", expr="rand() > 0.99")
    )
    
    df = readings_spec.build()
    
    # Join with sensor metadata to get proper ranges
    df = df.join(
        F.broadcast(sensor_data.withColumn("sensor_idx_join", 
            F.expr("cast(substring(sensor_id, 6) as int)"))),
        df.sensor_idx == F.col("sensor_idx_join"),
        "inner"
    )
    
    # Calculate actual reading value based on sensor's valid range
    df = df.withColumn(
        "reading_value",
        F.when(F.col("anomaly_flag") > 0.995,  # 0.5% anomalies
               F.col("min_value") + (F.col("base_value") + F.col("noise") + 0.3) * (F.col("max_value") - F.col("min_value")))
        .otherwise(
            F.col("min_value") + (F.col("base_value") + F.col("noise")) * (F.col("max_value") - F.col("min_value"))
        )
    )
    
    # Calculate timestamps
    start_timestamp = int(datetime.combine(start_date, datetime.min.time()).timestamp())
    df = df.withColumn(
        "reading_timestamp",
        F.from_unixtime(F.lit(start_timestamp) + F.col("day_offset") * 86400 + F.col("second_of_day"))
    ).withColumn(
        "event_time", F.col("reading_timestamp")
    ).withColumn(
        "processing_time", 
        F.from_unixtime(F.unix_timestamp(F.col("reading_timestamp")) + F.lit(1))
    )
    
    # Select final columns
    return df.select(
        "reading_id", "sensor_id", "unit_id", "refinery_id",
        "reading_timestamp", "reading_value", "reading_quality",
        "is_interpolated", "event_time", "processing_time"
    )


In [0]:
# --- fact_crude_shipments ---
def generate_crude_shipments(spark, refineries_df, crude_types_df, tanks_df, num_shipments=10000000):
    refinery_ids = [row.refinery_id for row in refineries_df.select("refinery_id").collect()]
    crude_ids = [row.crude_type_id for row in crude_types_df.select("crude_type_id").collect()]
    
    transport_modes = ["Tanker", "Pipeline", "Barge", "Rail"]
    ports = ["Ras Tanura", "Houston", "Rotterdam", "Singapore", "Fujairah", 
             "Corpus Christi", "Louisiana Offshore", "Bonny", "Primorsk"]
    statuses = ["SCHEDULED", "IN_TRANSIT", "DELIVERED", "DELAYED"]
    suppliers = [f"SUPP-{str(i).zfill(3)}" for i in range(1, 51)]
    vessels = [f"MT {name}" for name in ["Pacific Star", "Atlantic Grace", "Gulf Spirit", 
               "Ocean Pride", "Coastal Dawn", "Marine King", "Sea Fortune", "Tanker Alpha"]]
    
    shipment_spec = (
        dg.DataGenerator(spark, name="crude_shipments", rows=num_shipments, partitions=32)
        .withColumn("shipment_id", "string", expr="concat('SHIP-', lpad(cast(id as string), 7, '0'))")
        .withColumn("refinery_id", "string", values=refinery_ids, random=True)
        .withColumn("crude_type_id", "string", values=crude_ids, random=True)
        .withColumn("supplier_id", "string", values=suppliers, random=True)
        .withColumn("vessel_name", "string", values=vessels, random=True)
        .withColumn("transport_mode", "string", values=transport_modes, 
                   weights=[0.5, 0.3, 0.15, 0.05], random=True)
        .withColumn("origin_port", "string", values=ports, random=True)
        .withColumn("shipment_date", "date", begin="2020-01-01", end="2024-12-31", random=True)
        .withColumn("transit_days", "int", minValue=5, maxValue=45, random=True)
        .withColumn("volume_barrels", "int", minValue=100000, maxValue=2500000, step=50000, random=True)
        .withColumn("api_gravity_actual", "double", minValue=20.0, maxValue=50.0, random=True)
        .withColumn("sulfur_content_actual", "double", minValue=0.05, maxValue=4.0, random=True)
        .withColumn("water_content_pct", "double", minValue=0.0, maxValue=1.0, random=True)
        .withColumn("price_per_barrel", "double", minValue=40.0, maxValue=120.0, random=True)
        .withColumn("status", "string", values=statuses, weights=[0.05, 0.1, 0.8, 0.05], random=True)
    )
    
    df = shipment_spec.build()
    
    # Calculate derived columns
    df = df.withColumn("arrival_date", F.date_add("shipment_date", F.col("transit_days")))
    df = df.withColumn("expected_arrival_date", 
                       F.date_add("shipment_date", F.col("transit_days") - F.floor(F.rand() * 3).cast("int")))
    df = df.withColumn("total_cost", F.col("volume_barrels") * F.col("price_per_barrel"))
    df = df.withColumn("created_at", F.to_timestamp("shipment_date"))
    df = df.withColumn("updated_at", F.to_timestamp("arrival_date"))
    
    # Add destination tank (join with tanks)
    tank_ids = [row.tank_id for row in tanks_df.filter(F.col("product_type") == "Crude Oil")
                .select("tank_id").collect()]
    df = df.withColumn("destination_tank_id", 
                       F.element_at(F.array([F.lit(t) for t in tank_ids[:100]]), 
                                   (F.floor(F.rand() * len(tank_ids[:100])) + 1).cast("int")))
    
    return df.drop("transit_days", "id")


In [0]:
# --- fact_quality_tests ---
def generate_quality_tests(spark, refineries_df, shipments_df, tanks_df, num_tests=15000000):
    refinery_ids = [row.refinery_id for row in refineries_df.select("refinery_id").collect()]
    
    test_types = ["Full Assay", "Spot Check", "Blend Verification", "Product Certification"]
    source_types = ["Crude Shipment", "Tank Sample", "Process Stream", "Product Output"]
    statuses = ["PASS", "FAIL", "MARGINAL", "PENDING"]
    technicians = [f"TECH-{str(i).zfill(3)}" for i in range(1, 101)]
    
    test_spec = (
        dg.DataGenerator(spark, name="quality_tests", rows=num_tests, partitions=32)
        .withColumn("test_id", "string", expr="concat('TEST-', lpad(cast(id as string), 8, '0'))")
        .withColumn("refinery_id", "string", values=refinery_ids, random=True)
        .withColumn("sample_source_type", "string", values=source_types, 
                   weights=[0.3, 0.4, 0.2, 0.1], random=True)
        .withColumn("test_datetime", "timestamp", begin="2020-01-01 00:00:00", end="2024-12-31 00:00:00", random=True)
        .withColumn("test_type", "string", values=test_types, 
                   weights=[0.2, 0.5, 0.2, 0.1], random=True)
        .withColumn("technician_id", "string", values=technicians, random=True)
        .withColumn("api_gravity", "double", minValue=18.0, maxValue=55.0, random=True)
        .withColumn("sulfur_content_pct", "double", minValue=0.01, maxValue=5.0, random=True)
        .withColumn("water_sediment_pct", "double", minValue=0.0, maxValue=2.0, random=True)
        .withColumn("viscosity_cst", "double", minValue=1.0, maxValue=500.0, random=True)
        .withColumn("pour_point_f", "double", minValue=-40.0, maxValue=60.0, random=True)
        .withColumn("reid_vapor_pressure", "double", minValue=5.0, maxValue=15.0, random=True)
        .withColumn("flash_point_f", "double", minValue=100.0, maxValue=250.0, random=True)
        .withColumn("test_result_status", "string", values=statuses, 
                   weights=[0.85, 0.05, 0.08, 0.02], random=True)
        .withColumn("notes", "string", values=["Within spec", "Minor variance", "Requires retest", 
                                               "Approved", "See lab report"], random=True)
    )
    
    df = test_spec.build()
    
    # Add nullable fields
    df = df.withColumn("octane_number", 
                       F.when(F.rand() > 0.7, F.lit(None)).otherwise(F.rand() * 10 + 85))
    df = df.withColumn("cetane_number",
                       F.when(F.rand() > 0.7, F.lit(None)).otherwise(F.rand() * 15 + 40))
    df = df.withColumn("sample_source_id", F.expr("concat('SRC-', lpad(cast(floor(rand()*1000000) as string), 7, '0'))"))
    df = df.withColumn("tank_id", 
                       F.when(F.col("sample_source_type") == "Tank Sample",
                             F.expr("concat('TANK-', lpad(cast(floor(rand()*2000) as string), 5, '0'))"))
                       .otherwise(F.lit(None)))
    
    return df.drop("id")


In [0]:
# --- fact_tank_inventory ---
def generate_tank_inventory(spark, tanks_df, days_of_data=365):
    tank_ids = [row.tank_id for row in tanks_df.select("tank_id").collect()]
    tank_refineries = {row.tank_id: row.refinery_id 
                      for row in tanks_df.select("tank_id", "refinery_id").collect()}
    tank_products = {row.tank_id: row.product_type 
                    for row in tanks_df.select("tank_id", "product_type").collect()}
    tank_capacities = {row.tank_id: row.capacity_barrels 
                      for row in tanks_df.select("tank_id", "capacity_barrels").collect()}
    
    # 96 readings per day (every 15 min) × days × tanks
    readings_per_day = 96
    total_readings = len(tank_ids) * readings_per_day * days_of_data
    
    inv_spec = (
        dg.DataGenerator(spark, name="tank_inventory", rows=total_readings, partitions=32)
        .withColumn("inventory_id", "string", expr="uuid()")
        .withColumn("tank_idx", "int", minValue=0, maxValue=len(tank_ids)-1, random=True)
        .withColumn("tank_id", "string",
                   expr=f"element_at(array{tuple(tank_ids)}, tank_idx+1)")
        .withColumn("day_offset", "int", minValue=0, maxValue=days_of_data-1, random=True)
        .withColumn("time_slot", "int", minValue=0, maxValue=95, random=True)  # 0-95 for 15-min slots
        .withColumn("fill_percentage", "double", minValue=0.15, maxValue=0.92, random=True)
        .withColumn("temperature_f", "double", minValue=60.0, maxValue=120.0, random=True)
        .withColumn("water_bottom_inches", "double", minValue=0.0, maxValue=12.0, random=True)
        .withColumn("measurement_method", "string", 
                   values=["Radar Gauge", "Manual Dip", "Float Gauge", "Servo Gauge"],
                   weights=[0.6, 0.1, 0.15, 0.15], random=True)
        .withColumn("is_verified", "boolean", expr="rand() > 0.95")
    )
    
    df = inv_spec.build()
    
    # Calculate timestamp
    start_date = datetime(2024, 1, 1)
    start_ts = int(start_date.timestamp())
    df = df.withColumn(
        "measurement_timestamp",
        F.from_unixtime(F.lit(start_ts) + F.col("day_offset") * 86400 + F.col("time_slot") * 900)
    )
    
    # Add refinery_id and product info via lookup (simplified)
    # In real implementation, join with tanks_df
    df = df.join(F.broadcast(tanks_df.select("tank_id", "refinery_id", "product_type", "capacity_barrels")), 
                "tank_id", "left")
    
    df = df.withColumn("volume_barrels", 
                       (F.col("fill_percentage") * F.col("capacity_barrels")).cast("int"))
    
    # Add crude_type_id and product_id based on product_type
    df = df.withColumn("crude_type_id",
                       F.when(F.col("product_type") == "Crude Oil",
                             F.expr("concat('CRUDE-', lpad(cast(floor(rand()*15)+1 as string), 3, '0'))"))
                       .otherwise(F.lit(None)))
    df = df.withColumn("product_id",
                       F.when(F.col("product_type") != "Crude Oil",
                             F.expr("concat('PROD-', lpad(cast(floor(rand()*14)+1 as string), 3, '0'))"))
                       .otherwise(F.lit(None)))
    
    return df.select(
        "inventory_id", "tank_id", "refinery_id", "measurement_timestamp",
        "volume_barrels", "fill_percentage", "temperature_f", "water_bottom_inches",
        "product_type", "crude_type_id", "product_id", "measurement_method", "is_verified"
    )


In [0]:
# --- fact_production_output ---
def generate_production_output(spark, units_df, products_df, crude_types_df, days_of_data=365):
    unit_ids = [row.unit_id for row in units_df.select("unit_id").collect()]
    product_ids = [row.product_id for row in products_df.select("product_id").collect()]
    crude_ids = [row.crude_type_id for row in crude_types_df.select("crude_type_id").collect()]
    
    # 24 hours × days × units
    hours_of_data = 24 * days_of_data
    total_records = len(unit_ids) * hours_of_data // 10  # Sample 10%
    
    output_spec = (
        dg.DataGenerator(spark, name="production_output", rows=total_records, partitions=32)
        .withColumn("output_id", "string", expr="uuid()")
        .withColumn("unit_idx", "int", minValue=0, maxValue=len(unit_ids)-1, random=True)
        .withColumn("unit_id", "string",
                   expr=f"element_at(array{tuple(unit_ids[:1000])}, (unit_idx % 1000) + 1)")  # Limit for expr
        .withColumn("day_offset", "int", minValue=0, maxValue=days_of_data-1, random=True)
        .withColumn("hour", "int", minValue=0, maxValue=23, random=True)
        .withColumn("input_volume_barrels", "int", minValue=3000, maxValue=10000, step=100, random=True)
        .withColumn("input_crude_type_id", "string", values=crude_ids, random=True)
        .withColumn("output_product_id", "string", values=product_ids, random=True)
        .withColumn("yield_percentage", "double", minValue=0.75, maxValue=0.98, random=True)
        .withColumn("energy_consumed_mmbtu", "double", minValue=500.0, maxValue=2500.0, random=True)
        .withColumn("operating_temperature_f", "double", minValue=400.0, maxValue=900.0, random=True)
        .withColumn("operating_pressure_psi", "double", minValue=20.0, maxValue=100.0, random=True)
        .withColumn("downtime_minutes", "int", values=[0, 0, 0, 0, 0, 15, 30, 60], random=True)
        .withColumn("quality_grade", "string", 
                   values=["On-Spec", "Off-Spec", "Premium"], 
                   weights=[0.85, 0.10, 0.05], random=True)
        .withColumn("shift_id", "string", values=["SHIFT-A", "SHIFT-B", "SHIFT-C"], random=True)
    )
    
    df = output_spec.build()
    
    # Calculate production hour timestamp
    start_date = datetime(2024, 1, 1)
    start_ts = int(start_date.timestamp())
    df = df.withColumn(
        "production_hour",
        F.from_unixtime(F.lit(start_ts) + F.col("day_offset") * 86400 + F.col("hour") * 3600)
    )
    
    # Calculate output volume
    df = df.withColumn("output_volume_barrels",
                       (F.col("input_volume_barrels") * F.col("yield_percentage")).cast("int"))
    
    # Add refinery_id via join
    df = df.join(F.broadcast(units_df.select("unit_id", "refinery_id")), "unit_id", "left")
    
    return df.select(
        "output_id", "unit_id", "refinery_id", "production_hour",
        "input_volume_barrels", "input_crude_type_id", "output_product_id",
        "output_volume_barrels", "yield_percentage", "energy_consumed_mmbtu",
        "operating_temperature_f", "operating_pressure_psi", "downtime_minutes",
        "quality_grade", "shift_id"
    )


# Main Execution

In [0]:
def generate_all_data(spark, catalog, schema, sample_rate=0.1):
    """
    Main function to generate all datasets.
    sample_rate: Controls data volume (0.1 = ~10GB, 1.0 = ~100GB)
    """
    from pyspark.sql import functions as F
    from datetime import date
    spark.sql(f"create schema if not exists {catalog}.{schema}")
    spark.sql(f"use {catalog}.{schema}")
    
    print("=" * 60)
    print("CRUDE OIL REFINERY DATA GENERATION")
    print("=" * 60)
    
    # 1. Generate dimension tables
    print("\n[1/11] Generating dim_refineries...")
    refineries = generate_refineries(spark, 500)
    refineries.write.mode("overwrite").saveAsTable(f"dim_refineries")
    print(f"       Generated {refineries.count()} refineries")
    
    print("\n[2/11] Generating dim_crude_types...")
    crude_types = generate_crude_types(spark)
    crude_types.write.mode("overwrite").saveAsTable(f"dim_crude_types")
    print(f"       Generated {crude_types.count()} crude types")
    
    print("\n[3/11] Generating dim_products...")
    products = generate_products(spark)
    products.write.mode("overwrite").saveAsTable(f"dim_products")
    print(f"       Generated {products.count()} products")
    
    print("\n[4/11] Generating dim_processing_units...")
    units = generate_processing_units(spark, refineries, 10)
    units.write.mode("overwrite").saveAsTable(f"dim_processing_units")
    print(f"       Generated {units.count()} processing units")
    
    print("\n[5/11] Generating dim_storage_tanks...")
    tanks = generate_storage_tanks(spark, refineries, 4)
    tanks.write.mode("overwrite").saveAsTable(f"dim_storage_tanks")
    print(f"       Generated {tanks.count()} tanks")
    
    print("\n[6/11] Generating dim_sensors...")
    sensors = generate_sensors(spark, units, 100)
    sensors.write.mode("overwrite").saveAsTable(f"dim_sensors")
    print(f"       Generated {sensors.count()} sensors")
    
    # 2. Generate fact tables
    print("\n[7/11] Generating fact_crude_shipments...")
    shipments = generate_crude_shipments(spark, refineries, crude_types, tanks, 
                                        int(10000000 * sample_rate))
    shipments.write.mode("overwrite").saveAsTable(f"fact_crude_shipments")
    print(f"       Generated {shipments.count()} shipments")
    
    print("\n[8/11] Generating fact_quality_tests...")
    quality_tests = generate_quality_tests(spark, refineries, shipments, tanks,
                                          int(15000000 * sample_rate))
    quality_tests.write.mode("overwrite").saveAsTable(f"fact_quality_tests")
    print(f"       Generated {quality_tests.count()} quality tests")
    
    print("\n[9/11] Generating fact_tank_inventory...")
    tank_inventory = generate_tank_inventory(spark, tanks, 365)
    tank_inventory.write.mode("overwrite").saveAsTable(f"fact_tank_inventory")
    print(f"       Generated {tank_inventory.count()} inventory records")
    
    print("\n[10/11] Generating fact_production_output...")
    production = generate_production_output(spark, units, products, crude_types, 365)
    production.write.mode("overwrite").saveAsTable(f"fact_production_output")
    print(f"       Generated {production.count()} production records")
    
    print("\n[11/11] Generating fact_sensor_readings (batch - historical)...")
    start_date = date(2024, 1, 1)
    end_date = date(2024, 12, 31)
    sensor_readings = generate_sensor_readings_batch(spark, sensors, start_date, end_date, 
                                                    sample_rate=sample_rate * 0.1)
    sensor_readings.write.mode("overwrite").saveAsTable(f"fact_sensor_readings")
    print(f"       Generated {sensor_readings.count()} sensor readings")
    
    print("\n" + "=" * 60)
    print("DATA GENERATION COMPLETE!")
    print("=" * 60)
    
    # Print summary
    print("\nData Location:", f"{catalog}.{schema}")


In [0]:
# Run generation
generate_all_data(spark, CATALOG, SCHEMA, sample_rate=0.01)
