In [None]:
# imports
# basic
import pandas as pd
import numpy as np
import configparser
# classifiers
# Naive Bayes
from sklearn.naive_bayes import GaussianNB
# KNeighbors
from sklearn.neighbors import KNeighborsClassifier
# Random forest
from sklearn.ensemble import RandomForestClassifier
# SVC
from sklearn.svm import SVC

# metrics
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_validate
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
from sklearn.metrics import f1_score
from sklearn.metrics import make_scorer

# misc
from matplotlib import pyplot as plt
from matplotlib import dates as plotdates
from matplotlib.figure import Figure
from matplotlib.ticker import PercentFormatter
from datetime import datetime
import pickle
from sklearn.preprocessing import StandardScaler

In [None]:
# paths
config = configparser.ConfigParser()
config.read('settings.config')

RAW_DATA_PATH = config.get('PATH', 'RawDataPath')
PROCESSED_DATA_PATH = config.get('PATH', 'ProcessedDataPath')
FILTERED_DATA_PATH = config.get('PATH', 'FilteredDataPath')

MULTICLASS_CLASSIFIER_PATH = config.get('PATH', 'MulticlasClassifierPath')
BINARY_CLASSIFIER_PATH = config.get('PATH', 'BinaryClassifierPath')
CLASSIFIER_FOLDER_PATH = config.get('PATH', 'ClassifiersPath')

In [None]:
# set values
# set classification type
#   set to "sleep" to make binary classification sleep/wake
#   set to "staged" to make 4-class classification wake/light/deep/rem
class_type = "sleep" # "sleep" "staged"

In [None]:
# functions
def load_file(path):
    # loads *.csv file from provided path
    df = pd.read_csv(path)
    return df

def detection_type(flag, df):
    # sets classes based on classifiacation type
    #0 - wake, 1 - light, 2 - deep, 3 - rem, 4 - awake
    if flag == "sleep":
        df["label"] = df["label"].replace(["wake", "light", "deep", "rem"], "sleep")
        df["label"] = df["label"].replace(["sleep"], 0)
        df["label"] = df["label"].replace(["awake"], 1)
    if flag == "staged":
        df["label"] = df["label"].replace(["awake"], "wake")
    return df

def get_histogram_ticks(flag):
    if flag == "sleep":
        return ["awake", "sleep"], [0, 1]
    if flag == "staged":
        return ["wake", "light", "REM", "deep"], [0, 1, 2, 3]
    if flag == "all":
        return ["awake", "light", "wake", "deep", "REM"], [0, 1, 2, 3, 4]

In [None]:
# load processed dataset
df = load_file(FILTERED_DATA_PATH)

df = detection_type(class_type, df)

y = df["label"]
X = df.drop(["label"], axis=1)



In [None]:
def make_histogram(y, flag):
    xtick_label, xtick = get_histogram_ticks(flag)
    n, bins, patches = plt.hist(x=y, bins=np.arange(len(xtick)+1)-0.5, color='#0504aa', alpha=1, rwidth=0.9, weights=np.ones(len(y)) / len(y))
    plt.grid(axis='y', alpha=0.75)

    k = []
    # calculate the relative frequency of each bin
    for i in range(0,len(n)):
        k.append((bins[i+1]-bins[i])*n[i])
    r=4
    k = np.around(k,r)
    xx = 3
    yy = 0.001

    # plot the label/text to each bin
    for i in range(0, len(n)):
        x_pos = (bins[i + 1] - bins[i]) / xx + bins[i]
        y_pos = n[i] + (1 * yy)
        label = str(np.around(k[i]*100,2)) + "%" # relative frequency of each bin
        plt.text(x_pos, y_pos, label)


    plt.xticks(xtick, xtick_label)
    plt.gca().yaxis.set_major_formatter(PercentFormatter(1))
    plt.xlabel('Klasa')
    plt.ylabel('Ilość [%]')
    plt.title('')
    plt.show()

In [None]:
make_histogram(y, class_type)

In [None]:
# initialize classifier
cv = KFold(n_splits=10, shuffle=True, random_state=1)
if class_type == "staged":
    # labels_names = ["wake", "light", "deep", "rem"]  
    labels_names = ["deep", "light", "rem", "wake"]
    model_KN = KNeighborsClassifier(weights='distance', n_neighbors=42) # multiclass
    model_RN = RandomForestClassifier(criterion='entropy', n_estimators=800) # multiclass
    model_NB = GaussianNB(var_smoothing=1.23284674e-01) # multiclass
else:
    labels_names = ["sleep", "awake"] 
    model_KN = KNeighborsClassifier(weights='distance', n_neighbors=38) # binary
    model_NB = GaussianNB(var_smoothing=0.1873817422860384) # binary
    model_RN = RandomForestClassifier(criterion='entropy', n_estimators=100) # binary
    model_SVC = SVC(C=10, probability=True)

models = [model_NB, model_RN, model_KN]
model_names = ["GaussianNB", "RandomForestClassifier", "KNeighborsClassifier"]
if class_type == "sleep":
    models.append(model_SVC)
    model_names.append("SVC")

In [None]:
# single split metrics
print("Classifying...")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

for model, name in zip(models, model_names):  
    clf = model
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    matrix = confusion_matrix(y_test, y_pred)
    print(name)
    print(matrix.diagonal()/matrix.sum(axis=0))
    print(labels_names)
    print(matrix)
    print(acc) 
    print("Fscore(None): ", f1_score(y_test, y_pred, average=None))
    print("Fscore(weighted): ", f1_score(y_test, y_pred, average='weighted'))
    print("Fscore(micro): ", f1_score(y_test, y_pred, average='micro'))
    print("Fscore(macro): ", f1_score(y_test, y_pred, average='macro'))

In [None]:
import copy
# mean confusion matrix for cross validation 
f, axes = plt.subplots(1, 3, sharey=False, figsize=(11, 4) )
axes = axes.flatten()
for i, (model, name) in enumerate(zip(models, model_names)):
    X_temp = copy.deepcopy(X).to_numpy()
    if name == "GaussianNB":
        trans = StandardScaler()
        X_temp = trans.fit_transform(X_temp)
    conf_matrix_list_of_arrays = []  
    for idx, (train_index, test_index) in enumerate(cv.split(X_temp)):
        X_train, X_test = X_temp[train_index], X_temp[test_index]
        y_train, y_test = y[train_index], y[test_index]
        model.fit(X_train, y_train)
        conf_matrix = confusion_matrix(y_test, model.predict(X_test))
        conf_matrix_list_of_arrays.append(conf_matrix)
    mean_of_conf_matrix_arrays = np.mean(conf_matrix_list_of_arrays, axis=0)
    sum_of_conf_matrix_arrays = np.sum(conf_matrix_list_of_arrays, axis=0)
    print(name)
    disp = ConfusionMatrixDisplay(sum_of_conf_matrix_arrays.astype(int), ["deep", "light", "rem", "wake"])
    disp.plot(ax=axes[i], xticks_rotation=45)
    disp.ax_.set_title(name)
    disp.im_.colorbar.remove()
    disp.ax_.set_xlabel('')
    disp.ax_.set_ylabel('')


    #print(sum_of_conf_matrix_arrays.astype(int))

f.text(0.35, 0.1, 'Predykowane etykiety', ha='left')
f.text(0.05, 0.3, 'Prawdziwe etykiety', rotation="vertical")
plt.subplots_adjust(wspace=0.40, hspace=1)


f.colorbar(disp.im_, ax=axes)
plt.show()

In [None]:
# cross validation 
scoring = {'acc': 'accuracy',
           'fscore': 'f1',
           'fscore weighted': 'f1_weighted'}
for model, name in zip(models, model_names):  
    scores = cross_validate(model, X, y, scoring=scoring, cv=cv, n_jobs=-1)
    print("\n#######################################")
    print(name)
    print("Acc scores: ", scores["test_acc"])
    print('Accuracy: %.5f (%.5f)' % (np.mean(scores["test_acc"]), np.std(scores["test_acc"])))
    print("Fscore scores: ", scores["test_fscore"])
    print('Fscore: %.5f (%.5f)' % (np.mean(scores["test_fscore"]), np.std(scores["test_fscore"])))
    print("Fscore_w scores: ", scores["test_fscore weighted"])
    print('Fscore_w: %.5f (%.5f)' % (np.mean(scores["test_fscore weighted"]), np.std(scores["test_fscore weighted"])))

In [None]:
# finding best model parameters
models = [model_NB, model_RN, model_KN]
grid_params = [ 
    { 'var_smoothing': np.logspace(0,-9, num=100) }
    # { 'n_estimators': [50, 100, 300, 500, 800],
    # 'criterion': ['gini', 'entropy'] },
    # { 'n_neighbors': np.arange(1,50),
    # 'weights' : ['distance', 'uniform'] } 
    ]
if class_type == "sleep":
    grid_params.append({ 'C': [0.1,1, 10] })

for model, name, grid_param in zip(models, model_names, grid_params):
    gd_sr = GridSearchCV(estimator=model,
                     param_grid=grid_param,
                     scoring="accuracy",
                     cv=cv,
                     n_jobs=-1,
                     verbose=10) # print progress
    gd_sr.fit(X, y)
    best_parameters = gd_sr.best_params_
    print(best_parameters)
    print(gd_sr.cv_results_)
    # print(pd.DataFrame(gd_sr.cv_results_)) # print all results

In [None]:
def make_roc_curve_plot(X, y, model, model_name, ax):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    ns_probs = [0 for _ in range(len(y_test))]
    clf = model
    clf.fit(X_train, y_train)
    y_pred = clf.predict_proba(X_test)
    lr_probs = y_pred[:, 1]
    ns_auc = roc_auc_score(y_test, ns_probs)
    lr_auc = roc_auc_score(y_test, lr_probs)
    print(model_name)
    ns_fpr, ns_tpr, _ = roc_curve(y_test, ns_probs)
    lr_fpr, lr_tpr, _ = roc_curve(y_test, lr_probs)

    ax.plot(lr_fpr, lr_tpr, label=model_name)
    ax.set_ylabel('False Positive Rate')
    ax.set_xlabel('False Positive Rate')
    #plt.plot(lr_fpr, lr_tpr, label=model_name)
    # axis labels
    #plt.xlabel('False Positive Rate')
    #plt.ylabel('True Positive Rate')
    # show the legend


In [None]:
# create ROC curve plot
ax = plt.gca()
for model, name in zip(models, model_names):
    make_roc_curve_plot(X, y, model, name, ax)
ax.legend()
plt.show()

In [None]:
# save classificator
# RandomForestClassifier has been chosen because of best accuracy among the rest
clf = model_RN
clf.fit(X, y)
if class_type == "sleep":
    pickle.dump(clf, open(BINARY_CLASSIFIER_PATH, 'wb'))
if class_type == "staged":
    pickle.dump(clf, open(MULTICLASS_CLASSIFIER_PATH, 'wb'))

In [None]:
# load pickled classifier
if class_type == "sleep":
    clf = pickle.load(open(BINARY_CLASSIFIER_PATH, 'rb'))
if class_type == "staged":
    clf = pickle.load(open(MULTICLASS_CLASSIFIER_PATH, 'rb'))