<a href="https://colab.research.google.com/github/kentaro-muna/Graduation-Thesis/blob/main/zemi_test1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [60]:
from dataclasses import dataclass
from typing import List, Dict, Tuple
import numpy as np
from collections import defaultdict

@dataclass
class Params:
  T:int = 360
  H:int = 500
  F:int = 60
  I:int = 10
  goods_per_firm : int = 2

  cash_h_min: int = 3000
  cash_h_max: int = 5000

  Wf_min: int = 4000
  Wf_max: int = 5000

  cash_f0: int = 100_000
  cash_g0: int = 100_000
  cash_bank0: int = 40_000_000
  cash_equip0: int = 1_000_000

  p0_min : int = 1000
  p0_max : int = 1500

  K0: int = 1
  a_min: float = 10.0
  a_max: float = 12.0
  beta:float=0.2

  ti:int = 10
  sa:float = 1.65

  bc_min:int = 1000
  bc_max:int = 1500
  mpc:float = 0.5
  wd_min:float = 0.5
  wd_max:float = 0.8

  wm_update_interval:int = 12

  gamma:float = 50.0
  delta:float = 20.0
  epsilon:float = -0.02

  ri_tax_rate:float = 0.2
  rc_tax_rate:float = 0.4

  grp:float = 0.5
  grs:float = 0.5

  rb:float = 0.5

  equip_price: int = 500_000
  equip_capacity: int = 10

  reply_T:int = 100

  n_pref : int = 3

  seed: int = 42

@dataclass
class Household:
  cash : float
  dep : float
  firm_id : int
  wf : float
  pref_types: np.ndarray
  alpha: np.ndarray
  Y_prev: float
  M:float
  C:float
  BI_prev:float
  DI_prev:float
  tax_due:float


@dataclass
class Firm:
  cash: float
  K: int
  a: float
  goods : Tuple [int,int]
  price : Dict [int,float]
  inv : Dict [int, float]
  sales_hist:Dict[int,List[float]]
  corp_tax_due:float
  profit_prev_fy:float
  prod_plan: Dict[int, float]


@dataclass
class Government:
  cash: float
  tax_rev_prev: float
  grp:float
  grs:float

@dataclass
class Bank:
  cash:float

@dataclass
class EquipMaker:
  cash: float
  capacity: int
  price: int

@dataclass
class State:
  t:int
  households: List[Household]
  firms: List[Firm]
  gov: Government
  bank: Bank
  equip: EquipMaker
  firm_members: List[np.ndarray] # Added this field






In [61]:
def initialize_state(P: Params) -> State:
  rng = np.random.default_rng(P.seed)

  firm_id= rng.integers (0,P.F, size=P.H)
  firm_members = [np.where(firm_id == f)[0] for f in range(P.F)]

  households:List[Household]=[]
  for h in range(P.H): # Corrected 'range[' to 'range('
    pref_types = rng.choice(P.I, size=P.n_pref, replace=False)

    alpha=rng.dirichlet(np.ones(P.n_pref)) # Corrected 'alpfa' to 'alpha'

    cash0 = float(rng.integers(P.cash_h_min, P.cash_h_max + 1))

    dep0=0.0

    wf0 =float(rng.integers(P.Wf_min, P.Wf_max + 1))

    Y_prev0=wf0 * (1.0 - P.ri_tax_rate)

    households.append(
        Household(
            cash=cash0,
            dep=dep0,
            firm_id=int(firm_id[h]),
            wf=wf0,
            pref_types=pref_types,
            alpha=alpha, # Corrected 'alpfa' to 'alpha'
            Y_prev=Y_prev0,
            M=1.0,
            C=0.0,
            BI_prev=0.0,
            DI_prev=0.0,
            tax_due=0.0

            )
        )

  firms: List[Firm] = []
  for f in range(P.F):
    goods = tuple(rng.choice(P.I, size=P.goods_per_firm, replace=False).tolist())

    price = {g: float(rng.integers(P.p0_min, P.p0_max + 1)) for g in goods}

    inv = {g: 0.0 for g in goods}

    a = float(rng.uniform(P.a_min, P.a_max))

    sales_hist = {g: [0.0 for _ in range(P.ti)] for g in goods}

    prod_plan = {g: 0.0 for g in goods}



    firms.append(
          Firm(
              cash=float(P.cash_f0),
              K=P.K0,
              a=a,
              goods=goods,
              price=price,
              inv=inv,
              sales_hist=sales_hist,
              corp_tax_due=0.0,
              profit_prev_fy=0.0,
              prod_plan=prod_plan,


                )
      )

  gov=Government(
      cash=float(P.cash_g0),
      tax_rev_prev=float(P.cash_g0),
      grp=P.grp,
      grs=P.grs,
  )

  bank=Bank(cash=float(P.cash_bank0))

  equip=EquipMaker(cash=float(P.cash_equip0),capacity=P.equip_capacity,price=P.equip_price)


  return State(
          t=0,
          households=households,
          firms=firms,
          gov=gov,
          bank=bank,
          equip=equip,
          firm_members=firm_members
      )


In [62]:
def bootstrap_sales_hist_A(S: State, P: Params, init_sales: float = 1.0) -> None:
  for firm in S.firms:
        for g in firm.goods:
            firm.sales_hist[g] = [float(init_sales) for _ in range(P.ti)]

In [63]:
def step1_decision_budgeting(S: State, P: Params) -> None:
  rng = np.random.default_rng(P.seed + S.t + 1)

  Y_list = []
  for h in S.households:
    Y = h.Y_prev + h.BI_prev + h.DI_prev
    Y_list.append(Y)

  Y_ave = float(np.mean(Y_list)) if len(Y_list) > 0 else 1.0

  if (S.t % P.wm_update_interval) == 0:
        for idx, h in enumerate(S.households):
            Y = Y_list[idx]
            x = (P.gamma * (Y / Y_ave) - P.gamma)
            h.M = P.delta / (P.delta + np.exp(-P.epsilon * x))

  for idx, h in enumerate(S.households):
    Y = Y_list[idx]

    bc = float(rng.integers(P.bc_min, P.bc_max + 1))
    wd = float(rng.uniform(P.wd_min, P.wd_max))

    withdraw = h.dep * wd

    cons_from_income = bc + max(Y - bc, 0.0) * P.mpc

    h.C = cons_from_income + withdraw

    saving = max(Y - bc, 0.0) * (1.0 - P.mpc)
    h.dep = (h.dep - withdraw) + saving

    h.tax_due = 0.0

  for f_id, firm in enumerate(S.firms):
        members = S.firm_members[f_id]
        L = float(np.sum([S.households[h].M for h in members]))

        Q_lim = firm.a * (firm.K ** P.beta) * (L ** (1.0 - P.beta)) if L > 0 else 0.0

        for g in firm.goods:
          hist = firm.sales_hist[g]
          if len(hist) == 0:
                mean_sales = 0.0
                std_sales = 0.0
          else:
                mean_sales = float(np.mean(hist[-P.ti:]))
                std_sales = float(np.std(hist[-P.ti:], ddof=0))

          ST = firm.inv[g]

          Q_aim = mean_sales + P.sa * std_sales - ST

          Q_aim = max(Q_aim, 0.0)

          Q_aim = min(Q_aim, max(Q_lim - ST, 0.0))

          firm.prod_plan[g] = float(Q_aim)

        if Q_lim > 0:
            total_wage_cost = float(np.sum([S.households[h].wf for h in members]))
            unit_cost_floor = total_wage_cost / float(Q_lim)

            for g in firm.goods:
                inv_ratio = float(firm.inv[g]) / float(Q_lim)

                if inv_ratio <= 0.20:
                    firm.price[g] *= 1.02
                elif inv_ratio >= 0.80:
                    firm.price[g] *= 0.98




In [64]:
def step2_product_manufacturing(S: State, P: Params) -> None:
  for firm in S.firms:
    for g in firm.goods:
      q_plan = float(firm.prod_plan.get(g, 0.0))

      if q_plan < 0.0:
        q_plan = 0.0

      firm.inv[g] += q_plan


In [65]:
def step3__market_purchasing(S: State, P: Params) -> None:
  rng = np.random.default_rng(P.seed + S.t + 3)

  suppliers_by_g = defaultdict(list)
  for f_id, firm in enumerate(S.firms):
    for g in firm.goods:
      suppliers_by_g[g].append((f_id, firm))
  sales_this_period = [defaultdict(float) for _ in range(P.F)]

  for h in S.households:
    remaining_total_budget = float(min(h.C, h.cash))
    if remaining_total_budget <= 0:
      continue

    pref_types = list(h.pref_types)
    alpha = np.array(h.alpha, dtype=float)
    s = float(alpha.sum())
    if s <= 0:
      continue

    alpha = alpha / s
    for k, g in enumerate(pref_types):
      if remaining_total_budget <= 0:
        break
        g = int(g)
        budget_g = remaining_total_budget * float(alpha[k])
        if budget_g <= 0:
          continue

        candidates = []
        for (f_id, firm) in suppliers_by_g.get(g, []):
          if firm.inv[g] > 0 and firm.price[g] > 0:
            candidates.append((f_id, firm))

        if not candidates:
          continue

        prices = np.array([firm.price[g] for (_, firm) in candidates], dtype=float)
        min_p = prices.min()
        idxs = np.where(prices == min_p)[0]
        pick = int(rng.choice(idxs))
        f_id, firm = candidates[pick]
        p = float(firm.price[g])

        q = int(budget_g // p)
        if q <= 0:
          continue
        q = min(q, int(firm.inv[g]))
        if q <= 0:
          continue

        spend = q * p
        firm.inv[g] -= q
        firm.cash += spend

        h.cash -= spend
        remaining_total_budget -= spend

        sales_this_period[f_id][g] += q

    h.C = float(max(remaining_total_budget, 0.0))

  gov_budget = float(S.gov.grp * S.gov.tax_rev_prev)
  if gov_budget > 0:
    firms = S.firms
    goods_per_firm = int(P.goods_per_firm)

  while gov_budget > 0:
    bought_any = False

    for j in range(goods_per_firm):
      for f_id, firm in enumerate(firms):
        g = int(firm.goods[j])

        if firm.inv[g] <= 0:
          continue
        p = float(firm.price[g])
        if p <= 0 or gov_budget < p:
          continue

        q = 1
        firm.inv[g] -= q
        firm.cash += p

        S.gov.cash -= p
        gov_budget -= p

        sales_this_period[f_id][g] += q
        bought_any = True

        if gov_budget <= 0:
          break
      if gov_budget <= 0:
        break
    if not bought_any:
      break

  for f_id, firm in enumerate(S.firms):
        for g in firm.goods:
            q_sold = float(sales_this_period[f_id].get(int(g), 0.0))
            hist = firm.sales_hist[int(g)]

            if len(hist) >= 2:
              hist[:-1] = hist[1:]
              hist[-1] = q_sold
            elif len(hist) == 1:
              hist[0] = q_sold

            else:
              firm.sales_hist[int(g)] = [q_sold]

