In [1]:
from pandas import DataFrame
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from mitoolspro.project import Project
from mitoolspro.utils.objects import StringMapper
from mitoolspro.regressions.wrappers.linear_models import QuantilesRegressionSpecs
from mitoolspro.regressions.managers import QuantilesRegression
import mitoolspro as mtp

In [None]:
pr = Project.load(auto_load=True)
str_mapper = StringMapper.load_mappings(pr.get_path("string_map"))

In [None]:
data_name = pr.get_path('final_data')
data = pd.read_parquet(data_name).reset_index()
data = data.set_index(['Year', 'Country', 'Continent', 'Income Group', 'Current Income Group'])
data

In [None]:
sci_cols = [c for c in data.columns if 'SCI' in c and '_square' not in c]
sci_cols

In [None]:
data[sci_cols].loc[data.index.get_level_values('Year') == 2020, "Textiles & Wearing Apparel SCI"].sort_values(ascending=False).head(10)

In [6]:
regression_type = 'ols'
regression_degree = 'quadratic'
eci_tag = 'SCI'
levels_var = 'Income Group'
id_var = 'Country'
dependent_variable = 'CO2 emissions (metric tons per capita)'
independent_variables = [v for v in data.columns if v.endswith(f' {eci_tag}')]
if regression_degree == 'quadratic':
    independent_variables = [f'{v}_square' for v in independent_variables] + independent_variables
control_variables = [
     'Economic Globalisation',
     'Energy Consumption per Capita',
     'Environmental Patents per Capita',
     'Renewable energy consumption (% of total final energy consumption)',
     'Total natural resources rents (% of GDP)',
     'Urban population (% of total population)',
 ]

In [None]:
independent_variables

In [8]:
transformations = {
    'CO2 emissions (metric tons per capita)': 'log',
}

In [9]:
def get_transformed_variable(data: DataFrame, var: str, transformation: str | None = None) -> str:
    if transformation in ["log", "square", "boxcox"]:
        if f"{var}_{transformation}" in data.columns:
            return f"{var}_{transformation}"
        else:
            raise ValueError(f"{var}_{transformation} not in data columns")
    if transformation is None:
        return var
    else:
        raise NotImplementedError(f"{transformation} not implemented")

In [10]:
regression_dep_var = get_transformed_variable(data, dependent_variable, transformations.get(dependent_variable, None))
regression_indep_vars = independent_variables
regression_indep_vars.sort()
regression_control_vars = [get_transformed_variable(data, v, transformations.get(v, None)) for v in control_variables]
regression_control_vars.sort()

In [11]:
regression_data = data[[regression_dep_var] + regression_indep_vars + regression_control_vars].dropna(axis=0, how='any')
regression_data_groups_mask = regression_data.index.to_frame().isna().any(axis=1)
regression_data = regression_data[~regression_data_groups_mask]

In [None]:
regression_data

In [None]:
n_axes = len(regression_control_vars)
fig, axes = plt.subplots(3, 2, figsize=(8 * 2, 8 * 3))
colors = sns.color_palette('husl', n_axes)
for x, ax in zip(regression_control_vars, axes.flat):
    ax = regression_data[[x, regression_dep_var]].plot(kind='scatter', x=x, y=regression_dep_var, ax=ax, s=1.5, color=colors.pop(0))
    ax.set_title(x)

In [14]:
regression_data.columns = str_mapper.uglify_strs(regression_data.columns)
dep_var = str_mapper.uglify_str(regression_dep_var)
indep_vars = str_mapper.uglify_strs(regression_indep_vars)
control_vars = str_mapper.uglify_strs(regression_control_vars)

In [15]:
income_levels = ['Low income', 'Lower middle income', 'Upper middle income', 'High income', 'All income']
quantiles = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]

In [None]:
regression_data

In [None]:
t_values = True
error_method = "robust"
max_iter = 10_000
regressions_info = {}
regressions = {}
regressions_coefficients = {}
regressions_stats = {}
regressions_resids = {}

for group in income_levels:
    print(group)

    group_data = regression_data.loc[regression_data.index.get_level_values('Income Group') == group] if group != 'All income' else regression_data

    regression_info = QuantilesRegressionSpecs(
        dependent_variable=dep_var,
        independent_variables=indep_vars,
        quantiles=quantiles,
        quadratic=True,
        data=group_data,
        group=group,
        control_variables=control_vars,
    )
    regressions_info[group] = regression_info

    regression = QuantilesRegression(specs=regression_info, max_iter=max_iter, error_method=error_method, t_values=t_values, str_mapper=str_mapper)
    regressions[group] = regression
    regressions_coefficients[group] = regression.coeffs
    regressions_stats[group] = regression.stats
    regressions_resids[group] = regression.residuals

In [18]:
regression_id = regressions_info[list(regressions_info.keys())[0]].regression_id
mtp.files.store_pkl(regressions_info, pr.get_path('regressions') / f'{regression_id}_regressions_info.pkl')
mtp.files.store_pkl(regressions, pr.get_path('regressions') / f'{regression_id}_regressions.pkl')
mtp.files.store_pkl(regressions_coefficients, pr.get_path('regressions') / f'{regression_id}_regressions_coefficients.pkl')
mtp.files.store_pkl(regressions_stats, pr.get_path('regressions') / f'{regression_id}_regressions_stats.pkl')
mtp.files.store_pkl(regressions_resids, pr.get_path('regressions') / f'{regression_id}_regressions_resids.pkl')

In [None]:
for group, reg in regressions.items():
    print(f"{group}:\n")
    reg.coefficients_quantiles_latex_table()

In [None]:
coefficients = pd.concat([reg.coefficients_quantiles_table().stack("Quantile", future_stack=True) for reg in regressions.values()], axis=1)
coefficients.index = coefficients.index.map(lambda x: (*x[:2], 'quadratic', *x[3:]))

with pd.option_context('display.max_rows', None):
    display(coefficients)

In [None]:
all_income_coeff = coefficients[['All income']]
all_income_coeff = all_income_coeff.droplevel([0, 1, 2, 3, 4])
variables_order = all_income_coeff.index.get_level_values('Independent Vars').unique()
all_income_coeff = all_income_coeff.unstack('Quantile')
all_income_coeff = all_income_coeff.loc[variables_order, :]
all_income_coeff.to_excel(pr.get_path('results') / f'{eci_tag}_All_income_Coeffs_results.xlsx')
all_income_coeff

In [22]:
for income_level in ['High income', 'Upper middle income', 'Lower middle income', 'Low income']:
    income_coeff = coefficients[[income_level]]
    income_coeff = income_coeff.droplevel([0, 1, 2, 3, 4])
    variables_order = income_coeff.index.get_level_values('Independent Vars').unique()
    income_coeff = income_coeff.unstack('Quantile')
    income_coeff = income_coeff.loc[variables_order, :]
    income_coeff.to_excel(pr.get_path('results') / f'{eci_tag}_{income_level}_Coeffs_results.xlsx')    

In [None]:
regression_id = coefficients.index.get_level_values(0).unique()[0]
indep_vars = coefficients.loc[(coefficients.index.get_level_values(0) == regression_id) & (coefficients.index.get_level_values(4) == 'Exog')].index.get_level_values(-2).unique().tolist()
indep_vars = [var for var in indep_vars if '_square' not in var]
indep_vars

In [24]:
regression_data.columns = str_mapper.prettify_strs(regression_data.columns)

In [None]:
regression_data

In [26]:
regression_data.to_parquet(pr.get_path('data') / f'{regression_id}_regression_data.parquet')

#### Make Plot

In [27]:
color_palette = sns.color_palette('Paired')
colors = {
    'Agriculture': color_palette[2],
    'Electronics & Instruments': color_palette[6],
    'Fishing': color_palette[1],
    'Food & Beverages': color_palette[5],
    'Iron & Steel': '#484a49',
    'Machinery': color_palette[8],
    'Metal Products': color_palette[7],
    'Mining & Quarrying': color_palette[11],
    'Other Manufacturing': color_palette[10],
    'Petroleum, Chemicals & Non-Metals': color_palette[9],
    'Textiles & Wearing Apparel': color_palette[4],
    'Transport Equipment': color_palette[0],
    'Wood & Paper': color_palette[3]
}

income_plot_kwargs = {
    'All income': {
        'color': 'k',
        'linestyle': '--',
        'linewidth': 2.0,
    },
    'High income': {
        'color': 'darkgrey',
        'linestyle': ':',
        'linewidth': 2.0,
    },
    'Upper middle income': {
        'color': 'darkgrey',
        'linestyle': ':',
        'linewidth': 2.0,
    },
    'Lower middle income': {
        'color': 'darkgrey',
        'linestyle': '-',
        'linewidth': 2.0,
    },
    'Low income': {
        'color': 'darkgrey',
        'linestyle': '-.',
        'linewidth': 2.0,
    }
}

significance_plot_kwargs = {
 '***,***': {
    'color': 'k',
    'linestyle': '-',
    'linewidth': 2.0,
        },
 '***,**': {
    'color': 'g',
    'linestyle': '--',
    'linewidth': 1.5,
        },
 '**,***': {
     'color': 'r',
     'linestyle': '--',
     'linewidth': 1.5,
     },
 '**,**': {
     'color': 'g',
     'linestyle': '-.',
     'linewidth': 1.5,
        },
 '***,*': {
     'color': 'b',
     'linestyle': (0, (3, 5, 1, 5)),
     'linewidth': 1.5,
     },
 '*,***': {
     'color': 'r',
     'linestyle': (0, (3, 5, 1, 5)),
     'linewidth': 1.5,
     },
 '***': {
     'color': 'k',
     'linestyle': '-',
     'linewidth': 2.0,
     },
 '**': {
     'color': 'k',
     'linestyle': '-.',
     'linewidth': 1.5,
     },
 '*': {
     'color': 'k',
     'linestyle': ':',
     'linewidth': 1.5,
        },
 '***,-': {
     'color': 'k',
     'linestyle': ':',
     'linewidth': 1.0
     },
 '**,*': {
     'color': 'k',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '**,-': {
     'color': 'k',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '*,*': {
     'color': 'b',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '*,**': {
     'color': 'g',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '*,-': {
     'color': 'k',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '-': {
     'color': 'k',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '-,*': {
     'color': 'b',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '-,**': {
     'color': 'g',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '-,***': {
     'color': 'r',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 '-,-': {
     'color': 'k',
     'linestyle': ':',
     'linewidth': 1.0,
     },
 }

income_colors = {
            'High income': 'white',
            'Upper middle income': (175/255.0, 175/255.0, 175/255.0),
            'Lower middle income': (90/255.0, 90/255.0, 90/255.0),
            'Low income': 'k',
        }

In [None]:
mt.reg.create_regression_plots(
    data = regression_data,
    regression_coeffs = coefficients,
    regression_id = regression_id,
    dependent_variable = regression_dep_var,
    independent_variables = indep_vars,
    name_tag = 'Regression Results',
    groups = income_levels,
    all_groups = 'All income',
    folder = pr.vars['figures'],
    groups_col = "Income Group",
    entity_col = "Country",
    time_col = "Year",
    figsize = (12, 10),
    marker_kwargs = None,
    annotation_kwargs = None,
    text_x_offset = 0.0025,
    adjust_axes_lims_kwargs = None,
    significance_plot_kwargs = significance_plot_kwargs,
    labels_fontsize = 16,
    indep_vars_colors = list(colors.values()),
    groups_colors = income_colors,
    quantiles = None,
    recalculate = True,
)

***