In [None]:
import dask.dataframe as dd
from pathlib import Path

base_path = Path("/home/haasehelen/haasehelen/ifwaste/output")
EXCLUDE_COLUMNS = ['n_quickcook', 'n_cook', 'n_attempted_cook', 'n_leftovers', 'n_shop',
                   'n_quickshop', 'n_attempted_shop']

scenario_folders = [
    "02_scenario0_no_promotions", 
    "02_scenario1_bogos_only", 
    "02_scenario2_sales_only", 
    "02_scenario3_both"
]

scenario_data = {}

for scenario in scenario_folders:
    scenario_path = base_path / scenario
    combined_rows = []
    combined_bought = []  # Collect all bought DataFrames per run
    print(scenario_path)
    
    for run_folder in scenario_path.glob("run_*"):
        run_id = run_folder.name.split("_")[1]
        
        config_file = run_folder / "log_hh_config.csv"
        output_file = run_folder / "aggregated_outputs.csv"
        bought_file = run_folder / "log_bought.csv"

        if not config_file.exists() or not output_file.exists() or not bought_file.exists():
            print(f"Skipping {run_folder} due to missing files.")
            continue
        
        try:
            df_config = dd.read_csv(config_file)
            df_config["household"] = df_config["household"].astype(int)
            df_output = dd.read_csv(output_file)
            df_output["household"] = df_output["household"].astype(int)
            df_bought = dd.read_csv(bought_file)
            df_bought["household"] = df_bought["household"].astype(int)
        except Exception as e:
            print(f"Error reading files in {run_folder}: {e}")
            continue

        merged_df = dd.merge(df_config, df_output, on="household", how="inner")

        # Add unique key — use vectorized string concatenation via assign + map_partitions
        merged_df = merged_df.assign(run_household_key=merged_df["household"].map_partitions(
            lambda s: "run_" + run_id + "_" + s.astype(str)
        ))
        merged_df = merged_df.drop(columns=EXCLUDE_COLUMNS, errors='ignore')
        combined_rows.append(merged_df)

        # For bought, add run_household_key similarly
        df_bought = df_bought.assign(run_household_key=df_bought["household"].map_partitions(
            lambda s: "run_" + run_id + "_" + s.astype(str)
        ))
        combined_bought.append(df_bought)
    
    if combined_rows:
        scenario_data[scenario] = {}
        scenario_data[scenario]["general"] = dd.concat(combined_rows, interleave_partitions=True)
        scenario_data[scenario]["bought"] = dd.concat(combined_bought, interleave_partitions=True)
        print(f"{scenario}: Combined {len(combined_rows)} runs.")
    else:
        print(f"{scenario}: No valid data found.")
