In [None]:
from torch.utils import data as D

In [None]:
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch
import numpy as np
import pickle


In [None]:
class NeuralNet(nn.Module):
    def __init__(self):
        super(NeuralNet, self).__init__()
        self.conv1 = nn.Conv1d(4, 32, 5, padding = 2)
        self.conv2 = nn.Conv1d(32, 64, 5, padding = 2)
        self.conv3 = nn.Conv1d(64, 128, 5, padding = 2)
        self.channel_red = nn.Conv1d(128, 1, 5, padding = 2)                                                                                                                                                                            
        self.sig = nn.Sigmoid()
        self.pool = nn.MaxPool1d(2)
        #self.fc1 = nn.Linear(300, 300) ###same as self.channel_red layer 
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.conv3(x)
        x = F.relu(x)
        x = self.channel_red(x)
        #x = self.pool(x)
        #x = self.conv2(x)
        #x = F.relu(x)
        #x = F.pool(x)
        #x = self.softmax(x)
        x = self.sig(x).squeeze()
        ##output = F.log_softmax(x, dim=1)
        return x

net = NeuralNet()

criterion = nn.BCELoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)

##use Adam optimizer, for loss can use ECE?? CrossEntropy or use BCU with ??? without sigmoid layer
###

In [None]:
#data loader:
from torch.utils import data as D
def seq2onehot(seq):
    window_size = 300
    matrix = np.zeros(shape = (window_size, 4, 1))
    for i, nt in enumerate(seq, 0):
        if i == 300:
            break
        if nt == "A":
            matrix[i][0][0] = 1
        elif nt == "G":
            matrix[i][1][0] = 1
        elif nt == "C":
            matrix[i][2][0] = 1
        elif nt == "U":
            matrix[i][3][0] = 1
        else:
            continue

    return matrix

class sbp_seq_data(D.Dataset):
    def __init__(self, seq_path, training_mode = False, label_path = None):
        self.seq_path = seq_path
        self.seq_list = []
        self.training_mode = training_mode
        if self.training_mode:
            self.label_dict = torch.load(label_path)

        for line in open(self.seq_path, 'r'):
            line = line.split('\t')
            self.seq_list.append((line[0], line[1]))
        self.len = len(self.seq_list)
        return

    def __getitem__(self, index):
        sample = self.seq_list[index]
        seq = sample[1]
        seq_id = sample[0]
        oh_seq = seq2onehot(seq)
        oh_seq = torch.from_numpy(oh_seq.T).float()
        if self.training_mode:
            label = self.label_dict[seq]
            label = torch.from_numpy((np.array(list(label), dtype=float))) #.reshape(-1, 1)

            return oh_seq, label
        else:
            return seq_id, oh_seq

    def __len__(self):
        return(self.len)
    

#test_data = 'seq_vectors/Hela_pos_sample_seq_vectors_torchsave.pkl'
all_data = sbp_seq_data(seq_path = 'sbp_polyA_RNA_seq_info/sbp_polyA_RNA_HEK293_mrna_seqs.tsv', training_mode = True, label_path = 'seq_vectors/HEK293_pos_sample_seq_vectors.pickle')

train_data, test_data = torch.utils.data.random_split(all_data, [9785, 2446]) #generator = torch.Generator().manual_seed(42)


In [None]:
trainloader = D.DataLoader(train_data, batch_size=128, shuffle=True, num_workers=0)
testloader = D.DataLoader(train_data, batch_size=128, shuffle=True, num_workers=0)


Input dimension: 
inputs have dimenson 128 x 1 x 4 x 300 --> squeeze so inputs have dimension 128 x 4 x 300
outputs have dimension 128 x 300
labels have dimension 128 x 300 
Same dimensions for BCE loss 

ISSUE: loss not changing, also number correct is = 0??? Is it an issue with how I'm counting the ones that are correct?
Also, I rounded to see what was correct... is this causing issues because I have decimals at the end????

In [None]:
##training data
for epoch in range(0, 8):  # loop over the dataset multiple times
    correct =0
    total =0
    running_loss = 0.0
    for i, data in enumerate(trainloader):
        # get the inputs; data is a list of [inputs, labels]

        inputs, labels = data
        inputs = inputs.squeeze(1)
        labels = labels.float()

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs.float())
        loss = criterion(outputs.float(), labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 4 == 3:    # print every 4 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 4:.3f}')
            running_loss = 0.0
            
        total += len(outputs)
        outputs = torch.round(outputs)
        ##check outputs vs. label row by row in batch
        for j in range(0, len(outputs)):
            if torch.equal(outputs[j], labels[j]):
                correct +=1

    print(100*correct/total)
print('Finished Training')



In [None]:
##testing
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for i, data in enumerate(testloader):
        test_seqs, labels = data
        labels = labels.float()
        labels = labels.squeeze(1)
        test_seqs = test_seqs.squeeze()
        # calculate outputs by running images through the network
        outputs = net(test_seqs)
        
        # the class with the highest val is what we choose as prediction
        predicted = torch.max(outputs.data)

        total += len(outputs)
        
        outputs = torch.round(outputs)
        ##check each row in batch add to correct if correct
        for j in range(0, len(outputs)):
            if torch.equal(outputs[j], labels[j]):
                correct +=1
print(f'Accuracy of the network on the test vectors: {100 * correct/ total} %')



In [None]:
#####positive A percentage calculator: before running model
'''
def DRACH_motif():
    D = ['G', 'A', 'U']
    R = ['G', 'A']
    H = ['A', 'U', 'C']
    motifs = []
    for i in D:
        for j in R:
            for k in H:
                motifs.append(i + j + 'A' + 'C' + k)
    return motifs

def count_motifs(seq, motifs):
    nums = 0
    for m in motifs:
        nums += seq.count(m)
    return nums

percentages = [
drach_motifs = DRACH_motif()
with open('sbp_polyA_RNA_seq_info/sbp_polyA_RNA_Hela_mrna_seqs.tsv', 'r') as f:
    for line in f:
        line = line.split('\t')
        cur_seq = line[1]
        num_As = count_motifs(cur_seq, drach_motifs)
        percentages.append((num_As / 300) * 100)
    print(percentages)
        
'''

In [None]:
'''MASKING: FOR LATER
test_in = torch.randint(2, (1, 4, 300), dtype=torch.float)
print(test_in)
mask = test_in.eq(0) ##MASK OUT EVERYTHING THAT IS NOT As, NOT JUST NOT M6A. (like all As should be left)
#print(mask)
test_in = torch.masked_fill(test_in, mask, value = 0)
print(test_in.size())
model = NeuralNet()
out = model(test_in)

##print(test_in.size())
print("out: ", out.size())
#print(test_in == out)

x = torch.rand(3, 4)
print(x)
mask = x.eq(1)
print(mask)
'''