In [None]:
# general
import time
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import warnings
from importlib import reload

# modeling
import statsmodels
import statsmodels.api as sm
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import log_loss
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import shap

# custom
from scripts import HyperparameterTuning as HT
from scripts import Plots, Metrics

# Load and pre-process data

In [None]:
# options
env_path = 'data/environmental_data.csv'
bat_path = 'data/bat-level_data.csv'
random_state = 1337
dataset = 'env' # 'env', 'bat'
target = 'shortage'

# load configs
env_features = []
with open('config/env_features.txt', 'r') as f:
    for line in f:
        env_features.append(line.strip())
bat_features = []
with open('config/bat_features.txt', 'r') as f:
    for line in f:
        bat_features.append(line.strip())
rename = {}
with open('config/rename.tsv', 'r') as f:
    for line in f:
        line = line.strip().split('\t')
        rename[line[0]] = line[1]
    rename['Intercept'] = 'Intercept'

# book keeping
features = env_features if dataset == 'env' else bat_features

# load both datasets, sort, and merge
df_env = pd.read_csv(env_path).sort_values(
    by=['cal_year', 'cal_month'], ascending=[True, True])
df_bat = pd.read_csv(bat_path).sort_values(
    by=['cal_year', 'cal_month'], ascending=[True, True])
df = pd.merge(df_env, df_bat, on=['cal_year', 'cal_month'], how='outer')

# convert categories to binary and drop missing
df = df.replace('shortage', 1).replace('not_shortage', 0)
df = df.dropna(subset=[target])
df[target] = df[target].astype(int)

# include season as additional feature
to_season = {
    12: 1,  1: 1,  2: 1,
     3: 2,  4: 2,  5: 2,
     6: 3,  7: 3,  8: 3,
     9: 4, 10: 4, 11: 4,}
df['cal_season'] = df['cal_month'].apply(lambda x: to_season[x]).astype(float)

# drop rows with >50% missing features
df = df.dropna(thresh=0.5*len(features), subset=features)
df_missing = df.copy()

# impute missing values
n_missing = df[features].isna().sum().sum()
n_total = len(df) * len(features)
imp = IterativeImputer(max_iter=100, random_state=random_state)
df[features] = imp.fit_transform(df[features])

# start integer features from 0 and convert to categorical
for feature in ['cal_year', 'cal_month']:
    df[feature] = df[feature].astype(int)
for feature in ['cal_season', 'cal_month']:
    df[feature] = (df[feature] - df[feature].min()).astype('category')

# reset df index starting from 0
df = df.reset_index(drop=True)

# summary
print('Missing proportion: {:.2f}% values'.format(n_missing / n_total * 100))
print('Data shape: {} months, {} features'.format(*df[features].shape))
print('Food shortages: {} / {} ({:.2f}%)'.format(
    df[target].sum(), len(df), df[target].sum() / len(df) * 100))

# Split data into train/val/test sets

In [None]:
# options
rolling_forecast = True
start_year = df['cal_year'].min() + 4
test_year = 2018
split_size = 2

# convert dataframes to numpy arrays for modeling
X, y = df[features].values, df[target].values
years = df['cal_year'].values

# get indices of training and validation sets
split_years = np.arange(start_year, test_year, split_size)
if rolling_forecast:
    train_idx = np.argwhere(years < test_year).flatten()
else:
    train_idx = np.argwhere((years >= test_year - 4) & (years < test_year)).flatten()
train_idxs, val_idxs = [], []
for i, sy in enumerate(split_years):
    if rolling_forecast:
        train_idxs.append(np.argwhere(years < sy).flatten())
    else:
        train_idxs.append(np.argwhere((years >= sy-4) & (years < sy)).flatten())
    val_idxs.append(np.argwhere((years >= sy) * (years < sy+split_size)).flatten())
test_idx = np.argwhere(years >= test_year).flatten()

# compute train/val score weights
weights = [len(t_idx) for t_idx in train_idxs]

# plot train/val/test splits
Plots.cv_splits(
    years, 
    cv_splits=[train_idxs, val_idxs, test_idx], 
    dataset=dataset)

# Train GLM using hyperparameter grid search

In [None]:
# suppress PerfectSeparationWarning from statsmodels
warnings.filterwarnings(
    "ignore", 
    category=statsmodels.tools.sm_exceptions.PerfectSeparationWarning)

# custom GLM classifier mimicking LightGBMClassifier
class GLMClassifier(BaseEstimator, ClassifierMixin):

    def __init__(
        self,
        add_intercept=True,
        max_iter=100,
        random_state=42
    ):
        
        # initialize
        self.add_intercept = add_intercept
        self.max_iter = max_iter
        self.random_state = random_state
        self.result_ = None
        self.eps = np.sqrt(np.finfo(float).eps)

        # copy lgb evaluation metric for hyperparameter tuning script
        self.evals_result_ = {'valid_0': {'binary_logloss': []}}

    def fit(self, X, y, eval_set=None, **kwargs):
        
        # optional intercept
        X = sm.add_constant(X, prepend=True) if self.add_intercept else X

        # logistic regression GLM
        model = sm.GLM(
            y,
            X,
            family=sm.families.Binomial()
        )

        # fit GLM
        self.result_ = model.fit(maxiter=self.max_iter, **kwargs)

        # validation eval (if provided)
        if eval_set is not None:
            X_val, y_val = eval_set
            X_val = sm.add_constant(X_val, prepend=True) if self.add_intercept else X_val
            val_loss = log_loss(y_val, self.predict_proba(X_val)[:, 1], labels=[0, 1])
        else:
            val_loss = np.nan
        self.evals_result_['valid_0']['binary_logloss'].append(val_loss)

        return self

    def predict_proba(self, X):
        
        # preprocess
        X = sm.add_constant(X, prepend=True) if self.add_intercept else X

        # logistic regression returns p = P(y=1)
        p = self.result_.predict(X)
        p = np.clip(p, self.eps, 1 - self.eps)  # numeric stability
        return np.column_stack([1 - p, p])

    def predict(self, X, threshold=0.5):
        probs = self.predict_proba(X)[:, 1]
        return (probs >= threshold).astype(int)

    def evaluate(self, X, y):
        
        # check previously stored validation losses
        val_losses = self.evals_result_['valid_0']['binary_logloss']
        valid_numeric_losses = [v for v in val_losses if not np.isnan(v)]

        # compute log loss on given data if no valid logs are found
        if len(valid_numeric_losses) > 0:
            return min(valid_numeric_losses)
        else:
            if self.add_intercept:
                X = sm.add_constant(X, prepend=True)
            p = self.predict_proba(X)[:, 1]
            return log_loss(y, p, labels=[0, 1])

In [None]:
#
# tune hyperparameters
#

model_class = GLMClassifier

# hyperparameter grid
model_hyper = {
    'add_intercept': [True, False],
    'max_iter': list(np.arange(100)),
}
model_fixed = {
    'random_state': 42,
}

# Training hyperparams can remain empty if you have no extra parameters
train_hyper = {}
train_fixed = {}

model_best, train_best, model_list, score_list = HT.hyperparameter_tuning(
    model_class=model_class,
    model_hyper=model_hyper,
    model_fixed=model_fixed,
    train_hyper=train_hyper,
    train_fixed=train_fixed,
    x=X,
    y=y,
    train_indices=train_idxs,
    val_indices=val_idxs,
    weights=None,
    verbose=True,
)

print("Best model hyperparameters:", model_best)
print("Best training hyperparameters:", train_best)

# Plot model predictions

In [None]:
reload(Plots)

def predict(train_idx, val_idx):
    
    # fit model
    model = model_class(
        **model_best, 
        **model_fixed,
    ).fit(
        X=X[train_idx],
        y=y[train_idx],
        eval_set=(X[val_idx], y[val_idx]),
        **train_best,
        **train_fixed,
    )

    # predict probabilities
    y_pred_train = model.predict_proba(
        X[train_idx], 
    )[:, 1]
    y_pred_val = model.predict_proba(
        X[val_idx], 
    )[:, 1]

    return y_pred_train, y_pred_val, model

# compute predictions for each set
y_prob_train = [predict(t, v)[0] for t, v in zip(train_idxs, val_idxs)]
y_prob_train.append(predict(train_idx, test_idx)[0])
y_prob_val = np.concatenate([predict(t, v)[1] for t, v in zip(train_idxs, val_idxs)])
y_prob_test = predict(np.arange(test_idx[0]), test_idx)[1]
y_prob = np.concatenate([y_prob_train[0], y_prob_val, y_prob_test])

# create dates from years and months
dates = df['cal_year'].astype(str) + '-' + df['cal_month'].astype(int).astype(str)
dates = pd.Series(dates.values, index=np.arange(len(dates)))

# compute optimal probability threshold based on f1 score
val_idx = np.concatenate(val_idxs)
threshold = Metrics.get_threshold(y[val_idx], y_prob_val)
y_pred = (y_prob > threshold).astype(int)

# plot predictions over time
bd = 'bd'[dataset == 'bat']
Plots.predictions(
    y_true=y, 
    y_probs=[y_prob_train[0], y_prob_val, y_prob_test],
    cv_splits=[train_idxs, val_idxs, test_idx],
    dates=dates, 
    threshold=threshold, 
    dataset=dataset,
    save_name=f'figures/Fig1{bd}.pdf',
)

# plot train/val/test splits separately
bd = 'bd'[dataset == 'bat']
Plots.predictions_subplots(
    y_true=y, 
    y_probs=[y_prob_train, y_prob_val, y_prob_test],
    cv_splits=[train_idxs+[train_idx], val_idxs, test_idx],
    dates=dates, 
    threshold=threshold, 
    dataset=dataset,
    save_name=f'figures/SI_Fig4{bd}.pdf' if rolling_forecast else None,
)

# Print model performance metrics

In [None]:
reload(Metrics)
Metrics.print_metrics(
    y, 
    y_prob, 
    dates.values, 
    val_idx=val_idx, 
    test_idx=test_idx)

# Plot feature importance

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# options
keep = 11 if dataset == 'env' else 5
# keep = len(features) + 1
ab = 'ab'[dataset == 'bat']

# add intercept to features if it was optimal
if model_best['add_intercept']:
    _features = ['Intercept'] + features
else:
    _features = features

# train model on full training set
model = predict(np.arange(test_idx[0]), test_idx)[-1]

# extract values
params = model.result_.params     # coefficient estimates
tvalues = model.result_.tvalues   # t-values for each coefficient

# compute odds ratios
odds_ratios = np.exp(params)
or_df = pd.DataFrame({
    'Feature': [rename[feature] for feature in _features],
    'OddsRatio': odds_ratios,
    'tValue': tvalues
}).sort_values('OddsRatio', ascending=True)

# compute marginal effects
marg_eff = model.result_.get_margeff()
marg_eff_df = marg_eff.summary_frame()  # includes dy/dx, std err, z, etc.
marg_eff_df['Feature'] = [rename[feature] for feature in _features]
marg_eff_df = marg_eff_df.reset_index(drop=True).sort_values('dy/dx', ascending=True)

# initialize figure
fig, axs = plt.subplots(1, 2, figsize=(10, 0.5*keep))
sns.set_style('whitegrid')

# plot odds ratios
axs[0].errorbar(
    x=or_df['OddsRatio'].values[-keep:],
    y=or_df['Feature'].values[-keep:],
    fmt='o',
    color='blue',
    ecolor='gray',
    capsize=3
)
axs[0].axvline(x=1, color='red', linestyle='--')  # Reference line at OR=1
axs[0].set_xlabel('Odds ratio', fontsize=12)
axs[0].tick_params(axis='x', labelsize=10)
axs[0].tick_params(axis='y', labelsize=10)

# plot marginal effects
axs[1].errorbar(
    x=marg_eff_df['dy/dx'].values[-keep:],
    y=marg_eff_df['Feature'].values[-keep:],
    fmt='o',
    color='blue',
    ecolor='gray',
    capsize=3
)
axs[1].axvline(x=0, color='red', linestyle='--')  # 0 line for marginal effects
axs[1].set_xlabel('Marginal effect on probability', fontsize=12)
if all(marg_eff_df['Feature'].values[-keep:] == or_df['Feature'].values[-keep:]):
    axs[1].set_yticklabels([]) # hide feature names if they match
axs[1].tick_params(axis='x', labelsize=10)

plt.tight_layout()
plt.savefig(f'figures/SI_Fig6{ab}.pdf')
plt.show()