# Contents
* [Introduction](#Introduction)
* [Imports and configuration](#Imports-and-configuration)
* [Setup](#Setup)
* [Models](#Models)
* [Test harness](#Test-harness)
* [Results](#Results)
* [Discussion](#Discussion)

# Introduction

In this notebook, we build and test a `keras` dense net using intuited configurations.

# Imports and configuration

In [None]:
from time import time

notebook_begin_time = time()

# set random seeds

from os import environ
from random import seed as random_seed
from numpy.random import seed as np_seed
from tensorflow.random import set_seed


def reset_seeds(seed: int) -> None:
    """Utility function for resetting random seeds"""
    environ["PYTHONHASHSEED"] = str(seed)
    random_seed(seed)
    np_seed(seed)
    set_seed(seed)


reset_seeds(SEED := 2021)

In [None]:
# extensions
%load_ext autotime
%load_ext lab_black
%load_ext nb_black
%load_ext tensorboard

In [None]:
# core
import numpy as np
import pandas as pd

# utility
from collections import namedtuple
from gc import collect as gc_collect

# typing
from typing import Callable, Dict, List

# faster pandas & sklearn
import swifter
from sklearnex import patch_sklearn

patch_sklearn()
del patch_sklearn

# metrics
from imblearn.metrics import geometric_mean_score
from sklearn.metrics import balanced_accuracy_score, f1_score, roc_auc_score

# keras & tensorflow
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    AlphaDropout,
    BatchNormalization,
    Dense,
)

# visualization
from matplotlib.colors import ListedColormap
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_context("notebook")
sns.set_style("ticks")
%matplotlib inline

# display outputs w/o print calls
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
del InteractiveShell

# hide warnings
import warnings

warnings.filterwarnings("ignore")
del warnings

In [None]:
# Location of cross validation .feather files
FRILL_FEATHERS_FOLDER = "../10.0-mic-prepare_train-test_splits_on_full_data"

# Location where this notebook will output
DATA_OUT_FOLDER = "."

_ = gc_collect()

# Setup

In this section, we define some structures to keep track of models and scores.

The next cell defines a Python dictionary to record the results of cross validation such that the results may be easily cast to a `pandas` dataframe.

In [None]:
ALL_CASES = {"negative", "neutral", "ternary", "ternary_negneu"}

METRICS = (
    "balanced_accuracy",
    "balanced_accuracy_adjusted",
    "f1",
    "geometric_mean",
    "roc_auc",
    "fit_time",
    "predict_time",
)

# to be cast to pd.DataFrame
make_results = lambda: {key: [] for key in {"model_name", "case", *METRICS}}
results_ = make_results()

AvgScores = namedtuple("AvgScores", METRICS)


def store_result(
    where: Dict[str, List],
    case: str,
    model_name: str,
    avg_scores: AvgScores,
) -> None:
    "Appends a model's scores and fit/predict times to the results dict."
    for attribute, value in {
        "model_name": model_name,
        "case": case,
        **avg_scores._asdict(),
    }.items():
        where[attribute].append(value)


def create_results_df(results_dict: Dict[str, List]) -> pd.DataFrame:
    """Create a results dataframe from the results dictionary"""
    df = (
        pd.DataFrame(results_)
        .sort_values(by=["model_name"], ascending=True)
        .sort_values(by=["fit_time", "predict_time"], ascending=True)
        .sort_values(
            by=[
                "balanced_accuracy",
                "balanced_accuracy_adjusted",
                "geometric_mean",
                "f1",
                "roc_auc",
            ],
            ascending=False,
        )
        .sort_values(by=["case"], ascending=True)
        .reset_index(drop=True)
    )[
        [  # selection order
            "model_name",
            "case",
            "balanced_accuracy",
            "balanced_accuracy_adjusted",
            "f1",
            "geometric_mean",
            "roc_auc",
            "fit_time",
            "predict_time",
        ]
    ]
    df.loc[:, "case"] = df.loc[:, "case"].astype("category")
    return df


_ = gc_collect()

In [None]:
predictions_ = {key: [] for key in {"model", "case", "fold", "y_pred"}}

Prediction = namedtuple("Prediction", predictions_.keys())


def store_prediction(where: Dict[str, List], prediction: Prediction) -> None:
    "Records the y_pred of a classifier on a fold"
    for k, v in prediction._asdict().items():
        where[k].append(v)


_ = gc_collect()

# Model

In this section, we set up the MLP.

In [None]:
def make_MLP(case: str, output_bias: float, print_summary: bool = True) -> Sequential:
    """Return a prepared keras MLP"""
    if "ternary" in case:
        out_nodes, final_activation, loss, accuracy = (
            3,
            "softmax",
            "sparse_categorical_crossentropy",
            "sparse_categorical_accuracy",
        )
    else:
        out_nodes, final_activation, loss, accuracy = (
            1,
            "sigmoid",
            "binary_crossentropy",
            "binary_accuracy",
        )

    model = Sequential(
        [
            BatchNormalization(input_dim=2048),
            AlphaDropout(0.1, seed=SEED),
            Dense(
                1024,
                kernel_initializer="lecun_normal",
                activation="selu",
                activity_regularizer="l2",
            ),
            AlphaDropout(0.1, seed=SEED),
            Dense(
                1024,
                kernel_initializer="lecun_normal",
                activation="selu",
                activity_regularizer="l2",
            ),
            AlphaDropout(0.1, seed=SEED),
            Dense(
                512,
                kernel_initializer="lecun_normal",
                activation="selu",
                activity_regularizer="l2",
            ),
            AlphaDropout(0.1, seed=SEED),
            Dense(
                256,
                kernel_initializer="lecun_normal",
                activation="selu",
                activity_regularizer="l2",
            ),
            AlphaDropout(0.1, seed=SEED),
            Dense(
                128,
                kernel_initializer="lecun_normal",
                activation="selu",
                activity_regularizer="l2",
            ),
            AlphaDropout(0.1, seed=SEED),
            Dense(
                50,
                kernel_initializer="lecun_normal",
                activation="selu",
                activity_regularizer="l2",
            ),
            AlphaDropout(0.1, seed=SEED),
            Dense(
                out_nodes,
                activation=final_activation,
                bias_initializer=output_bias,
            ),
        ]
    )
    model.compile(
        loss=loss,
        optimizer=tf.keras.optimizers.Nadam(learning_rate=0.0001),
        metrics=accuracy,
    )
    if print_summary:
        model.summary()
    return model


_ = gc_collect()

# Test harness

This section defines functions for evaluating models.

In [None]:
recode_y: Dict[str, Callable] = {
    "negative": lambda y: ((y - 1) // 2) * (-1),
    "neutral": lambda y: y % 2,
    "positive": lambda y: y // 2,
    "ternary": lambda y: np.squeeze(y),
}


def evaluate_model(
    model_base: str, case: str, track_fold_time: bool = True
) -> AvgScores:
    """Evaluate a model with cross validation on prepared folds."""
    (
        fit_times,
        predict_times,
        balanced_accuracy,
        balanced_accuracy_adjusted,
        f1,
        geometric_mean,
        roc_auc,
    ) = ([] for _ in range(7))
    fold_num = 0
    while True:
        if track_fold_time:
            fold_begin = time()

        def read_feather_cv(xy_set: str) -> pd.DataFrame:
            """Helper function for reading split data"""
            path_prefix = f"{FRILL_FEATHERS_FOLDER}/cv_{fold_num}/{xy_set}"
            if "train" in xy_set:
                return pd.read_feather(f"{path_prefix}_LOF.feather")
            else:
                return pd.read_feather(f"{path_prefix}_untransformed.feather")

        # load training data
        try:
            X_train: pd.DataFrame = read_feather_cv("X_train")
            y_train: np.ndarray = read_feather_cv("y_train").values
        except FileNotFoundError:
            break

        # recode/reformat y_train
        recoder = recode_y[case]
        y_train = recoder(y_train)

        # load testing data
        X_test = read_feather_cv("X_test")
        y_test = np.squeeze(read_feather_cv("y_test").values).astype(np.int8)

        # initialize model with output layer bias
        if multiclass := "ternary" in case:
            zeros, ones, twos = np.bincount(y_test)
            init_bias = tf.keras.initializers.Constant(
                np.squeeze(
                    [
                        np.log([zeros / (ones + twos)]),
                        np.log([ones / (zeros + twos)]),
                        np.log([twos / (zeros + ones)]),
                    ]
                )
            )
            del twos
        else:
            # recode y_test
            y_test = recoder(y_test)
            del recoder
            _ = gc_collect()
            zeros, ones = np.bincount(y_test)
            init_bias = tf.keras.initializers.Constant(np.log([ones / zeros]))
        del ones
        del zeros
        _ = gc_collect()

        model = make_MLP(case, output_bias=init_bias)
        del init_bias
        _ = gc_collect()

        # fit
        reset_seeds(SEED)
        begin = time()
        history = model.fit(
            X_train,
            y_train,
            batch_size=16,
            epochs=100,
            # validation_data=(X_test, y_test),
            validation_split=0.1,
            callbacks=EarlyStopping(patience=10, restore_best_weights=True),
        )
        end = time()
        del X_train
        del y_train
        _ = gc_collect()
        fit_times.append(end - begin)
        print(f"fitted in {end - begin:.2f} s")

        pd.DataFrame(history.history).plot(
            cmap=ListedColormap(sns.color_palette("colorblind").as_hex())
        )
        plt.title("accuracy and loss by epochs")
        plt.legend(frameon=False)
        sns.despine(**dict.fromkeys(("right", "top"), True))
        plt.show()

        # predict
        begin = time()
        predicted = model.predict(X_test)
        end = time()
        del X_test
        del model
        _ = gc_collect()
        predict_times.append(end - begin)
        print(f"predicted in {end - begin:.2f} s")
        del end
        del begin
        _ = gc_collect()
        y_pred = (
            predicted.argmax(axis=1)
            if multiclass
            else pd.Series(np.squeeze(predicted)).swifter.apply(round)
        ).astype(np.int8)
        store_prediction(
            where=predictions_,
            prediction=Prediction(
                model=model_base,
                case=case,
                fold=fold_num,
                y_pred=y_pred,
            ),
        )

        # score
        score_params = {
            "y_true": y_test,
            "y_pred": y_pred,
        }
        balanced_accuracy_adjusted.append(
            balanced_accuracy_score(**score_params, adjusted=True)
        )
        balanced_accuracy.append(balanced_accuracy_score(**score_params))
        f1.append(f1_score(**score_params, average="weighted"))
        geometric_mean.append(geometric_mean_score(**score_params))
        del y_pred
        score_params = {
            "y_true": y_test,
            "y_score": predicted,
            "average": "weighted",
            "multi_class": "ovo",
        }
        roc_auc.append(roc_auc_score(**score_params))
        del score_params
        del predicted
        del y_test
        _ = gc_collect()

        if track_fold_time:
            print(
                f"{model_base} fold {fold_num + 1} completed in {time() - fold_begin:.2f} s"
            )
            del fold_begin

        fold_num += 1
        _ = gc_collect()

    return AvgScores(
        *[
            np.mean(_)
            for _ in (
                balanced_accuracy,
                balanced_accuracy_adjusted,
                f1,
                geometric_mean,
                roc_auc,
                fit_times,
                predict_times,
            )
        ]
    )


_ = gc_collect()

The next few cells evaluate models.

In [None]:
case = "negative"
model = "keras_MLP_14"

eval_begin = time()

print(f"evaluating {model}...")
_ = gc_collect()
store_result(
    where=results_,
    case=case,
    model_name=model,
    avg_scores=evaluate_model(model, case),
)
print(f"stored {model} for {case} classification in {time() - eval_begin:.2f} s")

del case
del model
del eval_begin
_ = gc_collect()

create_results_df(results_)

In [None]:
case = "neutral"
model = "keras_MLP_14"

eval_begin = time()

print(f"evaluating {model}...")
_ = gc_collect()
store_result(
    where=results_,
    case=case,
    model_name=model,
    avg_scores=evaluate_model(model, case),
)
print(f"stored {model} for {case} classification in {time() - eval_begin:.2f} s")

del case
del model
del eval_begin
_ = gc_collect()

create_results_df(results_)

In [None]:
case = "ternary"
model = "keras_MLP_14"

eval_begin = time()

print(f"evaluating {model}...")
_ = gc_collect()
store_result(
    where=results_,
    case=case,
    model_name=model,
    avg_scores=evaluate_model(model, case),
)
print(f"stored {model} for {case} classification in {time() - eval_begin:.2f} s")

del case
del model
del eval_begin
_ = gc_collect()

create_results_df(results_)

In [None]:
# save predictions
predictions_df = pd.DataFrame(predictions_)[["model", "case", "fold", "y_pred"]]
predictions_df.case = predictions_df.case.astype("category")
predictions_df.fold = predictions_df.fold.astype(np.uint8)
predictions_df.y_pred = predictions_df.y_pred.swifter.apply(np.int8)
predictions_df.to_feather(f"{DATA_OUT_FOLDER}/keras_MLP_14_predictions_CV5.feather")
del predictions_df
_ = gc_collect()

Let's peek at the results.

In [None]:
results_df = create_results_df(results_)
results_df

# Metaclassifier

In [None]:
def prep_y_pred(label: int) -> np.ndarray:
    """Prepares ternary labels for AUROC scoring"""
    _ = [0, 0, 0]
    _[label] = 1
    return np.asarray(_, dtype=np.int8)


model_name = "keras_MLP_14+keras_MLP_14"
pred_df = pd.DataFrame(predictions_).query(f"case != 'ternary'")[
    ["case", "fold", "y_pred"]
]
balanced_accuracy, balanced_accuracy_adjusted, f1, geometric_mean, roc_auc = (
    [],
    [],
    [],
    [],
    [],
)

for fold_num in np.unique(pred_df.fold):
    fold_df = pred_df.query(f"fold == {fold_num}").drop(columns="fold")
    query_fold = lambda case: fold_df.query(f"case == '{case}'").y_pred.item()
    df = pd.DataFrame(
        {
            "neutral_prediction": query_fold("neutral"),
            "negative_prediction": query_fold("negative"),
        }
    )
    del fold_df
    del query_fold
    _ = gc_collect()

    # "predict" 0: negative, 1: neutral, 2: positive
    df["hybrid_prediction"] = df.loc[:, "neutral_prediction"]
    df.loc[df["hybrid_prediction"] == 0, "hybrid_prediction"] = (-1) * df.loc[
        df["hybrid_prediction"] == 0, "negative_prediction"
    ] + 2
    y_pred: pd.Series = df.hybrid_prediction.astype(np.int8)
    del df
    _ = gc_collect()

    # record y_pred
    store_prediction(
        where=predictions_,
        prediction=Prediction(
            model=model_name,
            case="ternary_negneu",
            fold=fold_num,
            y_pred=y_pred,
        ),
    )

    # score
    score_params = {
        "y_true": np.squeeze(
            pd.read_feather(
                f"{FRILL_FEATHERS_FOLDER}/cv_{fold_num}/y_test_ter.feather"
            ).values
        ),
        "y_pred": y_pred,
    }
    balanced_accuracy.append(balanced_accuracy_score(**score_params))
    balanced_accuracy_adjusted.append(
        balanced_accuracy_score(**score_params, adjusted=True)
    )
    f1.append(f1_score(**score_params, average="weighted"))
    geometric_mean.append(geometric_mean_score(**score_params))
    score_params = {
        "y_true": score_params["y_true"],
        "y_score": np.stack(y_pred.swifter.apply(prep_y_pred).values),
        "average": "weighted",
        "multi_class": "ovo",
    }
    roc_auc.append(roc_auc_score(**score_params))
    del score_params
    del y_pred
    _ = gc_collect()
del pred_df
_ = gc_collect()

# helper functions
get_time = lambda case, fit_predict: results_df.query(
    f"case == '{case}' & model_name == 'keras_MLP_14'"
)[f"{fit_predict}_time"].item()
sum_times = lambda time_metric: get_time("negative", time_metric) + get_time(
    "neutral", time_metric
)

store_result(
    where=results_,
    case="ternary_negneu",
    model_name=model_name,
    avg_scores=AvgScores(
        np.mean(balanced_accuracy),
        np.mean(balanced_accuracy_adjusted),
        np.mean(f1),
        np.mean(geometric_mean),
        np.mean(roc_auc),
        sum_times("fit"),
        sum_times("predict"),
    ),
)

print(f"stored hybrid ternary classification results for {model_name}")

del balanced_accuracy
del balanced_accuracy_adjusted
del f1
del geometric_mean
del roc_auc
del model_name
del get_time
del sum_times
_ = gc_collect()

# Results

In [None]:
results_df = create_results_df(results_)
results_df

In [None]:
# save table of results
results_df.to_csv(f"{DATA_OUT_FOLDER}/initial_MLP_results_CV5.csv")

# Discussion

hopefully good performance

In [None]:
print(f"Time elapsed since notebook_begin_time: {time() - notebook_begin_time} s")
_ = gc_collect()

[^top](#Contents)