# Needleman-Wunsch Sequence Alignment Algorithm

The [Needleman-Wunsch algorithm](https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm) determines the set of optimal global alignment combinations given two source sequences. 



In [1]:
import numpy as np
import sys

In [2]:
#For arrow grid, 0=origin, 1=up, 2=left, 4=diag;
# numbers chosen for future bitflag refactor
TB_ORIG = 0
TB_UP = 1
TB_LEFT = 2
TB_DIAG = 4

#scoring parameters
#match = 1
#gap = -2
#mismatch = -1


In [3]:
#sequence test data

sequence1 = 'GATTACA'
sequence2 = 'GTCGACGCAAGATTACA'

#sequence1 = 'TTGCCAAGGTTGCCAAGGTTGCCAAGGTTGCCAAGGTTGCCAAGGTTGCCAAGG'
#sequence2 = 'CCAAGGTTAASNEUDHTTBKSBDJJGCCCAAGGTTGCDDD'


#sequence1 = 'HEAHEE'
#sequence2 = 'PAHE'

#sequence1 = 'AGT'
#sequence2 = 'AAGC'


In [4]:

def build_score_table(seq1: str, seq2: str, match_bonus, mismatch_penalty, 
                      gap_penalty) -> tuple([[], np.ndarray]):
    """Build a scoring table and traceback arrow table; return both.

    Keyword arguments:
    seq1 -- the first sequence for comparison
    seq2 -- the second sequence for comparison
    match_bonus -- score adjustment for matching sequence position
    mismatch_penalty -- score adjustment for mismatched sequence position
    gap_penalty -- score adjustment for introducing a gap in either sequence
    
    Returns table of arrows for all cells and scoring table.
    
    """
    
    #Set up scoring grid --we add 2 for the corner and two initial gap spots
    score_grid = np.zeros((len(seq1)+1, (len(seq2)+1)), dtype=np.int64)
    
    #Set constant scores in the corner
    score_grid[0][0] = 0
    score_grid[1][0] = gap_penalty
    score_grid[0][1] = gap_penalty
    
    #Initial list of arrows is empty. This is a list of lists of lists,
    # and eventually could be replaced with an ndarry (using bitmasks)
    # for memory and time performance improvement
    arrows_list = [[] for y in range(0, len(seq2)+1)]
        
    #Set up upper-left cell (end cell for traceback)
    arrows_list[0].append([TB_ORIG])

    #fill seq1 side
    for x in range(1, len(seq1) + 1):
        score_grid[x][0] = gap_penalty + score_grid[x-1][0]
        arrows_list[0].append([TB_LEFT])
    
    #fill seq2 side
    for y in range(1, len(seq2) + 1):
        score_grid[0][y] = gap_penalty + score_grid[0][y-1]
        arrows_list[y].append([TB_UP])
        
    #fill a row
    for y in range(0, len(seq2)):
        for x in range(1, len(seq1) + 1):
            
            # Get scores from relevant c ells
            above = score_grid[x][y]
            left = score_grid[x-1][y+1]
            diag = score_grid[x-1][y]
            
            result = get_cell_score(seq1[x-1], seq2[y], above, left, diag, 
                                           match_bonus, mismatch_penalty, gap_penalty)
            
            score = result[0]
            arrows = result[1]
            
            score_grid[x][y+1] = score  
            arrows_list[y+1].append(arrows)
                                                      
    return (arrows_list, score_grid)
      

In [5]:
def get_cell_score(char1: str, char2: str, above: int, left: int, diag: int, 
                   match_bonus: int, mismatch_penalty: int, gap_penalty: int) -> tuple([int, []]):
    
    """Get max score and arrow list for a given cell."""
    
    #Initialize to very low values
    diag_score = -sys.maxsize
    above_score = -sys.maxsize
    left_score = -sys.maxsize
    
    #Calculate score at the upper-left diagonal
    if char1 == char2:
        diag_score = diag + match_bonus;
    else: 
        diag_score = diag + mismatch_penalty;
        
    #calculate score directly above cell
    above_score = above + gap_penalty
    
    #calculate score to the left of cell
    left_score = left + gap_penalty
    
    #find highest score
    max_score = max(diag_score, above_score, left_score)
    
    #Choose one or more arrows to return
    arrows_to_record = []
    if max_score == left_score:
        arrows_to_record.append(TB_LEFT)
    if max_score == above_score:
        arrows_to_record.append(TB_UP)
    if max_score == diag_score:
        arrows_to_record.append(TB_DIAG)
    
    return (max_score, arrows_to_record)
            

In [6]:
def traceback(sequence1: str, sequence2: str, arrow_list: []) -> tuple([[],[]]):
    """Find all optimal sequence alignments using arrow grid generated by 
    scoring step.

    Keyword arguments:
    sequence1 -- the first sequence for comparison
    sequence2 -- the second sequence for comparison
    arrow_list -- a list of lists of lists containing each cell's arrows
    
    """
    found_branches = []
    
    curr_x, curr_y = len(arrow_list[0])-1, len(arrow_list)-1
    
    #list of branch points to trace
    branches = []
    for idx in range(0, len(arrow_list[curr_y][curr_x])):
        branches.append({'curr_x': curr_x, 'curr_y': curr_y, 'new_seq_1': '', 'new_seq_2': '', 
                         'arrow': arrow_list[curr_y][curr_x][idx]})
        
    #lists for completed sequences
    seq1_list = []
    seq2_list = []

    while (len(branches) > 0):
    
        curr_x = branches[0]['curr_x']
        curr_y = branches[0]['curr_y']

        new_seq_1 = branches[0]['new_seq_1']
        new_seq_2 = branches[0]['new_seq_2']
        
        curr_arrow = branches[0]['arrow']

        while curr_x + curr_y > 0:

            xchar = sequence1[curr_x-1]
            ychar = sequence2[curr_y-1]

            if curr_arrow == TB_DIAG:
                new_seq_1 = xchar + new_seq_1
                new_seq_2 = ychar + new_seq_2

                curr_x -= 1
                curr_y -= 1

            elif curr_arrow == TB_LEFT:
                new_seq_1 = xchar + new_seq_1
                new_seq_2 = '-' + new_seq_2

                curr_x -= 1

            else:
                new_seq_1 = '-' + new_seq_1
                new_seq_2 = ychar + new_seq_2

                curr_y -= 1

            curr_arrow_list = arrow_list[curr_y][curr_x]
            curr_arrow = curr_arrow_list[0]
        
            #Check for more branches
            if len(curr_arrow_list) > 1 and [curr_x, curr_y] not in found_branches:
                found_branches.append([curr_x, curr_y])
                for idx in range(1, len(arrow_list[curr_y][curr_x])):
                    branches.append({'curr_x': curr_x, 'curr_y': curr_y, 'new_seq_1': new_seq_1, 'new_seq_2': new_seq_2, 
                                     'arrow': arrow_list[curr_y][curr_x][idx]})
                            
        seq1_list.append(new_seq_1)
        seq2_list.append(new_seq_2)
               
        branches = branches[1:]
        
    return seq1_list, seq2_list


In [7]:
def needleman_wunsch(seq1: str, seq2: str, match_bonus: int = 1, 
                     mismatch_penalty: int = -1, gap_penalty: int = -2) -> tuple([[],[]]):
    
    result_tables = build_score_table(seq1, seq2, match_bonus, mismatch_penalty, gap_penalty)
    
    return traceback(seq1, seq2, result_tables[0])



In [8]:

result = needleman_wunsch(sequence1, sequence2) 

s1 = result[0]
s2 = result[1]
    
for n in range(0,len(s1)):
    print(s1[n])
    print(s2[n])
    print()


G---A-------TTACA
GTCGACGCAAGATTACA

G----------ATTACA
GTCGACGCAAGATTACA

G--------A--TTACA
GTCGACGCAAGATTACA

G-------A---TTACA
GTCGACGCAAGATTACA

---GA-------TTACA
GTCGACGCAAGATTACA

----------GATTACA
GTCGACGCAAGATTACA

------G----ATTACA
GTCGACGCAAGATTACA



In [9]:
#NOT IN USE -- NEEDS TO BE REFACTORED FOR NEW ARROW TABLE FORMAT
# AND MULTIPLE OPTIMAL RESULTS

STOP = '⚬'
ARROW_UP = '↑'
ARROW_LEFT = '←'
ARROW_DIAG = '⬉'

arrow_sym = {0: STOP, 1: ARROW_UP, 2: ARROW_LEFT, 4: ARROW_DIAG}


def arrow_chart(numbers):
    shp = numbers.shape
    
    print('shape = ' + str(numbers.shape))

    arrow_grid = []
    
    for x in range(0,shp[0]):
        arrows = [ ]
        for y in range(0,shp[1]):
            arrows.append(arrow_sym[numbers[x][y]])
        arrow_grid.append(arrows)
    
    return arrow_grid