In [19]:
print(torch.__version__)

2.0.1+cu118


# Directory and Device Setup

In [1]:
import os
import random
import torch
import numpy as np
import copy
import pandas as pd
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [2]:
from google.colab import drive
drive.mount('/gdrive')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [3]:
# Replace according to your environment
root_path = '/gdrive/My Drive/Colab Data/CRISPR Off Target/CRISPR-DIPOFF/'
model_dir = root_path + "Trained Models/"
input_dir = root_path + "Sample Input/"
output_dir = root_path + "Sample Output/"

input_filename = "sample_input.csv"
output_filename = "sample_output.csv"
CHANNEL_SIZE = 4

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = 'cpu'
print(device)

cuda


# Helper Functions

## Data Encoding

In [5]:
def encoder(RNAseq, order=['A','T','C','G']):
    lookup_table = {order[0]:[1,0,0,0],
                    order[1]:[0,1,0,0],
                    order[2]:[0,0,1,0],
                    order[3]:[0,0,0,1]}
    encoded = np.zeros((len(RNAseq),len(order)))

    for i in range(len(RNAseq)):
        nu = RNAseq[i]
        if nu in lookup_table:
            encoded[i] = np.array(lookup_table[nu])
        else:
            print("Exception: Unindentified Nucleotide")

    return encoded

def decoder(encoded, order=['A','T','C','G']):
    RNAseq = ''

    for i in range(encoded.shape[0]):
        idx = np.where(encoded[i]==1)[0][0] #first occurance only
        RNAseq += order[idx]

    return RNAseq

def superpose(encoded1, encoded2):
    if(len(encoded1) != len(encoded2)):
        print("Size Mismatch")
        return encoded1

    superposed = np.zeros(encoded1.shape)

    for i in range(len(encoded1)):
        for j in range(len(encoded1[i])):
            if encoded1[i][j] == encoded2[i][j]:
                superposed[i][j] = encoded1[i][j]
            else:
                superposed[i][j] = encoded1[i][j] + encoded2[i][j]
    return superposed

def superposeWithDirection(encoded1, encoded2):
    if(len(encoded1) != len(encoded2)):
        print("Size Mismatch")
        return encoded1

    superposed = np.zeros((encoded1.shape[0],encoded1.shape[1]+1))

    for i in range(len(encoded1)):
        for j in range(len(encoded1[i])):
            if encoded1[i][j] == encoded2[i][j]:
                superposed[i][j] = encoded1[i][j]
            else:
                superposed[i][j] = encoded1[i][j] + encoded2[i][j]
                superposed[i][-1] = encoded1[i][j]
    return superposed

def testEncDec():
    sgRNA = 'ACTGGG'
    print("Original: ", sgRNA)
    print("Encoded:")
    encoded = encoder(sgRNA)
    print(encoded)
    decoded = decoder(encoded)
    print("Decoded: ",decoded)


def testSuperpose():
    sgRNA = "ACTGGG"
    DNA = "GCTGGC"
    print('sgRNA: ', sgRNA)
    print('DNA  : ', DNA)

    encoded1 = encoder(sgRNA)
    encoded2 = encoder(DNA)

    superposed = superpose(encoded1, encoded2)
    print(superposed)

def testSuperposeWithDirection():
    sgRNA = "GACTGGGC"
    DNA = "AGCTGGCG"
    print('sgRNA: ', sgRNA)
    print('DNA  : ', DNA)

    encoded1 = encoder(sgRNA)
    encoded2 = encoder(DNA)

    superposed = superposeWithDirection(encoded1, encoded2)
    print(superposed)

testEncDec()
print()
testSuperpose()
print()
testSuperposeWithDirection()

Original:  ACTGGG
Encoded:
[[1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]]
Decoded:  ACTGGG

sgRNA:  ACTGGG
DNA  :  GCTGGC
[[1. 0. 0. 1.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 1.]]

sgRNA:  GACTGGGC
DNA  :  AGCTGGCG
[[1. 0. 0. 1. 1.]
 [1. 0. 0. 1. 0.]
 [0. 0. 1. 0. 0.]
 [0. 1. 0. 0. 0.]
 [0. 0. 0. 1. 0.]
 [0. 0. 0. 1. 0.]
 [0. 0. 1. 1. 1.]
 [0. 0. 1. 1. 0.]]


In [6]:
def get_encoded_data(df, channel_size = 4):
    enc_targets = []
    enc_off_targets = []
    enc_superposed = []
    enc_superposed_with_dir = []
    labels = []

    for i in range(df.shape[0]):
        df_row = df.iloc[i]
        target = encoder(df_row['sgRNA'])
        off_target = encoder(df_row['targetDNA'])
        superposed = superpose(target, off_target)
        superposed_with_dir = superposeWithDirection(target, off_target)

        enc_targets.append(target)
        enc_off_targets.append(off_target)
        enc_superposed.append(superposed)
        enc_superposed_with_dir.append(superposed_with_dir)
        labels.append(df_row['label'])

        if i%1000 == 0:
            print(i+1,"/",df.shape[0],"done")

    print(len(enc_targets))
    print(len(enc_off_targets))
    print(len(enc_superposed))
    print(len(superposed_with_dir))
    print(len(labels))

    if channel_size == 4:
        return enc_superposed, labels
    else:
        return enc_superposed_with_dir. labels

In [7]:
def load_data(filename, channel_size = 4):
    data = pd.read_csv(input_dir + filename)
    print("Data Loaded")
    print(data.shape)
    print(data.head(5))

    print("Adding a dummy label column with all 0s")
    data["label"] = 0
    print(data["label"].value_counts())
    print("Encoding Sequences...")
    data_x, data_y = get_encoded_data(data, channel_size)
    print("Encoding Complete.")
    return data, data_x, data_y

## Model Definition

In [8]:
class RNN_Model_Generic(nn.Module):
    def __init__(self, config, model_type):
        super(RNN_Model_Generic,self).__init__()
        # emb_size=256, hidden_size=128, hidden_layers=3, output=2

        self.model_type = model_type
        self.vocab_size = config["vocab_size"]
        self.emb_size = config["emb_size"]
        self.hidden_size = config["hidden_size"]
        self.lstm_layers = config["lstm_layers"]
        self.bi_lstm = config["bi_lstm"]
        self.reshape = config["reshape"]

        self.number_hidden_layers = config["number_hidder_layers"]
        self.dropout_prob = config["dropout_prob"]
        self.hidden_layers = []

        self.hidden_shape = self.hidden_size*2 if self.bi_lstm else self.hidden_size

        self.embedding = None
        if self.vocab_size > 0:
            self.embedding = nn.Embedding(self.vocab_size, self.emb_size, padding_idx=0)


        if model_type == "LSTM":
            self.lstm = nn.LSTM(self.emb_size, self.hidden_size, num_layers=self.lstm_layers,
                            batch_first=True, bidirectional=self.bi_lstm)
        elif model_type == "GRU":
            self.lstm= nn.GRU(self.emb_size, self.hidden_size, num_layers=self.lstm_layers,
                           batch_first=True, bidirectional=self.bi_lstm)
        else:
            self.lstm= nn.RNN(self.emb_size, self.hidden_size, num_layers=self.lstm_layers,
                           batch_first=True, bidirectional=self.bi_lstm)

        start_size = self.hidden_shape

        self.relu = nn.ReLU
        # self.dropout = nn.Dropout(self.dropout_prob)

        for i in range(self.number_hidden_layers):
            self.hidden_layers.append(nn.Sequential(
                nn.Linear(start_size, start_size // 2),
                nn.ReLU(),
                nn.Dropout(self.dropout_prob)))

            start_size = start_size // 2

        self.hidden_layers = nn.ModuleList(self.hidden_layers)
        self.output = nn.Linear(start_size,2)


    def forward(self,x):
        # added for captum's prediction
        softmax = nn.Softmax(dim=1)

        dir = 2 if self.bi_lstm else 1
        h = torch.zeros((self.lstm_layers*dir, x.size(0), self.hidden_size)).to(device)
        c = torch.zeros((self.lstm_layers*dir, x.size(0), self.hidden_size)).to(device)

        if self.embedding is not None:
            x = x.type(torch.LongTensor).to(device)
            x = self.embedding(x)
        elif self.reshape:
            x = x.view(x.shape[0],x.shape[1],1)

        if self.model_type == "LSTM":
            x, (hidden, cell) = self.lstm(x, (h,c))
        else:
            x, hidden = self.lstm(x, h)

        x = x[:, -1, :]

        # print(x.shape)
        for i, layer in enumerate(self.hidden_layers):
            x = layer(x)
            # print(x.shape)
        x = self.output(x)

        #This line has been added only for model evaluation. Should be removed for training
        x  = softmax(x)
        # print(x.shape)
        return x

In [9]:
def load_best_rnn_model():
    model_weights = model_dir + "best_lstm_model.pth"
    model_config = {
        'vocab_size': 0,
        'emb_size': 4,
        'hidden_size': 512,
        'lstm_layers': 1,
        'bi_lstm': True,
        'number_hidder_layers': 2,
        'dropout_prob': 0.4,
        'reshape': False,
        'batch_size': 64,
        'epochs': 50,
        'learning_rate': 0.00010
    }
    model = RNN_Model_Generic(model_config, "LSTM").to(device)

    if not torch.cuda.is_available():
        model.load_state_dict(torch.load(model_weights, map_location=torch.device('cpu')))
    else:
        model.load_state_dict(torch.load(model_weights))

    model.eval()
    return model

## Tester Functions

In [10]:
class TrainerDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs= inputs
        self.targets = torch.from_numpy(targets)

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        return torch.Tensor(self.inputs[idx]), self.targets[idx]

In [11]:
def tester(model, test_x, test_y):
    test_dataset = TrainerDataset(test_x, test_y)
    test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)
    model.eval()
    results = []

    with torch.no_grad():
        for test_features, test_labels in test_dataloader:
            outputs = model(test_features.to(device)).detach().to("cpu")
            results.extend(outputs)

    pred_y = np.array([y[1].item() for y in results])
    pred_y_list = []

    for x in pred_y:
        if(x>0.5):
            pred_y_list.append(1)
        else:
            pred_y_list.append(0)

    return pred_y_list, pred_y

# Make Predictions LSTM Model

In [12]:
# loading and encoding data
df, data_x, data_y = load_data(input_filename, CHANNEL_SIZE)
data_x = np.array(data_x)
data_y = np.array(data_y)

print(data_x.shape)
print(data_y.shape)


Data Loaded
(30647, 2)
                     sgRNA                targetDNA
0  GGCCCAGACTGAGCACGTGATGG  CGACCAGACTGAGGACATCAGGG
1  GGTGAGTGAGTGTGTGCGTGTGG  TGTGAGTGTGTGTGTGTGTGTTG
2  GGTGAGTGAGTGTGTGCGTGTGG  TGTGTGGCAGTGTGTGCGTGTGT
3  GGTGAGTGAGTGTGTGCGTGTGG  ACAGGGTGAGTGTGTGTGTGTGT
4  GGTGAGTGAGTGTGTGCGTGTGG  TGAGTGTGTGTGTGTGTGTGTGT
Adding a dummy label column with all 0s
0    30647
Name: label, dtype: int64
Encoding Sequences...
1 / 30647 done
1001 / 30647 done
2001 / 30647 done
3001 / 30647 done
4001 / 30647 done
5001 / 30647 done
6001 / 30647 done
7001 / 30647 done
8001 / 30647 done
9001 / 30647 done
10001 / 30647 done
11001 / 30647 done
12001 / 30647 done
13001 / 30647 done
14001 / 30647 done
15001 / 30647 done
16001 / 30647 done
17001 / 30647 done
18001 / 30647 done
19001 / 30647 done
20001 / 30647 done
21001 / 30647 done
22001 / 30647 done
23001 / 30647 done
24001 / 30647 done
25001 / 30647 done
26001 / 30647 done
27001 / 30647 done
28001 / 30647 done
29001 / 30647 done
30001 / 3

In [13]:
#loading the model
model = load_best_rnn_model()
print(model)

RNN_Model_Generic(
  (lstm): LSTM(4, 512, batch_first=True, bidirectional=True)
  (hidden_layers): ModuleList(
    (0): Sequential(
      (0): Linear(in_features=1024, out_features=512, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.4, inplace=False)
    )
    (1): Sequential(
      (0): Linear(in_features=512, out_features=256, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.4, inplace=False)
    )
  )
  (output): Linear(in_features=256, out_features=2, bias=True)
)


In [14]:
# get model output
# data_y is dummy with all 0s
predictions, probabilities = tester(model, data_x, data_y)

In [15]:
df = df.drop(["label"],axis=1)
df["predictions"] = predictions
df["probabilities"] = probabilities
print(df.shape)
print(df.head(5))
print(df["predictions"].value_counts())

(30647, 4)
                     sgRNA                targetDNA  predictions  \
0  GGCCCAGACTGAGCACGTGATGG  CGACCAGACTGAGGACATCAGGG            0   
1  GGTGAGTGAGTGTGTGCGTGTGG  TGTGAGTGTGTGTGTGTGTGTTG            0   
2  GGTGAGTGAGTGTGTGCGTGTGG  TGTGTGGCAGTGTGTGCGTGTGT            0   
3  GGTGAGTGAGTGTGTGCGTGTGG  ACAGGGTGAGTGTGTGTGTGTGT            0   
4  GGTGAGTGAGTGTGTGCGTGTGG  TGAGTGTGTGTGTGTGTGTGTGT            0   

   probabilities  
0   6.654766e-13  
1   1.498418e-25  
2   4.365360e-39  
3   1.398617e-29  
4   9.211646e-40  
0    30538
1      109
Name: predictions, dtype: int64


In [16]:
# Save output
df.to_csv(output_dir + output_filename, index = False)