# prediction_model data preparation

In [1]:
# 1. Configuration & Connection

import duckdb
from pathlib import Path

# Use relative path or home-based path to make it portable
BASE_DIR = Path(r"C:\Users\websi\OneDrive - UT Cloud\Semester\3. WS2025_26\DS500 Data Science Project (12 ECTS)\tankerkoenig_repo\tankerkoenig-data")
DB_PATH = BASE_DIR / "fuel_price_preparation.duckdb"
OUTPUT_PARQUET = BASE_DIR / "derived" / "features_sampled_e5_2023_2024.parquet"

# Globs for 2023/2024
PRICE_GLOBS = [
    str(BASE_DIR / "prices" / "2023" / "*" / "*-prices.csv"),
    str(BASE_DIR / "prices" / "2024" / "*" / "*-prices.csv")
]
STATION_GLOBS = [
    str(BASE_DIR / "stations" / "2023" / "*" / "*-stations.csv"),
    str(BASE_DIR / "stations" / "2024" / "*" / "*-stations.csv")
]

# Connect and setup
con = duckdb.connect(str(DB_PATH))
con.execute("PRAGMA threads=8;")
con.execute("SELECT setseed(0.42);") # Crucial for reproducible sampling
print(f"Connected to: {DB_PATH}")

IOException: IO Error: File is already open in 
C:\Users\websi\anaconda3\python.exe (PID 12888)

In [None]:
# 2. Ingest Prices and Stations
con.execute("DROP TABLE IF EXISTS prices_raw; DROP TABLE IF EXISTS stations_raw;")

query_ingest = """
    -- Read all price files at once
    CREATE TABLE prices_raw AS 
    SELECT * FROM read_csv_auto(?, SAMPLE_SIZE=-1);

    -- Read all station files at once, keeping filename for snapshot logic
    CREATE TABLE stations_raw AS 
    SELECT * FROM read_csv_auto(?, SAMPLE_SIZE=-1, filename=true, union_by_name=true);
"""

con.execute(query_ingest, [PRICE_GLOBS, STATION_GLOBS])

# Sanity Check
print("Ingestion Complete.")
con.sql("SELECT 'Prices' as type, count(*) as n FROM prices_raw UNION ALL SELECT 'Stations', count(*) FROM stations_raw").show()

In [None]:
# 3. Station Processing: Snapshot -> Sample -> Brand Grouping
con.execute("DROP TABLE IF EXISTS stations_final_sample;")

con.execute("""
CREATE TABLE stations_final_sample AS
WITH parsed_stations AS (
    SELECT 
        *,
        -- Extract date from filename (e.g., 2023-01-01)
        CAST(regexp_extract(filename, '([0-9]{4}-[0-9]{2}-[0-9]{2})', 1) AS DATE) AS snapshot_date
    FROM stations_raw
),
latest_snapshot AS (
    SELECT * EXCLUDE (rn) FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY uuid ORDER BY snapshot_date DESC) as rn
        FROM parsed_stations
    ) WHERE rn = 1
),
sampled_stations AS (
    -- Randomly sample 500 stations from the latest snapshots
    -- Note: seed 0.42 was set in Cell 1, so this is deterministic
    SELECT * FROM latest_snapshot ORDER BY random() LIMIT 500
),
brand_counts AS (
    SELECT brand, COUNT(*) as n FROM sampled_stations GROUP BY brand
)
SELECT 
    s.uuid AS station_uuid,
    s.city,
    s.brand,
    CASE WHEN bc.n > 5 THEN s.brand ELSE 'other' END AS brand_group
FROM sampled_stations s
LEFT JOIN brand_counts bc USING (brand);
""")

con.sql("SELECT brand_group, count(*) as n FROM stations_final_sample GROUP BY 1 ORDER BY 2 DESC").show()

In [None]:
# 4. Price Gridding and Forward Fill
con.execute("DROP TABLE IF EXISTS grid_sampled_e5_prepared;")

con.execute("""
CREATE TABLE grid_sampled_e5_prepared AS
WITH 
-- 1. Filter prices to our 500 stations and E5 fuel only
relevant_prices AS (
    SELECT 
        p.station_uuid,
        p.date as ts_raw,
        CAST(p.e5 AS DOUBLE) as price_raw
    FROM prices_raw p
    WHERE p.station_uuid IN (SELECT station_uuid FROM stations_final_sample)
      AND p.e5 IS NOT NULL AND p.e5 > 0
),
-- 2. Round timestamps to 30-minute floor, take average if multiple updates happen in 30 min
prices_30min AS (
    SELECT 
        station_uuid,
        date_trunc('hour', ts_raw) + 
            INTERVAL (CASE WHEN EXTRACT(MINUTE FROM ts_raw) < 30 THEN 0 ELSE 30 END) MINUTE AS ts_30,
        AVG(price_raw) as price_event
    FROM relevant_prices
    GROUP BY 1, 2
),
-- 3. Build the perfect Time Grid per station (min to max timestamp)
station_bounds AS (
    SELECT station_uuid, MIN(ts_30) as min_ts, MAX(ts_30) as max_ts 
    FROM prices_30min GROUP BY 1
),
full_grid AS (
    SELECT sb.station_uuid, gs.ts_30
    FROM station_bounds sb,
    generate_series(sb.min_ts, sb.max_ts, INTERVAL 30 MINUTE) AS gs(ts_30)
),
-- 4. Join Grid with Events and Forward Fill (Optimized)
joined_grid AS (
    SELECT 
        g.station_uuid,
        g.ts_30,
        p.price_event
    FROM full_grid g
    LEFT JOIN prices_30min p ON g.station_uuid = p.station_uuid AND g.ts_30 = p.ts_30
)
-- Final Selection with Forward Fill
SELECT
    station_uuid,
    ts_30,
    -- MAGIC: Forward fill ignoring nulls replaces the complex recursive logic
    LAST_VALUE(price_event IGNORE NULLS) OVER (
        PARTITION BY station_uuid ORDER BY ts_30
    ) AS price
FROM joined_grid;
""")

# Validation: Ensure we didn't lose data
con.sql("SELECT COUNT(*) FROM grid_sampled_e5_prepared WHERE price IS NOT NULL").show()

In [None]:
# 5. Feature Engineering and Export
con.execute("DROP TABLE IF EXISTS features_final;")

con.execute("""
CREATE TABLE features_final AS
WITH prepared_with_time AS (
    SELECT 
        g.*,
        -- Convert to Berlin Time
        timezone('Europe/Berlin', g.ts_30) AS ts_local,
        CAST(timezone('Europe/Berlin', g.ts_30) AS DATE) AS d,
        -- Calculate Time Cell (0-47)
        (EXTRACT(HOUR FROM timezone('Europe/Berlin', g.ts_30)) * 2) + 
        (EXTRACT(MINUTE FROM timezone('Europe/Berlin', g.ts_30)) / 30) AS time_cell
    FROM grid_sampled_e5_prepared g
    WHERE g.price IS NOT NULL -- Remove leading nulls where forward fill hadn't started
)
SELECT 
    w.station_uuid,
    w.d AS date,
    w.time_cell,
    w.price,
    
    -- 1. Seasonality
    dayofweek(w.d) AS dow,
    CASE WHEN dayofweek(w.d) IN (0, 6) THEN 1 ELSE 0 END AS is_weekend,
    
    -- 2. Lags (Raw History)
    LAG(w.price, 1) OVER (PARTITION BY w.station_uuid, w.time_cell ORDER BY w.d) AS price_lag_1d,
    LAG(w.price, 2) OVER (PARTITION BY w.station_uuid, w.time_cell ORDER BY w.d) AS price_lag_2d,
    LAG(w.price, 3) OVER (PARTITION BY w.station_uuid, w.time_cell ORDER BY w.d) AS price_lag_3d,
    LAG(w.price, 7) OVER (PARTITION BY w.station_uuid, w.time_cell ORDER BY w.d) AS price_lag_7d,
    LAG(w.price, 14) OVER (PARTITION BY w.station_uuid, w.time_cell ORDER BY w.d) AS price_lag_14d,
    LAG(w.price, 21) OVER (PARTITION BY w.station_uuid, w.time_cell ORDER BY w.d) AS price_lag_21d,
    
    -- 3. Brand Info
    s.brand_group
    
FROM prepared_with_time w
JOIN stations_final_sample s ON s.station_uuid = w.station_uuid
-- Apply Filter after window functions
QUALIFY price_lag_1d IS NOT NULL AND price_lag_21d IS NOT NULL;
""")

# --- ADD MOMENTUM (Calculated on the result to keep SQL clean) ---
# Momentum: "Are we spiking compared to history?"
con.execute("""
ALTER TABLE features_final ADD COLUMN mom_1d_2d DOUBLE;
ALTER TABLE features_final ADD COLUMN mom_1d_7d DOUBLE;

UPDATE features_final SET 
    mom_1d_2d = price_lag_1d - price_lag_2d,
    mom_1d_7d = price_lag_1d - price_lag_7d;
""")

# Export
output_dir = OUTPUT_PARQUET.parent
output_dir.mkdir(parents=True, exist_ok=True)

print(f"Exporting to {OUTPUT_PARQUET}...")
con.execute(f"COPY features_final TO '{OUTPUT_PARQUET}' (FORMAT PARQUET, COMPRESSION ZSTD);")
print("Done.")