In [None]:
# Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestRegressor
import doubleml as dml
import patsy

# Define random state
np.random.seed(123)
rs = 123

In [None]:
# Import fullly preprocessed dataset
filepath = "../data_processing/processed_merged_data/full_dataset.csv"
df = pd.read_csv(filepath)

In [None]:
# Log transform GVA_Real_Density and Tax_D_Real
df['GVA_Real_Density'] = np.log(df['GVA_Real_Density'])
df['Tax_D_Real'] = np.log(df['Tax_D_Real'])

In [None]:
# Define the cutoff values based on the propensity score analysis
tax_cutoff = 7.2 # Justify with generalised propensity score
gva_cutoff = 11 # Justify with scatter plot

# Find LANMs that are outside of range consistently
outliers = df.groupby('LANM').filter(
    lambda x: x['Tax_D_Real'].mean() < tax_cutoff and 
    x['GVA_Real_Density'].mean() > gva_cutoff
)

outliers = outliers['LANM'].unique()
print(f"List of Outlier LAs: {outliers}")

# Remove outliers from the dataset
df = df[~df['LANM'].isin(outliers)]

# Calculate total missing rows
num_years = df['Year'].nunique()
dropped_rows = len(outliers) * num_years
print(f"Number of rows dropped: {dropped_rows}")
print(f"Rows of data remaining: {df.shape[0]}")

In [None]:
# Remove alternate outcome "GVA_Real"
ml_df = df.drop(columns=['GVA_Real'])

# Remove all cols with GDHI in the name (mediators)
ml_df = ml_df.loc[:, ~ml_df.columns.str.contains('GDHI')]

# Remove non time-varying variables
ml_df = ml_df.drop(columns=['Area', 'Region', 'Authority_Type'])

# Remove id name columns
ml_df = ml_df.drop(columns=['LANM'])

# Duplicate year column
ml_df['Year_Group'] = ml_df['Year'].astype('str')

# Duplicate LACD column
ml_df['LACD_Group'] = ml_df['LACD'].astype('str')

# Create one hot encoding for fixed effects
ml_df = pd.get_dummies(ml_df, columns=['LACD'])

# Show columns and number of columns
print(ml_df.columns)
print(len(ml_df.columns))

In [None]:
# Define Nuisance Parameter Learners
ml_l =  RandomForestRegressor(random_state=rs)
ml_m = RandomForestRegressor(random_state=rs)

# Define two-dimensional clustering
clusters = ['LACD_Group', 'Year_Group']

# Create DoubleML cluster data object
obj_dml_data = dml.DoubleMLClusterData(ml_df,
                                       'GVA_Real_Density',
                                       'Tax_D_Real',
                                       clusters)

# Create DoubleML Partially Linear Regression object
dml_plr_obj = dml.DoubleMLPLR(obj_dml_data, ml_l, ml_m)

# Define parameter grids for tuning
par_grids = {'ml_l': {'n_estimators': [300, 400, 600, 800],
                     'max_features': [0.2, 0.5, 0.8],
                     'max_depth': [ None, 30, 40, 50],
                     'min_samples_split': [2, 4, 8],
                     'min_samples_leaf': [1, 2, 5]},
            'ml_m': {'n_estimators': [400, 600, 800, 1000],
                     'max_features': [0.2, 0.5, 0.8],
                     'max_depth': [ None, 30, 40, 50],
                     'min_samples_split': [2, 4, 8],
                     'min_samples_leaf': [1, 2, 5]}}

# Tune hyperparameters
dml_plr_obj.tune(par_grids, search_mode='randomized_search', 
                 n_iter_randomized_search=50)

# Print optimal hyperparameters
print(dml_plr_obj.params)

In [None]:
# Fit DML model
print(dml_plr_obj.fit())

In [None]:
# Create spline basis functions from df
spline_basis = patsy.dmatrix(
    "bs(GDHI_Total_Real_Density, df=5, degree=3)", df)

# Convert to DataFrame for easier handling
spline_basis_df = pd.DataFrame(spline_basis, 
                               columns=spline_basis.design_info.column_names)

# Estimate CATE using previously trained DML model
cate = dml_plr_obj.cate(spline_basis_df)
print(cate)

# Create a new grid for GDHI_Total_Real_Density for evaluation
new_data = {"GDHI_Total_Real_Density": np.linspace(
    df['GDHI_Total_Real_Density'].min(),
    df['GDHI_Total_Real_Density'].max(), 100)}

# Use the same design to create basis functions for the new grid
spline_grid = pd.DataFrame(patsy.build_design_matrices(
    [spline_basis.design_info], new_data)[0])

# Ensure columns in the new grid match the original basis
spline_grid.columns = spline_basis_df.columns

# Calculate confidence intervals via bootstrapping
df_cate = cate.confint(spline_grid, level=0.95, joint=True, n_rep_boot=2000)
print(df_cate)

# Visualise results
plt.rcParams['figure.figsize'] = 10., 7.5
plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['font.size'] = 18

df_cate['GDHI_Total_Real_Density'] = new_data['GDHI_Total_Real_Density']
fig, ax = plt.subplots()

ax.set_axisbelow(True)
ax.grid(True, axis='y', linestyle='-', linewidth=0.5, color='lightgray')

# Plot estimated effect with higher zorder
ax.plot(df_cate['GDHI_Total_Real_Density'], 
        df_cate['effect'], 
        label='Estimated Elasticity', 
        color='black', 
        linewidth=2, 
        zorder=3)

ax.fill_between(df_cate['GDHI_Total_Real_Density'], 
                df_cate['2.5 %'], 
                df_cate['97.5 %'], 
                color='cornflowerblue', 
                alpha=.3, 
                label='Confidence Interval (95%)', 
                zorder=2)

# Remove borders
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.spines['bottom'].set_visible(False)

# Adjust tick parameters
ax.tick_params(axis='both', which='major', labelsize=18)
ax.xaxis.set_major_locator(plt.MaxNLocator(5))

# Set labels and title
plt.xlabel('Gross Domestic Household Income Per Capita (£, Real)', fontsize=18)
plt.ylabel('Elasticity', fontsize=18)

# Define legend parameters
plt.legend(frameon=False, loc='upper left')

# Save the plot
plt.savefig('./figures/elasticity_plot.png', dpi=300, bbox_inches='tight')

# Display the plot
plt.show()