In [None]:
#CCN for binary image classifier (peak's existance or not)

import scipy.io
from scipy.sparse import coo_matrix
import pandas as pd
import matplotlib.pyplot as plt
import time
import cv2
import numpy as np

from sys import getsizeof

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data.sampler import SubsetRandomSampler

from MyDataset import MyDataset
import random

from statsmodels.stats.proportion import proportion_confint

print(torch.__version__)

In [None]:
torch.set_default_dtype(torch.float64)

images = torch.load('images.pt')
labels = torch.load('labels.pt')

In [None]:
# number of epochs to train the model
n_epochs = 1
# Learning rate of optimizer
learning_rate = 0.001
# Batch size of data loaders and batch size used when training model
batch_size = 16
#dropout rate
dropout = 0.5

In [None]:
print(len(images))
print(len(labels))

random.Random(10).shuffle(images) # shuffling with seed
random.Random(10).shuffle(labels) 

images=images[100:600]
labels=labels[100:600]

size = len(images)

dataset = MyDataset(images,labels)

split_indices = list(range(0,size))

train_idx=split_indices[0:round(0.70*size)]
val_idx=split_indices[round(0.70*size):round(0.85*size)]
test_idx=split_indices[round(0.85*size):]
print(train_idx)
print(val_idx)
print(test_idx)

train_dataset=MyDataset([images[i] for i in train_idx],[labels[i] for i in train_idx])
val_dataset=MyDataset([images[i] for i in val_idx],[labels[i] for i in val_idx])
test_dataset=MyDataset([images[i] for i in test_idx],[labels[i] for i in test_idx])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)
valid_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

for data, target in train_loader:
    print(data.shape)
    print(target.shape)
    

In [None]:
# define the CNN architecture
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # convolutional layer (sees 13000x560x1 "image" tensor)
        self.conv1 = nn.Conv2d(1, 4, 3, padding=1)
        self.conv1_bn=nn.BatchNorm2d(4)
        # convolutional layer (sees 1625x140x4 tensor)
        self.conv2 = nn.Conv2d(4, 8, 3, padding=1)
        self.conv2_bn=nn.BatchNorm2d(8)
        # convolutional layer (sees 203x35x8 tensor)
        self.conv3 = nn.Conv2d(8, 16, 3, padding=1)
        self.conv3_bn=nn.BatchNorm2d(16)
        # max pooling layers
        self.pool_1 = nn.MaxPool2d(2, [8,4])
        #self.pool_2 = nn.MaxPool2d(2, [2,2])
        # linear layer (X -> 10)
        self.fc1 = nn.Linear(16 * 26 * 9, 10)
        # linear layer (10 -> 1)
        self.fc2 = nn.Linear(10, 1)
        # dropout layer (p=0.25)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # add sequence of convolutional and max pooling layers
        #print(x.shape)
        x = self.pool_1(F.relu(self.conv1_bn(self.conv1(x))))
        #print(x.shape)
        x = self.pool_1(F.relu(self.conv2_bn(self.conv2(x))))
        #print(x.shape)
        x = self.pool_1(F.relu(self.conv3_bn(self.conv3(x))))
        #print(x.shape)
        # flatten image input
        x = x.view(-1, 16 * 26 * 9)
        #print(x.shape)
        x = self.dropout(x)
        # add 1st hidden layer, with relu activation function
        x = F.relu(self.fc1(x))
        #print(x.shape)
        # add dropout layer
        x = self.dropout(x)
        # output
        x = torch.sigmoid(self.fc2(x))
        x = x.view(x.size(0))
        #print(x.shape)    
        return x          

In [None]:
# create a complete CNN
torch.manual_seed(10) # set seed before creating model
model = Net()
        
print(model.state_dict()['fc1.weight'])

print(model)
    
# specify loss function (Binary cross entropy)
criterion = nn.MSELoss()

# specify optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# check if CUDA is available
train_on_gpu = torch.cuda.is_available()
#train_on_gpu = False

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    model.cuda()
    criterion.cuda()
    print('CUDA is available!  Training on GPU ...')

valid_loss_min = np.Inf # track change in validation loss

train_loss= [0.0] * n_epochs
valid_loss= [0.0] * n_epochs

for epoch in range(0, n_epochs):

    # keep track of training and validation loss
    #train_loss[epoch] = 0.0
    #valid_loss[epoch] = 0.0
    
    ###################
    # train the model #
    ###################
    model.train()
    for data, target in train_loader:
        data=data.to_dense() # model needs dense matrices as input
        # move tensors to GPU if CUDA is available
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        # forward pass: compute predicted outputs by passing inputs to the model
        output = model(data)
        # calculate the batch loss
        output = output.to(torch.float64) #
        target = target.to(torch.float64) #
        loss = criterion(output, target)
        # backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        # update training loss
        train_loss[epoch] += loss.item()*data.size(0)
        
    ######################    
    # validate the model #
    ######################
    model.eval()
    for data, target in valid_loader:
        data=data.to_dense() # model needs dense matrices as input
        # move tensors to GPU if CUDA is available
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()
        # forward pass: compute predicted outputs by passing inputs to the model
        output = model(data)
        # calculate the batch loss
        output = output.to(torch.float64) #
        target = target.to(torch.float64) #
        print(output)
        print(target)
        loss = criterion(output, target)
        # update average validation loss 
        valid_loss[epoch] += loss.item()*data.size(0)
    
    # calculate average losses
    train_loss[epoch] = train_loss[epoch]/len(train_loader.sampler)
    valid_loss[epoch] = valid_loss[epoch]/len(valid_loader.sampler)
        
    # print training/validation statistics
    print_train_loss=train_loss[epoch]
    print_valid_loss=valid_loss[epoch]
    print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
        epoch+1, print_train_loss, print_valid_loss))
    
    # save model if validation loss has decreased
    if valid_loss[epoch] <= valid_loss_min:
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_loss_min,
        print_valid_loss))
        torch.save(model.state_dict(), 'my_model.pt')
        valid_loss_min = valid_loss[epoch]

In [None]:
fig, ax = plt.subplots()
fig.set_dpi(200)
plt.ylim(0,1)
plt.xlabel("Epoch")
plt.ylabel("Loss")
line1 = plt.plot(np.linspace(1, n_epochs, num=n_epochs), train_loss)
line2 = plt.plot(np.linspace(1, n_epochs, num=n_epochs), valid_loss)
plt.legend(["train loss", "valid loss"])
#plt.xticks(np.arange(0, n_epochs, 1))

In [None]:
model = Net()
model.load_state_dict(torch.load('my_model.pt'))
model.eval()


#if train_on_gpu:
#model.cuda()

total = 0
true_positive = 0
true_negative = 0


for data, target in test_loader:
        data=data.to_dense() # model needs dense matrices as input
        # move tensors to GPU if CUDA is available
        #if train_on_gpu:
        #    data, target = data.cuda(), target.cuda()
        # forward pass: compute predicted outputs by passing inputs to the model
        output = model(data)
        # calculate the batch loss
        output = output.to(torch.float64) #
        target = target.to(torch.float64) #
        print(output)
        print(target)
        
        for i in range(len(output)):
            if (target[i] == 1) and (output[i] >= 0.5):
                true_positive += 1
            if (target[i] == 0) and (output[i] < 0.5):
                true_negative += 1
            total +=1
            
p_95 = proportion_confint((true_positive+true_negative), total, 0.05, 'normal')
accuracy = (true_positive + true_negative)/total
print('Accuracy:' + str(accuracy) + str(p_95))                
                