<a href="https://colab.research.google.com/github/anika107/Crispr-embedding/blob/main/Crispr_embedding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np

pos = pd.read_csv('pos1128.csv')  # specify the file path of positive data from dl-crispr
neg = pd.read_csv('neg.csv')      # specify the file path of negative data from dl-crispr

pos_df = pos
neg_df = neg

pos_array = np.array(pos)

for i in range(len(pos_array)):    #remove pam from positive data
    x = pos_array[i][0]
    pos_array[i][0] = x[:-3]
    x = pos_array[i][1]
    pos_array[i][1] = x[:-3]

neg_array = np.array(neg)
for i in range(len(neg_array)):    #remove pam from negative data
    x = neg_array[i][0]
    neg_array[i][0] = x[:-3]
    x = neg_array[i][1]
    neg_array[i][1] = x[:-3]

pos_df = pd.concat([pos_df, pos_df, pos_df, pos_df])
pos_df['label'] = 1
neg_df['label'] = 0

In [None]:
from dna2vec.multi_k_model import MultiKModel

filepath = '/dna2vec/pretrained/dna2vec-20161219-0153-k3to8-100d-10c-29320Mbp-sliding-Xat.w2v'  # specify the file path of pretrained dna2vec vectors
mk_model = MultiKModel(filepath)

In [None]:
all_kmer = list()
nucleotides = ['A', 'C', 'G', 'T']
kmer_embedding = {}

def generate_kmer(n, kmer, l):
  if len(kmer) == l and kmer not in all_kmer:
     all_kmer.append(kmer)
  if len(kmer) < l:
    kmer = kmer+n
    for i in range(4):
       generate_kmer(nucleotides[i], kmer, l)

k = 3 #k value range[3-6]
for i in range(4):
   generate_kmer(nucleotides[i], "", k)

for i in range(len(all_kmer)):
  kmer_embedding[all_kmer[i]] = mk_model.vector(all_kmer[i]) #store values for each kmer from dna2vec pretrained model

In [None]:
mis = { "AT": 0 , "AG": 1, "AC": 2, "TA": 3, "TG": 4, "TC": 5, "GA": 6, "GT": 7 , "GC": 8, "CA": 9, "CT": 10 , "CG": 11}

def mismatch(on, off):
    matrix = np.zeros((12, 100),dtype = int)
    for j in range(20):
      if on[j] != off[j]:
        mutation = on[j]+off[j]
        matrix[mis[mutation]][j] = 1
    return matrix

def dna_embedding(sequence):
    embedding = list()
    for i in range(len(sequence)-k+1):
        kmer= sequence[i:i+k]
        tmp = kmer_embedding[kmer]
        embedding.append(tmp)

    embedding = np.array(embedding)
    return embedding

def on_off_add(arr, c):
  on = arr[0]
  off = arr[1]
  on_embedding = dna_embedding(on)
  off_embedding = dna_embedding(off)
  if c == 1:
     mis = mismatch(on,off)
     rs = np.concatenate((on_embedding, off_embedding), axis=0)
     rs_with_mis = np.concatenate((on_embedding, off_embedding, mis), axis = 0)
     return rs, rs_with_mis
  else:
     rs = np.concatenate((on_embedding, off_embedding), axis=0)
     return rs

In [None]:
x,y = np.shape(pos_array)

pos = list()
neg = list()

pos_with_mis = list()
neg_with_mis = list()

neg_features = dict()
neg_mismatch = dict()
for i in range(x):
  rs, rs_with_mis = on_off_add(pos_array[i], 1)
  pos.append(rs)
  pos_with_mis.append(rs_with_mis)

x,y = np.shape(neg_array)
for i in range(x):
  rs = on_off_add(neg_array[i], 0)
  neg.append(rs)
  neg_features[i] = rs
  neg_mismatch[i] = neg_array[i]

pos = np.array(pos)
x,y,z = np.shape(pos)

pos_with_mis = np.array(pos_with_mis)

#Data augmentation

pos_90 = np.rot90(pos,1,(1,2))
pos_180 = np.rot90(pos_90,1,(1,2))
pos_270 = np.rot90(pos_180,1,(1,2))

pos = np.concatenate((pos, pos_90.reshape(x,y,z), pos_180.reshape(x,y,z), pos_270.reshape(x,y,z)), axis=0)
neg = np.array(neg)

x,y,z = np.shape(pos_with_mis)

pos_90 = np.rot90(pos_with_mis,1,(1,2))
pos_180 = np.rot90(pos_90,1,(1,2))
pos_270 = np.rot90(pos_180,1,(1,2))

pos_with_mis = np.concatenate((pos_with_mis, pos_90.reshape(x,y,z), pos_180.reshape(x,y,z), pos_270.reshape(x,y,z)), axis=0)

In [None]:
from sklearn.decomposition import PCA

# Multi dimension reduction
x, y, z = np.shape(pos)
y_pos = np.tile(1, x)
pos_dim_reduction = np.reshape(pos, (x, y*z))

x, y, z = np.shape(neg)
y_neg = np.tile(0, x)
neg_dim_reduction = np.reshape(neg, (x, y*z))

labels = np.concatenate((y_pos, y_neg))
all_array = np.concatenate((pos_dim_reduction, neg_dim_reduction), axis = 0)

pca = PCA(n_components = 2)
pca.fit(neg_dim_reduction)

neg_pca = pca.transform(neg_dim_reduction)


In [None]:
pca = PCA(n_components = 2)
pca.fit(pos_dim_reduction)
pos_pca = pca.transform(pos_dim_reduction)

In [None]:
from sklearn.cluster import KMeans

kmeans = KMeans(n_clusters=4, random_state=0).fit(pos_pca)

pca = PCA(n_components = 2)
pca.fit(all_array)
x_pca = pca.transform(all_array)

In [None]:
pos_pca = np.array(pos_pca).T
neg_pca = np.array(neg_pca).T

In [None]:
centers = kmeans.cluster_centers_

pos_df['pca_x'] = pos_pca[0]
pos_df['pca_y'] = pos_pca[1]
neg_df['pca_x'] = neg_pca[0]
neg_df['pca_y'] = neg_pca[1]

pos_df['cluster'] = kmeans.labels_
df = pos_df.loc[pos_df['cluster'] == 0]
neg_df["distance"] = ""

# Calculate maximum distance from data points to center for each cluster

for i in range(4):
  df = pos_df.loc[pos_df['cluster'] == i]
  max_dis = 0
  for ind in df.index:
     dis = np.sqrt(((df['pca_x'][ind]-centers[i][0])**2) + ((df['pca_y'][ind]-centers[i][1])**2))  # get distance from center of cluster
     max_dis = max(dis, max_dis)
  distance = list()
  for ind in neg_df.index:
     dis = np.sqrt(((neg_df['pca_x'][ind]-centers[i][0])**2) + ((neg_df['pca_y'][ind]-centers[i][1])**2))
     neg_df.at[ind, "distance"] = dis
  neg_df = neg_df[neg_df['distance'] > max_dis]

In [None]:
from imblearn.under_sampling import RandomUnderSampler

rus = RandomUnderSampler(return_indices=True,sampling_strategy=1)

# Random under sampling method for removing negative data

y_neg = neg_df['label']
y_pos = pos_df['label']

y = np.concatenate((y_pos, y_neg))

pos_pca = np.array(pos_pca).T

neg_pca_x = neg_df['pca_x']
neg_pca_y = neg_df['pca_y']
neg_pca = np.column_stack((neg_pca_x, neg_pca_y))

X = np.concatenate((pos_pca, neg_pca), axis = 0)
X_rus, y_rus, id_rus = rus.fit_sample(X, y)

In [None]:
on_target = list()
no_editing = list()

# Clean negative data which creates noise for positive data

for i in range(len(X_rus)):
    single_df = neg_df.loc[(neg_df['pca_x'] == X_rus[i][0]) & (neg_df['pca_y'] == X_rus[i][1])]
    if len(single_df) >= 1:
       on_target.append(single_df['ontarget'].iloc[0])
       no_editing.append(single_df['noeditting'].iloc[0])

clean_neg = np.column_stack((on_target, no_editing))
clean_neg_array = np.array(clean_neg)
print(np.shape(clean_neg_array))
for i in range(len(clean_neg_array)):
    x = clean_neg_array[i][0]
    clean_neg_array[i][0] = x[:-3]
    x = clean_neg_array[i][1]
    clean_neg_array[i][1] = x[:-3]

clean_neg_list = list()
x,y = np.shape(clean_neg_array)

for i in range(x):
  clean_neg_list.append(on_off_add(clean_neg_array[i], 0))

clean_neg_array = np.array(clean_neg_list)

In [None]:
clean_neg_with_mis = list()
c = 0

x,y,z = np.shape(clean_neg_array)
y = int(y/2)
for i in range(x):
   found_kmer_on = list()
   found_kmer_noedit = list()
   for j in range(y):
      tmp = clean_neg_array[i][j]
      for label, val in kmer_embedding.items():
          if (val == clean_neg_array[i][j]).all():
             found_kmer_on.append(label)
             break
      for label, val in kmer_embedding.items():
          if (val == clean_neg_array[i][j+y]).all():
             found_kmer_noedit.append(label)
             break
   on = ""
   for j in range(len(found_kmer_on)):
       if j == 0:
          on = on + found_kmer_on[j]
       else:
          on = on + found_kmer_on[j][k-1:]
   noedit = ""
   for j in range(len(found_kmer_noedit)):
       if j == 0:
          noedit = noedit + found_kmer_noedit[j]
       else:
          noedit = noedit + found_kmer_noedit[j][k-1:]
   rs, rs_with_mis = on_off_add([on, noedit], 1)
   clean_neg_with_mis.append(rs_with_mis)

clean_neg_with_mis = np.array(clean_neg_with_mis)
clean_neg_array = clean_neg_with_mis
pos = pos_with_mis

In [None]:
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

x = np.concatenate((pos, clean_neg_array))
y_neg = np.tile(0, len(clean_neg_array))
y_pos = pos_df['label']

y = np.concatenate((y_pos, y_neg))

x, y = shuffle(x, y, random_state = 42)
x = np.array(x)
y = np.array(y)

a,b,c = np.shape(x)
x = x.reshape(a, b, c, 1)

In [None]:
from tensorflow.keras import datasets, layers, models, initializers, optimizers, losses
from tensorflow.keras.models import load_model

def cnn_model(x, y, z):
  model = models.Sequential()
  he = initializers.HeNormal()
  model.add(layers.Conv2D(input_shape=(x,y,z),kernel_initializer=he,filters=16,kernel_size=(5,5),strides=(1,1),padding="same",activation="relu"))
  model.add(layers.Conv2D(filters=4,kernel_size=(3,3),strides=(1,1),padding="same", activation="relu"))
  model.add(layers.Conv2D(filters=4,kernel_size=(3,3),strides=(1,1),padding="same", activation="relu"))
  model.add(layers.Conv2D(filters=4,kernel_size=(3,3),strides=(1,1),padding="same", activation="relu"))
  model.add(layers.Conv2D(filters=4,kernel_size=(1,1),strides=(1,1),padding="same", activation="relu"))
  model.add(layers.Flatten(name='flatten'))
  model.add(layers.Dense(40, activation='relu', name='fc1'))
  model.add(layers.Dropout(0.5))
  model.add(layers.Dense(2, activation='sigmoid', name='output'))
  return model

In [None]:
model = cnn_model(b,c,1)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

model.compile(optimizer = optimizers.SGD(learning_rate = 0.0001),
              loss = losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
history = model.fit(x_train, y_train, epochs=7, batch_size=8, validation_data=(x_test, y_test))

In [None]:
test = pd.read_csv('test.csv')   # specifiy the file path of test data
test_array = np.array(test)
for i in range(len(test_array)):  # remove pam from test data
    tmp = test_array[i][0]
    test_array[i][0] = tmp[:-3]
    tmp = test_array[i][1]
    test_array[i][1] = tmp[:-3]

test_list = list()
l,z = np.shape(test_array)

for i in range(l):
  rs, rs_with_mis = on_off_add(test_array[i], 1)
  test_list.append(rs_with_mis)

test = np.array(test_list)
f,g,h = np.shape(test)
test = test.reshape(f, g, h, 1)

predicted = model.predict(test)

cnt = 0
for i in range(len(predicted)):
    if (predicted[i][0]>=0.5):
       cnt = cnt + 1
    else:
       y_pred.append(0)

print("The number of predicted samples: ",cnt)

The number of predicted samples:  37
