In [2]:
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
import numba
import pandas as pd
import warnings

warnings.filterwarnings("ignore")

# 5

In [5]:
# read table
#   \hline
#   Confidence positive & Correct class \\ \hline
#   0.95                & +             \\
#   0.85                & +             \\
#   0.8                 & -             \\
#   0.7                 & +             \\
#   0.55                & +             \\
#   0.45                & -             \\
#   0.4                 & +             \\
#   0.3                 & +             \\
#   0.2                 & -             \\
#   0.1                 & -             \\
#   \hline

confs = [0.95, 0.85, 0.8, 0.7, 0.55, 0.45, 0.4, 0.3, 0.2, 0.1]
correct = [1, 1, 0, 1, 1, 0, 1, 1, 0, 0]

# number of positive and negative examples
num_neg, num_pos = len(correct) - sum(correct), sum(correct)
FP, TP = 0, 0
FPR, TPR = [0], [0]

for i in range(len(confs)):
    if correct[i] == 1:
        TP += 1
    else:
        FP += 1
    FPR.append(FP / num_neg)
    TPR.append(TP / num_pos)

# plot
fig = go.Figure()
fig.add_trace(go.Scatter(x=FPR, y=TPR, mode="lines+markers"))
fig.update_layout(
    template="plotly_white",
    title="ROC curve",
    xaxis_title="FPR",
    yaxis_title="TPR",
    height=500,
    width=500,
)
fig.show()
fig.write_image("roc.png")


# 2.1

In [28]:
# read training set
filename = "D2z.txt"
df_train = pd.read_table(filename, sep=" ", header=None)
df_train.columns = ["x1", "x2", "y"]

# generate test set
x1_test = np.arange(-2, 2.1, 0.1)
x2_test = np.arange(-2, 2.1, 0.1)
x1_test, x2_test = np.meshgrid(x1_test, x2_test)
x1_test, x2_test = x1_test.flatten(), x2_test.flatten()
df_test = pd.DataFrame({"x1_test": x1_test, "x2_test": x2_test})
print(len(df_test))

# cross product df and df_test
df_train["key"], df_test["key"] = 0, 0
df = df_test.merge(df_train, on="key")
df = df.drop(columns=["key"])

# calculate distance
df["dist"] = (df["x1_test"] - df["x1"]) ** 2 + (df["x2_test"] - df["x2"]) ** 2
df = df.sort_values(by=["dist"])
df = df.drop_duplicates(subset=["x1_test", "x2_test"], keep="first")
df["label"] = df["y"].apply(lambda x: "Test - 1" if x == 1 else "Test - 0")

# plot
fig = px.scatter(df, x="x1_test", y="x2_test", color="label")
t = df_train.query("y == 1")
fig.add_trace(
    go.Scatter(
        x=t["x1"], y=t["x2"], mode="markers", marker=dict(symbol="x"), name="Train - 1"
    )
)
t = df_train.query("y == 0")
fig.add_trace(
    go.Scatter(
        x=t["x1"],
        y=t["x2"],
        mode="markers",
        marker=dict(symbol="circle"),
        name="Train - 0",
    )
)

fig.update_layout(
    template="plotly_white",
    title="1NN",
    xaxis_title="Feature 1",
    yaxis_title="Feature 2",
    height=500,
    width=600,
)
fig


1681


# 2.2

In [3]:
# read data
filename = "emails.csv"
df = pd.read_csv(filename)

# create word freq matrix
freq_matrix = df.iloc[:, 1:-1].to_numpy()
# convert to float
freq_matrix = freq_matrix.astype(np.float)
df["vector"] = [x for x in freq_matrix]
df["email_id"] = df.index + 1
df = df[["email_id", "vector", "Prediction"]]
df

Unnamed: 0,email_id,vector,Prediction
0,1,"[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, ...",0
1,2,"[8.0, 13.0, 24.0, 6.0, 6.0, 2.0, 102.0, 1.0, 2...",0
2,3,"[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 8.0, 0.0, 0.0, ...",0
3,4,"[0.0, 5.0, 22.0, 0.0, 5.0, 1.0, 51.0, 2.0, 10....",0
4,5,"[7.0, 6.0, 17.0, 1.0, 5.0, 2.0, 57.0, 0.0, 9.0...",0
...,...,...,...
4995,4996,"[20.0, 6.0, 3.0, 1.0, 1.0, 1.0, 34.0, 0.0, 0.0...",0
4996,4997,"[0.0, 7.0, 1.0, 0.0, 0.0, 0.0, 20.0, 1.0, 1.0,...",0
4997,4998,"[6.0, 8.0, 1.0, 3.0, 2.0, 1.0, 64.0, 7.0, 1.0,...",0
4998,4999,"[8.0, 6.0, 2.0, 5.0, 6.0, 1.0, 51.0, 4.0, 0.0,...",0


In [4]:
@numba.jit(nopython=True)
def euclidean_dist(x, y):
    return np.linalg.norm(x - y)


def get_distance_df(df, train_ids, test_ids):
    df_train = df.query("email_id in @train_ids")
    df_test = df.query("email_id in @test_ids")

    # calculate distance
    df_train["key"] = 0
    df_test["key"] = 0
    df_combined = df_test.merge(df_train, on="key", suffixes=("_test", "_train"))
    df_combined = df_combined.drop(columns=["key"])
    df_combined["dist"] = df_combined.apply(
        lambda x: euclidean_dist(x["vector_test"], x["vector_train"]), axis=1
    )
    return df_combined


def knn(df_combined, k=1):
    # find k nearest neighbors
    df_combined = (
        df_combined.groupby("email_id_test")
        .apply(lambda x: x.sort_values(by=["dist"]).head(k))
        .reset_index(drop=True)
    )

    # predict
    df_pred = (
        df_combined.groupby(["email_id_test", "Prediction_test"])["Prediction_train"]
        .apply(lambda x: x.sum() / k)
        .reset_index()
    )
    df_pred.columns = ["email_id_test", "Prediction_test", "prob"]
    df_pred["Prediction"] = df_pred["prob"].apply(lambda x: 1 if x >= 0.5 else 0)

    return df_pred


def evaluate(df_pred):
    # calculate TP, FP, TN, FN
    TP = df_pred.query("Prediction_test == 1 and Prediction == 1").shape[0]
    FP = df_pred.query("Prediction_test == 0 and Prediction == 1").shape[0]
    TN = df_pred.query("Prediction_test == 0 and Prediction == 0").shape[0]
    FN = df_pred.query("Prediction_test == 1 and Prediction == 0").shape[0]

    # calculate accuracy, precision, recall
    accuracy = (TP + TN) / (TP + FP + TN + FN)
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)

    return accuracy, precision, recall

In [50]:
# fold 1- 5
test_folds = [(1, 1001), (1000, 2001), (2000, 3001), (3000, 4001), (4000, 5001)]
accuracies, precisions, recalls = [], [], []

for test_fold in test_folds:
    test_ids = np.arange(test_fold[0], test_fold[1], 1)
    train_ids = [i for i in range(1, 5001) if i not in test_ids]
    df_combined = get_distance_df(df, train_ids, test_ids)
    df_pred = knn(df_combined, k=1)
    accuracy, precision, recall = evaluate(df_pred)
    accuracies.append(accuracy)
    precisions.append(precision)
    recalls.append(recall)

    print(f"Fold {test_fold}:")
    print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

Fold (1, 1001):
Accuracy: 0.8250, Precision: 0.6536, Recall: 0.8211
Fold (1000, 2001):
Accuracy: 0.8531, Precision: 0.6857, Recall: 0.8664
Fold (2000, 3001):
Accuracy: 0.8591, Precision: 0.7182, Recall: 0.8316
Fold (3000, 4001):
Accuracy: 0.8531, Precision: 0.7207, Recall: 0.8163
Fold (4000, 5001):
Accuracy: 0.7742, Precision: 0.6047, Recall: 0.7549


In [51]:
# make a table
df_res = pd.DataFrame(
    {"Accuracy": accuracies, "Precision": precisions, "Recall": recalls}
)
df_res.index = ["Fold 1", "Fold 2", "Fold 3", "Fold 4", "Fold 5"]
df_res

Unnamed: 0,Accuracy,Precision,Recall
Fold 1,0.825,0.653631,0.821053
Fold 2,0.853147,0.685714,0.866426
Fold 3,0.859141,0.718182,0.831579
Fold 4,0.853147,0.720721,0.816327
Fold 5,0.774226,0.604712,0.754902


# 2.3

In [5]:
@numba.jit(nopython=True)
def sigmoid(x):
    return 1 / (1 + np.exp(-x))


@numba.jit(nopython=True)
def get_grad(vector, Prediction, theta):
    grad = -vector * (Prediction - sigmoid(vector @ theta))
    return grad


@numba.jit(nopython=True)
def get_prob(vector, theta):
    return sigmoid(vector @ theta)


@numba.jit(nopython=True)
def get_cross_entropy(vector, theta, prediction):
    prob = get_prob(vector, theta)
    return -np.log(prob) if prediction == 1 else -np.log(1 - prob)


def logistic(df, train_ids, test_ids, eta=1e-4, max_iter=5000, tol=1e-5):
    # initialize coefficients
    theta = np.zeros(3000)
    theta = theta.reshape(-1, 1)

    # reshape vector
    df["vector"] = df["vector"].apply(lambda x: x.reshape(1, -1))

    # get train and test set
    df_train = df.query("email_id in @train_ids")
    df_test = df.query("email_id in @test_ids")

    # update theta
    cross_entropy = 10000000
    for i in range(max_iter):
        # calculate gradient
        grads = df_train.apply(
            lambda x: get_grad(x["vector"], x["Prediction"], theta),
            axis=1,
        )
        grads_mean = grads.mean(axis=0).reshape(-1, 1)
        theta = theta - eta * grads_mean

        # check convergence of corss entropy
        df["cross_entropy"] = df.apply(
            lambda x: get_cross_entropy(x["vector"], theta, x["Prediction"]),
            axis=1,
        )
        new_cross_entropy = df["cross_entropy"].mean()
        dist = np.abs(new_cross_entropy - cross_entropy)
        if i % 500 == 0:
            print(i, cross_entropy, new_cross_entropy, dist)
        if dist < tol:
            print(f"Converge at iteration {i}")
            break
        cross_entropy = new_cross_entropy

    # predict
    df_test = df_test.rename(columns={"Prediction": "Prediction_test"})
    df_test["prob"] = df_test["vector"].apply(
        lambda x: get_prob(x, theta),
    )

    df_test["Prediction"] = df_test["prob"].apply(
        lambda x: 1 if x > 0.5 else 0)

    return df_test


In [156]:
# fold 1- 5
test_folds = [(1, 1001), (1000, 2001), (2000, 3001),
              (3000, 4001), (4000, 5001)]
accuracies, precisions, recalls = [], [], []

for test_fold in test_folds:
    print(f"Fold {test_fold} training start")
    test_ids = np.arange(test_fold[0], test_fold[1], 1)
    train_ids = [i for i in range(1, 5001) if i not in test_ids]
    df_pred = logistic(df, train_ids, test_ids)
    accuracy, precision, recall = evaluate(df_pred)
    accuracies.append(accuracy)
    precisions.append(precision)
    recalls.append(recall)

    print(f"Fold {test_fold} metrics:")
    print(
        f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")


Fold (1, 1001) training start
0 10000000 [[0.69024492]] [[9999999.30975508]]
500 [[0.45523793]] [[0.4551133]] [[0.00012463]]
1000 [[0.41182358]] [[0.41176066]] [[6.29220861e-05]]
1500 [[0.38617658]] [[0.38613416]] [[4.24150739e-05]]
2000 [[0.36774764]] [[0.36771538]] [[3.22620032e-05]]
2500 [[0.3532485]] [[0.35322233]] [[2.61658313e-05]]
3000 [[0.34125418]] [[0.34123214]] [[2.20408007e-05]]
3500 [[0.33101813]] [[0.33099908]] [[1.90448826e-05]]
4000 [[0.32208604]] [[0.32206927]] [[1.67773387e-05]]
4500 [[0.31415774]] [[0.31414274]] [[1.49982097e-05]]
Fold (1, 1001) metrics:
Accuracy: 0.8980, Precision: 0.8645, Recall: 0.7614
Fold (1000, 2001) training start
0 10000000 [[0.69029825]] [[9999999.30970174]]
500 [[0.45237958]] [[0.45225283]] [[0.00012675]]
1000 [[0.4080134]] [[0.40794894]] [[6.44668939e-05]]
1500 [[0.38188174]] [[0.38183883]] [[4.2908013e-05]]
2000 [[0.36336582]] [[0.36333364]] [[3.21714361e-05]]
2500 [[0.34900357]] [[0.34897783]] [[2.57387312e-05]]
3000 [[0.33727413]] [[0.3

In [157]:
# make a table
df_res = pd.DataFrame(
    {"Accuracy": accuracies, "Precision": precisions, "Recall": recalls}
)
df_res.index = ["Fold 1", "Fold 2", "Fold 3", "Fold 4", "Fold 5"]
df_res

Unnamed: 0,Accuracy,Precision,Recall
Fold 1,0.898,0.864542,0.761404
Fold 2,0.895105,0.864407,0.736462
Fold 3,0.881119,0.895238,0.659649
Fold 4,0.875125,0.831373,0.721088
Fold 5,0.837163,0.787149,0.640523


# 2.4

In [52]:
test_folds = [(1, 1001), (1000, 2001), (2000, 3001),
              (3000, 4001), (4000, 5001)]
ks = [1, 3, 5, 7, 10]
accuracies = {k: [] for k in ks}

for test_fold in test_folds:
    test_ids = np.arange(test_fold[0], test_fold[1], 1)
    train_ids = [i for i in range(1, 5001) if i not in test_ids]
    df_combined = get_distance_df(df, train_ids, test_ids)
    for k in ks:
        df_pred = knn(df_combined, k=k)
        accuracy, precision, recall = evaluate(df_pred)
        accuracies[k].append(accuracy)

df_res = pd.DataFrame(accuracies)
display(df_res)
mean_accuracies = df_res.mean(axis=0)
mean_accuracies


Unnamed: 0,1,3,5,7,10
0,0.825,0.846,0.837,0.837,0.863
1,0.853147,0.85015,0.85015,0.86014,0.869131
2,0.859141,0.855145,0.87013,0.873127,0.877123
3,0.853147,0.878122,0.868132,0.875125,0.887113
4,0.774226,0.771229,0.779221,0.779221,0.781219


1     0.832932
3     0.840129
5     0.840926
7     0.844922
10    0.855517
dtype: float64

In [53]:
# plot
fig = go.Figure()
fig.add_trace(
    go.Scatter(x=ks, y=mean_accuracies, mode="lines+markers", text=mean_accuracies)
)
fig.update_layout(
    template="plotly_white",
    title="kNN 5-fold cross validation",
    xaxis_title="k",
    yaxis_title="Average accuracy",
    height=500,
    width=600,
)
fig

# 2.5

In [6]:
def get_roc_xy(confidence, correct):
    # confidence in descending order
    # number of positive and negative examples
    num_neg, num_pos = len(correct) - sum(correct), sum(correct)
    FP, TP = 0, 0
    FPR, TPR = [0], [0]

    for i in range(len(confidence)):
        if correct[i] == 1:
            TP += 1
        else:
            FP += 1
        FPR.append(FP / num_neg)
        TPR.append(TP / num_pos)

    return FPR, TPR

In [7]:
# Use a single training/test setting
test_fold = (1, 1001)
test_ids = np.arange(test_fold[0], test_fold[1], 1)
train_ids = [i for i in range(1, 5001) if i not in test_ids]

# knn roc curve
k = 5
df_combined = get_distance_df(df, train_ids, test_ids)
df_pred = knn(df_combined, k=k)
df_pred = df_pred.sort_values(by=["prob"], ascending=False)
knn_confidence, knn_correct = (
    df_pred["prob"].to_numpy(),
    df_pred["Prediction_test"].to_numpy(),
)

# roc curve
knn_FPR, knn_TPR = get_roc_xy(knn_confidence, knn_correct)

In [8]:
# logistic roc curve
df_pred = logistic(df, train_ids, test_ids)
df_pred = df_pred.sort_values(by=["prob"], ascending=False)
logit_confidence, logit_correct = (
    df_pred["prob"].to_numpy(),
    df_pred["Prediction_test"].to_numpy(),
)
logit_FPR, logit_TPR = get_roc_xy(logit_confidence, logit_correct)

0 10000000 [[0.69024492]] [[9999999.30975508]]
500 [[0.45523793]] [[0.4551133]] [[0.00012463]]
1000 [[0.41182358]] [[0.41176066]] [[6.29220861e-05]]
1500 [[0.38617658]] [[0.38613416]] [[4.24150739e-05]]
2000 [[0.36774764]] [[0.36771538]] [[3.22620032e-05]]
2500 [[0.3532485]] [[0.35322233]] [[2.61658313e-05]]
3000 [[0.34125418]] [[0.34123214]] [[2.20408007e-05]]
3500 [[0.33101813]] [[0.33099908]] [[1.90448826e-05]]
4000 [[0.32208604]] [[0.32206927]] [[1.67773387e-05]]
4500 [[0.31415774]] [[0.31414274]] [[1.49982097e-05]]


In [9]:
# get AUC
knn_auc = np.trapz(knn_TPR, knn_FPR)
logit_auc = np.trapz(logit_TPR, logit_FPR)

knn_auc, logit_auc


(0.918802600907864, 0.9445663108821004)

In [12]:
# plot
fig = go.Figure()
fig.add_trace(
    go.Scatter(x=knn_FPR, y=knn_TPR, mode="lines", name=f"kNN (AUC={knn_auc:.4f}))")
)
fig.add_trace(
    go.Scatter(
        x=logit_FPR, y=logit_TPR, mode="lines", name=f"Logistic (AUC={logit_auc:.4f})"
    )
)

fig.update_layout(
    template="plotly_white",
    title="ROC curve",
    xaxis_title="False positive rate",
    yaxis_title="True positive rate",
    legend=dict(
        x=0.99,
        xanchor="right",
        y=0.2,
    ),
    height=500,
    width=600,
)

fig.update_xaxes(range=[0, 1])
fig.update_yaxes(range=[0, 1])

fig