In [None]:
import utils

In [None]:
import numpy as np
import importlib
import matplotlib.pyplot as plt
from scipy import stats as st
import random
import pandas as pd

In [None]:
def loadFeatures(dataFolder,winSz,timeStep,idList):
  for k,id in enumerate(idList):
    # Loading the raw data
    xt, xv, yt, yv = utils.loadTrial(dataFolder,id=id)

    # Extracting the time window for which we have values for the measurements and the response
    timeStart = np.max((np.min(xt),np.min(yt)))
    timeEnd = np.min((np.max(xt),np.max(yt)))

    # Extracting the features
    _, feat = utils.extractFeat(xt,xv,winSz,timeStart,timeEnd,timeStep)
    _, lab = utils.extractLabel(yt,yv,winSz,timeStart,timeEnd,timeStep)

    # Storing the features
    if(k==0):
      featList = feat
      labList = lab
    else:
      featList = np.concatenate((featList,feat),axis=0)
      labList = np.concatenate((labList,lab),axis=0)

  return featList, labList

In [None]:
dirTrain = "data/train/"

timeStep = 1
winSz = 2

valIDs = []
while len(valIDs) < 3:
  num = random.randint(1, 26)
  if num != 7 and num not in valIDs:
    valIDs.append(num)

trainIDs = list(set(np.array(range(25))+1).difference(valIDs))

xTrain, yTrain = loadFeatures(dirTrain,winSz,timeStep,trainIDs)
xVal , yVal = loadFeatures(dirTrain, winSz, timeStep, valIDs)

In [None]:
import torch
import torch.nn as nn
import numpy as np

class NetWrapperCNN:
    def __init__(self, model, device, epochs, weights):
        self.device = device
        self.model = model.to(device)  # Move model to the specified device
        weight_tensor = torch.tensor(weights, dtype=torch.float).to(device)  # Ensure weights are on the GPU
        self.loss_fn = nn.CrossEntropyLoss(weight=weight_tensor)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)
        self.epochs = epochs

    def fit(self, X, y):
        if X.ndim == 2:
            X = np.expand_dims(X, axis=-1)  # Add a channel dimension if it's not present

        X = torch.from_numpy(X).float().permute(0, 2, 1).to(self.device)  # Move to GPU after conversion
        y = torch.from_numpy(y).long().to(self.device)  # Ensure labels are also on GPU

        for t in range(self.epochs):
            self.optimizer.zero_grad()
            pred = self.model(X)
            loss = self.loss_fn(pred, y)

            if torch.isnan(loss):
                print("Loss is NaN")
                break

            loss.backward()
            self.optimizer.step()

            if t % 500 == 499:
                print(f"Epoch {t+1}, Loss: {loss.item()}")

    def predict(self, X):
        if X.ndim == 2:
            X = np.expand_dims(X, axis=-1)

        X = torch.from_numpy(X).float().permute(0, 2, 1).to(self.device)  # Adjust and move data

        self.model.eval()
        with torch.no_grad():
            pred = self.model(X)
            pred = pred.cpu().detach().numpy()
            pred = np.argmax(pred, axis=1)

        return pred



INPUTSIZE = 12
OUTPUTSIZE = 4

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

EPOCHS = 7500

WEIGHTS = [0.20306067974248987, 0.9514893558595908, 0.6781497198149791, 0.8505315580874674]


In [None]:
print(DEVICE)

In [None]:
class CNN(nn.Module):
  def __init__(self, input_size, output_size):
    super(CNN, self).__init__()
    self.conv1 = nn.Conv1d(input_size, 128, 1)
    self.conv2 = nn.Conv1d(128, 256, 1)
    self.fc2 = nn.Linear(256, 64)
    self.fc3 = nn.Linear(64, output_size)
    self.relu = nn.ReLU()
    # self.dropout = nn.Dropout(0.5)
    self.maxpool = nn.MaxPool1d(2)

  def forward(self, x):
    x = self.relu(self.conv1(x))
    x = self.maxpool(x)
    x = self.relu(self.conv2(x))
    # x = self.maxpool(x)
    # x = self.relu(self.conv3(x))
    x = torch.mean(x, dim=2)  # Global average pooling
    x = self.relu(self.fc2(x))
    # x = self.relu(self.fc2(x))
    x = self.fc3(x)
    return x

In [None]:
cnn_model = CNN(input_size=1, output_size=OUTPUTSIZE)
net_cnn = NetWrapperCNN(model=cnn_model, device=DEVICE, epochs=EPOCHS, weights=WEIGHTS)

In [None]:
net_cnn.fit(xTrain, yTrain)

In [None]:
yTrainHat = net_cnn.predict(xTrain)
yValHat = net_cnn.predict(xVal)

print('Results for Validation:\n')
utils.summaryPerf(yVal,yValHat,yTrain,yTrainHat)

In [None]:
def getUpdatedPreds(xt, yt, yValHat):
  yPreds = []
  for i in yt:
    win = np.where((xt>=i-0.1)*(xt<=i+0.1))
    # print(win)

    wnd = []
    for j in win[0]:
      if j < len(yValHat):
        wnd.append(j)
    if len(wnd) == 0:
      yPreds.append(st.mode(yWin).mode)
      # yPreds.append(np.max(yWin))
    else:
      yWin = yValHat[wnd]
      yPreds.append(st.mode(yWin).mode)
      # yPreds.append(np.max(yWin))

  return np.array(yPreds)

In [None]:
from sklearn.metrics import balanced_accuracy_score

dirTest = "data/test/"
for id in [1,2,3,4]:
  xt, xv, yt, yv = utils.loadTrial(dirTrain,id)
  timeStart = np.max((np.min(xt),np.min(yt)))
  timeEnd = np.min((np.max(xt),np.max(yt)))
  _, xVal = utils.extractFeat(xt,xv,winSz,timeStart,timeEnd,0.025)
  yValHat = net_cnn.predict(xVal)
  yPreds = getUpdatedPreds(xt, yt, yValHat)
  print("Unique Values:", np.unique(yPreds, return_counts=True), np.unique(yv, return_counts=True))
  print("Balanced Accuracy Score for trial", id, ":", balanced_accuracy_score(yv, yPreds))