In [1]:
import pandas as pd

tx = pd.read_parquet("../data/processed/transactions_clean.parquet")
customers = pd.read_parquet("../data/processed/customers_clean.parquet")
articles  = pd.read_parquet("../data/processed/articles_clean.parquet")

In [2]:
# === Debug: unique Age / Gender in customers only ===
import pandas as pd

def _find_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

age_col = _find_col(customers, ["Age","age","customer_age"])
gen_col = _find_col(customers, ["Gender","gender","customer_gender"])

print("--- customers ---")
if age_col:
    ages = pd.to_numeric(customers[age_col], errors="coerce")
    print(f"Age column: '{age_col}' dtype={customers[age_col].dtype}")
    print(f" non-null={ages.notna().sum()}, unique={ages.dropna().nunique()}")
    if ages.notna().any():
        print(f" min={float(ages.min())}, max={float(ages.max())}")
        print(" sample uniques:", sorted(ages.dropna().unique().tolist())[:20])
else:
    print("Age column: NOT FOUND")

if gen_col:
    g = customers[gen_col].astype("string[python]").str.strip().str.lower()
    print(f"Gender column: '{gen_col}' dtype={customers[gen_col].dtype}")
    print(" unique (non-null):", sorted(g.dropna().unique().tolist()))
    print("\n value counts:\n", g.value_counts(dropna=True))
else:
    print("Gender column: NOT FOUND")


--- customers ---
Age column: 'Age' dtype=float64
 non-null=36592, unique=87
 min=17.0, max=103.0
 sample uniques: [17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0]
Gender column: 'Gender' dtype=object
 unique (non-null): ['female', 'male']

 value counts:
 Gender
female    34836
male       1756
Name: count, dtype: Int64


In [3]:
customers.dtypes

shopUserId                  object
invoiceFirstName    string[python]
invoiceLastName     string[python]
invoiceSSN                  object
invoiceZip          string[python]
invoiceCity                 object
invoiceCountryId            object
invoiceEmail                object
_row                         int64
Gender                      object
Age                        float64
Country                     object
dtype: object

## Build json for mental mapping

In [4]:
# %% [markdown]
# # Setup & Constants
# Lightweight imports and shared constants.

# %%
import re, json
import numpy as np
import pandas as pd
from pathlib import Path

NORDICS = ["Sweden", "Denmark", "Finland", "Norway"]

In [5]:
# %% [markdown]
# # Generic helpers
# Small utilities reused across the pipeline.

# %%
def status_from_orders(n: int) -> str:
    return "new" if n <= 1 else ("returning" if n <= 3 else "loyal")

def mode_or_first(s: pd.Series):
    m = s.mode()
    if not m.empty:
        return m.iat[0]
    s = s.dropna()
    return s.iat[0] if not s.empty else None

def _pick(df: pd.DataFrame, names, default=None) -> pd.Series:
    for n in names:
        if n in df.columns:
            return df[n]
    return pd.Series([default] * len(df), index=df.index)


In [6]:
# %% [markdown]
# # Normalization helpers
# Standardize ids and small categorical fields.

# %%
def _norm_id(s: pd.Series) -> pd.Series:
    # to string, strip, drop trailing ".0" if present (common after Parquet/float cast)
    s = s.astype("string[python]").str.strip()
    return s.str.replace(r"\.0+$", "", regex=True)

def _norm_gender(s: pd.Series):
    if s is None:
        return s
    s = s.astype("string[python]").str.strip().str.lower()
    return s.replace({"f": "female", "m": "male", "kvinnan": "female", "man": "male"})


In [7]:
# %% [markdown]
# # Tx base-frame builder
# Make a clean base transaction frame with unified schema.

# %%
def _build_base_tx(tx: pd.DataFrame) -> pd.DataFrame:
    country = _pick(tx, ["currency_country", "Country"], "Unknown").astype("string[python]").str.strip()
    country = country.replace({"": pd.NA}).fillna("Unknown")
    city = _pick(tx, ["invoiceCity", "city"], "Unknown").astype(object).fillna("Unknown")
    shop = _norm_id(tx["shopUserId"])
    order_id = tx["orderId"].astype("string[python]").str.strip()

    created_raw = tx["created"]
    created = created_raw if np.issubdtype(created_raw.dtype, np.datetime64) else pd.to_datetime(created_raw, errors="coerce")
    if isinstance(created, pd.Series) and np.issubdtype(created.dtype, np.datetime64):
        try:
            if getattr(created.dt, "tz", None) is not None:
                created = created.dt.tz_localize(None)
        except Exception:
            pass

    rev = pd.to_numeric(tx.get("line_total_sek"), errors="coerce").fillna(0)
    typ = tx["type"] if "type" in tx.columns else pd.Series([None] * len(tx), index=tx.index)
    price = tx["price"] if "price" in tx.columns else pd.Series([None] * len(tx), index=tx.index)

    return pd.DataFrame(
        {
            "country": country,
            "city": city,
            "shopUserId": shop,
            "orderId": order_id,
            "rev": rev,
            "created": created,
            "type": typ,
            "price": price,
        }
    )


In [8]:
# %% [markdown]
# # Tx base-frame builder
# Make a clean base transaction frame with unified schema.

# %%
def _build_base_tx(tx: pd.DataFrame) -> pd.DataFrame:
    country = _pick(tx, ["currency_country", "Country"], "Unknown").astype("string[python]").str.strip()
    country = country.replace({"": pd.NA}).fillna("Unknown")
    city = _pick(tx, ["invoiceCity", "city"], "Unknown").astype(object).fillna("Unknown")
    shop = _norm_id(tx["shopUserId"])
    order_id = tx["orderId"].astype("string[python]").str.strip()

    created_raw = tx["created"]
    created = created_raw if np.issubdtype(created_raw.dtype, np.datetime64) else pd.to_datetime(created_raw, errors="coerce")
    if isinstance(created, pd.Series) and np.issubdtype(created.dtype, np.datetime64):
        try:
            if getattr(created.dt, "tz", None) is not None:
                created = created.dt.tz_localize(None)
        except Exception:
            pass

    rev = pd.to_numeric(tx.get("line_total_sek"), errors="coerce").fillna(0)
    typ = tx["type"] if "type" in tx.columns else pd.Series([None] * len(tx), index=tx.index)
    price = tx["price"] if "price" in tx.columns else pd.Series([None] * len(tx), index=tx.index)

    return pd.DataFrame(
        {
            "country": country,
            "city": city,
            "shopUserId": shop,
            "orderId": order_id,
            "rev": rev,
            "created": created,
            "type": typ,
            "price": price,
        }
    )


In [9]:
# %% [markdown]
# # Customer normalization & collapse
# Normalize ids, pick Age/Gender variants, and collapse duplicates.

# %%
def _prep_customers(customers: pd.DataFrame) -> pd.DataFrame:
    c = customers.copy()
    c["shopUserId_norm"] = _norm_id(c["shopUserId"])
    c["Age"] = pd.to_numeric(_pick(c, ["Age", "age", "customer_age"]), errors="coerce").astype("Float64")
    c["Gender"] = _pick(c, ["Gender", "gender", "customer_gender"])
    c_agg = (
        c.groupby("shopUserId_norm", dropna=False)
        .agg(Age=("Age", mode_or_first), Gender=("Gender", mode_or_first))
        .reset_index()
    )
    return c_agg


In [10]:
# %% [markdown]
# # Customer normalization & collapse
# Normalize ids, pick Age/Gender variants, and collapse duplicates.

# %%
def _prep_customers(customers: pd.DataFrame) -> pd.DataFrame:
    c = customers.copy()
    c["shopUserId_norm"] = _norm_id(c["shopUserId"])
    c["Age"] = pd.to_numeric(_pick(c, ["Age", "age", "customer_age"]), errors="coerce").astype("Float64")
    c["Gender"] = _pick(c, ["Gender", "gender", "customer_gender"])
    c_agg = (
        c.groupby("shopUserId_norm", dropna=False)
        .agg(Age=("Age", mode_or_first), Gender=("Gender", mode_or_first))
        .reset_index()
    )
    return c_agg


In [11]:
# %% [markdown]
# # Merge tx + customers
# Attach age/gender and apply normalization.

# %%
def _merge_tx_customers(base_df: pd.DataFrame, c_agg: pd.DataFrame) -> pd.DataFrame:
    df = base_df.merge(c_agg, left_on="shopUserId", right_on="shopUserId_norm", how="left")
    df.drop(columns=["shopUserId_norm"], inplace=True)

    df["age"] = pd.to_numeric(df["Age"], errors="coerce").astype("Float64")
    df["gender"] = _norm_gender(df["Gender"])
    df.drop(columns=["Age", "Gender"], inplace=True)
    return df


In [12]:
# %% [markdown]
# # Optional backfill of country from customers
# If tx country is Unknown, pull from customers.Country when available.

# %%
def _backfill_country_from_customers(df: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
    if "Country" in customers.columns and (df["country"] == "Unknown").any():
        country_map = (
            customers.assign(shopUserId_norm=_norm_id(customers["shopUserId"]))
            .dropna(subset=["shopUserId_norm"])
            .drop_duplicates("shopUserId_norm")
            .set_index("shopUserId_norm")["Country"]
            .astype("string[python]")
            .str.strip()
        )
        mask = df["country"].eq("Unknown")
        df.loc[mask, "country"] = df.loc[mask, "shopUserId"].map(country_map).fillna("Unknown")
    return df


In [13]:
# %% [markdown]
# # Splitter
# Produce a dict of country → dataframe restricted to NORDICS.

# %%
def split_nordics(tx: pd.DataFrame, customers: pd.DataFrame) -> dict:
    base_df = _build_base_tx(tx)
    c_agg = _prep_customers(customers)
    df = _merge_tx_customers(base_df, c_agg)
    df = _backfill_country_from_customers(df, customers)
    return {cname: df[df["country"] == cname].copy() for cname in NORDICS}


In [14]:
# %% [markdown]
# # Export helpers: totals & aggregates
# Small builders used by the JSON exporter.
# (UPDATED: added _agg_city_monthly)

# %%
def _country_totals(df: pd.DataFrame):
    total_revenue = int(np.rint(df["rev"].sum()))
    customers_cnt = int(df["shopUserId"].nunique())
    total_orders = int(df["orderId"].nunique())
    aov_country = None if total_orders == 0 else int(round(float(total_revenue) / total_orders))
    return total_revenue, customers_cnt, total_orders, aov_country

def _agg_city(df: pd.DataFrame):
    agg_city = (
        df.groupby("city", dropna=False, sort=False)
        .agg(total_revenue_sek=("rev", "sum"), customers_count=("shopUserId", "nunique"))
    )
    agg_city["total_revenue_sek"] = np.rint(agg_city["total_revenue_sek"]).astype("int64")
    agg_city["customers_count"] = agg_city["customers_count"].astype("int64")
    city_orders = df.groupby("city", dropna=False)["orderId"].nunique().rename("total_orders").astype("int64")
    return agg_city, city_orders

def _agg_customer(df: pd.DataFrame):
    df_cust = df[df["shopUserId"].notna()].copy()
    agg_customer = (
        df_cust.groupby(["city", "shopUserId"], dropna=False, sort=False)
        .agg(
            total_spent_sek=("rev", "sum"),
            total_orders=("orderId", "nunique"),
            first_order=("created", "min"),
            last_order=("created", "max"),
            age=("age", mode_or_first),
            gender=("gender", mode_or_first),
        )
    )
    agg_customer["total_spent_sek"] = np.rint(agg_customer["total_spent_sek"]).astype("int64")
    agg_customer = agg_customer.sort_index(level=["city", "shopUserId"])
    return agg_customer

def _agg_order(df: pd.DataFrame):
    df_cust = df[df["shopUserId"].notna()].copy()
    agg_order = (
        df_cust.groupby(["city", "shopUserId", "orderId"], dropna=False, sort=False)
        .agg(
            order_total_sek=("rev", "sum"),
            n_items=("orderId", "size"),
            created=("created", "min"),
            order_type=("type", mode_or_first),
            price=("price", mode_or_first),
        )
    )
    agg_order["order_total_sek"] = np.rint(agg_order["order_total_sek"]).astype("int64")
    agg_order = agg_order.sort_index(level=["city", "shopUserId", "orderId"])
    return agg_order

def _agg_city_monthly(df: pd.DataFrame) -> dict[str, dict[str, int]]:
    """
    NEW: sum revenue by city and calendar month (YYYY-MM).
    Returns {city -> { 'YYYY-MM': total_revenue_sek_int, ... }, ...}
    """
    dfm = df.copy()
    # Ensure naive, month string
    ym = dfm["created"]
    if getattr(ym.dt, "tz", None) is not None:
        ym = ym.dt.tz_localize(None)
    dfm["year_month"] = ym.dt.to_period("M").astype(str)

    g = (
        dfm.groupby(["city", "year_month"], dropna=False)["rev"]
        .sum()
        .round()
        .astype("int64")
    )
    out: dict[str, dict[str, int]] = {}
    for (cty, ym), val in g.items():
        ckey = "Unknown" if pd.isna(cty) else str(cty)
        out.setdefault(ckey, {})[ym] = int(val)
    return out


In [15]:
# %% [markdown]
# # Export helpers: items extraction
# Build items_df from original tx aligned to country df indices.

# %%
def _build_items_grouped(df_country: pd.DataFrame, tx: pd.DataFrame):
    item_cols = ["sku", "groupId", "created", "quantity", "price_sek", "name", "line_total_sek", "type", "brand", "category", "price"]
    present = [c for c in item_cols if c in tx.columns]

    items_df = pd.DataFrame({"city": df_country["city"], "shopUserId": df_country["shopUserId"], "orderId": df_country["orderId"]})
    for c in present:
        if c == "created":
            col_created = df_country["created"]
            try:
                if getattr(col_created.dt, "tz", None) is not None:
                    col_created = col_created.dt.tz_localize(None)
            except Exception:
                pass
            items_df[c] = col_created
        else:
            items_df[c] = tx.loc[df_country.index, c] if c in tx.columns else None

    items_df = items_df[df_country["shopUserId"].notna()].copy()
    items_df["city"] = items_df["city"].fillna("Unknown")
    return items_df.groupby(["city", "shopUserId", "orderId"], dropna=False, sort=False)


In [16]:
# %% [markdown]
# # Export helpers: JSON node builders
# Convert rows to JSON-friendly dicts.

# %%
def _item_dict(row: pd.Series):
    cr = row.get("created")
    if isinstance(cr, pd.Timestamp):
        cr = cr.isoformat(sep=" ")
    def nz(v): return None if pd.isna(v) else v
    def to_int(v): return None if pd.isna(v) else int(v)
    def to_float(v): return None if pd.isna(v) else float(v)
    return {
        "sku": nz(row.get("sku")),
        "groupId": nz(row.get("groupId")),
        "created": nz(cr),
        "quantity": to_int(row.get("quantity")),
        "price_sek": to_int(row.get("price_sek")),
        "name": nz(row.get("name")),
        "line_total_sek": to_int(row.get("line_total_sek")),
        "type": nz(row.get("type")),
        "brand": nz(row.get("brand")),
        "category": nz(row.get("category")),
        "price": to_float(row.get("price")),
    }


In [17]:
# %% [markdown]
# # Export: main function
# Assemble the JSON structure and write to disk.
# (UPDATED: inject per-city monthly revenue)

# %%
def export_country_json(df_country: pd.DataFrame, tx: pd.DataFrame, country_name: str, out_dir="/workspace/data/processed"):
    df = df_country.copy()
    df["city"] = df["city"].fillna("Unknown")

    # --- totals ---
    total_revenue, customers_cnt, total_orders, aov_country = _country_totals(df)

    # --- city/ customer / order aggregates ---
    agg_city, city_orders = _agg_city(df)
    agg_customer = _agg_customer(df)
    agg_order = _agg_order(df)

    # --- NEW: monthly revenue per city ---
    city_monthly_map = _agg_city_monthly(df)

    # --- items ---
    items_grouped = _build_items_grouped(df_country, tx)

    # ---------- build JSON ----------
    top_key = country_name.lower()
    result = {
        top_key: {
            "total_revenue_sek": int(total_revenue),
            "customers_count": int(customers_cnt),
            "total_orders": int(total_orders),
            "avg_order_value_sek": aov_country,
            "cities": {},
        }
    }

    # cities
    for cty, row in agg_city.iterrows():
        ckey = "Unknown" if pd.isna(cty) else str(cty)
        orders_c = int(city_orders.get(cty, 0))
        rev_c = int(row["total_revenue_sek"])
        aov_c = None if orders_c == 0 else int(round(float(rev_c) / orders_c))

        result[top_key]["cities"][ckey] = {
            "total_revenue_sek": rev_c,
            "customers_count": int(row["customers_count"]),
            "total_orders": orders_c,
            "avg_order_value_sek": aov_c,
            # NEW: monthly breakdown here
            "monthly_revenue_sek": city_monthly_map.get(ckey, {}),
            "customers": {},
        }

    # customers + orders + items (unchanged)
    for (cty, uid), row in agg_customer.iterrows():
        status = status_from_orders(int(row["total_orders"]))
        first_iso = row["first_order"].isoformat(sep=" ") if pd.notna(row["first_order"]) else None
        last_iso = row["last_order"].isoformat(sep=" ") if pd.notna(row["last_order"]) else None

        age_val = None
        if "age" in row and pd.notna(row["age"]):
            try:
                age_val = int(row["age"])
            except Exception:
                age_val = None
        gender_val = None if "gender" not in row or pd.isna(row["gender"]) else str(row["gender"])

        cust_node = {
            "summary": {
                "total_orders": int(row["total_orders"]),
                "total_spent_sek": int(row["total_spent_sek"]),
                "first_order": first_iso,
                "last_order": last_iso,
                "status": status,
                "age": age_val,
                "gender": gender_val,
            },
            "orders": {},
        }

        try:
            cust_orders = agg_order.loc[(cty, uid)]
            if isinstance(cust_orders, pd.Series):
                cust_orders = cust_orders.to_frame().T
            for oid, orow in cust_orders.iterrows():
                try:
                    items_for_order = items_grouped.get_group((cty, uid, oid))
                    items = [_item_dict(r) for _, r in items_for_order.iterrows()]
                except KeyError:
                    items = []
                cust_node["orders"][str(oid)] = {
                    "created": orow["created"].isoformat(sep=" ") if pd.notna(orow["created"]) else None,
                    "order_total_sek": int(orow["order_total_sek"]),
                    "n_items": int(orow["n_items"]),
                    "order_type": None if pd.isna(orow["order_type"]) else orow["order_type"],
                    "price": None if pd.isna(orow.get("price")) else float(orow.get("price")),
                    "items": items,
                }
        except KeyError:
            pass

        ckey = "Unknown" if pd.isna(cty) else str(cty)
        result[top_key]["cities"][ckey]["customers"][str(uid)] = cust_node

    # write
    out_path = Path(out_dir) / f"{country_name}.json"
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with out_path.open("w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=False, indent=2)
    print(f"Saved: {out_path}")


In [18]:
# %% [markdown]
# # Example usage
# Split once, export four times.

# %%
countries = split_nordics(tx, customers)
export_country_json(countries["Sweden"],  tx, "Sweden")
export_country_json(countries["Denmark"], tx, "Denmark")
export_country_json(countries["Finland"], tx, "Finland")
export_country_json(countries["Norway"],  tx, "Norway")


Saved: /workspace/data/processed/Sweden.json
Saved: /workspace/data/processed/Denmark.json
Saved: /workspace/data/processed/Finland.json
Saved: /workspace/data/processed/Norway.json


## Flatten json for quick math

In [19]:
# %% [markdown]
# # Imports & Futures

# %%
from __future__ import annotations
from pathlib import Path
import json
import pandas as pd


In [20]:
# %% [markdown]
# # Paths & Config

# %%
# ---- configure these paths ----
INPUT_DIR  = Path("../data/processed")     # Sweden.json, Denmark.json, Finland.json, Norway.json
OUTPUT_DIR = Path("../data/parquet_out")   # will contain the 5 Parquet files

COUNTRY_FILES = {
    "Sweden":  INPUT_DIR / "Sweden.json",
    "Denmark": INPUT_DIR / "Denmark.json",
    "Finland": INPUT_DIR / "Finland.json",
    "Norway":  INPUT_DIR / "Norway.json",
}


In [21]:
# %% [markdown]
# # Stable Column Schemas
# (UPDATED: added CS_CITY_MONTHLY)

# %%
CS_COUNTRY = ["country","total_revenue_sek","customers_count","total_orders","avg_order_value_sek"]
CS_CITY    = ["country","city","total_revenue_sek","customers_count","total_orders","avg_order_value_sek"]

# Added age & gender here
CS_CUST    = ["country","city","customer_id","total_orders","total_spent_sek",
              "first_order","last_order","status","age","gender"]

CS_ORDERS  = ["country","city","customer_id","order_id","created","order_total_sek","n_items","order_type","price"]
CS_ITEMS   = ["country","city","customer_id","order_id","sku","groupId","created","quantity","price_sek","name","line_total_sek","type","brand","category","price"]

# NEW: monthly revenue per city
CS_CITY_MONTHLY = ["country","city","year_month","total_revenue_sek"]

In [22]:
# %% [markdown]
# # I/O Helpers

# %%
def load_json(path: Path) -> dict:
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)

def save_parquet(df: pd.DataFrame, path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(path, index=False)


In [23]:
# %% [markdown]
# # DataFrame Utilities

# %%
def _ensure(df: pd.DataFrame | None, cols: list[str]) -> pd.DataFrame:
    """Ensure columns exist, preserve dtypes, and reorder."""
    if df is None or df.empty:
        return pd.DataFrame({c: pd.Series([], dtype="object") for c in cols})[cols]
    for c in cols:
        if c not in df.columns:
            df[c] = pd.NA
    return df[cols]


In [24]:
# %% [markdown]
# # JSON Shape Helpers

# %%
def _unwrap(obj: dict, hint: str) -> tuple[str, dict]:
    """
    Accept {"denmark": {...}} or {...}.
    Returns (country_name_capitalized, payload_dict).
    """
    if isinstance(obj, dict) and len(obj) == 1 and isinstance(next(iter(obj.values())), dict):
        k = next(iter(obj.keys()))
        return k.capitalize(), next(iter(obj.values()))
    if not isinstance(obj, dict):
        raise ValueError("Top-level JSON must be an object/dict")
    return hint, obj


In [25]:
# %% [markdown]
# # JSON Shape Helpers

# %%
def _unwrap(obj: dict, hint: str) -> tuple[str, dict]:
    """
    Accept {"denmark": {...}} or {...}.
    Returns (country_name_capitalized, payload_dict).
    """
    if isinstance(obj, dict) and len(obj) == 1 and isinstance(next(iter(obj.values())), dict):
        k = next(iter(obj.keys()))
        return k.capitalize(), next(iter(obj.values()))
    if not isinstance(obj, dict):
        raise ValueError("Top-level JSON must be an object/dict")
    return hint, obj


In [26]:
# %% [markdown]
# # Flatten: Country → Row Buckets
# (UPDATED: collect city_monthly from "monthly_revenue_sek")

# %%
def flatten_country(obj: dict, country_hint: str) -> dict[str, list[dict]]:
    country, root = _unwrap(obj, country_hint)
    out = {
        "country_summary": [{
            "country": country,
            "total_revenue_sek": root.get("total_revenue_sek"),
            "customers_count": root.get("customers_count"),
            "total_orders": root.get("total_orders"),
            "avg_order_value_sek": root.get("avg_order_value_sek"),
        }],
        "city_summary": [],
        "city_monthly": [],   # NEW
        "customer_summary": [],
        "orders": [],
        "order_items": [],
    }

    for city, cnode in (root.get("cities") or {}).items():
        out["city_summary"].append({
            "country": country, "city": city,
            "total_revenue_sek": cnode.get("total_revenue_sek"),
            "customers_count": cnode.get("customers_count"),
            "total_orders": cnode.get("total_orders"),
            "avg_order_value_sek": cnode.get("avg_order_value_sek"),
        })

        # NEW: monthly map { 'YYYY-MM': revenue }
        for ym, rev in (cnode.get("monthly_revenue_sek") or {}).items():
            out["city_monthly"].append({
                "country": country,
                "city": city,
                "year_month": ym,
                "total_revenue_sek": rev,
            })

        for cust_id, cst in (cnode.get("customers") or {}).items():
            summ = cst.get("summary") or {}
            out["customer_summary"].append({
                "country": country,
                "city": city,
                "customer_id": cust_id,
                "total_orders": summ.get("total_orders"),
                "total_spent_sek": summ.get("total_spent_sek"),
                "first_order": summ.get("first_order"),
                "last_order": summ.get("last_order"),
                "status": summ.get("status"),
                "age": summ.get("age"),
                "gender": summ.get("gender"),
            })
            for order_id, ordn in (cst.get("orders") or {}).items():
                out["orders"].append({
                    "country": country, "city": city, "customer_id": cust_id, "order_id": order_id,
                    "created": ordn.get("created"),
                    "order_total_sek": ordn.get("order_total_sek"),
                    "n_items": ordn.get("n_items"),
                    "order_type": ordn.get("order_type"),
                    "price": ordn.get("price"),
                })
                for it in (ordn.get("items") or []):
                    out["order_items"].append({
                        "country": country, "city": city, "customer_id": cust_id, "order_id": order_id,
                        "sku": it.get("sku"), "groupId": it.get("groupId"), "created": it.get("created"),
                        "quantity": it.get("quantity"), "price_sek": it.get("price_sek"), "name": it.get("name"),
                        "line_total_sek": it.get("line_total_sek"), "type": it.get("type"),
                        "brand": it.get("brand"), "category": it.get("category"), "price": it.get("price"),
                    })
    return out


In [27]:
# %% [markdown]
# # Aggregation Orchestrator
# (UPDATED: include "city_monthly" bucket)

# %%
def collect_buckets(country_files: dict[str, Path]) -> dict[str, list[dict]]:
    buckets = {k: [] for k in ["country_summary","city_summary","city_monthly","customer_summary","orders","order_items"]}
    for name, path in country_files.items():
        if not path.exists():
            print(f"[warn] missing: {path}")
            continue
        rows = flatten_country(load_json(path), name)
        for k, v in rows.items():
            buckets[k].extend(v)
        print(f"[ok] parsed {name}")
    return buckets


In [28]:
# %% [markdown]
# # Materialize DataFrames (Fixed Schemas)
# (UPDATED: return df_city_monthly)

# %%
def to_dataframes(buckets: dict[str, list[dict]]) -> dict[str, pd.DataFrame]:
    df_country = _ensure(pd.DataFrame(buckets["country_summary"]), CS_COUNTRY)
    df_city    = _ensure(pd.DataFrame(buckets["city_summary"]),    CS_CITY)
    df_city_m  = _ensure(pd.DataFrame(buckets["city_monthly"]),    CS_CITY_MONTHLY)  # NEW
    df_cust    = _ensure(pd.DataFrame(buckets["customer_summary"]),CS_CUST)
    df_orders  = _ensure(pd.DataFrame(buckets["orders"]),          CS_ORDERS)
    df_items   = _ensure(pd.DataFrame(buckets["order_items"]),     CS_ITEMS)
    return {
        "country_summary": df_country,
        "city_summary": df_city,
        "city_monthly": df_city_m,     # NEW
        "customer_summary": df_cust,
        "orders": df_orders,
        "order_items": df_items,
    }


In [29]:
# %% [markdown]
# # Persist to Parquet
# (UPDATED: write city_monthly parquet)

# %%
def write_all_parquet(dfs: dict[str, pd.DataFrame], out_dir: Path) -> None:
    save_parquet(dfs["country_summary"], out_dir / "country_summary.parquet")
    save_parquet(dfs["city_summary"],    out_dir / "city_summary.parquet")
    save_parquet(dfs["city_monthly"],    out_dir / "city_monthly_revenue.parquet")  # NEW
    save_parquet(dfs["customer_summary"],out_dir / "customer_summary.parquet")
    save_parquet(dfs["orders"],          out_dir / "orders.parquet")
    save_parquet(dfs["order_items"],     out_dir / "order_items.parquet")
    print(f"[done] wrote Parquet files to {out_dir.resolve()}")

In [30]:
# %% [markdown]
# # Main

# %%
def main():
    buckets = collect_buckets(COUNTRY_FILES)
    dfs = to_dataframes(buckets)
    write_all_parquet(dfs, OUTPUT_DIR)

if __name__ == "__main__":
    main()


[ok] parsed Sweden
[ok] parsed Denmark
[ok] parsed Finland
[ok] parsed Norway
[done] wrote Parquet files to /workspace/data/parquet_out
