In [5]:
import numpy as np
import scipy.io
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torch.nn import Module
from torch.nn import Conv2d
from torch.nn import Linear
from torch.nn import MaxPool2d
from torch.nn import ReLU
from torch.nn import LogSoftmax
from torch.nn import Flatten
from torch.nn import Flatten
from torch.nn import Dropout
from torch.nn import Sigmoid

In [6]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [7]:
class DataLoader(Dataset):
    def __init__(self, data_mat, labels):
        self.data_mat = data_mat
        self.labels = labels

    def __len__(self):
        return self.data_mat.shape[2]

    def __getitem__(self, idx):
        mat = np.array(self.data_mat[:,:,idx], dtype=np.float32)
        mat_pad = np.zeros((50,50), dtype=np.float32)
        mat_pad[1:49, 1:49] = mat
        adj_mat = np.expand_dims(mat_pad, 0)
        label = [self.labels[idx], np.abs(1-self.labels[idx])]
        return torch.from_numpy(adj_mat), torch.from_numpy(np.array(label, dtype=np.float32))

In [8]:
class Model(nn.Module):
    def __init__(self, numChannels=1, classes=2):
        super().__init__()
        
        self.layer1 = nn.Sequential(
                            Conv2d(in_channels=numChannels, out_channels=32, kernel_size=(3,3)),
                            ReLU(),
                            MaxPool2d(kernel_size=(2, 2), stride=(2, 2)))
        
        self.layer2 = nn.Sequential(
                            Conv2d(in_channels=32, out_channels=64,kernel_size=(3,3)),
                            ReLU(),
                            MaxPool2d(kernel_size=(2, 2), stride=(2, 2)))

        self.flatten1 = Flatten(0,2)

        self.fc1 = nn.Sequential(
                            Linear(in_features=7744, out_features=128),
                            ReLU())

        self.dropout = Dropout(p=0.2)

        self.fc2 = nn.Sequential(
                            Linear(in_features=128, out_features=classes),
                            Sigmoid())

    def forward(self, x):
      #print("x: ", x.shape)
      x1 = self.layer1(x)
      #print("x1:", x1.shape)
      x2 = self.layer2(x1)
      #print("x2:", x2.shape)
      f1 = self.flatten1(x2)
      #print("f1:", f1.shape)
      f2 = self.fc1(f1)
      #print("f2:", f2.shape)
      f3 = self.dropout(f2)
      #print("f3:", f3.shape)
      out = self.fc2(f3)
      #print("out:", out.shape)
      return out

In [9]:
def train(dataloader, model, loss_fn, optimizer):
    size = dataloader.__len__
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        # Compute prediction error
        pred = model(X)
        #print("pred: ", pred, "y: ", y)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        loss = loss.item()
        #print(f"loss: {loss:>7f}")

In [17]:
def test(dataloader, model, loss_fn):
    size = len(dataloader)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    tp = 0
    fp = 0
    tn = 0
    fn = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            out = pred
            if(y[0] == 1):
              if(out[0] >= out[1]):
                tp += 1
              elif(out[0] < out[1]):
                #print(out)
                fn += 1
            elif(y[0] == 0):
              if(out[0] >= out[1]):
                fp += 1
              elif(out[0] < out[1]):
                tn += 1
    #print(test_loss, num_batches)
    test_loss /= num_batches
    correct /= size
    #print(f"Test Error: \n Metrics: {[tp, fp, tn, fn]}, Avg loss: {test_loss:>8f} \n")
    return [tp, fp, tn, fn], test_loss

In [12]:
mat = scipy.io.loadmat('/content/theta_A_W_uni10_100ms.mat')["W"]
labels = []

for i in range(250):
  labels.append(0)

for i in range(956):
  labels.append(1)

for i in range(250):
  labels.append(0)

labels = np.array(labels)

a1 = np.arange(0,250)
a2 = np.arange(250,250+956)
a3 = np.arange(250+956,500+956)

t1 = list(np.random.choice(a1, int(len(a1)*0.1), replace=False))
tr1 = list(set(a1)-set(t1))
t2 = list(np.random.choice(a2, int(len(a2)*0.1), replace=False))
tr2 = list(set(a2)-set(t2))
t3 = list(np.random.choice(a3, int(len(a3)*0.1), replace=False))
tr3 = list(set(a3)-set(t3))

t = t1 + t2 + t3
tr = tr1 + tr2 + tr3
test_data = mat[:,:,t]
train_data = mat[:,:,tr]
test_labels = labels[t]
train_labels = labels[tr]

print(len(t), len(tr))

145 1311


In [13]:
model = Model().to(device)
print(model)

Model(
  (layer1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (flatten1): Flatten(start_dim=0, end_dim=2)
  (fc1): Sequential(
    (0): Linear(in_features=7744, out_features=128, bias=True)
    (1): ReLU()
  )
  (dropout): Dropout(p=0.2, inplace=False)
  (fc2): Sequential(
    (0): Linear(in_features=128, out_features=2, bias=True)
    (1): Sigmoid()
  )
)


In [19]:
train_dataloader = DataLoader(train_data, train_labels)
test_dataloader = DataLoader(test_data, test_labels)
model = Model().to(device)
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [20], gamma=0.001)

epochs = 50
max_c = -1
max_c_epoch = -1
min_tl = 9999
min_tl_epoch = 9999

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    #test(train_dataloader, model, loss_fn)
    c, tl = test(test_dataloader, model, loss_fn)
    acc = (c[0] + c[2])/np.sum(c)
    pre = c[0]/(c[0]+c[1])
    sen = c[0]/(c[0]+c[3])
    spe = c[2]/(c[1]+c[2])
    f_score = 2*(pre*sen)/(pre+sen)
    print("acc: ", acc, "pre: ", pre, "rec: ", sen, "spe: ", spe, "f_score: ", f_score, "c: ", c)
print("Done!")

Epoch 1
-------------------------------
acc:  0.6551724137931034 pre:  0.6551724137931034 rec:  1.0 spe:  0.0 f_score:  0.7916666666666666 c:  [95, 50, 0, 0]
Epoch 2
-------------------------------
acc:  0.6551724137931034 pre:  0.6551724137931034 rec:  1.0 spe:  0.0 f_score:  0.7916666666666666 c:  [95, 50, 0, 0]
Epoch 3
-------------------------------
acc:  0.6551724137931034 pre:  0.6551724137931034 rec:  1.0 spe:  0.0 f_score:  0.7916666666666666 c:  [95, 50, 0, 0]
Epoch 4
-------------------------------
acc:  0.6551724137931034 pre:  0.6551724137931034 rec:  1.0 spe:  0.0 f_score:  0.7916666666666666 c:  [95, 50, 0, 0]
Epoch 5
-------------------------------
acc:  0.6551724137931034 pre:  0.6551724137931034 rec:  1.0 spe:  0.0 f_score:  0.7916666666666666 c:  [95, 50, 0, 0]
Epoch 6
-------------------------------
acc:  0.6551724137931034 pre:  0.6551724137931034 rec:  1.0 spe:  0.0 f_score:  0.7916666666666666 c:  [95, 50, 0, 0]
Epoch 7
-------------------------------
acc:  0.6551