<a href="https://colab.research.google.com/github/zhenghaojiang/rl_dsge/blob/main/Solver_Class.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from abc import ABC, abstractmethod
from sympy import *
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt

In [None]:
class Solver(ABC):

    def __init__(self,params,struc_params):
        # Constants
        self.r = params["r"]
        self.delta = params["delta"]
        # Structural Params
        self.theta = struc_params[0]
        self.rho = struc_params[1]
        self.sigma = struc_params[2]
    
    def pi(self,k,z):
        return z*k**self.theta

    def psi(self,I,k):
        return 0.01*(I**2)/(2*k)

    def modeldefs(self,k,kp):
        I = kp-(1-self.delta)*k
        return I

    @abstractmethod
    def dsge_solve(self):
        pass

In [None]:
class Linear(Solver):

    def __init__(self,params,struc_params):
        super(Linear, self).__init__(params,struc_params)
        # Constants
        self.k_st = (((self.r+self.delta)*(1+0.01*self.delta)-0.005*(self.delta**2))/self.theta) ** (1/(self.theta-1))
        self.I_st = self.delta*self.k_st
        self.m1 = self.theta*(self.theta-1)*(self.k_st**(self.theta-2))-0.01*(self.I_st**2)/(self.k_st**3) \
                  -0.01*(1-self.delta)*self.I_st/(self.k_st**2)
        self.m2 = 0.01*(self.I_st/(self.k_st**2)+(1-self.delta)/self.k_st)

    def dsge_solve(self):
        A = Symbol('A')
        B = Symbol('B')
        solved_value = solve([0.01*(1+self.r)*(A/self.k_st-self.I_st/(self.k_st**2))-(1-self.delta+A)*(self.m1+self.m2*A), 
                              0.01*(1+self.r)*B/self.k_st-self.theta*(self.k_st**(self.theta-1))*self.rho \
                              -self.m1*B-self.m2*B*(A+self.rho)], 
                             [A, B])
        kab = np.array(solved_value)[0]
        return kab

In [None]:
class Loglinear(Solver):

    def __init__(self,params,struc_params):
        super(Loglinear, self).__init__(params,struc_params)
        # Constants
        self.k = (((self.r+self.delta)*(1+0.01*self.delta)-0.005*(self.delta**2))/self.theta) ** (1/(self.theta-1))
        self.I = self.delta*self.k
        self.m1 = self.theta*(self.theta-1)*(self.k**(self.theta-1))-0.01*((self.I/self.k)**2)-0.01*(1-self.delta)*self.I/self.k
        self.m2 = 0.01*((self.I/self.k)**2)+0.01*(1-self.delta)*self.I/self.k
    
    def dsge_solve(self):
        A = Symbol('A')
        B = Symbol('B')
        solved_value = solve([0.01*(1+self.r)*self.I*(A-1)/self.k-(1-self.delta+A*self.I/self.k)*(self.m1+self.m2*A), 
                              0.01*(1+self.r)*self.I*B/self.k-self.theta*(self.k**(self.theta-1))*self.rho \
                              -self.m1*B*self.I/self.k-self.m2*(A*B*self.I/self.k+self.rho)], 
                             [A, B])
        kab = np.array(solved_value)[0]
        return kab

In [None]:
# rouwen
def rouwen(rho, mu, step, num):
    # discrete state space
    dscSp = np.linspace(mu -(num-1)/2*step, mu +(num-1)/2*step, num).T

    # transition probability matrix
    q = p = (rho + 1)/2.
    transP = np.array([[p**2, p*(1-q), (1-q)**2], \
              [2*p*(1-p), p*q+(1-p)*(1-q), 2*q*(1-q)], \
              [(1-p)**2, (1-p)*q, q**2]]).T


    while transP.shape[0] <= num - 1:

        # see Rouwenhorst 1995
        len_P = transP.shape[0]
        transP = p * np.vstack((np.hstack((transP, np.zeros((len_P, 1)))), np.zeros((1, len_P+1)))) \
                + (1 - p) * np.vstack((np.hstack((np.zeros((len_P, 1)), transP)), np.zeros((1, len_P+1)))) \
                + (1 - q) * np.vstack((np.zeros((1, len_P+1)), np.hstack((transP, np.zeros((len_P, 1)))))) \
                + q * np.vstack((np.zeros((1, len_P+1)), np.hstack((np.zeros((len_P, 1)), transP))))

        transP[1:-1] /= 2.


    # ensure columns sum to 1
    if np.max(np.abs(np.sum(transP, axis=1) - np.ones(transP.shape))) >= 1e-12:
        print('Problem in rouwen routine!')
        return None
    else:
        return transP.T, dscSp

In [None]:
class VI(Solver):
    
    def __init__(self,params,struc_params):
        super(VI, self).__init__(params,struc_params)
        # Constants
        self.k_st = (((self.r+self.delta)*(1+0.01*self.delta)-0.005*(self.delta**2))/self.theta) ** (1/(self.theta-1))
        self.I_st = self.delta*self.k_st

    def dsge_solve(self):
        # set up grid for k
        keps = .01
        klow = keps*self.k_st
        khigh = (2-keps)*self.k_st
        knpts = 51
        kgrid = np.linspace(klow, khigh, num = knpts)

        # set up Markov approximation of AR(1) process using Rouwenhorst method
        spread = 2.  # number of standard deviations above and below 0
        znpts = 16
        zstep = 2*spread*self.sigma/(znpts-1)
        # Markov transition probabilities, current z in cols, next z in rows
        Pimat, lnzgrid = rouwen(self.rho, 0., zstep, znpts)
        zgrid = np.exp(lnzgrid)

        VF = np.zeros((knpts, znpts))
        VFnew = np.zeros((knpts, znpts))
        PF = np.zeros((knpts, znpts))

        ccrit = 1.0E-2
        maxit = 1000
        damp = 1.
        dist = 1.0E+99
        iters = 0

        while (dist > ccrit) and (iters < maxit):
            VFnew.fill(0.0)
            iters = iters + 1
            for i in range (0, knpts):
                for j in range(0, znpts):
                    maxval = -1.0E+98
                    for m in range(0, knpts):
                        # get current period utility
                        I = self.modeldefs(kgrid[i], kgrid[m])
                        # get expected value
                        val = self.pi(kgrid[i], zgrid[j]) - self.psi(I, kgrid[i]) - I
                        for n in range (0, znpts):
                            # sum over all possible value of z(t+1) with Markov probs
                            val = val + Pimat[n, j]*VF[m, n]/(1+self.r)
                            # if this exceeds previous maximum do replacements
                        if val > maxval:
                            maxval = val
                            VFnew[i, j] = val
                            PF[i, j] = kgrid[m]
            dist = np.amax(np.abs(VF - VFnew))
            print('iteration: ', iters, 'distance: ', dist)
            VF = damp*VFnew + (1-damp)*VF
        print('Converged after', iters, 'iterations')
        print('Policy function at (', int((knpts-1)/2), ',', int((znpts-1)/2), ') should be', \
            kgrid[int((knpts-1)/2)], 'and is', PF[int((knpts-1)/2), int((znpts-1)/2)])
        def policy(k,z):
            i = np.argmin(abs(kgrid-k))
            j = np.argmin(abs(zgrid-z))
            k_new = PF[i,j]
            return k_new
        return policy

In [None]:
basic_params = {'r': 1/24,
                'delta': 0.15}

basic_struc_params = [0.70,0.70,0.15]

In [None]:
res = VI(params=basic_params,struc_params=basic_struc_params)
plc = res.dsge_solve()

iteration:  1 distance:  170.01999552952066
iteration:  2 distance:  12.022597147589465
iteration:  3 distance:  9.512392664027658
iteration:  4 distance:  7.906627149013929
iteration:  5 distance:  6.848901069758568
iteration:  6 distance:  6.114907804248247
iteration:  7 distance:  5.578300985196108
iteration:  8 distance:  5.166690090654129
iteration:  9 distance:  4.836893166585213
iteration:  10 distance:  4.562295917637073
iteration:  11 distance:  4.3260424451007395
iteration:  12 distance:  4.117223482872191
iteration:  13 distance:  3.9286560225285996
iteration:  14 distance:  3.755540077158088
iteration:  15 distance:  3.594622757519261
iteration:  16 distance:  3.443667177458167
iteration:  17 distance:  3.3011097112145364
iteration:  18 distance:  3.165836195032739
iteration:  19 distance:  3.0370345195038055
iteration:  20 distance:  2.9140969104506667
iteration:  21 distance:  2.7965548345159164
iteration:  22 distance:  2.6840354725121074
iteration:  23 distance:  2.5762