In [None]:
from collections import Counter
from typing import List, Tuple, Optional
import os

import numpy as np
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score

# Looking into the dataset

In [None]:
data_filename = "../data/sonar.all-data"

In [None]:
df = pd.read_csv(data_filename, header=None)
df.head()

In [None]:
df.info()

In [None]:
df.iloc[:, :-1].describe()

In [None]:
df.iloc[:,-1].value_counts()

In [None]:
for label in ['R', 'M']:
    cur_label_only = df[df.iloc[:, -1] == label]
    row_index = 0
    plt.plot(cur_label_only.iloc[row_index, :-1])
    plt.title(f'example of {label} frequencies')
    plt.show()

# Dataset preparation for the model

In [None]:
test_size = 0.2
random_state = 42

X = df.iloc[:, :-1].values
y = df.iloc[:, -1].values

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=test_size, random_state=random_state, stratify=y)

In [None]:
class SonarDataset(torch.utils.data.Dataset):
    i2label = ['R', 'M']
    label2i = {label: i for i, label in enumerate(i2label)}

    def __init__(self, X, labels):
        y_list = [self.label2i[label] for label in labels]
        self.y = torch.tensor(y_list, dtype = torch.float32).view(-1, 1) 
        self.X = torch.tensor(X.values, dtype = torch.float32)
        
    def __len__(self):
        return len(self.y)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
    def get_classes_distribution(self):
        return Counter(map(lambda i: self.i2label[int(i)], self.y.numpy().flatten()))

In [None]:
def get_train_test_datasets(path_to_csv: str, test_size: float = 0.2, random_state: int = 42):
    df = pd.read_csv(path_to_csv, index_col = False, header = None)
    X = df.iloc[:, :-1]
    y = df.iloc[:, -1]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size = test_size, random_state=random_state, stratify=y)
    train_dataset = SonarDataset(X_train, y_train)
    test_dataset = SonarDataset(X_test, y_test)
    return train_dataset, test_dataset

In [None]:
train_dataset, test_dataset = get_train_test_datasets(data_filename)
datasets = {'train': train_dataset, 'test': test_dataset}

In [None]:
for phase, dataset in datasets.items():
    print(f"{phase} classes distribution: {dataset.get_classes_distribution()}")

In [None]:
dataloaders = {
    # phase: torch.utils.data.DataLoader(dataset, batch_size=len(dataset), shuffle=True)
    phase: torch.utils.data.DataLoader(dataset, batch_size=16, shuffle=True)
    for phase, dataset in datasets.items()
}

# Train script

In [None]:
def train_binary_classifier(model, criterion, optimizer,
                       num_epochs=3, phases = ['train', 'test'],
                       history = None, device = None, threshold = 0.5):
    """
    At the moment history is supposed to be a dict with keys 'train' and 'test'
    and values being dicts with keys 'loss', 'accuracy', f_score and values being lists of floats.
    """

    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    with tqdm (range(num_epochs)) as pbar:
        for epoch in pbar:
            for phase in phases:
                if phase == 'train':
                    model.train()
                else:
                    model.eval()

                running_loss = 0.0
                running_corrects = 0.0
                all_true_labels = []
                all_preds = []

                with torch.set_grad_enabled(phase == 'train'):
                    for inputs, labels in dataloaders[phase]:
                        inputs = inputs.to(device)
                        labels = labels.to(device)
                        
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                        if phase == 'train':
                            optimizer.zero_grad()
                            loss.backward()
                            optimizer.step()

                        threshold = 0.5
                        preds = outputs > threshold
                        
                        running_loss += loss.item()
                        running_corrects += torch.sum(preds == labels.data)

                        all_true_labels.extend(labels.tolist())
                        all_preds.extend(preds.tolist())

                epoch_loss = running_loss / len(datasets[phase])
                epoch_acc = running_corrects.item() / len(datasets[phase])
                epoch_f_score = f1_score(all_true_labels, all_preds, average = 'macro')
                
                if history is not None:
                    history[phase]["f_score"].append(epoch_f_score)
                    history[phase]["loss"].append(epoch_loss)
                    history[phase]["accuracy"].append(epoch_acc)

                # print(f"{phase} loss: {epoch_loss:.4f}, f_score: {epoch_f_score:.4f}")
                pbar.set_description(f"{phase} loss: {epoch_loss:.4f}, f_score: {epoch_f_score:.4f}, accuracy: {epoch_acc}")

# Models

In [None]:
# pytorch model that applies precomputed mean and variance to the data
class NormalizedModel(torch.nn.Module):
    def __init__(self, model, mean, var):
        super().__init__()
        self.model = model
        self.mean = mean
        self.var = var

    def forward(self, x):
        x = (x - self.mean) / self.var
        return self.model(x)

In [None]:
model = torch.nn.Sequential(
    torch.nn.Linear(60, 40),
    torch.nn.ReLU(),
    torch.nn.Linear(40, 1),
    torch.nn.Sigmoid()
)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.BCELoss()

history = {'train': {'loss': [], 'accuracy': [], 'f_score': []}, 'test': {'loss': [], 'accuracy': [], 'f_score': []}}

model

In [None]:
X_train_mean = torch.tensor(X_train.mean(), dtype=torch.float32).view(1, -1)
X_train_var = torch.tensor(X_train.var(), dtype=torch.float32).view(1, -1)

model_normalized = NormalizedModel(
    torch.nn.Sequential(
        torch.nn.Linear(60, 40),
        torch.nn.ReLU(),
        torch.nn.Linear(40, 1),
        torch.nn.Sigmoid()
    ),
    X_train_mean,
    X_train_var)

optimizer_normalized = torch.optim.Adam(model_normalized.parameters(), lr=0.001)
criterion_normalized = torch.nn.BCELoss()

history_normalized = {'train': {'loss': [], 'accuracy': [], 'f_score': []}, 'test': {'loss': [], 'accuracy': [], 'f_score': []}}

model_normalized

In [None]:
model_dropout = torch.nn.Sequential(
    torch.nn.Linear(60, 40),
    torch.nn.ReLU(),
    torch.nn.Dropout(0.5),
    torch.nn.Linear(40, 1),
    torch.nn.Sigmoid()
)

optimizer_dropout = torch.optim.Adam(model_dropout.parameters(), lr=0.001)
criterion_dropout = torch.nn.BCELoss()

history_dropout = {'train': {'loss': [], 'accuracy': [], 'f_score': []}, 'test': {'loss': [], 'accuracy': [], 'f_score': []}}

model_dropout

# Training

In [None]:
train_binary_classifier(model_normalized, criterion_normalized, optimizer_normalized, num_epochs=60, history = history_normalized)

In [None]:
train_binary_classifier(model, criterion, optimizer, num_epochs=60, history = history)

In [None]:
train_binary_classifier(model_dropout, criterion_dropout, optimizer_dropout, num_epochs=60, history = history_dropout)

# Results

In [None]:
from typing import List, Tuple, Dict
import matplotlib.pyplot as plt


def get_phases_and_metric_names_from_hirtory_list(history_list: List[Dict[str, List[float]]]) -> Tuple[List[str], List[str]]:
    history1 = history_list[0]
    phases = list(history1.keys())
    metric_names = list(history1[phases[0]].keys())
    return phases, metric_names


def get_epoches_num_from_history(history: Dict[str, List[float]], phases: List[str], metric_names: List[str]) -> int:
    return len(history[phases[0]][metric_names[0]])


def plot_history(history_list: List[Dict[str, List[float]]],
                 history_names: List[str] = None,
                 omit_first_epoch: bool = False,
                 force_legend: bool = False,
                 img_name: str = None) -> None:
    """
    Plots histories on same plot

    history_list is a list of histories. A history is a dict with phase
    names as keys and a an inner dict as values. The inner dict has metric
    names as keys and list of metric values as values.
    Here is an example of a history:
        {
            'train': {
                'accuracy': [0.1, 0.2, 0.3, ...],
                'loss': [0.1, 0.2, 0.3, ...],
                'f1_score': [0.1, 0.2, 0.3, ...],
            },
            'test': {
                'accuracy': [0.1, 0.2, 0.3, ...],
                'loss': [0.1, 0.2, 0.3, ...],
                'f1_score': [0.1, 0.2, 0.3, ...],
            }
        }
    
    The resulting plot is a grid of suplots with (phases number) rows
    and (metric names number) columns. Each subplot contains metric values for all histories.

    The legend is present if len(history_list) > 1 or force_legend is True. If history_names is not None,
    it is used as legend labels. Otherwise, history_list indexes are used as legend labels.

    Args:
        history_list (List[dict[str, List[float]]]): list of histories.
        history_names (List[str], optional):
            list of history names. Defaults to None.
        omit_first_epoch (bool, optional):
            if True, first epoch will be omitted. Defaults to False.
        force_legend (bool, optional): if True, legend will be
            present even if len(history_list) == 1. Defaults to False.
        img_name (str, optional): if not None, the plot will be
            saved to img_name. Defaults to None.
    """

    assert len(history_list) > 0, "history_list is empty"

    phases, metric_names = get_phases_and_metric_names_from_hirtory_list(history_list)

    if history_names is None:
        history_list_indexes = range(len(history_list))
        history_names = history_list_indexes
    else:    
        assert len(history_list) == len(history_names), "len(history_list) != len(history_names)"

    max_epochs_num = max([get_epoches_num_from_history(history, phases, metric_names) for history in history_list])
    
    fig, axs = plt.subplots(len(phases), len(metric_names))

    fig.set_figheight(10)
    fig.set_figwidth(20)

    start = 1 if omit_first_epoch else 0

    for phase_index, phase in enumerate(phases):
        for metric_index, metric_name in enumerate(metric_names):
            ax = axs[phase_index][metric_index]
            ax.set_title(f"{phase} {metric_name}")
            ax.set_xticks(range(max_epochs_num - start))
            ax.set_xticklabels(range(start + 1, max_epochs_num + 1))

            for history, name in zip(history_list, history_names):
                ax.plot(history[phase][metric_name][start:], label = name)
            
            if force_legend or len(history_list) > 1:
                ax.legend()
    
    if img_name is not None:
        plt.savefig(img_name)
    plt.show()

In [None]:
plot_history([history, history_normalized, history_dropout], ['model', 'with normalized data', 'model_dropout'])

In [None]:
index = 1
threshold = 0.5
model(datasets['test'][index][0]).item(), datasets['train'][index][1].item()