# 4) Neural Network

In [3]:
pip install torch numpy scipy matplotlib


Defaulting to user installation because normal site-packages is not writeable
Collecting torch
  Downloading torch-2.5.1-cp39-none-macosx_11_0_arm64.whl (63.9 MB)
[K     |████████████████████████████████| 63.9 MB 1.9 MB/s eta 0:00:01
[?25hCollecting numpy
  Downloading numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl (5.3 MB)
[K     |████████████████████████████████| 5.3 MB 2.3 MB/s eta 0:00:01
[?25hCollecting scipy
  Downloading scipy-1.13.1-cp39-cp39-macosx_12_0_arm64.whl (30.3 MB)
[K     |████████████████████████████████| 30.3 MB 4.2 MB/s eta 0:00:01
[?25hCollecting matplotlib
  Downloading matplotlib-3.9.4-cp39-cp39-macosx_11_0_arm64.whl (7.8 MB)
[K     |████████████████████████████████| 7.8 MB 3.4 MB/s eta 0:00:01
[?25hCollecting sympy==1.13.1
  Downloading sympy-1.13.1-py3-none-any.whl (6.2 MB)
[K     |████████████████████████████████| 6.2 MB 4.5 MB/s eta 0:00:01
Collecting fsspec
  Downloading fsspec-2024.12.0-py3-none-any.whl (183 kB)
[K     |██████████████████████████████

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
from scipy.optimize import root

# Set random seeds for reproducibility
torch.manual_seed(1234)
np.random.seed(1234)

# Model parameters
BETA = 0.97  # discount factor
ALPHA = 0.33  # capital share
DELTA = 0.1   # depreciation rate
RHO = 0.95    # AR(1) persistence
SIGMA = 0.007 # AR(1) shock std
EPSILON = 1e-8  # Small epsilon to avoid division by zero


def steady_state(ALPHA, BETA, DELTA):
    """
    Compute steady state given parameters α (alpha), β (beta), and δ (delta).
    The function solves three steady state equations:
    1. From (2_ss): β(αk^(α-1)l^(1-α) + 1-δ) = 1
    2. From (1_ss): (1/c)k^α(1-α)l^(-α) = l
    3. From (3_ss): c + δk = k^αl^(1-α)
    
    Args:
        α (float): Capital share parameter
        β (float): Discount factor
        δ (float): Depreciation rate
    
    Returns:
        numpy.ndarray: Steady state values [k, l, c]
    """
    def ss_equations(x):
        k, l, c = x
        
        # The three steady state equations
        eq1 = BETA * (ALPHA * k**(ALPHA-1) * l**(1-ALPHA) + 1-DELTA) - 1
        eq2 = (1/c) * k**ALPHA * (1-ALPHA) * l**(-ALPHA) - l
        eq3 = c + DELTA*k - k**ALPHA * l**(1-ALPHA)
        
        return [eq1, eq2, eq3]
    
    # Initial guess
    x0 = np.array([1.0, 0.3, 0.5])
    
    # Solve system of equations using scipy.optimize.root
    result = root(ss_equations, x0)
    
    if not result.success:
        raise ValueError("Failed to find steady state solution")
        
    return result.x


class DSGENet(nn.Module):
    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(2, 25),
            nn.ReLU(),
            nn.Linear(25, 25),
            nn.ReLU(),
            nn.Linear(25, 25),
            nn.ReLU(),
            nn.Linear(25, 3)
        )
    
    def forward(self, x):
        return self.network(x)

def tauchen(rho=RHO, sigma=SIGMA, n=3, m=3):
    """Tauchen method for discretizing AR(1) process"""
    sigma_y = sigma / np.sqrt(1 - rho**2)
    y_max = m * sigma_y
    y_min = -y_max
    step = (y_max - y_min) / (n - 1)
    y = np.linspace(y_min, y_max, n)
    
    P = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            if j == 0:
                P[i,j] = norm.cdf((y[j] + step/2 - rho*y[i])/sigma)
            elif j == n-1:
                P[i,j] = 1 - norm.cdf((y[j] - step/2 - rho*y[i])/sigma)
            else:
                P[i,j] = (norm.cdf((y[j] + step/2 - rho*y[i])/sigma) - 
                         norm.cdf((y[j] - step/2 - rho*y[i])/sigma))
    
    return torch.tensor(y, dtype=torch.float32), torch.tensor(P, dtype=torch.float32)

def production(k, l, z):
    """Production function"""
    return torch.exp(z) * k**ALPHA * l**(1 - ALPHA)

def euler_errors(model, k, z, z_grid, P):
    """Calculate Euler equation errors"""
    with torch.set_grad_enabled(True):
        # Get current policy
        state = torch.tensor([k, z], dtype=torch.float32)
        output = model(state)
        c = torch.nn.functional.softplus(output[0])
        l = torch.nn.functional.softplus(output[1])
        k_next = torch.nn.functional.softplus(output[2])
        
        # Current period production
        y = production(k, l, z)
        
        # Labor-leisure condition error
        ll_error = ((1 / c) * torch.exp(z) * k**ALPHA * (1 - ALPHA) * l**(-ALPHA) - l)**2
        
        # Euler equation error
        ee_error = 0.0
        z_index = (z_grid == z).nonzero(as_tuple=True)[0].item()  # Get the index of the current z
        for i, z_next in enumerate(z_grid):
            state_next = torch.tensor([k_next, z_next], dtype=torch.float32)
            output_next = model(state_next)
            c_next = torch.nn.functional.softplus(output_next[0])
            l_next = torch.nn.functional.softplus(output_next[1])
            
            mpc_next = torch.exp(z_next) * ALPHA * k_next**(ALPHA - 1) * l_next**(1 - ALPHA) + (1 - DELTA)
            ee_error += P[z_index, i] * BETA * (c / (c_next + EPSILON)) * mpc_next
        ee_error = (1 - ee_error)**2
        
        # Resource constraint error
        rc_error = (c + k_next - y - (1 - DELTA) * k)**2
        
        return ll_error + ee_error + rc_error

def train_model(epochs=1000):
    """Train the neural network"""
    model = DSGENet()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    
    # Create grid points
    z_grid, P = tauchen()
    k_grid = torch.linspace(1.0, 10.0, 100, dtype=torch.float32)
    
    # Training loop
    for epoch in range(epochs):
        total_loss = 0.0
        for k in k_grid:
            for z in z_grid:
                optimizer.zero_grad()
                loss = euler_errors(model, k, z, z_grid, P)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
        
        if (epoch + 1) % 100 == 0:
            avg_loss = total_loss / (len(k_grid) * len(z_grid))
            print(f"Epoch {epoch+1}: Average Loss = {avg_loss:.6f}")
    
    return model

def generate_policy_functions(model):
    """Generate policy functions using the trained model"""
    k_grid = torch.linspace(1.0, 10.0, 100, dtype=torch.float32)
    z_grid, _ = tauchen()
    
    c_policy = np.zeros((len(k_grid), len(z_grid)))
    l_policy = np.zeros((len(k_grid), len(z_grid)))
    k_policy = np.zeros((len(k_grid), len(z_grid)))
    
    with torch.no_grad():
        for i, k in enumerate(k_grid):
            for j, z in enumerate(z_grid):
                state = torch.tensor([k, z], dtype=torch.float32)
                output = model(state)
                c_policy[i,j] = torch.nn.functional.softplus(output[0]).item()
                l_policy[i,j] = torch.nn.functional.softplus(output[1]).item()
                k_policy[i,j] = torch.nn.functional.softplus(output[2]).item()
    
    return c_policy, l_policy, k_policy, k_grid.numpy()

def plot_policy_functions(k_grid, c_policy, l_policy, k_policy):
    """Plot the policy functions for all states with 2D, 3D, and contour visualizations"""
    z_grid, _ = tauchen()
    z_labels = [f'z = {z:.3f}' for z in z_grid]
    
    # Create meshgrid for contour plots
    K, Z = np.meshgrid(k_grid, z_grid)
    
    # 2D plots for all states
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 15))
    
    for i in range(len(z_grid)):
        ax1.plot(k_grid, c_policy[:,i], label=z_labels[i])
    ax1.set_title('Consumption Policy')
    ax1.set_xlabel('Capital')
    ax1.set_ylabel('Consumption')
    ax1.legend()
    ax1.grid(True)
    
    for i in range(len(z_grid)):
        ax2.plot(k_grid, l_policy[:,i], label=z_labels[i])
    ax2.set_title('Labor Policy')
    ax2.set_xlabel('Capital')
    ax2.set_ylabel('Labor')
    ax2.legend()
    ax2.grid(True)
    
    for i in range(len(z_grid)):
        ax3.plot(k_grid, k_policy[:,i], label=z_labels[i])
    ax3.set_title('Next Period Capital Policy')
    ax3.set_xlabel('Capital')
    ax3.set_ylabel('Next Period Capital')
    ax3.legend()
    ax3.grid(True)
    
    plt.tight_layout()
    plt.savefig('q4fin.png')
    plt.close()

    print("Policy functions plotted and saved.")

def main():
    print("Training model...")
    model = train_model()
    
    print("Generating policy functions...")
    c_policy, l_policy, k_policy, k_grid = generate_policy_functions(model)
    
    print("Plotting results...")
    plot_policy_functions(k_grid, c_policy, l_policy, k_policy)
    
    return model, c_policy, l_policy, k_policy

# Run the solution
if __name__ == "__main__":
    model, c_policy, l_policy, k_policy = main()

Matplotlib is building the font cache; this may take a moment.


Training model...
Epoch 100: Average Loss = 0.001151
Epoch 200: Average Loss = 0.002608
Epoch 300: Average Loss = 0.001933
Epoch 400: Average Loss = 0.000882
Epoch 500: Average Loss = 0.001500
Epoch 600: Average Loss = 0.001575
Epoch 700: Average Loss = 0.002580
Epoch 800: Average Loss = 0.001468
Epoch 900: Average Loss = 0.001727
Epoch 1000: Average Loss = 0.002155
Generating policy functions...
Plotting results...
Policy functions plotted and saved.
