In [1]:
import numpy as np
import pandas as pd
import argparse
from gekko import GEKKO
from pathlib import Path
import os

pd.options.display.max_rows = 999
pd.options.display.max_columns = 99
 
class CGE():
    '''
    Computable general equilibrium (CGE) models are a class of economic models that use 
    actual economic data to estimate how an economy might react to changes in policy, 
    technology or other external factors. CGE models are also referred to as AGE 
    (applied general equilibrium) models.

    In this model assumed two factors influancing economy: Capital (K) and Labour (L).

    Model has been written completly in Python.
    '''

    def __init__(self, capital, labour, used_data_folder):

        work_dir = 'E:\\nauka\\GitHub\\cge_model_python'

        self.use = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\production.csv', index_col=0)
        self.xdem = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\consumption.csv', index_col=0)
        self.xsup = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\economy_output.csv', index_col=0)
        self.intc = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\intermediate_consumption.csv', index_col=0)        
        self.enfac = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\endowment.csv', index_col=0)
        self.gdem = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\government_consumption.csv', index_col=0)
        self.gsav = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\government_savings.csv', index_col=0)
        self.tc = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\consumption_taxes.csv', index_col=0)
        self.ti = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\income_taxes.csv', index_col=0)
        
        self.factors = self.use.index
        self.sectors = self.use.columns
        self.households = self.xdem.columns

        self.Shock = {}
        for fac in self.factors:
            self.Shock[fac] = 0

        self.gamma = {}
        self.XS0 = {}
        self.VA0 = {}
        self.B_leo = {}
        self.beta_leo = pd.DataFrame(index=self.factors, columns=self.sectors)
        self.AIJ = pd.DataFrame(index=self.sectors, columns=self.sectors)
        self.beta_ces = pd.DataFrame(index=self.factors, columns=self.sectors)
        self.B_ces = {}
        self.alpha_gov = {}
        self.INC0 = {}
        self.XD0 = pd.DataFrame(index=self.households, columns=self.sectors)
        self.trans = pd.DataFrame(0, index=self.households, columns=self.households)
        self.W0 = {}
        self.alpha_cb = pd.DataFrame(index=self.sectors, columns=self.households)
        self.A_cb = {}
        self.itax = {}
        self.PW = {}
        self.W = {}
        self.INC = {}
        self.VA = {}
        self.XS = {}
        self.ENDW0 = pd.DataFrame(index=self.enfac.index, columns=self.enfac.columns)

        self.p = {}
        self.pFAC = {}
        self.pVA = {}
        self.tr_in = {}
        self.tr_out = {}

        self.m = GEKKO(remote=False)

    def m_sum(self, array):
        return np.sum(np.nan_to_num(array)) 
    def m_prod(self, array):
        return np.prod(np.nan_to_num(array))
    
    def parameters_extended(self):

        # Recalibration of output including intermediate use (Leontief)
        for sec in self.sectors:
            self.XS0[sec] = sum(self.intc[sec]) + sum(self.use[sec])
            self.VA0[sec] = sum(self.use[sec])
            self.VA[sec] = self.m.Var(lb=0, value=self.VA0[sec])
            self.XS[sec] = self.m.Var(lb=0, value=self.XS0[sec])
            self.pVA[sec] = self.m.Var(lb=0, value=1)

            for fac in self.factors:
                self.beta_leo[sec][fac] = self.m.Const(self.use[sec][fac] / self.VA0[sec])
            
            self.B_leo[sec] = self.m.Const(self.VA0[sec] / self.m_prod([self.use[sec][fac]**self.beta_leo[sec][fac] for fac in self.factors]))

            for sect in self.sectors:
                self.AIJ[sec][sect] = self.m.Const(self.intc[sec][sect] / self.XS0[sec])

        # CES production function calibration
        for sec in self.sectors:
            # gammas
            self.gamma[sec] = self.m.Const(0.5)
            # betas
            for fac in self.factors:
                self.beta_ces[sec][fac] = self.m.Const(self.use[sec][fac]**(1/self.gamma[sec]) / sum(self.use[sec])**(1/self.gamma[sec])**(self.gamma[sec]))
            # B
            self.B_ces[sec] = self.m.Const(self.VA0[sec] / self.m_sum([
                self.beta_ces[sec][fac]**(1/self.gamma[sec]) * self.use[sec][fac]**((self.gamma[sec] - 1)/self.gamma[sec])
                for fac in self.factors
            ])**(self.gamma[sec]/(self.gamma[sec] - 1)))

        # Calibration of the government, parameters of govt C-D function
        gov = self.gdem.columns[0]
        # WG0
        self.WG0 = sum(self.gdem[gov])
        # alpha_gov
        for sec in self.sectors:
            self.alpha_gov[sec] = self.m.Const(self.gdem[gov][sec] / self.WG0)
        # AG
        self.AG = self.m.Const(self.WG0 / self.m_prod([self.gdem[gov][sec]**self.alpha_gov[sec] for sec in self.sectors]))
        
        # Household incomes
        for hou in self.households:
            self.INC0[hou] = self.m_sum([self.enfac[fac][hou] for fac in self.factors]) 

        # Demand
        for sec in self.sectors:
            for hou in self.households:
                self.XD0[sec][hou] = self.xdem[hou][sec]

        for hou in self.households:
            # Transfers
            self.tr_in[hou] = self.m.Const(self.trans[hou].sum())
            self.tr_out[hou] = self.m.Const(self.trans.loc[hou].sum())
            # benchmark utility
            self.W0[hou] = self.m_sum([self.xdem[hou][sec] for sec in self.sectors])

        # Parameters of the Cobb Dougas utility function
        for hou in self.households:
            # alpha cobb douglas
            for sec in self.sectors:
                self.alpha_cb[hou][sec] = self.m.Const(self.xdem[hou][sec] / self.W0[hou])
            # A
            self.A_cb[hou] = self.m.Const(self.W0[hou] / self.m_prod([self.xdem[hou][sec]**self.alpha_cb[hou][sec] for sec in self.sectors]))
            # income tax
            self.itax[hou] = self.m.Const(sum(self.ti[hou]) / self.INC0[hou])

        # Factor tax
        # None
        for sec in self.sectors:
            self.p[sec] = self.m.Var(lb=0, value=1)
        for fac in self.factors:
            self.pFAC[fac] = self.m.Var(lb=0, value=1)

        for hou in self.households:
            self.PW[hou] = self.m.Var(lb=0, value=1)
            self.W[hou] = self.m.Var(value=sum(self.xdem[hou]))
            self.INC[hou] = self.m.Var(lb=0, value=sum(self.xdem[hou]))
            for fac in self.factors:
                # Endowments of factors
                self.ENDW0[fac][hou] = self.m.Const(
                    self.enfac[fac][hou] * (1 - self.Shock[fac])
                )
        self.PWG = self.m.Var(lb=0, value=1)
        self.WG = self.m.Var(value=self.WG0)
        self.INCG = self.m.Var(lb=0, value=sum(self.ti.iloc[0]))
        self.GSAV = self.m.Var(value=sum(self.gsav.loc[gov]))

            

    def equilibrium_extended(self):

        self.parameters_extended()

        self.equations = {
            'PRF_WG': (
                self.m_prod(
                    [
                        (self.p[sec] / self.alpha_gov[sec])**(self.alpha_gov[sec]) 
                        for sec in self.sectors
                    ]
                ) 
                / self.AG 
                == self.PWG
            ),
            'MKT_WG': (
                self.WG * self.PWG 
                == self.INCG - self.GSAV
            ),
            'I_INCG': (
                np.nan_to_num(np.sum([
                    self.INC[hou] * self.itax[hou] 
                    for hou in self.households
                ])) 
                + self.m_sum([
                    self.m_sum([
                        self.pFAC[fac] * self.beta_ces[sec][fac] * self.B_ces[sec]**(self.gamma[sec] - 1)
                        * self.VA[sec] * (self.pVA[sec] / self.pFAC[fac])**self.gamma[sec]
                        for sec in self.sectors
                    ])
                    for fac in self.factors
                ]) 
                == self.INCG
            )
        }

        for sec in self.sectors:
            # Zero profit conditions (P = MC)
            self.equations[f'PRF_{sec}_S'] = (
                (
                    1 - self.m_sum([
                        self.AIJ[sec][sec] 
                        for sec in self.sectors
                    ]) * self.pVA[sec])
                == self.p[sec]
            )
            self.equations[f'PRF_{sec}_VA'] = (
                self.m_sum([
                    self.beta_ces[sec][fac] * self.pFAC[fac]**(1 - self.gamma[sec])
                    for fac in self.factors
                ]) 
                == (self.B_ces[sec] * self.pVA[sec])**(1 - self.gamma[sec])
            )
            self.equations[f'MKT_{sec}_S'] = (
                self.m_sum([
                    self.alpha_cb[hou][sec] * self.PW[hou] * self.W[hou] / self.p[sec]
                    for hou in self.households
                ])
                + self.alpha_gov[sec] * self.WG * self.PWG / self.p[sec]
                + self.m_sum([
                    self.AIJ[sec][sect] * self.XS[sect]
                    for sect in self.sectors
                ])
                == self.XS[sec]
            )
            self.equations[f'MKT_{sec}_VA'] = (
                (1 - self.m_sum([
                    self.AIJ[sec][sect]
                    for sect in self.sectors
                ]))**self.XS[sec]
                == self.VA[sec] 
            )
        for hou in self.households:
            self.equations[f'PRF_{hou}_W'] = (
                self.m_prod(
                    [
                        (self.p[sec] / self.alpha_cb[hou][sec])**(self.alpha_cb[hou][sec]) 
                        for sec in self.sectors
                    ]
                ) 
                / self.A_cb[hou] 
                == self.PW[hou]
            )
            self.equations[f'MKT_{hou}_W'] = (
                self.PW[hou] * self.W[hou]
                == self.INC[hou] - self.itax[hou]
            )
            self.equations['INC_' + hou] = (
                self.m_sum([
                    self.pFAC[fac] * self.ENDW0[fac][hou]
                    for fac in self.factors
                ])
                + self.tr_in[hou] - self.tr_out[hou]   
                == self.INC[hou]
            )
        for fac in self.factors:
            self.equations[f'MKT_{fac}'] = (
                self.m_sum([
                    self.beta_ces[sec][fac] * self.B_ces[sec]**(self.gamma[sec] - 1)
                    * self.VA[sec] * (self.pVA[sec] / self.pFAC[fac])**self.gamma[sec]
                    for sec in self.sectors
                ])
                == np.nan_to_num(np.sum([
                    self.ENDW0[fac][hou]
                    for hou in self.households
                ]))
            )
        

        self.m.Equations(list(self.equations.values()))
        self.m.options.SOLVER=1
        self.m.solve()

In [2]:
mod_test = CGE(0, 0, 'SA_SAM_2015')
mod_test.parameters_extended()
mod_test.equilibrium_extended()



 ----------------------------------------------------------------
 APMonitor, Version 0.9.2
 APMonitor Optimization Suite
 ----------------------------------------------------------------
 
 @error: Model Expression
 *** Error in syntax of function string: Invalid element: nan

Position: 1                   
 nan
 ?




Exception: @error: Model Expression
 *** Error in syntax of function string: Invalid element: nan

Position: 1                   
 nan
 ?



In [35]:
import numpy as np
import pandas as pd
import argparse
from gekko import GEKKO
from pathlib import Path
import os

pd.options.display.max_rows = 999
pd.options.display.max_columns = 99
 
class CGE():
    '''
    Computable general equilibrium (CGE) models are a class of economic models that use 
    actual economic data to estimate how an economy might react to changes in policy, 
    technology or other external factors. CGE models are also referred to as AGE 
    (applied general equilibrium) models.

    In this model assumed two factors influancing economy: Capital (K) and Labour (L).

    Model has been written completly in Python.
    '''

    def __init__(self, capital, labour, used_data_folder):

        work_dir = 'E:\\nauka\\GitHub\\cge_model_python'
        
        self.use = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\production.csv', index_col=0)
        self.xdem = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\consumption.csv', index_col=0)
        self.xsup = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\economy_output.csv', index_col=0)
        self.intc = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\intermediate_consumption.csv', index_col=0)        
        self.enfac = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\endowment.csv', index_col=0)
        self.gdem = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\government_consumption.csv', index_col=0)
        self.gsav = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\government_savings.csv', index_col=0)
        self.tc = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\consumption_taxes.csv', index_col=0)
        self.ti = pd.read_csv(work_dir + '\\Data\\' + used_data_folder + '\\income_taxes.csv', index_col=0)
        
        self.factors = self.use.index
        self.sectors = self.use.columns
        self.households = self.xdem.columns

        self.trans = pd.DataFrame(0, index=self.households, columns=self.households)
        self.Shock = {}
        for fac in self.factors:
            self.Shock[fac] = 0

        self.gamma = {}
        self.XS0 = {}
        self.VA0 = {}
        self.B_leo = {}
        self.beta_leo = pd.DataFrame(index=self.factors, columns=self.sectors)
        self.AIJ = pd.DataFrame(index=self.sectors, columns=self.sectors)
        self.beta_ces = pd.DataFrame(index=self.factors, columns=self.sectors)
        self.B_ces = {}
        self.alpha_gov = {}
        self.INC0 = {}
        self.XD0 = pd.DataFrame(index=self.households, columns=self.sectors)
        self.W0 = {}
        self.alpha_cb = pd.DataFrame(index=self.sectors, columns=self.households)
        self.A_cb = {}
        self.itax = {}
        self.PW = {}
        self.W = {}
        self.INC = {}
        self.VA = {}
        self.XS = {}
        self.ENDW0 = pd.DataFrame(index=self.enfac.index, columns=self.enfac.columns)

        self.p = {}
        self.pFAC = {}
        self.pVA = {}
        self.tr_in = {}
        self.tr_out = {}

        self.m = GEKKO(remote=False)

    
    def parameters_extended(self):

        # Recalibration of output including intermediate use (Leontief)
        for sec in self.sectors:
            self.XS0[sec] = sum(self.intc[sec]) + sum(self.use[sec])
            self.VA0[sec] = sum(self.use[sec])
            self.VA[sec] = self.m.Var(lb=0, value=self.VA0[sec])
            self.XS[sec] = self.m.Var(lb=0, value=self.XS0[sec])
            self.pVA[sec] = self.m.Var(lb=0, value=1)

            for fac in self.factors:
                self.beta_leo[sec][fac] = self.use[sec][fac] / self.VA0[sec]
            
            self.B_leo[sec] = self.VA0[sec] / np.prod([self.use[sec][fac]**self.beta_leo[sec][fac] for fac in self.factors])

            for sect in self.sectors:
                self.AIJ[sec][sect] = self.intc[sec][sect] / self.XS0[sec]

        # CES production function calibration
        for sec in self.sectors:
            # gammas
            self.gamma[sec] = 0.5
            # betas
            for fac in self.factors:
                self.beta_ces[sec][fac] = self.use[sec][fac]**(1/self.gamma[sec]) / sum(self.use[sec])**(1/self.gamma[sec])**(self.gamma[sec])
            # B
            self.B_ces[sec] = self.VA0[sec] / np.sum(np.nan_to_num([
                self.beta_ces[sec][fac]**(1/self.gamma[sec]) * self.use[sec][fac]**((self.gamma[sec] - 1)/self.gamma[sec])
                for fac in self.factors
            ]))**(self.gamma[sec]/(self.gamma[sec] - 1))

        # Calibration of the government, parameters of govt C-D function
        gov = self.gdem.columns[0]
        # WG0
        self.WG0 = sum(self.gdem[gov])
        # alpha_gov
        for sec in self.sectors:
            self.alpha_gov[sec] = self.gdem[gov][sec] / self.WG0
        # AG
        self.AG = self.WG0 / np.prod([self.gdem[gov][sec]**self.alpha_gov[sec] for sec in self.sectors])
        
        # Household incomes
        for hou in self.households:
            self.INC0[hou] = np.sum([self.enfac[fac][hou] for fac in self.factors]) 

        # Demand
        for sec in self.sectors:
            for hou in self.households:
                self.XD0[sec][hou] = self.xdem[hou][sec]

        for hou in self.households:
            # Transfers
            self.tr_in[hou] = self.trans[hou].sum()
            self.tr_out[hou] = self.trans.loc[hou].sum()
            # benchmark utility
            self.W0[hou] = np.sum([self.xdem[hou][sec] for sec in self.sectors])

        # Parameters of the Cobb Dougas utility function
        for hou in self.households:
            # alpha cobb douglas
            for sec in self.sectors:
                self.alpha_cb[hou][sec] = self.xdem[hou][sec] / self.W0[hou]
            # A
            self.A_cb[hou] = self.W0[hou] / np.prod([self.xdem[hou][sec]**self.alpha_cb[hou][sec] for sec in self.sectors])
            # income tax
            self.itax[hou] = sum(self.ti[hou]) / self.INC0[hou]

        # Factor tax
        # None
        for sec in self.sectors:
            self.p[sec] = self.m.Var(lb=0, value=1)
        for fac in self.factors:
            self.pFAC[fac] = self.m.Var(lb=0, value=1)

        for hou in self.households:
            self.PW[hou] = self.m.Var(lb=0, value=1)
            self.W[hou] = self.m.Var(value=sum(self.xdem[hou]))
            self.INC[hou] = self.m.Var(lb=0, value=sum(self.xdem[hou]))
            for fac in self.factors:
                # Endowments of factors
                self.ENDW0[fac][hou] = self.enfac[fac][hou] * (1 - self.Shock[fac])
        self.PWG = self.m.Var(lb=0, value=1)
        self.WG = self.m.Var(value=self.WG0)
        self.INCG = self.m.Var(lb=0, value=sum(self.ti.iloc[0]))
        self.GSAV = self.m.Var(value=sum(self.gsav.loc[gov]))

In [36]:
mod_test = CGE(0, 0, 'SA_SAM_2015')
mod_test.parameters_extended()



In [37]:
mod_test.pVA

{'agri': 1,
 'fore': 1,
 'fish': 1,
 'coal': 1,
 'gold': 1,
 'more': 1,
 'omin': 1,
 'food': 1,
 'bevt': 1,
 'weav': 1,
 'knit': 1,
 'leat': 1,
 'foot': 1,
 'wood': 1,
 'papr': 1,
 'prnt': 1,
 'petr': 1,
 'bchm': 1,
 'ochm': 1,
 'rubb': 1,
 'plas': 1,
 'glss': 1,
 'nmmi': 1,
 'bisc': 1,
 'nfme': 1,
 'fabm': 1,
 'mach': 1,
 'emch': 1,
 'rdtv': 1,
 'mopt': 1,
 'mtvp': 1,
 'otrp': 1,
 'furn': 1,
 'omnf': 1,
 'elcg': 1,
 'watd': 1,
 'cnst': 1,
 'wtrd': 1,
 'rtrd': 1,
 'mtvs': 1,
 'acct': 1,
 'ltrp': 1,
 'wtrp': 1,
 'atrp': 1,
 'trps': 1,
 'post': 1,
 'fins': 1,
 'insp': 1,
 'ofin': 1,
 'real': 1,
 'rent': 1,
 'comp': 1,
 'rsea': 1,
 'obus': 1,
 'puba': 1,
 'educ': 1,
 'heal': 1,
 'wast': 1,
 'morg': 1,
 'recr': 1,
 'oact': 1,
 'nobs': 1,
 'spcm': 1,
 'texm': 1,
 'phar': 1,
 'ftrp': 1,
 'texf': 1,
 'conf': 1,
 'lani': 1,
 'soap': 1,
 'afee': 1,
 'pain': 1,
 'pfis': 1,
 'ptrp': 1,
 'dair': 1,
 'ofoo': 1,
 'wear': 1,
 'star': 1,
 'gear': 1,
 'orub': 1,
 'genm': 1,
 'oche': 1,
 'cats': 1,
 'do