In [1]:
###-----------------
### Import Libraries
###-----------------

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import (accuracy_score, classification_report)

import torch
import torch.nn as nn
from torch.utils.data import DataLoader , Dataset

#from utils.helper import fn_plot_confusion_matrix

In [2]:
def fn_plot_tf_hist(hist_df : pd.DataFrame):
    '''
    Note this function is specifically designed to plot Tensorflow training output
    Args:
      hist_df : pandas DataFrame with four columns
                For 'x' values, we will use index
                first column is accuracy
                Second column is loss
                third column is val_accuracy
                fourth column is val_loss
    '''
    fig, axes = plt.subplots(1,2 , figsize = (15,6)) # instantiate plot

    # properties  matplotlib.patch.Patch 
    props = dict(boxstyle='round', facecolor='aqua', alpha=0.4)
    facecolor = 'cyan'
    fontsize=12
    
    # Get columns by index to eliminate any column naming error
    y1 = hist_df.columns[0]
    y2 = hist_df.columns[1]
    y3 = hist_df.columns[2]
    y4 = hist_df.columns[3]

    # Where was min loss
    best = hist_df[hist_df[y4] == hist_df[y4].min()]
 
    ax = axes[0]

    hist_df.plot(y = [y2,y4], ax = ax, colormap=CMAP)


    # little beautification
    txtFmt = "Loss: \n  train: {:6.4f}\n   test: {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y2],
                           hist_df.iloc[-1][y4]) #text to plot
    
    # place a text box in upper middle in axes coords
    ax.text(0.3, 0.95, txtstr, transform=ax.transAxes, fontsize=fontsize,
            verticalalignment='top', bbox=props)

    # Mark arrow at lowest
    ax.annotate(f'Min: {best[y4].to_numpy()[0]:6.4f}', # text to print
                xy=(best.index.to_numpy(), best[y4].to_numpy()[0]), # Arrow start
                xytext=(best.index.to_numpy()-1, best[y4].to_numpy()[0]), # location of text 
                fontsize=fontsize, va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor=facecolor, shrink=0.05)) # arrow

    # Draw vertical line at best value
    ax.axvline(x = best.index.to_numpy(), color = 'green', linestyle='-.', lw = 3);

    ax.set_xlabel("Epochs")
    ax.set_ylabel(y2.capitalize())
    ax.set_title('Errors')
    ax.legend(loc = 'upper left') # model legend to upper left

    ax = axes[1]

    hist_df.plot( y = [y1, y3], ax = ax, colormap=CMAP)
    
    # little beautification
    txtFmt = "Accuracy: \n  train: {:6.4f}\n  test:  {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y1],
                           hist_df.iloc[-1][y3]) #text to plot

    # place a text box in upper middle in axes coords
    ax.text(0.3, 0.2, txtstr, transform=ax.transAxes, fontsize=fontsize,
            verticalalignment='top', bbox=props)

    # Mark arrow at lowest
    ax.annotate(f'Best: {best[y3].to_numpy()[0]:6.4f}', # text to print
                xy=(best.index.to_numpy(), best[y3].to_numpy()[0]), # Arrow start
                xytext=(best.index.to_numpy()-1, best[y3].to_numpy()[0]), # location of text 
                fontsize=fontsize, va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor=facecolor, shrink=0.05)) # arrow
    
    
    # Draw vertical line at best value
    ax.axvline(x = best.index.to_numpy(), color = 'green', linestyle='-.', lw = 3);

    ax.set_xlabel("Epochs")
    ax.set_ylabel(y1.capitalize())
    ax.legend(loc = 'lower left')
    
    plt.tight_layout()


def fn_plot_confusion_matrix(y_true, y_pred, labels):
    '''
    Args:
        y_true: Ground Truth 
        y_pred : Predictions
        labels : dictionary 
                  {0: 'Goal Keeper', 
                  1: 'Defender', 
                  2: 'Mid-Fielder', 
                  3: 'Forward'}
    
    '''
    
    cm  = confusion_matrix(y_true, y_pred)
    
    disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                                  display_labels=labels.values())
    
    fig, ax = plt.subplots(figsize = (4,4))
    
    disp.plot(ax = ax, cmap = 'Blues', xticks_rotation = 'vertical', colorbar=False)
    # Disable the grid
    ax.grid(False)

    plt.show()


In [3]:
# Some basic parameters
inpDir = '../../../input' # location where input data is stored
outDir = '../output' # location to store outputs
subDir = 'fashion_MNIST' # location of the images
modelDir = '../models'
altName = 'all_in'

RANDOM_STATE = 24 # for initialization ----- REMEMBER: to remove at the time of promotion to production
BATCH_SIZE = 32 # batch size for training   
TRAIN_SIZE = BATCH_SIZE * 9

EPOCHS = 100 # number of cycles to run
ALPHA = 0.001 # learning rate
WEIGHT_DECAY =0.001
LR_FACTOR = 0.1
LR_PATIENCE = 10
PATIENCE = 20

# Set parameters for decoration of plots
params = {'legend.fontsize' : 'large',
          'figure.figsize'  : (9,9),
          'axes.labelsize'  : 'x-large',
          'axes.titlesize'  :'x-large',
          'xtick.labelsize' :'large',
          'ytick.labelsize' :'large',
         }

plt.rcParams.update(params) # update rcParams
CMAP = plt.cm.coolwarm
plt.style.use('seaborn-v0_8-darkgrid') # plt.style.use('ggplot')

In [4]:
data_df = pd.read_csv('ionosphere.data', header=None)
data_df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,25,26,27,28,29,30,31,32,33,34
0,1,0,0.99539,-0.05889,0.85243,0.02306,0.83398,-0.37708,1.0,0.0376,...,-0.51171,0.41078,-0.46168,0.21266,-0.3409,0.42267,-0.54487,0.18641,-0.453,g
1,1,0,1.0,-0.18829,0.93035,-0.36156,-0.10868,-0.93597,1.0,-0.04549,...,-0.26569,-0.20468,-0.18401,-0.1904,-0.11593,-0.16626,-0.06288,-0.13738,-0.02447,b
2,1,0,1.0,-0.03365,1.0,0.00485,1.0,-0.12062,0.88965,0.01198,...,-0.4022,0.58984,-0.22145,0.431,-0.17365,0.60436,-0.2418,0.56045,-0.38238,g
3,1,0,1.0,-0.45161,1.0,1.0,0.71216,-1.0,0.0,0.0,...,0.90695,0.51613,1.0,1.0,-0.20099,0.25682,1.0,-0.32382,1.0,b
4,1,0,1.0,-0.02401,0.9414,0.06531,0.92106,-0.23255,0.77152,-0.16399,...,-0.65158,0.1329,-0.53206,0.02431,-0.62197,-0.05707,-0.59573,-0.04608,-0.65697,g


In [5]:
train_df, test_df = train_test_split(data_df,
                                     train_size=TRAIN_SIZE,
                                     stratify= data_df[data_df.columns[-1]],
                                     random_state=RANDOM_STATE)

train_df.shape , test_df.shape

((288, 35), (63, 35))

In [6]:
device = 'code'  if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')

Using cpu device


In [7]:
""" Using Sinleton class Pattenn """

class Transformers:
    _instance = None

    def __init__ (self):
        if Transformers._instance is not None:
            raise Exception ("Global Scaler claass is singleton")

        self.scaler = StandardScaler()
        self.encoder = LabelEncoder()

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls._instance = Transformers()
        return cls. _instance

In [8]:
class  IonoDS(Dataset):
    transformers = Transformers.get_instance()

    def __init__(self,
                dataframe: pd.DataFrame,
                device: str = device,
                is_train= True,
                label_col =None):
        super(IonoDS, self).__init__()

        self.df = dataframe
        self.device = device
        self.is_train = is_train
        self.scaler = self.transformers.scaler
        self.encoder = self.transformers.encoder
        self.label_col = label_col

        y = self.df[label_col].to_numpy()
        x = self.df.drop(label_col, axis = 1)

        if self.is_train:
            self.labels = self.encoder.fit_transform(y)
            self.features = self.scaler.fit_transform(x)
        else:
            self.labels = self.encoder.transform(y)
            self.features = self.scaler.transform(x)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        #Extract features and labels from the dataframe row
        features = self.features[index]
        label = self.labels[index]

        features = torch.tensor(features, dtype = torch.float32, device = self.device)
        label = torch.tensor(label, dtype= torch.int64, device = self.device)

        return features, label

In [9]:
class Model(nn.Module):
    def __init__(self, input_dim):
        super(Model, self).__init__()

        dor1 = 0.05
        dor2 = 0.15
        dor3 = 0.25

        self.layer1 = nn.Linear(input_dim,26)
        self.bm1 = nn.BatchNorm1d(26)
        self.do1 = nn.Dropout(dor1)
        self.act1 = nn.ReLU()

        self.layer2 = nn.Linear(26,18)
        self.bm2 = nn.BatchNorm1d(18)
        self.do2 = nn.Dropout(dor2)
        self.act2 = nn.ReLU()

        self.layer3 = nn.Linear(18, 10)
        self.bm3 = nn.BatchNorm1d(10)
        self.do3 = nn.Dropout(dor3)
        self.act3 = nn.ReLU()

        self.layer4 = nn.Linear(10,2)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):

        x = self.do1(self.act1 (self.bm1 (self.layer1(x))))
        x = self.do2(self.act2 (self.bm2 (self.layer2(x))))
        x = self.do3(self.act3 (self.bm3 (self.layer3(x))))

        output = self.softmax(self.layer4(x))

        return output

input_dim = 34
model = Model(input_dim).to(device)
    

In [10]:
label_col = 34
train_ds = IonoDS(train_df, is_train = True,  label_col = label_col)
test_ds = IonoDS(test_df, is_train = False,  label_col = label_col)

In [11]:
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=True)

In [12]:
# Define the loss function and optimizer
loss_fn = nn.CrossEntropyLoss()

# Initialize lists to track losses, accuracies, and epochs
loss, tloss = [], []
acc, tacc = [], []
n_epoch = []

# Define the optimizer and learning rate scheduler
optimizer = torch.optim.Adam(model.parameters(),
                             lr=ALPHA,
                             weight_decay=0.1e-5)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,
                                                       mode='min',
                                                       factor=LR_FACTOR,
                                                       patience=LR_PATIENCE,
                                                       min_lr=1e-5)

minLoss = float('inf')

# Define the path to save the model
savePath = os.path.join(modelDir, subDir, "iono.path")

# Training loop
for epoch in range(EPOCHS):
    train_loss = 0.0
    train_acc = 0.0

    # Training phase
    for i, data in enumerate(train_loader):
        inputs, labels = data

        optimizer.zero_grad()
        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1)
        batch_loss = loss_fn(outputs, labels)
        batch_acc = accuracy_score(labels.cpu().numpy(), preds.cpu().numpy())
        batch_loss.backward()
        optimizer.step()

        # Update training loss and accuracy
        train_loss += batch_loss.item() * inputs.size(0)
        train_acc += batch_acc * inputs.size(0)

    # Average training loss and accuracy
    train_loss /= len(train_ds)
    train_acc /= len(train_ds)

    # Store training loss and accuracy
    loss.append(train_loss)
    acc.append(train_acc)

    # Test phase
    test_loss = 0.0
    test_acc = 0.0

    with torch.no_grad():
        model.eval()
        for data in test_loader:
            inputs, labels = data
            outputs = model(inputs)
            preds = torch.argmax(outputs, dim=1)
            batch_loss = loss_fn(outputs, labels)
            batch_acc = accuracy_score(labels.cpu().numpy(), preds.cpu().numpy())
            test_loss += batch_loss.item() * inputs.size(0)
            test_acc += batch_acc * inputs.size(0)

        # Average test loss and accuracy
        test_loss /= len(test_ds)
        test_acc /= len(test_ds)

        # Store test loss and accuracy
        tloss.append(test_loss)
        tacc.append(test_acc)

    # Store the current epoch
    n_epoch.append(epoch)

    # Step the learning rate scheduler based on test loss
    scheduler.step(test_loss)

    # If the current test loss is lower, save the model
    if test_loss < minLoss:
        minLoss = test_loss
        counter = 0
        # Save the model checkpoint
        torch.save({
            "epoch": epoch + 1,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "loss": loss_fn
        }, savePath)
    else:
        counter += 1
        # Stop training if the loss hasn't improved for `PATIENCE` epochs
        if counter > PATIENCE:
            break

    # Print status every 10 epochs
    if epoch % 10 == 0:
        print(f'At epoch {epoch:3d} | Loss: {train_loss:4f}/{test_loss:4f} | Acc: {train_acc:4f}/{test_acc:4f}')


At epoch   0 | Loss: 0.837359/0.765338 | Acc: 0.347222/0.365079
At epoch  10 | Loss: 0.330214/0.425065 | Acc: 0.885417/0.809524
At epoch  20 | Loss: 0.090054/0.258848 | Acc: 0.968750/0.873016
At epoch  30 | Loss: 0.037765/0.216319 | Acc: 0.989583/0.888889
At epoch  40 | Loss: 0.019894/0.191054 | Acc: 0.996528/0.920635
At epoch  50 | Loss: 0.010879/0.186821 | Acc: 0.996528/0.936508
At epoch  60 | Loss: 0.005417/0.194012 | Acc: 1.000000/0.936508
At epoch  70 | Loss: 0.003596/0.204367 | Acc: 1.000000/0.936508
