# 401 CNN

View more, visit my tutorial page: https://morvanzhou.github.io/tutorials/
My Youtube Channel: https://www.youtube.com/user/MorvanZhou

Dependencies:
* torch: 0.1.11
* torchvision
* matplotlib

In [None]:
# importing the libraries
import pandas as pd
import numpy as np

# for reading and displaying images
from skimage.io import imread
import matplotlib.pyplot as plt
%matplotlib inline

# for creating validation set
from sklearn.model_selection import train_test_split

# for evaluating the model
from sklearn.metrics import accuracy_score
from tqdm import tqdm

# PyTorch libraries and modules
import torch
from torch.autograd import Variable
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.utils.data import Dataset, DataLoader

In [None]:
# Load training data set (small amount to test if it works first)

xtrain = r"data/training_sample2.csv.gz"
ytrain = r"data/training_label2_int2.csv.gz"
xtest = r"data/testing_sample2.csv.gz"
ytest = r"data/testing_label2_int2.csv.gz"

'''samplesdf = pd.read_csv(xtrain,compression ="gzip",delimiter=',', nrows = 5000, header=0)
x_train = samplesdf.to_numpy()

samplesdf = pd.read_csv(ytrain,compression ="gzip",delimiter=',', nrows = 5000, header=0)
y_train = samplesdf.to_numpy()

samplesdf = pd.read_csv(xtest,compression ="gzip",delimiter=',',nrows = 5000, header=0)
x_test = samplesdf.to_numpy()

samplesdf = pd.read_csv(ytest,compression ="gzip",delimiter=',', nrows = 5000,  header=0)
y_test = samplesdf.to_numpy()
print("done")

x_train = x_train.reshape(-1,1,8949)
x_test = x_test.reshape(-1,1,8949)
y_train = y_train.reshape(-1)
y_test = y_test.reshape(-1)
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)
'''

In [None]:
'''#convert into torch format
x_train = torch.from_numpy(x_train).double()
y_train = torch.from_numpy(y_train).long()
x_train.shape,y_train.shape
'''

In [None]:
'''
#convert into torch format
x_test = torch.from_numpy(x_test).double()
y_test = torch.from_numpy(y_test).long()
x_test.shape,y_test.shape        
'''

In [None]:
class Dataset(Dataset):

    def __init__(self,samples,labels,numrows):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.data = pd.read_csv(samples,compression ="gzip",delimiter=',', nrows = numrows, header=0)
        self.label = pd.read_csv(labels,compression ="gzip",delimiter=',', nrows = numrows, header=0)
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        rna = self.data.iloc[idx]
        seplb = self.label.iloc[idx]
        rna = np.array([rna])
        seplb = np.array([seplb])
        rna = rna.astype('double').reshape(-1,8949)
        seplb = seplb.astype('long').reshape(-1)
        sample = {'rna': rna, 'label': seplb}

        return sample

In [None]:
train_dataset = Dataset(samples=xtrain,labels=ytrain,numrows=10000)
test_dataset = Dataset(samples = xtest,labels = ytest, numrows = 5000)

In [None]:
# Hyper Parameters
EPOCH = 30             # train the training data n times, to save time, we just train 1 epoch
LR = 0.008              # learning rate
batch_size = 32

In [None]:
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(         # input shape (x,1, 8949)
            nn.Conv1d(
                in_channels=1,              # input height
                out_channels=4,            # n_filters
                kernel_size=3,              # filter size
                stride=1,                   # filter movement/step
                padding=1,                  # if want same width and length of this image after con2d, padding=(kernel_size-1)/2 if stride=1
            ),                              # output shape (x,64,8949)
            nn.BatchNorm1d(4),
            nn.ReLU(),                      # activation
        )
        self.conv2 = nn.Sequential(         # input shape (x,64, 8949)
            nn.Conv1d(4,4,3,1,1),            
            nn.ReLU(),  
            nn.Dropout(p=0.4),
            nn.MaxPool1d(kernel_size =2, stride=2,ceil_mode = True),                # output shape (x,64,4478)
        )
        self.conv3 = nn.Sequential(         # input shape (x,64,4478)
            nn.Conv1d(4, 4, 3, 1, 1),     # output shape (x,128,4478)
            nn.ReLU(),                      # activation
            nn.MaxPool1d(kernel_size =2, stride=2,ceil_mode = True),                # output shape (x,128,2238)
        )
        self.out = nn.Linear(8952 , 2)   # fully connected layer, output 10 classes

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = x.view(x.size(0), -1)  
        output = self.out(x)
        return output  

In [None]:
cnn = CNN()
optimizer = torch.optim.Adam(cnn.parameters(), lr=LR)   # optimize all cnn parameters
loss_func = nn.CrossEntropyLoss()                       # the target label is not one-hotted
if torch.cuda.is_available():
    cnn = cnn.cuda()
    loss_func = loss_func.cuda()
cnn = cnn.double()    
print(cnn)

In [None]:
def test():
    cnn.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for batch in test_loader:
            rna,labels = batch["rna"], batch["label"] 
            if torch.cuda.is_available():
                rna = rna.cuda()
                labels = labels.cuda()
            labels = labels.reshape(-1)
            outputs = cnn(rna)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    #print(total,correct)

        print('Test Accuracy of the model on the test images: {} %'.format((correct / total) * 100))



In [None]:
def train(num_epoch): 
    cnn.train()
    total_step = len(train_loader)
    
    for epoch in range(num_epoch):
        for i, batch in enumerate(train_loader):
            # Run the forward pass
            rna,labels = batch["rna"], batch["label"] 
            if torch.cuda.is_available():
                rna = rna.cuda()
                labels = labels.cuda()
            outputs = cnn(rna)
            labels = labels.long()
            labels = labels.reshape(-1)
            loss = loss_func(outputs, labels)
            train_losses.append(loss.item())

            # Backprop and perform Adam optimisation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Track the accuracy
            total = labels.size(0)
            _, predicted = torch.max(outputs.data, 1)
            correct = (predicted == labels).sum().item()
            train_acc.append(correct / total)
            if i % 100 == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'
                        .format(epoch + 1, num_epoch , i + 1, total_step, loss.item(),
                                (correct / total) * 100))
            test()

In [None]:
train_losses = []
train_acc = []
train(EPOCH)

In [None]:
plt.plot(train_losses, label='Training loss')
plt.plot(train_acc, label='Training accuracy')
plt.legend()
plt.show()

In [None]:
# Save the model and plot
torch.save(model.state_dict(), MODEL_STORE_PATH + 'conv_net_model.ckpt')