# Needleman-Wunsch implementation

In this asssingment you will be asked to implement the Needleman-Wunsch algorithm.

You can implement it with a corresponding function or a class. The choice is up to you.

### The inputs are:
A protein sequnce A in a fasta file.

A protein sequence B in a fasta file.

Substitution matrix in a text file.

Gap introduction cost as an integer.

### The Outputs are:
Alingment score
Alingment representation of sequence1
Alingment representation of sequence2

## Implementation and testing
1. First you need to implement your algorithm and test your implementation.
Although there could be more than one alingment with an the optimal score, in this execrice we only requre you to provide one.

2. Design some tests which will show that your code works correctly.
For that purpose you can use some short DNA sequences. Create some examples which could be easily checked by eye.

3. Run some tests with s1.fasta and s2.fasta
Make sure that you could obtain the score of 31 with the PAM250 substitution matrix and the score of 4 with BLOSUM62 substitution matrix. Use gap cost of -8 for PAM250 and -6 for BLOSUM62. Be careful with the gap score and make sure that you score gaps negatively in your matrix.

4. Provide alingments and scores for each pair of sequences in s1.fasta, s2.fasta, s3,fasta, s4.fasta
 for both PAM250 and BLOSUM62 and use the gap cost from the previous step

In [2]:
# imports
import unittest
import numpy as np
import io
import pandas as pd

In [3]:
def needleman_wunsch(fasta_file_1, fasta_file_2, file_substitution_matrix, cost_gap_open):
    seq1= read_fasta_file(fasta_file_1)
    seq2= read_fasta_file(fasta_file_2)
    subm = read_substitution_matrix(file_substitution_matrix)
    m = calculate_matrix(seq1,seq2,cost_gap_open, subm)
    aningment_score, aligned_seq_1, aligned_seq_2 = traceback(m, seq1,seq2,cost_gap_open, subm)
    return aningment_score, aligned_seq_1, aligned_seq_2

class NeedlemanWunsch:
    def run(fasta_file_1, fasta_file_2, file_substitution_matrix, cost_gap_open):
        seq1= read_fasta_file(fasta_file_1)
        seq2= read_fasta_file(fasta_file_2)
        subm = read_substitution_matrix(file_substitution_matrix)
        m = calculate_matrix(seq1,seq2,cost_gap_open, subm)
        aningment_score, aligned_seq_1, aligned_seq_2 = traceback(m, seq1,seq2,cost_gap_open, subm)
        return aningment_score, aligned_seq_1, aligned_seq_2

In [273]:
class TestNeedleman(unittest.TestCase):
    def test_needleman0(self):
        seq1, seq2 = "data/s1.fasta","data/s2.fasta" 
        iss,_,_ = needleman_wunsch(seq1, seq2, "data/pam250.txt", -8)
        should = 31
        self.assertEqual(iss, should)
    def test_needleman1(self):
        seq1, seq2 = "data/test1_0.fasta","data/test1_1.fasta" 
        iss = needleman_wunsch(seq1, seq2, "data/pam250.txt", -8)
        should = (-4, ['E', 'I', 'L', 'G', 'N', 'S', 'T', 'R', 'V'], ['S', 'T', 'R', 'V', 'T', 'V', 'T', 'S', 'D'])
        self.assertEqual(iss, should)
    def test_needleman2(self):
        seq1, seq2 = "data/test2_0.fasta","data/test2_1.fasta" 
        iss = needleman_wunsch(seq1, seq2, "data/pam250.txt", -8)
        should = (-19, ['S', 'T', 'T', 'R', 'V', 'T', 'S', 'D'], ['E', 'T', '-', 'R', 'V', '-', '-', '-'])
        self.assertEqual(iss, should)
    def test_needleman2_blosum(self):
        seq1, seq2 = "data/test2_0.fasta","data/test2_1.fasta" 
        iss = needleman_wunsch(seq1, seq2, "data/blosum62.txt", -6)
        should = (-10, ['S', 'T', 'T', 'R', 'V', 'T', 'S', 'D'], ['E', 'T', '-', 'R', 'V', '-', '-', '-'])
        self.assertEqual(iss, should)
t = TestNeedleman()
t.test_needleman0()
t.test_needleman1()
t.test_needleman2()
t.test_needleman2_blosum()

## Guildines

You do not have to use the suggested guidlines but it might help you not go get stuck

In [4]:
def read_fasta_file(fasta_file):
    line = open(fasta_file).readlines()[1].replace("\n","")
    sequence = np.array(list(line))
    # add padding and convert to arr
    sequence= np.insert(sequence,0," ")
    return sequence

In [5]:
class TestReadFastaFile(unittest.TestCase):
    def test_read_fasta_file(self):
        path = "data/s1.fasta"
        sequence = read_fasta_file(path)
        self.assertEqual(''.join(sequence), ' ILDMDVVEGSAARFDCKVEGYPDPEVMWFKDDNPVKESRHFQIDYDEEGN')
t = TestReadFastaFile()
t.test_read_fasta_file()

In [6]:
def read_substitution_matrix(file_substitution_matrix):
    """
    Implement reading the scores file.
    It can be stored as a dictionary of example:
    scores[("A", "R")] = -1
    """
    lines = open(file_substitution_matrix).readlines()
    load_matrix = ""
    # Correct matrix so that is is convertable np matrix via loadtxt
    for l in lines:
        if l[0]=='#':
            continue
        load_matrix+= l.replace("   ","-  ")
    # Save and load to/from buffer
    f = io.StringIO(load_matrix)
    matrix = np.loadtxt(f, dtype='str')

    # Convert np matrix to dict
    scores = dict()
    for i in range(matrix.shape[0]-1):
        for j in range(matrix.shape[1]-1):
            scores[(matrix[0][i+1],matrix[j+1][0])] = int(matrix[i+1][j+1])
    return scores

In [7]:
class TestReadSubstitionMatrix(unittest.TestCase):
    def test_read_substition_matrix_pam(self):
        path = "data/pam250.txt"
        scores = read_substitution_matrix(path)
        self.assertEquals(scores.get(('*','*')), 1)
    def test_read_substition_matrix_blosum(self):
        path = "data/blosum62.txt"
        scores = read_substitution_matrix(path)
        self.assertEquals(scores.get(('A','A')), 4)
t = TestReadSubstitionMatrix()
t.test_read_substition_matrix_pam()
t.test_read_substition_matrix_blosum()

In [21]:
def init_matrix(sequence1, sequence2, gap_cost):
    """
    Implement initialization of the matrix.
    Make sure you picked the right dimention and correctly initilized the first row and the first column.
    init_matrix returns a pandas frame with the seqs as labels and the top row/column init to the gap_cost x gap_multiplier
    """
    m = np.full((len(sequence1),len(sequence2)),0)
    matrix = pd.DataFrame(m, columns=sequence2, index=sequence1)
    for gap_mulitplier in range(len(sequence1)):
        matrix.iloc[gap_mulitplier][0] = gap_cost*gap_mulitplier   
    for gap_mulitplier in range(len(sequence2)):
        matrix.iloc[0][gap_mulitplier] = gap_cost*gap_mulitplier   
    return matrix
print(init_matrix(np.array(['','A','B','C', 'D']),np.array(['','A','B','C']),-1))

      A  B  C
   0 -1 -2 -3
A -1  0  0  0
B -2  0  0  0
C -3  0  0  0
D -4  0  0  0


In [27]:
def new_value_computation(char_seq1, char_seq2, gap_cost, substitution_matrix, diag_val, top_val, left_val):
    """
    new_value_computation computes a new cells value based on the left, top-left and top value. 
    Each direction corresponds to one value, where the highest value is chosen for the new cell (colission doesn't matter).
    Implement the computation of the value in the new cell.
    In this function we assume that we want to compute the value in the new cell in the matrix.
    Assume that the values "to the left", "to the top" and "top left" are already computed and provided
    as the input to the function. Also we know what characters in both sequences correspond to the given cell.
    """
    diag_val += substitution_matrix.get((char_seq1,char_seq2))
    top_val+= gap_cost
    left_val += gap_cost
    return sorted([diag_val, top_val , left_val], reverse=True)[0]

if new_value_computation('A','R',-1,read_substitution_matrix("data/pam250.txt"), 0, 2 ,-1) != 1:
    print("error")

In [42]:
def calculate_matrix(sequence1, sequence2, gap_opening_cost, substitution_cost):
    """
    calculate_matrix completes the entire matrix for a given sequence.
    TODO this method is ugly. With more time it should be made more readiable.
    Implement the step of complete computation of the matrix
    First initialize the matrix then fill it in from top to bottom.
    """
    matrix = init_matrix(sequence1, sequence2, gap_opening_cost)
    i,j = 1,1
    while(True):
        if i <= len(sequence1)-1 and j <= len(sequence2)-1 :
            matrix.iloc[i][j]=new_value_computation(sequence1[i],sequence2[j], gap_opening_cost, substitution_cost,matrix.iloc[i-1][j-1],matrix.iloc[i-1][j],matrix.iloc[i][j-1])
        for row in range(i+1, len(sequence1)):
            matrix.iloc[row][j]=new_value_computation(sequence1[row],sequence2[j], gap_opening_cost, substitution_cost,matrix.iloc[row-1][j-1],matrix.iloc[row][j-1],matrix.iloc[row-1][j])
        for col in range(j, len(sequence2)):
            matrix.iloc[i][col]=new_value_computation(sequence1[i],sequence2[col], gap_opening_cost, substitution_cost,matrix.iloc[i-1][col-1],matrix.iloc[i-1][col],matrix.iloc[i][col-1])
        if i == len(sequence1)-1 and i == len(sequence1)-1 : break 
        if i < len(sequence1)-1 : i+=1
        if j < len(sequence2)-1 : j+=1
    return matrix

seq1 = np.array(['','P','A','W','H', 'E','A', 'E'])
seq2 = np.array(['','H','E','A', 'G','A','W', 'G','H','E','E'])
sub = read_substitution_matrix("data/pam250.txt")
m=calculate_matrix(seq1, seq2,-1,sub)
if m.iloc[7][10] != 29 :
    print('error')
    print(m)

# https://gtuckerkellogg.github.io/pairwise/demo/

In [44]:
def traceback(matrix, sequence1, sequence2, gap_opening_cost,  substitution_cost):
    """
    traceback returns ONE traceback for ONE sequnce alignment. 
    it returns an alignment score and the aligned seq1 and seq2

    Implement the traceback part of the algorithm
    With the given matrix traceback which cells were taken to complete the path from 
    the top left corner to the bottom right corner.
    """
    i, j= len(sequence1)-1, len(sequence2)-1
    traceback = look(matrix, sequence1, sequence2, gap_opening_cost,  substitution_cost, i,j,[])
    traceback.reverse()
    a1 , a2 = alingment_build(traceback,sequence1,sequence2)
    return matrix.iloc[i][j], a1, a2

def look(matrix, sequence1, sequence2, gap_opening_cost,  substitution_matrix,i,j, traceback):
    """
    look is a recursive and greedy method to find a traceback path.
    It calculates a path from the bottom right cell to the top left.
    """
    score = matrix.iloc[i][j]
    if i==0 and j==0 :
        return traceback
    traceback.append([i,j])        
    if i>0 and score == matrix.iloc[i-1][j] + gap_opening_cost:
        i-=1
        look(matrix, sequence1, sequence2, gap_opening_cost,  substitution_matrix, i,j,traceback)
    elif j>0 and score == matrix.iloc[i][j-1] + gap_opening_cost:
        j-=1
        look(matrix, sequence1, sequence2, gap_opening_cost,  substitution_matrix, i,j,traceback)
    elif j>0 and i>0 and score == matrix.iloc[i-1][j-1] + substitution_matrix.get((sequence1[i],sequence2[j])):
        i-=1
        j-=1
        look(matrix, sequence1, sequence2, gap_opening_cost,  substitution_matrix, i,j,traceback)
    return traceback

seq1 = np.array(['','P','A','W','H', 'E','A', 'E'])
seq2 = np.array(['','H','E','A', 'G','A','W', 'G','H','E','E'])
sub = read_substitution_matrix("data/pam250.txt")
m=calculate_matrix(seq1, seq2,-1,sub)
if m.iloc[7][10] != 29 :
    print('error')
    print(m)

In [49]:
def alingment_build(traceback, seq1, seq2):
    """
    Implement the alingment creation.
    Given the traceback figure out which editing operations were used to create the alingment.
    """
    seq1_align, seq2_align =[], []
    seq1, seq2= seq1[1:], seq2[1:]
    for p in traceback:
        if p[0] == 0:
            seq1_align.append('-')
            seq2_align.append(seq2[p[1]-1])
        elif p[1] == 0:
            seq1_align.append(seq1[p[0]-1])
            seq2_align.append('-')
        else:
            seq1_align.append(seq1[p[0]-1])
            seq2_align.append(seq2[p[1]-1])

    # Substitute '-' if in the traceback we remain at the same position i for a sequence
    # Here holds len(seq1)=len(seq2)
    for i in range(len(seq1_align)):
        if i > 0 and traceback[i][0] == traceback[i-1][0]:
            seq1_align[i] = '-'
        if i > 0 and traceback[i][1] == traceback[i-1][1]:
            seq2_align[i] = '-'
    return seq1_align, seq2_align

seq1 = np.array(['','P','A','W','H', 'E','A', 'E'])
seq2 = np.array(['','H','E','A', 'G','A','W', 'G','H','E','E'])
sub = read_substitution_matrix("data/pam250.txt")
m=calculate_matrix(seq1, seq2,-1,sub)
t = traceback(m,seq1,seq2,-1,sub)
print(t)
# print(alingment_build(t,seq1,seq2))

(29, ['-', '-', 'P', '-', 'A', 'W', '-', 'H', 'E', 'A', 'E'], ['H', 'E', 'A', 'G', 'A', 'W', 'G', 'H', 'E', '-', 'E'])


![](img/test.png)