In [4]:
import torch
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
#import matplotlib.pyplot as plt
import numpy as np

In [5]:
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [21]:
class MiniCNN(torch.nn.Module):
  def __init__(self):
    super(MiniCNN, self).__init__()
    self.conv1 = torch.nn.Conv1d(4, 6, kernel_size=5) 
    self.pool = torch.nn.MaxPool1d(kernel_size=2, stride=2)  
    self.conv2 = torch.nn.Conv1d(6, 16, kernel_size=3)  
    self.fc1 = torch.nn.Linear(16 * 5 * 5, 120)  
    self.fc2 = torch.nn.Linear(120, 2) 

  def forward(self, x):
    x = self.pool(torch.nn.functional.relu(self.conv1(x)))  
    x = self.pool(torch.nn.functional.relu(self.conv2(x))) 
    x = x.view(-1,16 * 5 * 5)  
    x = torch.nn.functional.relu(self.fc1(x))  
    x = self.fc2(x)  
    return x

In [8]:
# Reading in and encoding data
def one_hot(input_file):
    one_hot_data = np.zeros((20, 4, 120), dtype=int)
    seq_index = 0

    with open(input_file, 'r') as inf:
        sequences = inf.readlines()

    for sequence in sequences:
            
        sequence = sequence.strip()
        char_index = 0
        
        for char in sequence:
            if char == 'a' or char == 'A':
                one_hot_data[seq_index, 0, char_index] = 1
            elif char == 'c' or char == 'C':
                one_hot_data[seq_index, 1, char_index] = 1
            elif char == 'g' or char == 'G':
                one_hot_data[seq_index, 2, char_index] = 1
            elif char == 't' or char == 'T':
                one_hot_data[seq_index, 3, char_index] = 1
            else:
                print("encountered non-acgt")
            
            char_index += 1
        seq_index += 1

    return one_hot_data

In [10]:
enc_data = one_hot("C:\\Users\\sdelozi\\projects\\CS_590_HW\\p2\\enc\\ex_enc.txt")
# 60/40 split
enc_train_data, enc_test_data = np.split(enc_data, [int(0.6 * len(enc_data))])
print(enc_data, enc_train_data, enc_test_data)

[[[1 0 0 0 1 0 0 0 1 0]
  [0 1 0 0 0 1 0 0 0 0]
  [0 0 1 0 0 0 1 0 0 0]
  [0 0 0 1 0 0 0 1 0 1]]

 [[0 0 0 0 1 0 0 1 0 1]
  [0 0 1 0 0 1 0 0 0 0]
  [0 1 0 0 0 0 1 0 0 0]
  [1 0 0 1 0 0 0 0 1 0]]

 [[1 0 1 0 1 0 0 0 0 1]
  [0 0 0 1 0 0 0 0 1 0]
  [0 0 0 0 0 0 1 1 0 0]
  [0 1 0 0 0 1 0 0 0 0]]

 [[0 0 1 0 0 0 1 0 1 1]
  [1 1 0 0 0 0 0 1 0 0]
  [0 0 0 0 0 1 0 0 0 0]
  [0 0 0 1 1 0 0 0 0 0]]

 [[0 0 1 1 0 0 1 0 1 0]
  [0 0 0 0 0 0 0 0 0 0]
  [0 1 0 0 1 0 0 1 0 1]
  [1 0 0 0 0 1 0 0 0 0]]

 [[0 0 0 1 0 0 0 0 1 0]
  [1 0 0 0 0 0 1 0 0 0]
  [0 1 0 0 1 0 0 1 0 0]
  [0 0 1 0 0 1 0 0 0 1]]

 [[1 0 0 0 0 0 1 0 1 0]
  [0 0 1 0 0 1 0 0 0 1]
  [0 0 0 1 0 0 0 1 0 0]
  [0 1 0 0 1 0 0 0 0 0]]

 [[1 0 0 0 1 0 0 0 1 0]
  [0 0 1 0 0 0 0 0 0 0]
  [0 0 0 1 0 0 1 0 0 1]
  [0 1 0 0 0 1 0 1 0 0]]

 [[1 0 0 0 0 1 0 0 1 0]
  [0 1 0 0 0 0 0 0 0 1]
  [0 0 0 0 1 0 0 1 0 0]
  [0 0 1 1 0 0 1 0 0 0]]

 [[1 0 0 1 0 0 0 1 0 0]
  [0 1 0 0 0 1 0 0 0 0]
  [0 0 0 0 0 0 1 0 0 1]
  [0 0 1 0 1 0 0 0 1 0]]

 [[1 0 0 0 0 0 1 1 0

In [11]:
# Converting the data to tensor type and floating point type

tensor_train_data = torch.from_numpy(enc_train_data).float()
tensor_test_data = torch.from_numpy(enc_test_data).float()

In [12]:
print(tensor_train_data.shape)
print(tensor_test_data.dtype)

torch.Size([12, 4, 10])
torch.float32


In [13]:
# Creating random binary labels. and converting it to tensor

label_test = np.random.choice([0, 1], size=len(tensor_test_data))
label_train = np.random.choice([0, 1], size=len(tensor_train_data))

print(label_train.dtype)
label_test = torch.from_numpy(label_test)
label_train = torch.from_numpy(label_train)
print(label_test.dtype)

int32
torch.int32


In [14]:

# The most important class, a custom data loader, understand how it is working.

class data_class(Dataset):
    def __init__(self,data,label):
        self.data=data
        self.labels=torch.tensor(label)

    def __len__(self):
        return len(self.data) 
    
    def __getitem__(self,id):
        data_set=self.data[id]
        labels=self.labels[id]

        return data_set,labels

In [15]:
# calling the data_class for the raw enc data

train_data=data_class(tensor_train_data,label_train)
test_data=data_class(tensor_test_data,label_test)

  self.labels=torch.tensor(label)


In [16]:
# Creating the data loader which is going to load the data to the AI model

train_dataloader=DataLoader(train_data,batch_size=2,shuffle=True)
test_dataloader=DataLoader(test_data,batch_size=2,shuffle=True)

In [17]:
# Instantiating the model and assigning an optimizer to the model and creating a loss function

model=MiniCNN().to(device)
optimizer=optim.Adam(params=model.parameters(),lr=0.0001)
loss_fn = nn.CrossEntropyLoss()

In [18]:
def train(model,device,train_dataloader,optimizer,epochs):
    print("inside train")
    model.train()
    for batch_ids, (seq, classes) in enumerate(train_dataloader):
        classes=classes.type(torch.LongTensor)
        seq,classes=seq.to(device),classes.to(device)
        torch.autograd.set_detect_anomaly(True)     
        optimizer.zero_grad()
        output=model(seq)
        loss = loss_fn(output,classes)                
        
        loss.backward()
        optimizer.step()
    if(batch_ids +1) % 2 == 0:
        print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
            epochs, batch_ids* len(seq), len(train_dataloader.dataset),
            100.*batch_ids / len(train_dataloader),loss.item()))

In [19]:
def test(model, device, test_dataloader):
    model.eval()
    test_loss=0
    correct=0
    with torch.no_grad():
        for seq,classes in test_dataloader:
            seq,classes=seq.to(device), classes.to(device)
            y_hat=model(seq)
            test_loss+=F.nll_loss(y_hat,classes,reduction='sum').item()
            _,y_pred=torch.max(y_hat,1)
            correct+=(y_pred==classes).sum().item()
        test_loss/=len(test_dataloader)
        print("\n Test set: Avarage loss: {:.0f},Accuracy:{}/{} ({:.0f}%)\n".format(
            test_loss,correct,len(test_dataloader),100.*correct/len(test_dataloader)))
        print('='*30)

In [22]:
# WE ARE USING RANDOM DATA SO THE TRAINING AND TESTING DOES NOT MATTER, THE AIM IS TO SHOWCASE THE USE OF A CUSTOM DATASET
# SINCE IN PRACTICAL SENSE YOU HAVE TO CLEAN THE DATA AND LOAD THE DATA INTO THE MODEL.


if __name__=='__main__':
    seed=42
    EPOCHS=2
    
    for epoch in range(1,EPOCHS+1):
        train(model,device,train_dataloader,optimizer,epoch)
        test(model,device,test_dataloader)

inside train


RuntimeError: max_pool1d() Invalid computed output size: 0