### Add Scenario A rows

In [None]:
# Cell 1: Setup & config (Scenario A focused)
import os, json, uuid, hashlib, math, logging, base64, time
from pathlib import Path
from datetime import datetime, timedelta, timezone
import numpy as np
import pandas as pd
from scipy import stats

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

# Paths (keep your original)
INPUT_CSV = Path(r"C:\Users\ishaa\OneDrive\Desktop\synthetic_data_final\engine_inference_decoded_json.csv")
OUT_CSV = Path(r"C:\Users\ishaa\OneDrive\Desktop\synthetic_data_final\synthetic_engine_inference_scenarioA.csv")
OUT_CSV.parent.mkdir(parents=True, exist_ok=True)

# Synthesis params (tweakable)
RNG_SEED = 42
RNG = np.random.RandomState(RNG_SEED)
CHUNK_SIZE = 5000
TOTAL_DAYS = 12   # Scenario A covers days 1..12
# We'll generate exactly the hours defined in the original scenario A mapping (see Cell 4)
USE_COPULA = False   # disabled for tuning stability (switch on later if needed)

# Composite tuning (error-like: higher -> worse)
WEIGHTS = {
    "recon_error_dense": 0.6,
    "recon_error_lstm": 0.25,
    "isolation_score": 0.08,
    "kde_logp": 0.04,
    "gmm_logp": 0.03
}
COMPOSITE_GAIN = 6.0
COMPOSITE_CLIP = (-6.0, 6.0)

# Spike control (robust)
SPIKE_PCTILE_LOW, SPIKE_PCTILE_HIGH = 1.0, 99.5
MAX_SPIKE_FACTOR = 1.0   # max fraction of (p90 - p10) used as spike magnitude


In [None]:
# Cell 2: Utilities (hash, b64, empirical helpers, winsorize/log1p transforms)
def sha256_hex_of_row(d: dict, exclude_keys=("row_hash",)):
    dd = {k:v for k,v in d.items() if k not in exclude_keys}
    s = json.dumps(dd, sort_keys=True, ensure_ascii=False, separators=(',',':'))
    return hashlib.sha256(s.encode('utf-8')).hexdigest()

def to_base64_json(obj):
    s = json.dumps(obj, separators=(',',':'))
    return base64.b64encode(s.encode('utf-8')).decode('ascii')

def format_timestamp_for_csv(ts: pd.Timestamp) -> str:
    return ts.strftime("%Y-%m-%dT%H:%M:%S+0000")

def empirical_cdf_interpolator(arr):
    arr = np.asarray(arr)
    if arr.size == 0:
        return None, None
    uvals, counts = np.unique(arr, return_counts=True)
    cumprob = np.cumsum(counts) / counts.sum()
    return uvals.astype(float), cumprob

def invert_empirical(u, uvals, cumprob):
    return float(np.interp(u, cumprob, uvals, left=uvals[0], right=uvals[-1]))

def winsorize_series(arr, low_pct=1.0, high_pct=99.5):
    lo = np.percentile(arr, low_pct)
    hi = np.percentile(arr, high_pct)
    return np.clip(arr, lo, hi), lo, hi

def robust_scale_params(arr):
    # returns median and scale (use p10-p90 or IQR fallback)
    arr = np.asarray(arr)
    if arr.size == 0:
        return 0.0, 1.0
    median = float(np.median(arr))
    p10, p90 = np.percentile(arr, [10,90])
    scale = float(max(1e-6, p90 - p10))
    return median, scale


In [None]:
# === battery_inference_decoded_json.csv ===
# ['row_hash', 'timestamp', 'date', 'source_id', 'kafka_key', 'offset', 'source_file', 
#  'battery_voltage_ecu_7ee', 'battery_current', 'battery_state_of_charge_soc_pct', 'battery_state_of_health_soh_pct', 
#  'battery_temperature_cell', 'hv_battery_pack_voltage', 'hv_battery_pack_current', 'internal_resistance_impedance', 
#  'alternator_load_pct', 'alternator_voltage_output', 'energy_consumption_per_km_wh_per_km_per_inr_per_km', 
#  'regenerative_energy_recovered', 'distance_to_empty_km', 'average_trip_speed_while_moving', 'average_trip_speed_overall',
#  'ambient_air_temperature', 'barometer_android_device_mb', 'engine_coolant_temperature', 'charging_power_kw',
#  'charging_efficiency_pct', 'auxiliary_12v_battery_current_draw', 'fuel_consumption_km_per_l_or_l_per_100_km', 
#  'co_emissions_instant_per_avg', 'alcohol_fuel_percentage', 'boost_commanded_per_measured', 'recon_error_dense',
#  'recon_error_lstm', 'lstm_window_id', 'isolation_score', 'kde_logp', 'gmm_logp', 'composite_score', 'anomaly_label',
#  'anomaly_severity', 'inference_run_id', 'inference_timestamp', 'processing_latency_ms', 'dense_per_feature_error_json', 
#  'explain_top_k_json', 'raw_model_outputs_json', 'dense_per_feature_error_json_decoded_json', 'explain_top_k_json_decoded_json', 
#  'raw_model_outputs_json_decoded_json']

# === engine_inference_decoded_json.csv ===
# ['row_hash', 'timestamp', 'date', 'source_id', 'kafka_key', 'offset', 'source_file', 'air_fuel_ratio_commanded_1', 
#  'air_fuel_ratio_measured_1', 'catalyst_temperature_bank_1_sensor_1', 'catalyst_temperature_bank_1_sensor_2', 
#  'engine_kw_at_the_wheels_kw', 'engine_load_absolute', 'engine_oil_temperature', 'engine_rpm_rpm', 'fuel_flow_rate_hour_l_hr', 
#  'fuel_trim_bank_1_long_term', 'fuel_trim_bank_1_short_term', 'mass_air_flow_rate_g_s', 'o2_sensor1_wide_range_current_ma',
#  'o2_bank_1_sensor_2_voltage_v', 'timing_advance', 'turbo_boost_vacuum_gauge_psi', 'voltage_control_module_v', 
#  'volumetric_efficiency_calculated', 'ecu_7ea_engine_coolant_temperature', 'ecu_7ea_intake_air_temperature', 
#  'ecu_7eb_ambient_air_temp', 'ecu_7eb_engine_load', 'ecu_7eb_engine_rpm_rpm', 'ecu_7eb_speed_obd_km_h', 
#  'recon_error_dense', 'recon_error_lstm', 'lstm_window_id', 'isolation_score', 'kde_logp', 'gmm_logp', 
#  'composite_score', 'anomaly_label', 'anomaly_severity', 'inference_run_id', 'inference_timestamp', 
#  'processing_latency_ms', 'dense_per_feature_error_json', 'explain_top_k_json', 'raw_model_outputs_json', 
#  'dense_per_feature_error_json_decoded_json', 'explain_top_k_json_decoded_json', 'raw_model_outputs_json_decoded_json']

# === body_inference_decoded_json.csv ===
# ['row_hash', 'timestamp', 'date', 'source_id', 'kafka_key', 'offset', 'source_file', 'ambient_air_temperature_body', 
#  'cabin_temperature', 'cabin_humidity_pct', 'hvac_blower_speed', 'ac_compressor_load_pct', 'window_open_pct', 
#  'sunroof_position_pct', 'fuel_level_pct', 'distance_since_codes_cleared', 'distance_with_mil_lit', 'odometer_reading', 
#  'recon_error_dense', 'recon_error_lstm', 'lstm_window_id', 'isolation_score', 'kde_logp', 'gmm_logp', 
#  'composite_score', 'anomaly_label', 'anomaly_severity', 'inference_run_id', 'inference_timestamp', 
#  'processing_latency_ms', 'dense_per_feature_error_json', 'explain_top_k_json', 'raw_model_outputs_json', 
#  'dense_per_feature_error_json_decoded_json', 'explain_top_k_json_decoded_json', 'raw_model_outputs_json_decoded_json']

# === tyre_inference_decoded_json.csv ===
# ['row_hash', 'timestamp', 'date', 'source_id', 'kafka_key', 'offset', 'source_file', 
#  'bearing_vehicle_heading', 'longitudinal_acceleration', 'lateral_acceleration', 'yaw_rate', 
#  'stopping_distance', 'steering_angle_sensor', 'steering_torque_applied', 'suspension_height_per_deflection',
#  'suspension_damper_force', 'vertical_acceleration', 'tyre_pressure_fl_psi', 'tyre_pressure_fr_psi', 
#  'tyre_pressure_rl_psi', 'tyre_pressure_rr_psi', 'tyre_temp_fl_c', 'tyre_temp_fr_c', 'tyre_temp_rl_c', 'tyre_temp_rr_c',
#  'wheel_speed_fl_kmh', 'wheel_speed_fr_kmh', 'wheel_speed_rl_kmh', 'wheel_speed_rr_kmh', 'slip_ratio_fl', 'slip_ratio_fr', 
#  'slip_ratio_rl', 'slip_ratio_rr', 'tyre_wear_fl_pct', 'tyre_wear_fr_pct', 'tyre_wear_rl_pct', 'tyre_wear_rr_pct', 'tyre_load_fl_n', 
#  'tyre_load_fr_n', 'tyre_load_rl_n', 'tyre_load_rr_n', 'accel_x_g_tyre', 'accel_y_g_tyre', 'accel_z_g_tyre', 
#  'vibration_f1_hz', 'vibration_f2_hz', 'vibration_f3_hz', 'recon_error_dense', 'recon_error_lstm', 'lstm_window_id', 
#  'isolation_score', 'kde_logp', 'gmm_logp', 'composite_score', 'anomaly_label', 'anomaly_severity', 
#  'inference_run_id', 'inference_timestamp', 'processing_latency_ms', 'dense_per_feature_error_json', 'explain_top_k_json',
#  'raw_model_outputs_json', 'dense_per_feature_error_json_decoded_json', 'explain_top_k_json_decoded_json', 'raw_model_outputs_json_decoded_json']

# === transmission_inference_decoded_json.csv ===
# ['row_hash', 'timestamp', 'date', 'source_id', 'kafka_key', 'offset', 'source_file',
#  'engine_rpm', 'gear_position_actual', 'gear_commanded_target', 'transmission_oil_temperature',
#  'transmission_oil_pressure', 'clutch_engagement_per_slip', 'torque_converter_slip_speed', 
#  'actual_engine_pct_torque', 'driver_demand_engine_pct_torque', 'engine_reference_torque_nm', 
#  'acceleration_sensor_total_g', 'throttle_position_manifold_pct', 'accelerator_pedal_position_d_per_e_per_f_pct',
#  'air_fuel_ratio_commanded', 'air_fuel_ratio_measured', 'boost_pressure_commanded_a_per_b', 
#  'boost_pressure_sensor_a_per_b', 'engine_load_calculated_pct', 'engine_load_absolute_pct', 
#  'egr_commanded_pct', 'egr_error_pct', 'catalyst_temperatures_bank_sensors', 'bearing_heading_per_vehicle_yaw', 
#  'accel_x_g', 'accel_y_g', 'accel_z_g', 'vehicle_speed_kmh', 'recon_error_dense', 'recon_error_lstm', 
#  'lstm_window_id', 'isolation_score', 'kde_logp', 'gmm_logp', 'composite_score', 'anomaly_label', 
#  'anomaly_severity', 'inference_run_id', 'inference_timestamp', 'processing_latency_ms', 
#  'dense_per_feature_error_json', 'explain_top_k_json', 'raw_model_outputs_json', 
#  'dense_per_feature_error_json_decoded_json', 'explain_top_k_json_decoded_json', 'raw_model_outputs_json_decoded_json']

In [None]:
# Cell 3: Load empirical CSV & compute robust marginals/stats
logging.info("Loading empirical CSV...")
df_emp = pd.read_csv(INPUT_CSV, low_memory=False)
n_emp = len(df_emp)
logging.info(f"Empirical rows: {n_emp}, columns: {len(df_emp.columns)}")

# Candidate numeric columns (same list as your original; filtered by presence)
NUMERIC_COLS_CANDIDATE = [
    "air_fuel_ratio_commanded_1","air_fuel_ratio_measured_1","catalyst_temperature_bank_1_sensor_1","catalyst_temperature_bank_1_sensor_2",
    "engine_kw_at_the_wheels_kw","engine_load_absolute",
    "engine_oil_temperature","engine_rpm_rpm","fuel_flow_rate_hour_l_hr",
    "fuel_trim_bank_1_long_term","fuel_trim_bank_1_short_term","mass_air_flow_rate_g_s",
    "o2_sensor1_wide_range_current_ma",
    "o2_bank_1_sensor_2_voltage_v","timing_advance","turbo_boost_vacuum_gauge_psi",
    "voltage_control_module_v","volumetric_efficiency_calculated",
    "ecu_7ea_engine_coolant_temperature","ecu_7ea_intake_air_temperature","ecu_7eb_ambient_air_temp",
    "ecu_7eb_engine_load","ecu_7eb_engine_rpm_rpm","ecu_7eb_speed_obd_km_h",
    "recon_error_dense","recon_error_lstm","isolation_score","kde_logp","gmm_logp"
]




NUMERIC_COLS = [c for c in NUMERIC_COLS_CANDIDATE if c in df_emp.columns]
logging.info(f"Numeric cols present: {NUMERIC_COLS}")

# Precompute robust marginals: uvals/cumprob for each numeric column and robust stats
empirical_marginals = {}
for c in NUMERIC_COLS:
    arr_raw = df_emp[c].dropna().astype(float).values
    if arr_raw.size == 0:
        empirical_marginals[c] = {"uvals": None, "cumprob": None, "median": 0.0, "scale":1.0, "p1":0.0, "p995":0.0}
        continue
    # winsorize to remove pathological extremes for building the empirical CDF used in invert_empirical
    arr_win, p1, p995 = winsorize_series(arr_raw, SPIKE_PCTILE_LOW, SPIKE_PCTILE_HIGH)
    uvals, cumprob = empirical_cdf_interpolator(arr_win)
    median, scale = robust_scale_params(arr_win)
    empirical_marginals[c] = {"uvals": uvals, "cumprob": cumprob, "median": median, "scale": scale, "p1": float(p1), "p995": float(p995)}
logging.info("Prepared robust empirical marginals.")


In [None]:
# Cell 4: Scenario A schedule (days 0..11) and helpers
# We will use the same day-hours mapping you had for Scenario A
day_hours = []
for d in range(1,13):
    if d <= 6:
        day_hours.append(3)   # baseline
    elif d <= 10:
        day_hours.append(8)   # anomaly windows
    elif d == 11:
        day_hours.append(9)   # service day
    else:
        day_hours.append(4)   # post-service
assert len(day_hours) == TOTAL_DAYS

# choose_substate and modulation functions specialized for scenario A
def choose_substate_for_time_within_day_scenarioA(second_of_day, H_seconds):
    frac = second_of_day / max(1, H_seconds)
    # map: first half baseline, middle pre_failure, then failure window near end of middle,
    # service shortly after, post_service at very end.
    if frac < 0.5:
        return "baseline", frac/0.5
    elif frac < 0.8:
        # within pre_failure -> map to pre_failure or failure if near the final portion
        rel = (frac - 0.5) / 0.3
        # inject a failure in days where H_hours >= 6 (we'll handle in generator by checking day index)
        return "pre_failure", rel
    elif frac < 0.85:
        return "failure", (frac-0.8)/0.05
    elif frac < 0.90:
        return "service", (frac-0.85)/0.05
    else:
        return "post_service", (frac-0.90)/0.10

def modulation_M_scenarioA(substate, rel_pos, day_idx):
    # substate -> scalar; pre_failure ramps more negative further into the Scenario A block
    if substate == "baseline":
        return RNG.normal(loc=0.0, scale=0.02)
    if substate == "pre_failure":
        # ramp from small negative to stronger negative depending on day_idx (later days worse)
        base = -0.05 - 0.012 * max(0, (day_idx - 3))  # worsen after day 4 gradually
        return base * (0.2 + rel_pos) + RNG.normal(0, 0.02)
    if substate == "failure":
        return float(RNG.uniform(-0.75, -0.55) + RNG.normal(0, 0.03))
    if substate == "service":
        return float(RNG.uniform(0.45, 0.7) + RNG.normal(0, 0.03))
    if substate == "post_service":
        return float(0.05 + RNG.normal(0, 0.02))
    return float(RNG.normal(0, 0.02))


In [None]:
# Cell 5: Row generator (Scenario A only) - marginal sampling + robust spike + composite (error-like)
def pick_alpha_for_col(col_name):
    col_lower = col_name.lower()
    if "recon_error" in col_lower or "dense" in col_lower or "lstm" in col_lower:
        return 0.9
    if "isolation" in col_lower:
        return 0.5
    if "kde" in col_lower or "gmm" in col_lower or "logp" in col_lower:
        return 0.35
    if any(k in col_lower for k in ["battery","current","voltage","temperature","resistance","soc","soh"]):
        return 0.08
    return 0.03

# robust transform for recon errors: log1p + winsorize by precomputed percentiles
RECON_COLS = [c for c in ["recon_error_dense","recon_error_lstm"] if c in empirical_marginals]

def compute_composite_score_row(row_vals):
    # Build normalized components robustly and ensure composite_score is error-like (higher = worse)
    comps = {}
    # dense
    for key, weight_key in [("recon_error_dense","recon_error_dense"),
                            ("recon_error_lstm","recon_error_lstm"),
                            ("isolation_score","isolation_score"),
                            ("kde_logp","kde_logp"),
                            ("gmm_logp","gmm_logp")]:
        val = float(row_vals.get(key, 0.0) or 0.0)
        if key in RECON_COLS:
            # log-transform for heavy tails
            val_t = math.log1p(abs(val)) * (1.0 if val >=0 else -1.0)
            median = 0.0
            scale = 1.0
            # compute median/scale from marginal precomputed log1p stats (approx)
            meta = empirical_marginals.get(key)
            if meta:
                # approximate by median and scale of winsorized raw -> but we used log1p at inference;
                # so use median of log1p(empirical) for centering (compute lazily if not present)
                if "log_median" not in meta:
                    arr = df_emp[key].dropna().astype(float).values
                    arr_win, _, _ = winsorize_series(arr, SPIKE_PCTILE_LOW, SPIKE_PCTILE_HIGH)
                    meta["log_median"] = float(np.median(np.log1p(np.abs(arr_win)) * np.sign(arr_win)))
                    p10, p90 = np.percentile(np.log1p(np.abs(arr_win)) * np.sign(arr_win), [10,90])
                    meta["log_scale"] = float(max(1e-6, p90 - p10))
                median = meta["log_median"]
                scale = meta["log_scale"]
            normed = (val_t - median) / max(scale, 1e-6)
        else:
            meta = empirical_marginals.get(key)
            median = meta["median"] if meta else 0.0
            scale = meta["scale"] if meta else 1.0
            normed = (val - median) / max(scale, 1e-6)
        comps[key] = normed

    # weighted raw
    score_raw = 0.0
    # Only include weights that exist in WEIGHTS
    for k in ["recon_error_dense","recon_error_lstm","isolation_score","kde_logp","gmm_logp"]:
        w = WEIGHTS.get(k, 0.0)
        score_raw += w * comps.get(k, 0.0)

    # clip raw and logistic
    score_raw = float(np.clip(score_raw, COMPOSITE_CLIP[0], COMPOSITE_CLIP[1]))
    s = 1.0 / (1.0 + math.exp(-COMPOSITE_GAIN * score_raw))
    composite_score = float(np.clip(s, 0.0, 1.0))  # error-like: higher => worse
    return composite_score

def generate_scenarioA_rows(day_hours, start_date, rng=RNG):
    offset_counter = 0
    inference_ts_now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f+00:00")
    for day_idx in range(len(day_hours)):
        H_hours = day_hours[day_idx]
        H_seconds = int(H_hours * 3600)
        current_date = start_date + pd.Timedelta(days=day_idx)
        day_start_time = pd.Timestamp(current_date) + pd.Timedelta(hours=8)

        for sec in range(H_seconds):
            ts = day_start_time + pd.Timedelta(seconds=sec)
            timestamp_str = format_timestamp_for_csv(pd.Timestamp(ts).tz_localize(None))
            date_str = timestamp_str[:10]

            substate, rel_pos = choose_substate_for_time_within_day_scenarioA(sec, H_seconds)
            M = modulation_M_scenarioA(substate, rel_pos, day_idx)

            # sample marginals for each numeric column
            row_vals = {}
            for col in NUMERIC_COLS:
                meta = empirical_marginals.get(col)
                if meta is None or meta["uvals"] is None:
                    row_vals[col] = float("nan")
                    continue
                # sample u and invert empirical
                u = rng.uniform()
                val = invert_empirical(u, meta["uvals"], meta["cumprob"])

                # prepare alpha and spike bound
                alpha = pick_alpha_for_col(col)
                # robust spike magnitude: fraction of (p90 - p10) approximated by meta["scale"]
                spike_bound = MAX_SPIKE_FACTOR * meta["scale"]
                spike = 0.0
                if substate == "failure":
                    # directed spike that increases 'error' for recon errors (sign depends on nature)
                    spike_loc = -1.0 * spike_bound  # negative direction in value space is not consistent across features,
                                                     # but we treat this as additive magnitude for recon errors.
                    spike = rng.normal(loc=spike_loc, scale=0.25 * spike_bound)

                # apply modulation:
                if "recon_error" in col.lower() or "recon" in col.lower() or "dense" in col.lower() or "lstm" in col.lower():
                    # additive-dominant
                    val_mod = val + alpha * (M * max(1.0, abs(val))) + spike + rng.normal(0, meta["scale"] * 0.02)
                else:
                    # multiplicative small drift
                    val_mod = val * (1.0 + alpha * M) + spike + rng.normal(0, meta["scale"] * 0.02)

                # winsorize/cap to empirical p1/p995
                val_mod = float(np.clip(val_mod, meta["p1"], meta["p995"]))
                row_vals[col] = val_mod

            # compute composite score (error-like)
            composite = compute_composite_score_row(row_vals)
            # label and severity using error-like thresholds (higher error worse)
            if composite > 0.6:
                label, severity = "anomaly", 2
            elif composite > 0.4:
                label, severity = "suspicious", 1
            else:
                label, severity = "normal", 0

            # explain_top_k (reuse your earlier logic but with robust dev calc)
            def generate_explain_top_k_local(row_numeric_dict, rng_local=rng):
                FEATURE_POOL = [c for c in NUMERIC_COLS if c not in ("recon_error_dense","recon_error_lstm","isolation_score","kde_logp","gmm_logp")]
                deviations = []
                for f in FEATURE_POOL:
                    if f not in empirical_marginals:
                        deviations.append((f, 0.0)); continue
                    meta = empirical_marginals[f]
                    med = meta["median"]
                    std = meta["scale"]
                    val = row_numeric_dict.get(f, 0.0)
                    deviations.append((f, abs((val - med) / max(1e-6, std))))
                deviations.sort(key=lambda x: x[1], reverse=True)
                top_candidates = deviations[:6]
                feats = [t[0] for t in top_candidates]
                weights = np.array([t[1] + 1e-6 for t in top_candidates], dtype=float)
                probs = weights / weights.sum() if weights.sum() > 0 else np.ones_like(weights)/len(weights)
                chosen = list(rng_local.choice(feats, size=min(3,len(feats)), replace=False, p=probs))
                result = []
                for f in chosen:
                    dev = next((d for (ff,d) in deviations if ff==f), 0.0)
                    contrib = float(np.clip(rng_local.uniform(0,1) * (0.3 + min(dev/3.0, 1.0)), 0.0, 1.0))
                    result.append({"feature": f, "contribution": contrib})
                decoded_json_str = json.dumps(result)
                encoded = base64.b64encode(decoded_json_str.encode()).decode()
                return decoded_json_str, encoded

            decoded_explain, encoded_explain = generate_explain_top_k_local(row_vals)

            # build final row dict
            row = {}
            row["row_hash"] = None
            row["timestamp"] = timestamp_str
            row["date"] = date_str
            row["source_id"] = "sim001"
            row["kafka_key"] = "sim001"
            row["offset"] = offset_counter
            row["source_file"] = str(df_emp["source_file"].iloc[0]) if "source_file" in df_emp.columns else "C:\\source.csv"

            for c in NUMERIC_COLS:
                row[c] = row_vals.get(c, float("nan"))

            row["composite_score"] = composite
            row["anomaly_label"] = label
            row["anomaly_severity"] = severity
            row["inference_run_id"] = "run-" + uuid.uuid4().hex[:24]
            row["inference_timestamp"] = inference_ts_now
            row["processing_latency_ms"] = int(abs(rng.normal(loc=500, scale=200)))
            row["explain_top_k_json_decoded_json"] = decoded_explain
            row["explain_top_k_json"] = encoded_explain
            row["lstm_window_id"] = str(uuid.uuid4())
            row["inference_window_id"] = str(uuid.uuid4())

            row["row_hash"] = sha256_hex_of_row(row)
            offset_counter += 1
            yield row


In [None]:
# Cell 6: Streamed chunk write to CSV
from pathlib import Path

def write_synthetic_csv(out_path: Path, row_gen, chunk_size=CHUNK_SIZE, total_estimate=None):
    first = True
    written = 0
    start_time = time.time()
    with out_path.open("w", newline='', encoding='utf-8') as fout:
        chunk = []
        for row in row_gen:
            chunk.append(row)
            if len(chunk) >= chunk_size:
                df_chunk = pd.DataFrame(chunk)
                if first:
                    df_chunk.to_csv(fout, index=False)
                    first = False
                else:
                    df_chunk.to_csv(fout, index=False, header=False)
                written += len(chunk)
                elapsed = time.time() - start_time
                logging.info(f"WROTE {written} rows (elapsed {elapsed:.1f}s)")
                chunk = []
            if total_estimate and written >= total_estimate:
                break
        if chunk:
            df_chunk = pd.DataFrame(chunk)
            if first:
                df_chunk.to_csv(fout, index=False)
            else:
                df_chunk.to_csv(fout, index=False, header=False)
            written += len(chunk)
            logging.info(f"FINAL WRITE: total rows written {written}")
    return written

# compute estimated target for scenario A (rows = sum hours * 3600)
estimated_rows_A = sum(int(h * 3600) for h in day_hours)
logging.info(f"Estimated Scenario A rows: {estimated_rows_A}")
start_date = None
# derive start_date from empirical CSV min timestamp if available
if "timestamp" in df_emp.columns and df_emp['timestamp'].dropna().shape[0]>0:
    try:
        start_date = pd.to_datetime(df_emp['timestamp'].dropna().astype(str).iloc[0]).normalize()
    except Exception:
        start_date = pd.Timestamp.now().normalize()
else:
    start_date = pd.Timestamp.now().normalize()

row_gen = generate_scenarioA_rows(day_hours, start_date, rng=RNG)
logging.info("Starting Scenario A synthesis to CSV ...")
written = write_synthetic_csv(OUT_CSV, row_gen, chunk_size=CHUNK_SIZE, total_estimate=estimated_rows_A)
logging.info(f"Synthesis complete. Rows written: {written}. File: {OUT_CSV}")


### Add Final Rows

In [None]:
# Cell 7: Inspect existing CSV to find last timestamp, last offset, and row count
import shutil
from pathlib import Path
import pandas as pd
import logging

logging.info("Inspecting existing synthetic CSV to compute continuation parameters...")

OUT_CSV = Path(r"C:\Users\ishaa\OneDrive\Desktop\synthetic_data_final\synthetic_engine_inference_scenarioA.csv")
if not OUT_CSV.exists():
    raise FileNotFoundError(f"Existing synthetic CSV not found: {OUT_CSV}")

# read in chunks just last chunk to be memory-friendly
last_row = None
last_offset = None
last_ts = None
rows_count = 0
usecols = ["timestamp", "offset"] if "offset" in df_emp.columns else ["timestamp"]
# robust chunk read -- use iterator to only keep final chunk
for chunk in pd.read_csv(OUT_CSV, usecols=["timestamp","offset"], chunksize=200000, low_memory=False):
    rows_count += len(chunk)
    last_row = chunk.iloc[-1]

if last_row is None:
    raise RuntimeError("Could not locate any rows in existing CSV (unexpected).")

last_ts = pd.to_datetime(str(last_row["timestamp"]))
last_offset = int(last_row["offset"]) if "offset" in last_row.index or "offset" in last_row else rows_count - 1

logging.info(f"Existing CSV rows: {rows_count}, last timestamp: {last_ts}, last offset: {last_offset}")


In [None]:
# Cell 8: Backup the existing CSV before appending
from datetime import datetime
import shutil

backup_ts = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
backup_path = OUT_CSV.with_name(OUT_CSV.stem + f"_backup_{backup_ts}" + OUT_CSV.suffix)
shutil.copy2(OUT_CSV, backup_path)
logging.info(f"Backup created: {backup_path}")


In [None]:
# Cell 9: Generator wrapper to produce exactly N additional rows, cycling the Scenario A day_hours pattern
import itertools
from typing import Iterable

def generate_scenarioA_rows_n(target_rows: int, base_day_hours: Iterable[int], start_date, rng=None, initial_offset=0):
    """
    Produce exactly `target_rows` rows, cycling the base_day_hours sequence as needed.
    Uses generate_scenarioA_rows logic for per-day generation but will stop when target reached.
    """
    if rng is None:
        rng = RNG  # default RNG from notebook
    # create an infinite day_hours iterator that cycles through the baseline pattern
    cyc = itertools.cycle(base_day_hours)
    written = 0
    day_idx = 0
    # We'll call the original row generator per-day but limit rows overall
    while written < target_rows:
        H_hours = next(cyc)  # using cycle ensures pattern repeats
        # We'll construct a per-day single-day day_hours list to call existing generator for one day
        per_day_gen = generate_scenarioA_rows([H_hours], start_date + pd.Timedelta(days=day_idx), rng=rng)
        for row in per_day_gen:
            # fix offset to be continuous from existing file
            row["offset"] = initial_offset + written + 1  # +1 so next offset is after last_offset
            yield row
            written += 1
            if written >= target_rows:
                break
        day_idx += 1


In [None]:
# Cell 10: Append new block (28 days / 529200 rows) to existing CSV
from pathlib import Path
import time

TARGET_ROWS_APPEND = 529200  # exact number you specified for 28 days
# Determine next start_date as day after last row's date (normalize to midnight)
start_date_newblock = pd.to_datetime(last_ts).normalize() + pd.Timedelta(days=1)

# new RNG seed so appended block is different yet reproducible
NEW_RNG_SEED = RNG_SEED + 1
new_rng = np.random.RandomState(NEW_RNG_SEED)

# build generator for N rows using original base day_hours (the 12-day mapping), but cycled
row_gen_n = generate_scenarioA_rows_n(
    target_rows=TARGET_ROWS_APPEND,
    base_day_hours=day_hours,   # original pattern from Cell 4
    start_date=start_date_newblock,
    rng=new_rng,
    initial_offset=last_offset
)

# append to CSV in chunks (mirror write_synthetic_csv but append mode and no header)
chunk_size = CHUNK_SIZE
written_append = 0
start_time = time.time()
first_chunk = True
with OUT_CSV.open("a", newline='', encoding='utf-8') as fout:
    chunk = []
    for row in row_gen_n:
        chunk.append(row)
        if len(chunk) >= chunk_size:
            df_chunk = pd.DataFrame(chunk)
            # write without header to append
            df_chunk.to_csv(fout, index=False, header=False)
            written_append += len(chunk)
            elapsed = time.time() - start_time
            logging.info(f"APPENDED {written_append} rows (elapsed {elapsed:.1f}s)")
            chunk = []
    # final remainder
    if chunk:
        df_chunk = pd.DataFrame(chunk)
        df_chunk.to_csv(fout, index=False, header=False)
        written_append += len(chunk)
        logging.info(f"FINAL APPEND: appended rows {written_append}")

elapsed_total = time.time() - start_time
logging.info(f"Append complete. Rows appended: {written_append}. Time: {elapsed_total:.1f}s")
