In [None]:
from sklearn.model_selection import KFold
from sklearn.datasets import load_digits
from sklearn.metrics import confusion_matrix, f1_score
import polars as pl

TARGET = "HeartDiseaseorAttack"

def fetch_data(df, use_split, rbf=False, scale_col=True):
    assert(use_split >= 0 and use_split <= 4)
    # Perform 5-fold cross validation with a deterministic seed
    kf = KFold(n_splits=5, shuffle=True, random_state=2023)
    splits = list(kf.split(df))
    # Indexing the dataframe with an array returns the appropriate splits
    df_train, df_test = df[splits[use_split][0]], df[splits[use_split][1]]
    if scale_col:
        for col in df_train.columns:
            if col == TARGET:
                continue
            df_train, df_test = scale_column(df_train, df_test, col)
    cols = list(df_train.columns)
    if rbf:
        for col in cols:
            if col == TARGET:
                continue
            df_train, df_test = rbf_column(df_train, df_test, col) # Use radial bias function
    return df_train, df_test

def scale_column(df_train, df_test, col_name):
    max = df_train.get_column(col_name).max()
    min = df_train.get_column(col_name).min()
    df_train = df_train.with_columns((pl.col(col_name) - min) / (max - min))
    df_test = df_test.with_columns((pl.col(col_name) - min) / (max - min))
    return df_train, df_test

def rbf_column(df_train, df_test, col_name):
    std = df_train.get_column(col_name).std()
    low = df_train.get_column(col_name).quantile(0.25)
    middle = df_train.get_column(col_name).quantile(0.5)
    high = df_train.get_column(col_name).quantile(0.75)
    for i, val in enumerate([low, middle, high]):
        df_train = df_train.with_columns((-1.0 * (pl.col(col_name) - val)**2 / (2 * std)**2).exp().alias(f"{col_name}_{i}"))
        df_test = df_test.with_columns((-1.0 * (pl.col(col_name) - val)**2 / (2 * std)**2).exp().alias(f"{col_name}_{i}"))
    return df_train.drop(col_name), df_test.drop(col_name)

def get_x_y(df):
    X, y = df.drop(TARGET), df.get_column(TARGET)
    X = X.with_columns(pl.lit(1.0).alias('constant')) # extra column for the bias term
    return X.to_numpy(), y.to_numpy()

df = pl.read_csv("heart_disease.csv", has_header=True)

# data = load_digits(n_class=2)
# df = pl.DataFrame(data.data)
# df = df.with_columns(pl.Series(name='target', values=data.target)) 
# for col_id in range(64):
#     col = f"column_{col_id}"
#     df = df.with_columns(pl.col(col) / 8.0)

df_train, df_test = fetch_data(df, 0)
X_train, y_train = get_x_y(df_train)
X_test, y_test = get_x_y(df_test)
df_train.tail()

In [None]:
import numpy as np

def sigmoid(x):
    return 1./(1. + np.exp(-x))

def error(x, y, w):
    inside = sigmoid(np.dot(w, x))
    try:
        assert(inside > 0.0 and inside < 1.0)
    except AssertionError:
        return -100 * x
        # print(w)
        # print(x)
        # print(w * x)
        # print(inside)
        # assert(False)
    return -(y * np.log(inside) + (1 - y) * np.log(1 - inside))

def error_gradient(x, y, w, lamb):
    norm = np.abs(w) # causing weird issues
    # norm = 1
    return x * (sigmoid(np.dot(w, x)) - y) + lamb * norm

def sgd(X, Y, lr, lamb=0, epochs=2):
    w = np.random.random(size=X[0].size)
    # w = np.ones_like(X[0])
    err_avg = []

    for _ in range(epochs):
        err = []
        for (x, y) in zip(X, Y):
            w = w - lr * error_gradient(x, y, w, lamb)
            err.append(error(x, y, w))
        err_avg.append(np.mean(err))
    
    return w, err_avg

In [None]:
# l = np.logspace(-5, 5) # lambda values to test
# l = [0.1, 0.2, 0.3]
l = [3e-4, 3e-3, 3e-2]

def logistic_regression(X, y, lambdas):
    weights, errors = [], []
    for l in lambdas:
        w, err = sgd(X, y, 5e-3, l)
        weights.append(w)
        errors.append(err)
    return weights, errors

weights, errors = logistic_regression(X_train, y_train, l)

In [None]:
(y_train == 0).sum() / len(y_train)

In [None]:
from sklearn.linear_model import LogisticRegression

class Score:
    def __init__(self, preds, y):
        self.score = (np.count_nonzero(preds == y)) / len(y)
        self.conf = confusion_matrix(y, preds)
        self.f1 = f1_score(y, preds)

def score(X, y, w):
    preds = np.array([sigmoid(np.dot(x, w)) for x in X])
    # print(preds)
    # print(y)
    preds = np.rint(preds)
    # print("-----------------")
    # score = (np.count_nonzero(np.rint(preds) == y)) / len(y)
    return Score(preds, y)

def run_experiment(df, use_split, lambdas, rbf=False):
    df_train, df_test = fetch_data(df, use_split, rbf=rbf)
    X_train, y_train = get_x_y(df_train)
    X_test, y_test = get_x_y(df_test)
    print(len(y_test))
    all_weights, _ = logistic_regression(X_train, y_train, lambdas)
    scores = [score(X_test, y_test, w) for w in all_weights]
    return scores
    # return np.array(scores)

def sk_bench(df, use_split, rbf=False):
    df_train, df_test = fetch_data(df, use_split, rbf=rbf)
    X_train, y_train = get_x_y(df_train)
    X_test, y_test = get_x_y(df_test)
    model = LogisticRegression(solver='newton-cg', random_state=0).fit(X_train, y_train)
    return model.score(X_test, y_test)

In [None]:
# l = [0.1, 0.2, 0.3]
scores = np.stack([run_experiment(df, x, l, rbf=False) for x in range(1)])

print(f'Best: {scores.mean(axis=0).argmax()}')
print(f'Best Score: {scores.mean(axis=0).max()}')
# print(f'Lambda Value: {l[25]}') 

In [None]:
import matplotlib.pyplot as plt

len(errors[0])

plt.plot(np.arange(1000), errors[0])
plt.show()

In [None]:
import seaborn
import pandas as pd 
import matplotlib.pyplot as plt

plt.style.use('ggplot')

scores = np.stack([[x.f1 for x in run_experiment(df, x, l, rbf=False)] for x in range(5)])

COLORS = seaborn.color_palette()
f, ax = plt.subplots(1, 2, figsize=(12, 4))

ax[0].plot(l, [np.linalg.norm(x) for x in weights], color=COLORS[0])
ax[0].vlines(l[scores.mean(axis=0).argmax()], ymin=ax[0].get_ylim()[0], ymax=ax[0].get_ylim()[1], color=COLORS[1])
ax[0].set_xscale('log')
ax[0].set_ylabel('Weights Norm')
ax[0].set_xlabel('λ Value')
ax[0].set_title('Weight Decay')

our_method = scores.max(axis=1).tolist()
benchmark = [sk_bench(df, x, rbf=False) for x in range(5)]
method = ['Ours' for _ in range(5)] + ['Benchmark' for _ in range(5)]
plot_df = pd.DataFrame.from_dict({'R^2': our_method + benchmark, 'Method': method, 'Trial': list(range(5)) + list(range(5))})
ax[1].set_title('5-Fold Cross Validation R^2 Score')
seaborn.barplot(data=plot_df, x='Trial', y='R^2', hue='Method', ax=ax[1], palette=COLORS)
for container in ax[1].containers:
    ax[1].bar_label(container, fmt="%.2f")

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.linear_model import LogisticRegression
df_train, df_test = fetch_data(df, 3, rbf=True, scale_col=False)
X_train, y_train = get_x_y(df_train)
X_test, y_test = get_x_y(df_test)
model = LogisticRegression(solver='newton-cg', random_state=0).fit(X_train, y_train)
# model.score(X_test, y_test)
pred = model.predict(X_test)
cf2 = confusion_matrix(y_test, pred)
plt.figure(figsize = (9,6))
ax = seaborn.heatmap(cf2, annot=True, fmt='g')
plt.xlabel('PREDICTED')
plt.ylabel('ACTUAL')
plt.title('CONFUSION MATRIX FOR YOUR ENJOYMENT')
ax.set_ylim(0, 2)
plt.show()
print(f1_score(y_test, pred))

In [None]:
import seaborn
import pandas as pd 
import matplotlib.pyplot as plt

plt.style.use('ggplot')

out = np.stack([run_experiment(df, x, l, rbf=True) for x in range(5)])

COLORS = seaborn.color_palette()
f, ax = plt.subplots(1, 1)

our_method = out.max(axis=1).tolist()
benchmark = [sk_bench(df, x, rbf=True) for x in range(5)]
method = ['Ours' for _ in range(5)] + ['Benchmark' for _ in range(5)]
plot_df = pd.DataFrame.from_dict({'R^2': our_method + benchmark, 'Method': method, 'Trial': list(range(5)) + list(range(5))})
ax.set_title('5-Fold Cross Validation R^2 Score')
seaborn.barplot(data=plot_df, x='Trial', y='R^2', hue='Method', ax=ax, palette=COLORS)
for container in ax.containers:
    ax.bar_label(container, fmt="%.2f")