In [33]:
import numpy as np
from scipy.stats import beta

# parameters and model set up

sigma = 5               # elasticity of substitution between critical goods
theta = 0.1             # between outside and critical good
kappa = 0.5             # fixed entry cost
z_h = {"A": 0.0, "B": 0.0}  # production subsidies for countries A and B
e_h = {"A": 0.0, "B": 0.0}  # entry subsidy for countries A and B
tau = 0.1                   # generic trade cost

imtax_Fij = {"A": {"A": 0.00, "B": 0.00, "C": 0.00}, # import tariff on critical goods from i to j
             "B": {"A": 0.00, "B": 0.00, "C": 0.00}}

extax_Fij = {"A": {"A": 0.00, "B": 0.00}, # export tax on critical goods from i to j
             "B": {"A": 0.00, "B": 0.00}}

imtax_Ihi = {"A": {"A": 0.00, "B": 0.00}, # import tariff on inputs from h to i
             "B": {"A": 0.00, "B": 0.00}}

extax_Ihi = {"A": {"A": 0.00, "B": 0.00}, # export tax on inputs from h to i
             "B": {"A": 0.00, "B": 0.00}}

alpha_c = {"A": 6, "B": 6}    # alpha for country risk, e.g. 6 (~86%), 9 (90%), 19 (95%), 99 (99%)
beta_c = {"A": 1, "B": 1}     # beta parameter (fix to 1)
alpha_i = 19                  # alpha parameter for idiosyncratic risk
beta_i = 1

cvar_alpha = 0.95  # confidence level for CVaR

In [35]:
# run model
# trade costs for critical goods from A to (A, B)
t_Fij = {
    i: {
        j: 1 if i == j else 1 + imtax_Fij.get(i, {}).get(j, 0) + extax_Fij.get(i, {}).get(j, 0) + tau
        for j in ("A", "B", "C") if j in imtax_Fij.get(i, {})
    }
    for i in ("A", "B", "C") if i in imtax_Fij
}

# trade costs for critical goods from A to (A, B)
t_Ihi = {
    h: {
        i: 1 if h == i else 1 + imtax_Ihi.get(h, {}).get(i, 0) + extax_Ihi.get(h, {}).get(i, 0) + tau
        for i in ("A", "B", "C") if i in imtax_Ihi.get(h, {})
    }
    for h in ("A", "B") if h in imtax_Ihi
}

# expected survival probability
expected_phi_h = {
    "A": (alpha_c["A"]/(alpha_c["A"] + beta_c["A"]))*(alpha_i/(alpha_i + beta_i)),
    "B": (alpha_c["B"]/(alpha_c["B"] + beta_c["B"]))*(alpha_i/(alpha_i + beta_i)),
}

# calculate input prices
p_Ih = {
    "A": (sigma / (sigma - 1)) * (1 / expected_phi_h["A"] - z_h["A"]),
    "B": (sigma / (sigma - 1)) * (1 / expected_phi_h["B"] - z_h["B"]),
}

# calculate price of final goods (p_Fi) = input price index (bar_P_Ii)
p_Fi = {i: np.sum([(p_Ih[h] * t_Ihi[h][i]) ** (1 - sigma) for h in ("A", "B")]) ** (1 / (1 - sigma)) for i in ("A", "B")}

# calculate price index for final goods consumed in country j
bar_P_Fj = {j: np.sum([(p_Fi[i] * t_Fij[i][j]) ** (1 - sigma) for i in ("A", "B")]) ** (1 / (1 - sigma)) for j in ("A", "B", "C")}

# expected demand for final goods
expected_x_Fij = {
    i: {j: (p_Fi[i] * t_Fij[i][j]) ** (-sigma) * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta)) for j in ("A", "B", "C")}
    for i in ("A", "B")
}

# expected demand for inputs
expected_X_Ihi = {
    h: {
        i: (p_Ih[h] * t_Ihi[h][i]) ** -sigma
        * sum(
            t_Fij[i][j] ** -sigma
            * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta))
            for j in ("A", "B", "C")
        )
        for i in ("A", "B")
    }
    for h in ("A", "B")
}

# n_h (number of entrants) for each h
n_h = {
    h: sum(expected_X_Ihi[h][i] for i in ("A", "B"))
    / ((sigma - 1) * (1 - e_h[h]) * kappa)
    * ((1 / expected_phi_h[h]) - z_h[h])
    for h in ("A", "B")
}

# expected survivors for each h
ns_h = {
    h: n_h[h] * expected_phi_h[h] for h in ("A", "B")
    for h in ("A", "B")
}

# expected tariff revenue
tariff_revenue = {
    j: sum(
        imtax_Fij.get(i, {}).get(j, 0) * expected_x_Fij.get(i, {}).get(j, 0)
        for i in ("A", "B") if i != j
    ) + (sum(
        imtax_Ihi.get(i, {}).get(j, 0) * expected_X_Ihi.get(i, {}).get(j, 0)
        for i in ("A", "B") if i != j
    ) if j in ("A", "B") else 0)
    + sum(
        extax_Fij.get(j, {}).get(i, 0) * expected_x_Fij.get(j, {}).get(i, 0)
        for i in ("A", "B") if j != i
    )
    + (sum(
        extax_Ihi.get(j, {}).get(i, 0) * expected_X_Ihi.get(j, {}).get(i, 0)
        for i in ("A", "B") if j != i
    ) if j in ("A", "B") else 0)
    for j in ("A", "B", "C")
}

# expected production/entry subsidy cost
subsidy_cost = {
    h: ns_h.get(h, 0) * e_h.get(h, 0) + z_h.get(h, 0) * sum(
        expected_X_Ihi.get(h, {}).get(i, 0) for i in ("A", "B")
    )
    for h in ("A", "B")
}

# outside good consumption
x_0j = {
    j: 1
    + tariff_revenue.get(j, 0)
    - subsidy_cost.get(j, 0)
    - bar_P_Fj.get(j, 0) ** (-theta / (1 - theta))
    for j in ("A", "B", "C")
}

# monte carlo setup
num_simulations = 40000
max_iterations = 1000
min_iterations = 5 # min before checking convergence
iteration = 0
results = []
cvar_results = []
tolerance = 0.01  # relative error tolerance
z_alpha = 1.96    # critical z-value for 95% confidence

# run the loop until convergence or max iterations
while iteration < max_iterations:
    iteration += 1   
    
    # generate risk factors
    D_h_A = 1 - beta.rvs(alpha_c["A"], beta_c["A"], size = num_simulations)
    D_h_B = 1 - beta.rvs(alpha_c["B"], beta_c["B"], size = num_simulations)
    fail_all_A = np.random.rand(num_simulations) < D_h_A
    fail_all_B = np.random.rand(num_simulations) < D_h_B
    phi_A = np.where(fail_all_A, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
    phi_B = np.where(fail_all_B, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
    
    # compute rho_i directly for all samples
    rho_i = {
        i: (np.sum(
            [
                (
                    ((phi_h / expected_phi_h[h]) ** ((sigma - 1) / sigma))
                    * (
                        (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                    )
                    * (t_Ihi[h][i] ** (1 - sigma))
                )
                for h, phi_h in zip(["A", "B"], [phi_A, phi_B])
            ],
            axis=0,
        )
        / np.sum(
            [
                (
                    (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                    * (t_Ihi[h][i] ** (1 - sigma))
                )
                for h in ["A", "B"]
            ],
            axis=0,
        )) ** (sigma / (sigma - 1))
        for i in ["A", "B"]
    }
    
    # compute utility U_j
    sum_term = {
        j: 
        (((rho_i["A"] * expected_x_Fij["A"][j]) ** ((sigma - 1) / sigma) + 
          (rho_i["B"] * expected_x_Fij["B"][j]) ** ((sigma - 1) / sigma)))
        for j in ("A", "B", "C")
    }
    
    U_j = {
        j: x_0j.get(j, 0) +
           (1 / theta) * (sum_term.get(j, 0) ** (theta * sigma / (sigma - 1)))
        for j in ("A", "B")
    }
    
    # Calculate VaR as a dictionary
    var_threshold = {
        j: np.percentile(U_j[j], (1 - cvar_alpha) * 100) for j in U_j.keys()
    }
    
    # Calculate CVaR as a dictionary
    cvar = {
        j: np.mean(U_j[j][U_j[j] <= var_threshold[j]]) for j in U_j.keys()
    }
    
    # append results
    cvar_results.append(cvar)
    results.append({j: np.mean(U_j[j]) for j in U_j.keys()})

    # extract utility and CVaR results for country "A"
    cvar_results_A = [cvar["A"] for cvar in cvar_results]
    results_A = [result["A"] for result in results]
    
    # calculate standard errors for country "A"
    cvar_sem_A = (
        np.std(cvar_results_A, ddof=1) / np.sqrt(len(cvar_results_A))
        if len(cvar_results_A) > 1
        else 0  # Use 0 if variance can't be computed
    )
    mean_sem_A = (
        np.std(results_A, ddof=1) / np.sqrt(len(results_A))
        if len(results_A) > 1
        else 0
    )
    
    # Calculate confidence interval widths for country "A"
    mean_ci_width_A = 2 * z_alpha * mean_sem_A
    cvar_ci_width_A = 2 * z_alpha * cvar_sem_A
    
    # Check convergence for country "A"
    mean_converged_A = (
        len(results_A) > min_iterations
        and (mean_ci_width_A / np.abs(np.mean(results_A))) < tolerance
    )
    cvar_converged_A = (
        len(cvar_results_A) > min_iterations
        and (cvar_ci_width_A / np.abs(np.mean(cvar_results_A))) < tolerance
    )
    
    if mean_converged_A and cvar_converged_A:
        print(f"Converged after {iteration} iterations.")
        break
else:
    print("Reached maximum iterations without convergence.")

# Extract utility and CVaR results for countries A and B
results_A = [result["A"] for result in results]
results_B = [result["B"] for result in results]
cvar_results_A = [cvar["A"] for cvar in cvar_results]
cvar_results_B = [cvar["B"] for cvar in cvar_results]

# Output final results
print(f"Overall mean utility (A): {np.mean(results_A):.3f}")
print(f"Overall mean utility (B): {np.mean(results_B):.3f}")
print(f"Mean CVaR (at α = {cvar_alpha}) (A): {np.mean(cvar_results_A):.3f}")
print(f"Mean CVaR (at α = {cvar_alpha}) (B): {np.mean(cvar_results_B):.3f}")

Converged after 68 iterations.
Overall mean utility (A): 9.579
Overall mean utility (B): 9.579
Mean CVaR (at α = 0.95) (A): 5.368
Mean CVaR (at α = 0.95) (B): 5.368


In [17]:
# tariff loop
# run the top two cells first to retrieve parameters

import pandas as pd

tariff_sequence = np.arange(0, 1.05, 0.05)
loop_results = []

for i in tariff_sequence:

    tariff_AB = i
    
    for j in tariff_sequence:

        tariff_BA = j

        # import tariff on inputs from h to i
        imtax_Ihi = {"A": {"A": 0.00, "B": tariff_AB}, 
                     "B": {"A": tariff_BA, "B": 0.00}}

        # trade costs for critical goods from A to (A, B)
        t_Fij = {
            i: {
                j: 1 if i == j else 1 + imtax_Fij.get(i, {}).get(j, 0) + extax_Fij.get(i, {}).get(j, 0) + tau
                for j in ("A", "B", "C") if j in imtax_Fij.get(i, {})
            }
            for i in ("A", "B", "C") if i in imtax_Fij
        }
        
        # trade costs for critical goods from A to (A, B)
        t_Ihi = {
            h: {
                i: 1 if h == i else 1 + imtax_Ihi.get(h, {}).get(i, 0) + extax_Ihi.get(h, {}).get(i, 0) + tau
                for i in ("A", "B", "C") if i in imtax_Ihi.get(h, {})
            }
            for h in ("A", "B") if h in imtax_Ihi
        }
        
        # calculate price of final goods (p_Fi) = input price index (bar_P_Ii)
        p_Fi = {i: np.sum([(p_Ih[h] * t_Ihi[h][i]) ** (1 - sigma) for h in ("A", "B")]) ** (1 / (1 - sigma)) for i in ("A", "B")}
        
        # calculate price index for final goods consumed in country j
        bar_P_Fj = {j: np.sum([(p_Fi[i] * t_Fij[i][j]) ** (1 - sigma) for i in ("A", "B")]) ** (1 / (1 - sigma)) for j in ("A", "B", "C")}
        
        # expected demand for final goods
        expected_x_Fij = {
            i: {j: (p_Fi[i] * t_Fij[i][j]) ** (-sigma) * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta)) for j in ("A", "B", "C")}
            for i in ("A", "B")
        }
        
        # expected demand for inputs
        expected_X_Ihi = {
            h: {
                i: (p_Ih[h] * t_Ihi[h][i]) ** -sigma
                * sum(
                    t_Fij[i][j] ** -sigma
                    * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta))
                    for j in ("A", "B", "C")
                )
                for i in ("A", "B")
            }
            for h in ("A", "B")
        }
        
        # n_h (number of entrants) for each h
        n_h = {
            h: sum(expected_X_Ihi[h][i] for i in ("A", "B"))
            / ((sigma - 1) * (1 - e_h[h]) * kappa)
            * ((1 / expected_phi_h[h]) - z_h[h])
            for h in ("A", "B")
        }
        
        # expected survivors for each h
        ns_h = {
            h: n_h[h] * expected_phi_h[h] for h in ("A", "B")
            for h in ("A", "B")
        }
        
        # expected tariff revenue
        tariff_revenue = {
            j: sum(
                imtax_Fij.get(i, {}).get(j, 0) * expected_x_Fij.get(i, {}).get(j, 0)
                for i in ("A", "B") if i != j
            ) + (sum(
                imtax_Ihi.get(i, {}).get(j, 0) * expected_X_Ihi.get(i, {}).get(j, 0)
                for i in ("A", "B") if i != j
            ) if j in ("A", "B") else 0)
            + sum(
                extax_Fij.get(j, {}).get(i, 0) * expected_x_Fij.get(j, {}).get(i, 0)
                for i in ("A", "B") if j != i
            )
            + (sum(
                extax_Ihi.get(j, {}).get(i, 0) * expected_X_Ihi.get(j, {}).get(i, 0)
                for i in ("A", "B") if j != i
            ) if j in ("A", "B") else 0)
            for j in ("A", "B", "C")
        }
        
        # expected production/entry subsidy cost
        subsidy_cost = {
            h: ns_h.get(h, 0) * e_h.get(h, 0) + z_h.get(h, 0) * sum(
                expected_X_Ihi.get(h, {}).get(i, 0) for i in ("A", "B")
            )
            for h in ("A", "B")
        }
        
        # outside good consumption
        x_0j = {
            j: 1
            + tariff_revenue.get(j, 0)
            - subsidy_cost.get(j, 0)
            - bar_P_Fj.get(j, 0) ** (-theta / (1 - theta))
            for j in ("A", "B", "C")
        }
        
        # monte carlo setup
        iteration = 0
        results = []
        cvar_results = []
        
        # run the loop until convergence or max iterations
        while iteration < max_iterations:
            iteration += 1   
            
            # generate risk factors
            D_h_A = 1 - beta.rvs(alpha_c["A"], beta_c["A"], size = num_simulations)
            D_h_B = 1 - beta.rvs(alpha_c["B"], beta_c["B"], size = num_simulations)
            fail_all_A = np.random.rand(num_simulations) < D_h_A
            fail_all_B = np.random.rand(num_simulations) < D_h_B
            phi_A = np.where(fail_all_A, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
            phi_B = np.where(fail_all_B, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
            
            # compute rho_i for all samples
            rho_i = {
                i: (np.sum(
                    [
                        (
                            ((phi_h / expected_phi_h[h]) ** ((sigma - 1) / sigma))
                            * (
                                (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                            )
                            * (t_Ihi[h][i] ** (1 - sigma))
                        )
                        for h, phi_h in zip(["A", "B"], [phi_A, phi_B])
                    ],
                    axis=0,
                )
                / np.sum(
                    [
                        (
                            (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                            * (t_Ihi[h][i] ** (1 - sigma))
                        )
                        for h in ["A", "B"]
                    ],
                    axis=0,
                )) ** (sigma / (sigma - 1))
                for i in ["A", "B"]
            }
            
            # compute utility U_j
            sum_term = {
                j: 
                (((rho_i["A"] * expected_x_Fij["A"][j]) ** ((sigma - 1) / sigma) + 
                  (rho_i["B"] * expected_x_Fij["B"][j]) ** ((sigma - 1) / sigma)))
                for j in ("A", "B", "C")
            }
            
            U_j = {
                j: x_0j.get(j, 0) +
                   (1 / theta) * (sum_term.get(j, 0) ** (theta * sigma / (sigma - 1)))
                for j in ("A", "B")
            }
            
            # Calculate VaR as a dictionary
            var_threshold = {
                j: np.percentile(U_j[j], (1 - cvar_alpha) * 100) for j in U_j.keys()
            }
            
            # Calculate CVaR as a dictionary
            cvar = {
                j: np.mean([u for u in U_j[j] if u <= var_threshold[j]]) for j in U_j.keys()
            }
            
            # Append results
            cvar_results.append(cvar)
            results.append({j: np.mean(U_j[j]) for j in U_j.keys()})
        
            # Extract utility and CVaR results for country "A"
            cvar_results_A = [cvar["A"] for cvar in cvar_results]
            results_A = [result["A"] for result in results]
            
            # calculate standard errors for country "A"
            cvar_sem_A = (
                np.std(cvar_results_A, ddof=1) / np.sqrt(len(cvar_results_A))
                if len(cvar_results_A) > 1
                else 0  # Use 0 if variance can't be computed
            )
            mean_sem_A = (
                np.std(results_A, ddof=1) / np.sqrt(len(results_A))
                if len(results_A) > 1
                else 0
            )
            
            # calculate confidence interval widths for country A
            mean_ci_width_A = 2 * z_alpha * mean_sem_A
            cvar_ci_width_A = 2 * z_alpha * cvar_sem_A
            
            # check convergence for country A
            mean_converged_A = (
                len(results_A) > min_iterations
                and (mean_ci_width_A / np.abs(np.mean(results_A))) < tolerance
            )
            cvar_converged_A = (
                len(cvar_results_A) > min_iterations
                and (cvar_ci_width_A / np.abs(np.mean(cvar_results_A))) < tolerance
            )
            
            if mean_converged_A and cvar_converged_A:
                # print(f"Converged after {iteration} iterations.")
                break
        else:
            print("Reached maximum iterations without convergence.")

        # after the monte carlo loop
        final_cvar = {k: np.mean([c[k] for c in cvar_results]) for k in cvar.keys()}
        final_utility = {k: np.mean([r[k] for r in results]) for k in results[0].keys()}
        
        # append to loop_results
        loop_results.append({
            "tariff_AB": tariff_AB,
            "tariff_BA": tariff_BA,
            "utility_A": final_utility["A"],
            "utility_B": final_utility["B"],
            "cvar_A": final_cvar["A"],
            "cvar_B": final_cvar["B"],
            "symmetric": 1 if tariff_AB == tariff_BA else 0
        })

# save csv
results_df = pd.DataFrame(loop_results)
results_df.to_csv("output/tariff_loop_asymmetric_risk.csv", index=False)

print("Results saved.")

KeyboardInterrupt: 

In [None]:
# export tax loop
# run the top two cells first to retrieve parameters

import pandas as pd

extax_sequence = np.arange(0, 1.05, 0.05)
ex_loop_results = []

for i in extax_sequence:

    extax_AB = i
    
    for j in extax_sequence:

        extax_BA = j

        # export tax on inputs from h to i
        extax_Ihi = {"A": {"A": 0.00, "B": extax_AB},
                     "B": {"A": extax_BA, "B": 0.00}}

        # trade costs for critical goods from A to (A, B)
        t_Fij = {
            i: {
                j: 1 if i == j else 1 + imtax_Fij.get(i, {}).get(j, 0) + extax_Fij.get(i, {}).get(j, 0) + tau
                for j in ("A", "B", "C") if j in imtax_Fij.get(i, {})
            }
            for i in ("A", "B", "C") if i in imtax_Fij
        }
        
        # trade costs for critical goods from A to (A, B)
        t_Ihi = {
            h: {
                i: 1 if h == i else 1 + imtax_Ihi.get(h, {}).get(i, 0) + extax_Ihi.get(h, {}).get(i, 0) + tau
                for i in ("A", "B", "C") if i in imtax_Ihi.get(h, {})
            }
            for h in ("A", "B") if h in imtax_Ihi
        }
        
        # calculate price of final goods (p_Fi) = input price index (bar_P_Ii)
        p_Fi = {i: np.sum([(p_Ih[h] * t_Ihi[h][i]) ** (1 - sigma) for h in ("A", "B")]) ** (1 / (1 - sigma)) for i in ("A", "B")}
        
        # calculate price index for final goods consumed in country j
        bar_P_Fj = {j: np.sum([(p_Fi[i] * t_Fij[i][j]) ** (1 - sigma) for i in ("A", "B")]) ** (1 / (1 - sigma)) for j in ("A", "B", "C")}
        
        # expected demand for final goods
        expected_x_Fij = {
            i: {j: (p_Fi[i] * t_Fij[i][j]) ** (-sigma) * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta)) for j in ("A", "B", "C")}
            for i in ("A", "B")
        }
        
        # expected demand for inputs
        expected_X_Ihi = {
            h: {
                i: (p_Ih[h] * t_Ihi[h][i]) ** -sigma
                * sum(
                    t_Fij[i][j] ** -sigma
                    * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta))
                    for j in ("A", "B", "C")
                )
                for i in ("A", "B")
            }
            for h in ("A", "B")
        }
        
        # n_h (number of entrants) for each h
        n_h = {
            h: sum(expected_X_Ihi[h][i] for i in ("A", "B"))
            / ((sigma - 1) * (1 - e_h[h]) * kappa)
            * ((1 / expected_phi_h[h]) - z_h[h])
            for h in ("A", "B")
        }
        
        # expected survivors for each h
        ns_h = {
            h: n_h[h] * expected_phi_h[h] for h in ("A", "B")
            for h in ("A", "B")
        }
        
        # expected tariff revenue
        tariff_revenue = {
            j: sum(
                imtax_Fij.get(i, {}).get(j, 0) * expected_x_Fij.get(i, {}).get(j, 0)
                for i in ("A", "B") if i != j
            ) + (sum(
                imtax_Ihi.get(i, {}).get(j, 0) * expected_X_Ihi.get(i, {}).get(j, 0)
                for i in ("A", "B") if i != j
            ) if j in ("A", "B") else 0)
            + sum(
                extax_Fij.get(j, {}).get(i, 0) * expected_x_Fij.get(j, {}).get(i, 0)
                for i in ("A", "B") if j != i
            )
            + (sum(
                extax_Ihi.get(j, {}).get(i, 0) * expected_X_Ihi.get(j, {}).get(i, 0)
                for i in ("A", "B") if j != i
            ) if j in ("A", "B") else 0)
            for j in ("A", "B", "C")
        }
        
        # expected production/entry subsidy cost
        subsidy_cost = {
            h: ns_h.get(h, 0) * e_h.get(h, 0) + z_h.get(h, 0) * sum(
                expected_X_Ihi.get(h, {}).get(i, 0) for i in ("A", "B")
            )
            for h in ("A", "B")
        }
        
        # outside good consumption
        x_0j = {
            j: 1
            + tariff_revenue.get(j, 0)
            - subsidy_cost.get(j, 0)
            - bar_P_Fj.get(j, 0) ** (-theta / (1 - theta))
            for j in ("A", "B", "C")
        }
        
        # monte carlo setup
        iteration = 0
        results = []
        cvar_results = []
        
        # run the loop until convergence or max iterations
        while iteration < max_iterations:
            iteration += 1   
            
            # generate risk factors
            D_h_A = 1 - beta.rvs(alpha_c["A"], beta_c["A"], size = num_simulations)
            D_h_B = 1 - beta.rvs(alpha_c["B"], beta_c["B"], size = num_simulations)
            fail_all_A = np.random.rand(num_simulations) < D_h_A
            fail_all_B = np.random.rand(num_simulations) < D_h_B
            phi_A = np.where(fail_all_A, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
            phi_B = np.where(fail_all_B, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
            
            # compute rho_i for all samples
            rho_i = {
                i: (np.sum(
                    [
                        (
                            ((phi_h / expected_phi_h[h]) ** ((sigma - 1) / sigma))
                            * (
                                (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                            )
                            * (t_Ihi[h][i] ** (1 - sigma))
                        )
                        for h, phi_h in zip(["A", "B"], [phi_A, phi_B])
                    ],
                    axis=0,
                )
                / np.sum(
                    [
                        (
                            (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                            * (t_Ihi[h][i] ** (1 - sigma))
                        )
                        for h in ["A", "B"]
                    ],
                    axis=0,
                )) ** (sigma / (sigma - 1))
                for i in ["A", "B"]
            }
            
            # compute utility U_j
            sum_term = {
                j: 
                (((rho_i["A"] * expected_x_Fij["A"][j]) ** ((sigma - 1) / sigma) + 
                  (rho_i["B"] * expected_x_Fij["B"][j]) ** ((sigma - 1) / sigma)))
                for j in ("A", "B", "C")
            }
            
            U_j = {
                j: x_0j.get(j, 0) +
                   (1 / theta) * (sum_term.get(j, 0) ** (theta * sigma / (sigma - 1)))
                for j in ("A", "B")
            }
            
            # Calculate VaR as a dictionary
            var_threshold = {
                j: np.percentile(U_j[j], (1 - cvar_alpha) * 100) for j in U_j.keys()
            }
            
            # Calculate CVaR as a dictionary
            cvar = {
                j: np.mean([u for u in U_j[j] if u <= var_threshold[j]]) for j in U_j.keys()
            }
            
            # Append results
            cvar_results.append(cvar)
            results.append({j: np.mean(U_j[j]) for j in U_j.keys()})
        
            # Extract utility and CVaR results for country "A"
            cvar_results_A = [cvar["A"] for cvar in cvar_results]
            results_A = [result["A"] for result in results]
            
            # calculate standard errors for country "A"
            cvar_sem_A = (
                np.std(cvar_results_A, ddof=1) / np.sqrt(len(cvar_results_A))
                if len(cvar_results_A) > 1
                else 0  # use 0 if variance can't be computed
            )
            mean_sem_A = (
                np.std(results_A, ddof=1) / np.sqrt(len(results_A))
                if len(results_A) > 1
                else 0
            )
            
            # calculate confidence interval widths for country A
            mean_ci_width_A = 2 * z_alpha * mean_sem_A
            cvar_ci_width_A = 2 * z_alpha * cvar_sem_A
            
            # check convergence for country A
            mean_converged_A = (
                len(results_A) > min_iterations
                and (mean_ci_width_A / np.abs(np.mean(results_A))) < tolerance
            )
            cvar_converged_A = (
                len(cvar_results_A) > min_iterations
                and (cvar_ci_width_A / np.abs(np.mean(cvar_results_A))) < tolerance
            )
            
            if mean_converged_A and cvar_converged_A:
                # print(f"Converged after {iteration} iterations.")
                break
        else:
            print("Reached maximum iterations without convergence.")

        # after the monte carlo loop
        final_cvar = {k: np.mean([c[k] for c in cvar_results]) for k in cvar.keys()}
        final_utility = {k: np.mean([r[k] for r in results]) for k in results[0].keys()}
        
        # append to ex_loop_results
        ex_loop_results.append({
            "extax_AB": extax_AB,
            "extax_BA": extax_BA,
            "utility_A": final_utility["A"],
            "utility_B": final_utility["B"],
            "cvar_A": final_cvar["A"],
            "cvar_B": final_cvar["B"],
            "symmetric": 1 if extax_AB == extax_BA else 0
        })

# save csv
results_df = pd.DataFrame(ex_loop_results)
results_df.to_csv("output/export_tax_loop_high_risk.csv", index=False)

print("Results saved as export_tax_loop_high_risk.csv")

In [37]:
# production subsidy loop
# run the top two cells first to retrieve parameters

import pandas as pd

# use a higher number here - variance gets big
max_iterations = 3000

sub_sequence = np.arange(0, 0.90, 0.05)
sub_loop_results = []

for i in sub_sequence:

    sub_A = i
    
    for j in sub_sequence:

        no_convergence = 0
        
        sub_B = j

        z_h = {"A": sub_A, "B": sub_B}
        
        # trade costs for critical goods from A to (A, B)
        t_Fij = {
            i: {
                j: 1 if i == j else 1 + imtax_Fij.get(i, {}).get(j, 0) + extax_Fij.get(i, {}).get(j, 0) + tau
                for j in ("A", "B", "C") if j in imtax_Fij.get(i, {})
            }
            for i in ("A", "B", "C") if i in imtax_Fij
        }
        
        # trade costs for critical goods from A to (A, B)
        t_Ihi = {
            h: {
                i: 1 if h == i else 1 + imtax_Ihi.get(h, {}).get(i, 0) + extax_Ihi.get(h, {}).get(i, 0) + tau
                for i in ("A", "B", "C") if i in imtax_Ihi.get(h, {})
            }
            for h in ("A", "B") if h in imtax_Ihi
        }

        # calculate input prices
        p_Ih = {
            "A": (sigma / (sigma - 1)) * (1 / expected_phi_h["A"] - z_h["A"]),
            "B": (sigma / (sigma - 1)) * (1 / expected_phi_h["B"] - z_h["B"]),
        }
        
        # calculate price of final goods (p_Fi) = input price index (bar_P_Ii)
        p_Fi = {i: np.sum([(p_Ih[h] * t_Ihi[h][i]) ** (1 - sigma) for h in ("A", "B")]) ** (1 / (1 - sigma)) for i in ("A", "B")}
        
        # calculate price index for final goods consumed in country j
        bar_P_Fj = {j: np.sum([(p_Fi[i] * t_Fij[i][j]) ** (1 - sigma) for i in ("A", "B")]) ** (1 / (1 - sigma)) for j in ("A", "B", "C")}
        
        # expected demand for final goods
        expected_x_Fij = {
            i: {j: (p_Fi[i] * t_Fij[i][j]) ** (-sigma) * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta)) for j in ("A", "B", "C")}
            for i in ("A", "B")
        }
        
        # expected demand for inputs
        expected_X_Ihi = {
            h: {
                i: (p_Ih[h] * t_Ihi[h][i]) ** -sigma
                * sum(
                    t_Fij[i][j] ** -sigma
                    * bar_P_Fj[j] ** ((sigma * (1 - theta) - 1) / (1 - theta))
                    for j in ("A", "B", "C")
                )
                for i in ("A", "B")
            }
            for h in ("A", "B")
        }
        
        # n_h (number of entrants) for each h
        n_h = {
            h: sum(expected_X_Ihi[h][i] for i in ("A", "B"))
            / ((sigma - 1) * (1 - e_h[h]) * kappa)
            * ((1 / expected_phi_h[h]) - z_h[h])
            for h in ("A", "B")
        }
        
        # expected survivors for each h
        ns_h = {
            h: n_h[h] * expected_phi_h[h] for h in ("A", "B")
            for h in ("A", "B")
        }
        
        # expected tariff revenue
        tariff_revenue = {
            j: sum(
                imtax_Fij.get(i, {}).get(j, 0) * expected_x_Fij.get(i, {}).get(j, 0)
                for i in ("A", "B") if i != j
            ) + (sum(
                imtax_Ihi.get(i, {}).get(j, 0) * expected_X_Ihi.get(i, {}).get(j, 0)
                for i in ("A", "B") if i != j
            ) if j in ("A", "B") else 0)
            + sum(
                extax_Fij.get(j, {}).get(i, 0) * expected_x_Fij.get(j, {}).get(i, 0)
                for i in ("A", "B") if j != i
            )
            + (sum(
                extax_Ihi.get(j, {}).get(i, 0) * expected_X_Ihi.get(j, {}).get(i, 0)
                for i in ("A", "B") if j != i
            ) if j in ("A", "B") else 0)
            for j in ("A", "B", "C")
        }
        
        # expected production/entry subsidy cost
        subsidy_cost = {
            h: ns_h.get(h, 0) * e_h.get(h, 0) + z_h.get(h, 0) * sum(
                expected_X_Ihi.get(h, {}).get(i, 0) for i in ("A", "B")
            )
            for h in ("A", "B")
        }
        
        # outside good consumption
        x_0j = {
            j: 1
            + tariff_revenue.get(j, 0)
            - subsidy_cost.get(j, 0)
            - bar_P_Fj.get(j, 0) ** (-theta / (1 - theta))
            for j in ("A", "B", "C")
        }
        
        # monte carlo setup
        iteration = 0
        results = []
        cvar_results = []
        
        # run the loop until convergence or max iterations
        while iteration < max_iterations:
            iteration += 1   
            
            # generate risk factors
            D_h_A = 1 - beta.rvs(alpha_c["A"], beta_c["A"], size = num_simulations)
            D_h_B = 1 - beta.rvs(alpha_c["B"], beta_c["B"], size = num_simulations)
            fail_all_A = np.random.rand(num_simulations) < D_h_A
            fail_all_B = np.random.rand(num_simulations) < D_h_B
            phi_A = np.where(fail_all_A, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
            phi_B = np.where(fail_all_B, 0, beta.rvs(alpha_i, beta_i, size = num_simulations))
            
            # compute rho_i for all samples
            rho_i = {
                i: (np.sum(
                    [
                        (
                            ((phi_h / expected_phi_h[h]) ** ((sigma - 1) / sigma))
                            * (
                                (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                            )
                            * (t_Ihi[h][i] ** (1 - sigma))
                        )
                        for h, phi_h in zip(["A", "B"], [phi_A, phi_B])
                    ],
                    axis=0,
                )
                / np.sum(
                    [
                        (
                            (1 / expected_phi_h[h] - z_h[h]) ** (1 - sigma)
                            * (t_Ihi[h][i] ** (1 - sigma))
                        )
                        for h in ["A", "B"]
                    ],
                    axis=0,
                )) ** (sigma / (sigma - 1))
                for i in ["A", "B"]
            }
            
            # compute utility U_j
            sum_term = {
                j: 
                (((rho_i["A"] * expected_x_Fij["A"][j]) ** ((sigma - 1) / sigma) + 
                  (rho_i["B"] * expected_x_Fij["B"][j]) ** ((sigma - 1) / sigma)))
                for j in ("A", "B", "C")
            }
            
            U_j = {
                j: x_0j.get(j, 0) +
                   (1 / theta) * (sum_term.get(j, 0) ** (theta * sigma / (sigma - 1)))
                for j in ("A", "B")
            }
            
            # Calculate VaR as a dictionary
            var_threshold = {
                j: np.percentile(U_j[j], (1 - cvar_alpha) * 100) for j in U_j.keys()
            }
            
            # Calculate CVaR as a dictionary
            cvar = {
                j: np.mean([u for u in U_j[j] if u <= var_threshold[j]]) for j in U_j.keys()
            }
            
            # Append results
            cvar_results.append(cvar)
            results.append({j: np.mean(U_j[j]) for j in U_j.keys()})
        
            # Extract utility and CVaR results for country "A"
            cvar_results_A = [cvar["A"] for cvar in cvar_results]
            results_A = [result["A"] for result in results]
            
            # calculate standard errors for country "A"
            cvar_sem_A = (
                np.std(cvar_results_A, ddof=1) / np.sqrt(len(cvar_results_A))
                if len(cvar_results_A) > 1
                else 0  # use 0 if variance can't be computed
            )
            mean_sem_A = (
                np.std(results_A, ddof=1) / np.sqrt(len(results_A))
                if len(results_A) > 1
                else 0
            )
            
            # calculate confidence interval widths for country A
            mean_ci_width_A = 2 * z_alpha * mean_sem_A
            cvar_ci_width_A = 2 * z_alpha * cvar_sem_A
            
            # check convergence for country A
            mean_converged_A = (
                len(results_A) > min_iterations
                and (mean_ci_width_A / np.abs(np.mean(results_A))) < tolerance
            )
            cvar_converged_A = (
                len(cvar_results_A) > min_iterations
                and (cvar_ci_width_A / np.abs(np.mean(cvar_results_A))) < tolerance
            )
            
            if mean_converged_A and cvar_converged_A:
                # print(f"Converged after {iteration} iterations.")
                break
        else:
            print("Reached maximum iterations without convergence.")
            no_convergence = 1

        # after the monte carlo loop
        final_cvar = {k: np.mean([c[k] for c in cvar_results]) for k in cvar.keys()}
        final_utility = {k: np.mean([r[k] for r in results]) for k in results[0].keys()}
        
        # append to ex_loop_results
        sub_loop_results.append({
            "sub_A": sub_A,
            "sub_B": sub_B,
            "utility_A": final_utility["A"],
            "utility_B": final_utility["B"],
            "cvar_A": final_cvar["A"],
            "cvar_B": final_cvar["B"],
            "symmetric": 1 if sub_A == sub_B else 0,
            "no_convergence": no_convergence,
        })

# save csv
results_df = pd.DataFrame(sub_loop_results)
results_df.to_csv("output/prod_subsidy_loop_high_risk.csv", index=False)

print("Results saved as prod_subsidy_loop_high_risk.csv")

Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Reached maximum iterations without convergence.
Results saved as prod_subsidy_loop_high_