In [143]:
import numpy as np
import sys

# Checks if pair is complementary and assigns the appropriate score as defined in the paper
def isPairScore(seq, i, j, probL, probR, probP):
  possibleBasePairs = set(['AU', 'UA', 'GC', 'CG', 'GU', 'UG']) # includes wobble pairs
  Ri = seq[i]
  Rj = seq[j]

  if Ri + Rj in possibleBasePairs:
    return probL[i] + probR[j]
  else:
    return probP[i] + probP[j]


# Initializes an n x n matrix
def initMatrix(Text : str) -> list[list[int]]:
  n = len(Text)

  M = np.empty([n, n])
  M[:] = np.NAN

  M[range(n), range(n)] = 0
  M[range(1, n), range(n - 1)] = 0

  return M

# Forward direction for Updated Nussinov algorithm
def Forward(probL, probR, probP, M, Text, min_loop_length = 0):
  n = len(Text)

  for k in range(1, n):
    for i in range(n - k):
      j = i + k

      if (j - i >= min_loop_length):
        down = M[i + 1][j]+probP[i]
        left = M[i][j - 1]+probP[j]
        diag = M[i + 1][j - 1] + isPairScore(Text, i, j, probL, probR, probP)
        if(i+1 == j):
            other = 0
        else:
            other = max([M[i][t] + M[t + 1][j] for t in range(i+1, j)])
        M[i][j] = max(down, left, diag, other)
      else:
        M[i][j] = 0

  return M

# Back direction of Updated Nussinov algorithm, backtrack
def Back(M, Text, pairs, i, length, probL, probR, probP):
  j = length

  if i < j:
    if M[i][j] == M[i + 1][j]+probP[i]:
      Back(M, Text, pairs, i + 1, j, probL, probR, probP)
    elif M[i][j] == M[i][j - 1]+probP[j]:
      Back(M, Text, pairs, i, j - 1, probL, probR, probP)
    elif M[i][j] == M[i + 1][j - 1] + isPairScore(Text, i, j, probL, probR, probP):
      pairs.append((i, j))
      Back(M, Text, pairs, i + 1, j - 1, probL, probR, probP)
    else:
      for k in range(i + 1, j):
        if M[i][j] == M[i, k] + M[k + 1][j]:
          Back(M, Text, pairs, i, k, probL, probR, probP)
          Back(M, Text, pairs, k + 1, j, probL, probR, probP)
          break


# Convert to dotBracket
def dotBracket(Text : str, pairs : list[tuple[int, int]]) -> str:
  dot = ["." for i in range(len(Text))]

  for s in pairs:
    dot[max(s)] = ")"
    dot[min(s)] = "("

  return "".join(dot)

# Run Updated Nussinov algorithm
def UpdatedNussinov(Text, probL, probR, probP):
  n = len(Text)

  initial_matrix = initMatrix(Text)
  forward_matrix = Forward(probL, probR, probP, initial_matrix, Text)

  pairs = []
  Back(forward_matrix, Text, pairs, 0, n - 1, probL, probR, probP)

  return dotBracket(Text, pairs)

In [None]:
from keras.layers import Conv2D, Dense, MaxPooling2D, Flatten
import keras

In [125]:
model = keras.models.load_model("82.h5")

In [126]:
import numpy as np
import math

def encode(seq):
    n = len(seq)
    l = [[0 for i in range(n)] for j in range(n)]
    for i in range(n):
        for j in range(n):
            if((seq[i] == 'A' and seq[j] == 'U') or (seq[j] == 'A' and seq[i] == 'U')):
                l[i][j] = 2
            elif((seq[i] == 'G' and seq[j] == 'C') or (seq[j] == 'G' and seq[i] == 'C')):
                l[i][j] = 3
            elif((seq[i] == 'G' and seq[j] == 'U') or (seq[j] == 'U' and seq[i] == 'G')):
                l[i][j] = 0.8
            else:
                l[i][j] = 0
    return l

def Gaussian(x):
    return math.exp(-0.5*(x*x))

def matrix_calc(data, sliding_param):
    mat = np.zeros([len(data)+sliding_param,len(data)])
    for i in range(len(data[0])):
        for j in range(len(data[0])):
            coefficient = 0
            for add in range(30):
                if i - add >= 0 and j + add <len(data):
                    score = data[i-add][j+add]
                    if score == 0:
                        break
                    else:
                        coefficient = coefficient + score * Gaussian(add)
                else:
                    break
            if coefficient > 0:
                for add in range(1,30):
                    if i + add < len(data) and j - add >= 0:
                        score = data[i+add][j-add]
                        if score == 0:
                            break
                        else:
                            coefficient = coefficient + score * Gaussian(add)
                    else:
                        break
            mat[[i+int(sliding_param/2)],[j]] = coefficient
    return mat

def slidingWindow(numpy_M, sliding_param, n):
    zeros = np.zeros((sliding_param, n), dtype=int)
    numpy_M = np.append(numpy_M, zeros, axis=0)
    sliding_mats = [numpy_M[i: i + sliding_param, :].tolist() for i in range(n)]
    return sliding_mats

def scale(im, nR, nC):
    nR0 = len(im) 
    nC0 = len(im[0]) 
    arr =  [[ im[int(nR0 * r / nR)][int(nC0 * c / nC)]  
             for c in range(nC)] for r in range(nR)]
    return np.array(arr)

In [127]:
seq = "GGGUCUGUAGCUCAGGUGGUUAGAGCGCACCCCUGAUAAGGGUGAGGUCGGUGGUUCGAGUCCUCCCAGACCCACCA"
windows = slidingWindow(matrix_calc(encode(seq), 19), 19, len(seq))
scaled = np.zeros(shape=(len(seq), 19, 120))
for i in range(len(windows)):
    scaled[i] = scale(windows[i], 19, 120)

In [128]:
print(scaled)

[[[0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  ...
  [0.         0.         0.         ... 0.         0.         2.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 3.         3.         0.        ]]

 [[0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  ...
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 3.         3.         0.        ]
  [3.         3.         3.         ... 0.         0.         0.        ]]

 [[0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.      

In [129]:
x = model.predict(scaled)



In [130]:
#print(x)
probL = []
probR = []
probP = []
for i in range(len(x)):
    probL.append(x[i][0])
    probR.append(x[i][1])
    probP.append(x[i][2])

print(UpdatedNussinov(seq, probL, probR, probP))

ValueError: max() arg is an empty sequence

In [7]:
# Test performance of CNN + DP 

import os
import sys

def read_dbn(filename):
    with open(filename, 'r') as f:
        lines = f.readlines()
        seq = lines[3].strip()
        dbn = lines[4].strip()
    return seq, dbn


def compare_dbn(dbn1, dbn2):
    if len(dbn1) != len(dbn2):
        return -1 

    n = len(dbn1)
    correct = 0

    for i in range(n):
        if dbn1[i] == dbn2[i]:
            correct += 1
        if (dbn1[i] == '[' or dbn1[i] == ']') and dbn2[i] == '.':
            correct+=1

    return correct

def test_cnndp(directory, threshold):
    test_dir = directory
    files = sorted(os.listdir(test_dir))

    num_tests_ran = 0
    percent_corr = 0


    for filename in files:
        if num_tests_ran == 1000:
            break

        if filename.endswith('.dbn'):
            seq, dbn = read_dbn(os.path.join(test_dir, filename))
            n = len(seq)
            
            # Skip over any sequence that has 'N' (which indicates sequencing wasn't accurate)
            valid = True if (set(seq) == {'A', 'C', 'G', 'U'}) else False 

            # Only consider sequences with length <= 400 and skip invalid sequences
            if(n > 120 or not valid): 
                continue
            
            num_tests_ran += 1

            # Run CNN + DP
            windows = slidingWindow(matrix_calc(encode(seq), 19), 19, len(seq))
            scaled = np.zeros(shape=(len(seq), 19, 120))
            
            for i in range(len(windows)):
                scaled[i] = scale(windows[i], 19, 120)
            
            print(dbn)
            probs = model.predict(scaled)

            probL = []
            probR = []
            probP = []
            for i in range(len(probs)):
                probL.append(probs[i][0])
                probR.append(probs[i][1])
                probP.append(probs[i][2])
            print("backtracking...")
            pred_dbn = UpdatedNussinov(seq, probL, probR, probP)
            just = ""
            for i in range(len(seq)):
                x = max(probL[i], probR[i], probP[i])
                if(x == probL[i]):
                    just+='('
                if(x == probR[i]):
                    just+=')'
                if(x == probP[i]):
                    just+='.'
            print(seq)
            print(just)
            print(pred_dbn)
            print(dbn)
            # Compare predicted dbn with actual dbn
            percent_correct = (compare_dbn(dbn, pred_dbn)/n)
            
            if percent_correct >= threshold:
                percent_corr += percent_correct
            print('Percent correct: ', percent_correct)
            
    return (percent_corr/num_tests_ran)



#test_cnndp('/Users/krishsuraparaju/Downloads/dbnFiles', 0.5)
            


In [1]:
import numpy as np
import sys

# Constants
min_loop_length = 0
pairs = {"A": "U", "U": "A", "G": "C", "C": "G"}

# Checks if pair is complementary
def isPair(pair : str) -> bool:
  return pairs[pair[0]] == pair[1]

# Initializes an n x n matrix
def initMatrix(Text : str) -> list[list[int]]:
  n = len(Text)

  M = np.empty([n, n])
  M[:] = np.NAN

  M[range(n), range(n)] = 0
  M[range(1, n), range(n - 1)] = 0

  return M

# Forward direction for Nussinov algorithm
def Forward(M : list[list[int]], Text : str) -> list[list[int]]:
  n = len(Text)

  for k in range(1, n):
    for i in range(n - k):
      j = i + k

      if (j - i >= min_loop_length):
        down = M[i + 1][j]
        left = M[i][j - 1]
        diag = M[i + 1][j - 1] + isPair((Text[i], Text[j]))
        other = max([M[i][t] + M[t + 1][j] for t in range(i, j)])

        M[i][j] = max(down, left, diag, other)
      else:
        M[i][j] = 0

  return M

# Ming direction of Nussinov algorithm
def Back(M : list[list[int]], Text : str, pairs : list[tuple[int, int]], i : int, length : int) -> list[tuple[int, int]]: 
  j = length

  if i < j:
    if M[i][j] == M[i + 1][j]:
      Back(M, Text, pairs, i + 1, j)
    elif M[i][j] == M[i][j - 1]:
      Back(M, Text, pairs, i, j - 1)
    elif M[i][j] == M[i + 1][j - 1] + isPair((Text[i], Text[j])):
      pairs.append((i, j))
      Back(M, Text, pairs, i + 1, j - 1)
    else:
      for k in range(i + 1, j - 1):
        if M[i][j] == M[i, k] + M[k + 1][j]:
          Back(M, Text, pairs, i, k)
          Back(M, Text, pairs, k + 1, j)
          break


# Convert to dotBracket
def dotBracket(Text : str, pairs : list[tuple[int, int]]) -> str:
  dot = ["." for i in range(len(Text))]

  for s in pairs:
    dot[max(s)] = ")"
    dot[min(s)] = "("

  return "".join(dot)

# Run Nussinov algorithm
def Nussinov(Text : str) -> str:
  n = len(Text)

  initial_matrix = initMatrix(Text)
  forward_matrix = Forward(initial_matrix, Text)

  pairs = []
  Back(forward_matrix, Text, pairs, 0, n - 1)

  return dotBracket(Text, pairs)




In [2]:
import numpy as np
import pandas as pd
import os
import math

pairs = {'G': 'C', 'C': 'G', 'A': 'U', 'U': 'A'}
wobble = {'G': 'U', 'U': 'G'}

def initialize_matrix(seq, dist, n):
  M = np.empty((n, n))
  M[:] = np.NaN

  # for the condition that j + 5 > i -> inf
  for x in range(M.shape[0]):
      for y in range(M.shape[1]):
        if y + dist > x:
          M[x, y] = np.inf
  return M

# h(x, y) = 2(i - j + 5)
def calc_hairpin_energy(x, y):
  return (x - y + 5) * 2

# stem energy: initialized to -4 (pairs), 0 (wobble), 4 (otherwise)
def calc_stem_energy(seq, x, y):
  if (pairs[seq[x]] == seq[y]):
    return -4
  elif ((seq[x] == "G" or seq[x] == "U") and wobble[seq[x]] == seq[y]):
    return 0
  else:
    return 4

# calculate energy for structures
# hairpins: V(x, y) = s(x, y) + h(x - 1, y + 1)
# match: V(x, y) = s(x, y) + W(x - 1, y + 1)
def calc_V(x, y, V, W, seq):
  hairpin_val = calc_hairpin_energy(x, y) + calc_stem_energy(seq, x, y)
  match_val = calc_stem_energy(seq, x, y) + W[x - 1, y + 1]
  if (hairpin_val < match_val):
    V[x, y] = hairpin_val
    return ('H', V)
  else:
    V[x, y] = match_val
    return ('M', V)

# calculate energy of optimal structure
def calc_W(x, y, V, W, path, seq):
  structure, V = calc_V(x, y, V, W, seq)

  k_dict = {}
  for k in range(y + 2, x):
    if k not in k_dict:
      k_dict[k] = W[x, k] + W[k - 1, y]
  min_k = min(k_dict.keys(), key=(lambda k: k_dict[k]))

  # update matrix W with the minimum choice
  W[x, y] = min(W[x - 1, y], W[x, y + 1], V[x, y], k_dict[min_k])
  if (W[x, y] == W[x - 1, y]):
    path[x, y] = 'L'
  if (W[x, y] == W[x, y + 1]):
    path[x, y] = 'D'
  if (W[x, y] == V[x, y]):
    path[x, y] = structure
  if (W[x, y] == k_dict[min_k]):
    path[x, y] = str(min_k)

  return (V, W, path)

def Zuker(V, W, n, dist, path, seq):
  final = W[n - 1, 0]

  while math.isnan(final):
    for x in range(n):
      for y in range(n):
        if y + dist <= x:
          left = W[x - 1, y]
          down = W[x, y + 1]
          if math.isnan(down) or math.isnan(left):
            continue
          V, W, path = calc_W(x, y, V, W, path, seq)
      final = W[n - 1, 0]

  return (V, W, path)

def Backtrack(path, backtrack, n):
  match = []

  curr_x = 0
  curr_y = n - 1
  curr_cell = path[curr_x, curr_y]
  path = path.transpose()
  backtrack[curr_x, curr_y] = "#"

  while curr_cell != 'H':
    if curr_cell == 'M':
      match.append((curr_x, curr_y))
      curr_x += 1
      curr_y -= 1
    elif curr_cell == "L":
      curr_y -= 1
    elif curr_cell == "D":
      curr_x += 1
    curr_cell = path[curr_x, curr_y]
    backtrack[curr_x, curr_y] = "#"

  hairpin = (curr_x, curr_y)
  return (match, hairpin, path, backtrack)

def dot_bracket(path, backtrack, n):
    matches, hairpin, path, backtrack = Backtrack(path, backtrack, n)
  
    dot_bracket = ["." for _ in range(n)]
    for match in matches:
      first_paren = match[0]
      second_paren = match[1]
      dot_bracket[first_paren] = '('
      dot_bracket[second_paren] = ')'
    dot_bracket = "".join(dot_bracket)

    return dot_bracket



In [5]:
def test_nussinov(directory):

    test_dir = directory
    files = sorted(os.listdir(test_dir))

    num_tests_ran = 0
    percent_corr = 0

    for filename in files:
        if num_tests_ran == 1000:
            break

        if filename.endswith('.dbn'):
            seq, dbn = read_dbn(os.path.join(test_dir, filename))
            n = len(seq)
            
            # Skip over any sequence that has 'N' (which indicates sequencing wasn't accurate)
            valid = True if (set(seq) == {'A', 'C', 'G', 'U'}) else False 

            # Only consider sequences with length <= 400 and skip invalid sequences
            if(n > 120 or not valid): 
                continue
            
            num_tests_ran += 1

            # Run Nussinov
            pred_dbn = Nussinov(seq)

            # Compare predicted dbn with actual dbn
            percent_correct = (compare_dbn(dbn, pred_dbn)/n)
            
            percent_corr += percent_correct
            print('Percent correct: ', percent_correct)

    return (percent_corr/num_tests_ran)


In [4]:
def test_zucker(directory):
    test_dir = directory
    files = sorted(os.listdir(test_dir))

    num_tests_ran = 0
    percent_corr = 0

    for filename in files:
        if num_tests_ran == 1000:
            break

        if filename.endswith('.dbn'):
            seq, dbn = read_dbn(os.path.join(test_dir, filename))
            n = len(seq)
            
            # Skip over any sequence that has 'N' (which indicates sequencing wasn't accurate)
            valid = True if (set(seq) == {'A', 'C', 'G', 'U'}) else False 

            # Only consider sequences with length <= 400 and skip invalid sequences
            if(n > 120 or not valid): 
                continue
            
            num_tests_ran += 1

            # Run Nussinov
            distance_constant = 5
            n = len(seq)
            W = initialize_matrix(seq, distance_constant, n)
            V = initialize_matrix(seq, distance_constant, n)
            path = np.zeros((n, n), 'U1')
            backtrack = np.zeros((n, n), 'U1')

            V, W, path = Zuker(V, W, n, distance_constant, path, seq)
            pred_dbn = dot_bracket(path, backtrack, n)

            # Compare predicted dbn with actual dbn
            percent_correct = (compare_dbn(dbn, pred_dbn)/n)
            
            percent_corr += percent_correct
            print('Percent correct: ', percent_correct)

    return (percent_corr/num_tests_ran)



In [9]:
nussinovavg = test_nussinov('/Users/krishsuraparaju/Downloads/dbnFiles')
zuckeravg = test_zucker('/Users/krishsuraparaju/Downloads/dbnFiles')
print('Nussinov average: ', nussinovavg)
print('Zucker average: ', zuckeravg)

Percent correct:  0.36082474226804123
Percent correct:  0.2037037037037037
Percent correct:  0.21818181818181817
Percent correct:  0.2037037037037037
Percent correct:  0.35833333333333334
Percent correct:  0.35833333333333334
Percent correct:  0.35833333333333334
Percent correct:  0.35833333333333334
Percent correct:  0.38333333333333336
Percent correct:  0.35833333333333334
Percent correct:  0.35833333333333334
Percent correct:  0.35833333333333334
Percent correct:  0.25287356321839083
Percent correct:  0.25287356321839083
Percent correct:  0.23255813953488372
Percent correct:  0.23255813953488372
Percent correct:  0.2413793103448276
Percent correct:  0.2413793103448276
Percent correct:  0.23255813953488372
Percent correct:  0.23255813953488372
Percent correct:  0.23255813953488372
Percent correct:  0.44166666666666665
Percent correct:  0.44166666666666665
Percent correct:  0.4188034188034188
Percent correct:  0.49166666666666664
Percent correct:  0.48333333333333334
Percent correct: 