In [237]:
import pandas as pd
import os
import torch
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [238]:
def merge_dna_files(positive_file, negative_file):
    with open(positive_file, 'r') as pos_file, open(negative_file, 'r') as neg_file:
        pos_lines = pos_file.readlines()
        neg_lines = neg_file.readlines()

    # Determine the file with the lowest number of lines
    min_lines = min(len(pos_lines), len(neg_lines))

    # Merge the files alternately
    merged_lines = []
    for i in range(min_lines):
        if(i!=0):
            merged_lines.append(f'1 {pos_lines[i].strip()}')  # Label positive lines with 1
            merged_lines.append(f'0 {neg_lines[i].strip()}')  # Label negative lines with 0

    # Write merged lines to a new file
    
    output_file = os.path.join('Merge',"merged_file.txt")
    with open(output_file, 'w') as merged_file:
        merged_file.write('\n'.join(merged_lines))
    return merged_lines

In [239]:
merged_lines = merge_dna_files('Output/Access/ENCFF139HDN_output.csv','Output/NonAccess/ENCFF139HDN_in_output.csv')
print(merged_lines[0])

1 gagccaccacagaaagcagaggtgcatccagcaccacagaaaacagagccaccacagaaaacagaggggtgactgtcatcccctccagtctctgcacactcccagctgcagcagagccggaggagagagcacagcctgcaatgctaatttgccaggagctcacatgcctgcgtcactg


In [240]:
#One-hot-encoding stuff
import numpy as np

bases = 'ATGC'
one_hot_encode = []
max_seq_length = max(len(line.split(' ', 1)[1]) for line in merged_lines)
for line in merged_lines:
        label, dna_sequence = line.split(' ', 1)
        label = int(label)
        dna_one_hot = np.zeros((4, max_seq_length),dtype=np.float64)
        for i, base in enumerate(dna_sequence):
            if base.upper() in bases:
                idx = bases.index(base.upper())
                dna_one_hot[idx, i] = 1.0
        one_hot_encode.append((label, dna_one_hot))


In [241]:
print(one_hot_encode[0])

(1, array([[0., 1., 0., 0., 0., 1., 0., 0., 1., 0., 1., 0., 1., 1., 1., 0.,
        0., 1., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 1., 0., 0.,
        1., 0., 0., 1., 0., 1., 0., 1., 1., 1., 1., 0., 1., 0., 1., 0.,
        0., 0., 1., 0., 0., 1., 0., 1., 0., 1., 1., 1., 1., 0., 1., 0.,
        1., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0.,
        0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
        0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0.,
        1., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 1., 0., 1., 0., 1.,
        0., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 1., 1., 0., 0., 0.,
        0., 1., 1., 0., 0., 0., 0., 0., 0., 1., 0., 0., 1., 0., 0., 0.,
        0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.,
        0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 1., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0.,

In [245]:
#Splitting the data into train and test
split_index = int(len(one_hot_encode)*0.8)
train_data_raw = one_hot_encode[:split_index]
test_data_raw = one_hot_encode[split_index:]
train_data[0][1]
train_data = [sublist[1] for sublist in train_data_raw]
train_data = np.array(train_data)
test_data = [sublist[1] for sublist in test_data_raw]
test_data = np.array(test_data)
train_labels = [sublist[0] for sublist in train_data_raw]
test_labels = [sublist[0] for sublist in test_data_raw]


In [246]:
#The most important class. Creates a custom dataset
class data_class(Dataset):
    def __init__(self,data,label):
        data= torch.tensor(data)
        self.data = data
        labels=torch.tensor(label,dtype=torch.double)
        self.labels = labels
        

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self,id):
        data_set=self.data[id]
        labels=self.labels[id]

        return data_set,labels

In [250]:
train_data_main = data_class(train_data,train_labels)
test_data_main = data_class(test_data,test_labels)

first_data, first_label = train_data_main.__getitem__(0)


In [253]:
# Creating the data loader which is going to load the data to the AI model

train_dataloader=DataLoader(train_data_main,batch_size=2,shuffle=True)
test_dataloader=DataLoader(test_data_main,batch_size=2,shuffle=True)
for data,label in train_dataloader:
    print(label[0])
    break

tensor(1., dtype=torch.float64)


In [225]:
for data,label in train_data_main:
    print(label)
    break

tensor(1., dtype=torch.float64)


In [254]:
class MiniCNN(torch.nn.Module):
  def __init__(self):
    super(MiniCNN, self).__init__()
    self.conv1 = torch.nn.Conv1d(4, 6, kernel_size=5) 
    self.pool = torch.nn.MaxPool1d(kernel_size=2, stride=2)  
    self.conv2 = torch.nn.Conv1d(6, 16, kernel_size=3)  
    self.fc1 = torch.nn.Linear(16 * 5 * 5, 120)  
    self.fc2 = torch.nn.Linear(120, 2) 

  def forward(self, x):
    print("A")
    x = self.conv1(x)
    print("new")
    x = self.pool(torch.nn.functional.relu(x))
    print("B")
    x = self.pool(torch.nn.functional.relu(self.conv2(x))) 
    print("C")
    x = x.view(-1,16 * 5 * 5)  
    x = torch.nn.functional.relu(self.fc1(x))  
    x = self.fc2(x)  
    return x

In [255]:
# Instantiating the model and assigning an optimizer to the model and creating a loss function
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model=MiniCNN().to(device)
optimizer=optim.Adam(params=model.parameters(),lr=0.0001)
loss_fn = nn.CrossEntropyLoss()

In [256]:
def train(model,device,train_dataloader,optimizer,epochs):
    print("inside train")
    model.train()
    len = 0
    for batch_ids, (img, classes) in enumerate(train_dataloader):
        len = len + 1
        print(len)
        classes=classes.type(torch.LongTensor)
        img,classes=img.to(device),classes.to(device)
        torch.autograd.set_detect_anomaly(True)     
        optimizer.zero_grad()
        output=model(img)
        loss = loss_fn(output,classes)                
        
        loss.backward()
        optimizer.step()
    if(batch_ids +1) % 2 == 0:
        print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
            epochs, batch_ids* len(img), len(train_dataloader.dataset),
            100.*batch_ids / len(train_dataloader),loss.item()))

In [257]:
def test(model, device, test_dataloader):
    model.eval()
    test_loss=0
    correct=0
    len = 0
    with torch.no_grad():
        for img,classes in test_dataloader:
            len = len + 1
            print(len)
            img,classes=img.to(device), classes.to(device)
            y_hat=model(img)
            test_loss+=F.nll_loss(y_hat,classes,reduction='sum').item()
            _,y_pred=torch.max(y_hat,1)
            correct+=(y_pred==classes).sum().item()
        test_loss/=len(test_dataloader)
        print("\n Test set: Avarage loss: {:.0f},Accuracy:{}/{} ({:.0f}%)\n".format(
            test_loss,correct,len(test_dataloader),100.*correct/len(test_dataloader)))
        print('='*30)

In [258]:
if __name__=='__main__':
    seed=42
    EPOCHS=2
    for epoch in range(1,EPOCHS+1):
        train(model,device,train_dataloader,optimizer,epoch)
        test(model,device,test_dataloader)

inside train
1
A


RuntimeError: expected scalar type Double but found Float