In [None]:
'''
Neoclassical Growth model

Finite difference method adapted from https://benjaminmoll.com/codes/ HJB_NGM.m
'''
import os
import time
from typing import Dict

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from deep_macrofin import (Comparator, OptimizerType, PDEModel,
                           PDEModelTimeStep, set_seeds)


plt.rcParams["font.size"] = 20
plt.rcParams["lines.linewidth"] = 3
plt.rcParams["lines.markersize"] = 10

In [None]:
def utility(c, gamma):
    if gamma == 1:
        return np.log(c)
    else:
        return c**(1-gamma)/(1-gamma)

def utility_deriv(c, gamma):
    if gamma == 1:
        return 1 / c
    else:
        return c**(-gamma)

def inverse_marginal_deriv(dV, gamma):
    if gamma == 1:
        return 1 / dV
    else:
        return dV**(-1/gamma)

def solve_upwind(gamma, alpha, delta, rho, A, N, max_iter=10000, crit=1e-6):
    '''
    Inputs:
    - gamma: risk aversion
    - alpha: capital share
    - delta: depreciation
    - rho: discount rate
    - A: productivity
    - N: grid count
    - max_iter: maximum number of iterations
    - crit: error threshold
    '''
    kss = (alpha * A / (rho + delta)) ** (1 / (1-alpha))
    print(kss)
    kmin = 0.001 * kss
    kmax = 2 * kss
    k = np.linspace(kmin, kmax, N)
    dk = (kmax - kmin) / (N - 1)

    dVf = np.zeros(N)
    dVb = np.zeros(N)
    c = np.zeros(N)

    # Initial guess
    c0 = A * k ** alpha
    v0 = utility(c0, gamma) / rho
    v = v0.copy()

    dist = np.zeros(max_iter)
    for n in range(max_iter):
        V = v.copy()

        # Forward difference
        dVf[:-1] = (V[1:] - V[:-1]) / dk
        dVf[-1] = 0  # Never used

        # Backward difference
        dVb[1:] = (V[1:] - V[:-1]) / dk
        dVb[0] = 0  # Never used

        I_concave = dVb > dVf

        # Consumption and savings
        cf = inverse_marginal_deriv(dVf, gamma)
        muf = A * k ** alpha - delta * k - cf

        cb = inverse_marginal_deriv(dVb, gamma)
        mub = A * k ** alpha - delta * k - cb

        c0 = A * k ** alpha - delta * k
        dV0 = c0 ** (-gamma)

        # Upwind scheme
        If = muf > 0
        Ib = mub < 0
        I0 = ~(If | Ib)

        Ib[0] = False
        If[0] = True
        Ib[-1] = True
        If[-1] = False

        dV_Upwind = dVf * If + dVb * Ib + dV0 * I0

        c = inverse_marginal_deriv(dV_Upwind, gamma)
        Vchange = utility(c, gamma) + dV_Upwind * (A * k ** alpha - delta * k - c) - rho * V

        # Update
        DeltaT = 0.9 * dk / np.max(A * k ** alpha - delta * k)
        v += DeltaT * Vchange

        dist[n] = np.max(np.abs(Vchange))
        if dist[n] < crit:
            print(f'Value Function Converged, Iteration = {n}')
            break
    return k, v, c

In [None]:
def get_model(params: Dict[str, float], training_config: Dict[str, Dict], model_configs: Dict[str, Dict], seed=0, init_guess={"V": -18, "c": 1.5}):
    kss = (params["alpha"] / (params["rho"] + params["delta"])) ** (1 / (1 - params["alpha"]))
    ckss = kss**params["alpha"] - params["delta"] * kss
    set_seeds(seed)
    model = PDEModelTimeStep("ncg", config=training_config)
    model.set_state(["k"], {"k": [0.01 * kss, 2 * kss]}) #  
    model.add_params(params)
    model.add_endog("V", config=model_configs["V"])
    model.add_endog("c", config=model_configs["c"])
    if params["gamma"] == 1:
        endog_cond = torch.log(torch.tensor(ckss, dtype=torch.float32, device=model.device))/params["rho"]
        utility_eq = "u=log(c)"
    else:
        endog_cond = ckss**(1-params["gamma"]) / ((1-params["gamma"]) * params["rho"])
        utility_eq = "u=c**(1-gamma)/(1-gamma)"
    
    ss_bd = torch.zeros((100, 2), device=model.device)
    ss_bd[:, 0] = kss
    ss_bd[:, 1] = torch.linspace(0, 1, 100, device=model.device)
    model.add_endog_condition("V", 
                                "V(SV)", 
                                {"SV": ss_bd},
                                Comparator.EQ,
                                "ec", {"ec": endog_cond},
                                label="v1")
    model.add_endog_condition("c", 
                                "c(SV)", 
                                {"SV": ss_bd},
                                Comparator.EQ,
                                "kss**alpha - delta * kss", params | {"kss": kss},
                                label="c1")
    model.add_equation("s=k**alpha - delta * k - c")
    model.add_equation(utility_eq)
    model.add_endog_equation("c**(-gamma)=V_k")
    model.add_constraint("c_k", Comparator.GEQ, "0")
    model.add_hjb_equation("V_t + u+ V_k * s-rho*V")
    model.set_initial_guess(init_guess)
    return model

In [None]:
PARAMS = {
        "gamma": 2, # Risk aversion
        "alpha": 0.3, # Returns to scale
        "delta": 0.05, # Capital depreciation
        "rho": 0.05, # Discount rate
        "A": 1, # Productivity
    }
kss = (PARAMS["alpha"] / (PARAMS["rho"] + PARAMS["delta"])) ** (1 / (1 - PARAMS["alpha"]))

TRAINING_CONFIGS = {
    "MLP": {"num_outer_iterations": 20, "num_inner_iterations": 10000,  
            "time_batch_size": 5, "optimizer_type": OptimizerType.Adam},
}

MODEL_CONFIGS = {
    "MLP": {
        "V": {"hidden_units": [64] * 4},
        "c": {"hidden_units": [32] * 4, "positive": True},
    },
}

INIT_GUESSES = {"V": -18, "c": 1.5}

BASE_DIR = "models/ncg_ts/"
os.makedirs(BASE_DIR, exist_ok=True)
print("{0:=^80}".format("loading FD solutions"))
if not os.path.exists(f"{BASE_DIR}/ncg_fd.npz"):
    k, v, c = solve_upwind(PARAMS["gamma"], PARAMS["alpha"], PARAMS["delta"], 
                        PARAMS["rho"], PARAMS["A"], 100)
    np.savez(f"{BASE_DIR}/ncg_fd.npz", k=k, v=v, c=c)
fd_res = np.load(f"{BASE_DIR}/ncg_fd.npz")

In [None]:
model = get_model(PARAMS, TRAINING_CONFIGS, MODEL_CONFIGS, 0)
if not os.path.exists(f"{BASE_DIR}/model.pt"):
    model.train_model(BASE_DIR, f"model.pt", True)
    model.eval_model(True)
else:
    model.load_model(torch.load(f"{BASE_DIR}/model.pt", weights_only=False, map_location=model.device))
    model.eval_model(True)

In [None]:
k_fd = fd_res["k"]
v_fd = fd_res["v"]
c_fd = fd_res["c"]
idx = k_fd > 0.01 * kss
k_fd = k_fd[idx]
v_fd = v_fd[idx]
c_fd = c_fd[idx]

SV = torch.zeros((k_fd.shape[0], 2), device=model.device)
SV[:, 0] = torch.tensor(k_fd, device=model.device, dtype=torch.float32)
for i, sv_name in enumerate(model.state_variables):
    model.variable_val_dict[sv_name] = SV[:, i:i+1]
model.variable_val_dict["SV"] = SV
model.update_variables(SV)
V_model = model.variable_val_dict["V"].detach().cpu().numpy().reshape(-1)
c_model = model.variable_val_dict["c"].detach().cpu().numpy().reshape(-1)

fig, ax = plt.subplots(1, 2, figsize=(20, 10))
ax[0].plot(k_fd, v_fd, linestyle="-.", color="red", label="Finite Difference")
ax[0].plot(k_fd, V_model, color="blue", label=f"Deep-MacroFin")
ax[0].legend()
ax[0].set_xlabel("$k$")
ax[0].set_ylabel("$V(k)$")

ax[1].plot(k_fd, c_fd, linestyle="-.", color="red", label="Finite Difference")
ax[1].plot(k_fd, c_model, color="blue", label=f"Deep-MacroFin")
ax[1].legend()
ax[1].set_xlabel("$k$")
ax[1].set_ylabel("$c(k)$")
plt.tight_layout()
plt.show()