In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models
from sklearn.metrics import balanced_accuracy_score as BACC
from sklearn.model_selection import train_test_split
import eval_scores as eval
import cv2 as cv
import random

ModuleNotFoundError: No module named 'torch'

In [None]:
def augment_data(xtrain, ytrain):
    print(xtrain.shape, ytrain.shape)
    #(6470, 50, 50) (6470,)
    #(50, 50) ()
    xtrain_len = len(xtrain)
    
    aug_xtrain = np.zeros((xtrain_len*2, 50, 50))
    aug_ytrain = np.zeros((xtrain_len*2))
    
    aug_xtrain[0:xtrain_len, :, :] = xtrain
    aug_ytrain[0:xtrain_len] = ytrain
    
    for idx in range(xtrain_len):
        image = xtrain[idx,:,:]
        label = ytrain[idx]
                
        angle = int(random.uniform(-90, 90))
        h, w = image.shape[0], image.shape[1]
        M = cv.getRotationMatrix2D((int(w/2), int(h/2)), angle, 1)
        rotated = cv.warpAffine(image, M, (w, h))
        
        flipped = cv.flip(rotated, 1)


        aug_xtrain[xtrain_len+idx] = flipped
        aug_ytrain[xtrain_len+idx] = label
        
        
    return aug_xtrain, aug_ytrain

In [None]:
xtrain = np.load("Xtrain_Classification_Part1.npy")
ytrain = np.load("Ytrain_Classification_Part1.npy")
xtrain_len = len(xtrain)
ytrain_len = len(ytrain)

#Reshape Images
xtrain = xtrain.reshape((xtrain_len,50,50))
mean = xtrain.mean(axis=(0, 1, 2)) 
std = xtrain.std(axis=(0, 1, 2))

xtrain = (xtrain - mean)/std  

xtrain, ytrain = augment_data(xtrain, ytrain)

In [None]:
plt.imshow(xtrain[6479,:,:])
plt.show()

In [None]:
class ImageDataset(Dataset):
    def __init__(self):
        xtrain = np.load("Xtrain_Classification_Part1.npy")
        ytrain = np.load("Ytrain_Classification_Part1.npy")
        xtest = np.load("Xtest_Classification_Part1.npy")


        xtrain_len = len(xtrain)
        ytrain_len = len(ytrain)
        xtest_len = len(xtest)

        #Reshape Images
        xtrain = xtrain.reshape((xtrain_len,50,50))
        mean = xtrain.mean(axis=(0, 1, 2)) 
        std = xtrain.std(axis=(0, 1, 2))

        xtrain = (xtrain - mean)/std  
        
        xtrain, ytrain = augment_data(xtrain, ytrain)
        
        new_xtrain_len = len(xtrain)
        new_ytrain_len = len(ytrain)
        
        self.xtrain = xtrain.reshape((new_xtrain_len,1,50,50))
        self.xtest = xtest.reshape((xtest_len,1,50,50))

        self.ytrain = ytrain.reshape(new_ytrain_len)

    def __len__(self):
        return len(self.xtrain)

    def __getitem__(self, idx):
        #mage = self.xtrain[idx,:,:,:].reshape((50,50))
        #image = cv.resize(image, (25,25))
        #image = image.reshape(1,25,25)
        
        image = self.xtrain[idx, :, :, :]
        label = self.ytrain[idx]
        

        return image, label
    

In [None]:
class ClassifyNet(nn.Module):

    def __init__(self):
        super(ClassifyNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=10,
                              kernel_size=5, stride=1)
        self.conv2 = nn.Conv2d(in_channels=10, out_channels=20,
                              kernel_size=5, stride=1)
        self.dropout = nn.Dropout(0.2)
        self.fc1 = nn.Linear(in_features=1620, out_features=800)
        self.fc2 = nn.Linear(in_features=1620, out_features=1)
        
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2, 2)
        
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2, 2)
        
        #print(x.shape)
        
        x = x.view(x.shape[0], -1)
        
        x = self.dropout(x)
        
        #print(x.shape)
        
        #x = self.fc1(x)
        #x = F.relu(x)
        
        #x = self.dropout(x)
        
        x = self.fc2(x)
        x = torch.sigmoid(x)
                
        return x

In [None]:
config = {
    'batch_size' : 64,
    'lr' : 0.001,
    'momentum' : 0.9
}

In [None]:
def train_model(model):
    
    num_epochs = 200
    best_score = 1.0
    
    train_loss = []
    val_loss = []
    
    for epoch in range(num_epochs):
        print(epoch)
        
        model.train()
        running_loss = 0.0
        
        criterion = nn.BCELoss()
        
        for idx, data in enumerate(train_loader):
            image, label = data[0].float().to(device), data[1].float()
            
            
            optimizer.zero_grad()
            
            classify_output = model(image)
            
            
            y_true = torch.reshape(label.cpu(), (-1,))
            y_pred = torch.reshape(classify_output.cpu(), (-1,))
            
            loss = criterion(y_pred, y_true)
            
            train_loss.append(loss)

            #print(loss)

            loss.backward()
            optimizer.step()
            
            # print for mini batches
            running_loss += loss.item()
            if idx % 50 == 49:  # every 50 mini batches
                #print('[Epoch %d, %5d Mini Batches] loss: %.3f' %
                      #(epoch + 1, idx + 1, running_loss/50))
                running_loss = 0.0
               
        model.eval()
        test_score = 0.0
        
         
        with torch.no_grad():
            for idx, data in enumerate(test_loader):
                image, label = data[0].float().to(device), data[1].float()

                classify_output = model(image)

                y_true = torch.reshape(label.cpu(), (-1,))
                y_pred = torch.reshape(classify_output.cpu(), (-1,))

                test_loss = criterion(y_pred, y_true)
                
                val_loss.append(test_loss)

                test_score += test_loss

            test_score /= len(test_loader)

            #print(epoch)

            if test_score < best_score:
                torch.save(model, "ClassifyNet.pth")
                best_score = test_score
        
    return train_loss, val_loss

In [None]:
data_set = ImageDataset()

train_set, test_set = train_test_split(data_set, test_size=0.2, random_state=1, shuffle=True)

train_loader = DataLoader(train_set, batch_size=config['batch_size'], shuffle=True)
test_loader = DataLoader(test_set, batch_size=config['batch_size'], shuffle=True)

In [None]:
torch.cuda.empty_cache()

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device= torch.device('cpu')
model = ClassifyNet().to(device)

optimizer = optim.SGD(model.parameters(), lr=config['lr'], momentum=config['momentum'])

In [None]:
print(model)

In [None]:
train_loss, val_loss = train_model(model)

In [None]:
# Test model against training data

model = torch.load("ClassifyNet.pth")

test_score = 0.0

with torch.no_grad():
    for idx, data in enumerate(test_loader):
        image, label = data[0].float().to(device), data[1].float()

        classify_output = model(image)

        y_true = torch.reshape(label.cpu(), (-1,)) > 0.5
        y_pred = torch.reshape(classify_output.cpu(), (-1,)) > 0.5
        
        test_loss = BACC(y_pred.numpy(), y_true.numpy())

        test_score += test_loss

test_score /= len(test_loader)

print(test_score)