In [21]:
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import preliz as pz
import pymc as pm
import seaborn as sns

from pymc.gp.util import plot_gp_dist
from sklearn.preprocessing import MaxAbsScaler

plt.style.use("bmh")
plt.rcParams["figure.figsize"] = [12, 7]
plt.rcParams["figure.dpi"] = 100
plt.rcParams["figure.facecolor"] = "white"

%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format = "retina"

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [22]:
seed: int = sum(map(ord, "multilevel_elasticities"))
rng: np.random.Generator = np.random.default_rng(seed=seed)

In [41]:
from dataclasses import dataclass
from numpy.typing import NDArray


@dataclass
class Item:
    id: int
    prices: NDArray[np.float_]
    sales: NDArray[np.float_]

    def __post_init__(self) -> None:
        if self.prices.size != self.sales.size:
            raise ValueError("prices and sales must have the same size")
        if self.prices.size == 0:
            raise ValueError("prices and sales must have at least one element")
        if self.prices.min() <= 0:
            raise ValueError("prices must be positive")
        if self.sales.min() < 0:
            raise ValueError("sales must be non-negative")

    def to_dataframe(self) -> pd.DataFrame:
        return pd.DataFrame(
            data={"item_id": self.id, "price": self.prices, "sales": self.sales}
        )


@dataclass
class Store:
    id: int
    items: list[Item]

    def __post_init__(self) -> None:
        if len(self.items) == 0:
            raise ValueError("stores must have at least one item")
        if len({item.id for item in self.items}) != len(self.items):
            raise ValueError("items must have unique ids")

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.concat([item.to_dataframe() for item in self.items], axis=0)
        df["store_id"] = self.id
        return df.reset_index(drop=True)


@dataclass
class Region:
    id: int
    stores: list[Store]
    median_income: float  # Z_j

    def __post_init__(self) -> None:
        if len(self.stores) == 0:
            raise ValueError("states must have at least one store")
        if len({store.id for store in self.stores}) != len(self.stores):
            raise ValueError("stores must have unique ids")
        if self.median_income <= 0:
            raise ValueError("median_income must be positive")

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.concat([store.to_dataframe() for store in self.stores], axis=0)
        df["region_id"] = self.id
        return df.reset_index(drop=True)


@dataclass
class Market:
    regions: list[Region]

    def __post_init__(self) -> None:
        if len(self.regions) == 0:
            raise ValueError("markets must have at least one region")
        if len({region.id for region in self.regions}) != len(self.regions):
            raise ValueError("regions must have unique ids")

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.concat([region.to_dataframe() for region in self.regions], axis=0)
        return df.reset_index(drop=True)


In [42]:
n_regions = 5  # J

n_stores_per_region_dist = pm.NegativeBinomial.dist(mu=10, alpha=2)
n_stores_per_region_draws = pm.draw(n_stores_per_region_dist, draws=n_regions)

median_income_per_region_dist = pm.Gamma.dist(mu=10, sigma=1)
median_income_per_region_draws = pm.draw(median_income_per_region_dist, draws=n_regions)

In [43]:
time_range = 20

epsilon = 0.1

a_alpha = 0.5
b_alpha = 0.1
sigma_gamma_0j = 0.02

a_beta = 0.1
b_beta = 0.8
sigma_gamma_1j = 0.03

price_mu = 1.5
price_sigma = 0.25

regions: list[Region] = []

for j in range(n_regions):
    n_stores_per_region = n_stores_per_region_draws[j]
    median_income_per_region = median_income_per_region_draws[j]

    stores: list[Store] = []

    for i in range(n_stores_per_region):
        alpha_j_dist = pm.Normal.dist(
            mu=a_alpha + b_alpha * median_income_per_region, sigma=sigma_gamma_0j
        )
        alpha_j_samples = pm.draw(alpha_j_dist, draws=time_range)

        beta_j_dist = pm.Normal.dist(
            mu=a_beta + b_beta * median_income_per_region, sigma=sigma_gamma_1j
        )
        beta_j_samples = pm.draw(beta_j_dist, draws=time_range)

        prices_dist = pm.Gamma.dist(mu=price_mu, sigma=price_sigma)
        prices_samples = pm.draw(prices_dist, draws=time_range)

        log_sales_dist = pm.Normal.dist(
            mu=alpha_j_samples + beta_j_samples * np.log(prices_samples), sigma=epsilon
        )

        log_sales_samples = pm.draw(log_sales_dist, draws=1)

        sales_samples = np.exp(log_sales_samples)
        sales_samples

        store = Store(
            id=i, items=[Item(id=0, prices=prices_samples, sales=sales_samples)]
        )

        stores.append(store)

    region = Region(id=j, stores=stores, median_income=median_income_per_region)

    regions.append(region)

market = Market(regions=regions)


In [44]:
market.to_dataframe()


Unnamed: 0,item_id,price,sales,store_id,region_id
0,0,1.414217,100.253391,0,0
1,0,1.399836,108.967686,0,0
2,0,1.003125,6.425430,0,0
3,0,1.560894,315.832370,0,0
4,0,1.500006,198.220018,0,0
...,...,...,...,...,...
515,0,1.531888,86.510828,3,4
516,0,1.704091,218.971820,3,4
517,0,1.514978,99.180346,3,4
518,0,1.276157,26.391290,3,4
