In [1]:
import numpy as np

Define alphabet and scoring function

In [2]:
keys = ['A', 'C', 'T', 'G', '-']
rho = -10   # open gap penalty
delta = {}  # match/mismatch costs
for i in range(len(keys)):
    delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j]  else -1 for j in range(len(keys))])}
costs = (rho, delta)

Define variations on the algorithm

In [3]:
styles = ["global", "fitting", "local"]

Define backpointers

In [4]:
OPEN_UP = (-1, -1, 0)
EXTEND_UP = (0, -1, 0)
CLOSE_UP = (1, 0, 0)
OPEN_LEFT = (1, 0, -1)
EXTEND_LEFT = (0, 0, -1)
CLOSE_LEFT = (-1, 0, 0)
DIAG = (0, -1, -1)
ORIGIN = (0, 0, 0)

In [5]:
def traceback(v, w, i, j, pointers):
  k = 1 # current table
  new_v = []
  new_w = []
  while True:
    dk, di, dj = pointers[k, i, j]
    if (dk, di, dj) == ORIGIN or (i <= 0 and j <= 0):
      # End of backtrace
      break

    if k == 1:
      if (dk, di, dj) == DIAG:
        # Match or mismatch
        new_v.append(v[i-1])
        new_w.append(w[j-1])
      # Nothing to be done to close gaps
    elif k == 0:
      # Insert gap
      new_v.append('-')
      new_w.append(w[j-1])
    elif k == 2:
      # Insert gap
      new_v.append(v[i-1])
      new_w.append('-')

    # Update current table and indices
    k, i, j = k + dk, i + di, j + dj
  return ''.join(new_v[::-1])+'\n'+''.join(new_w[::-1])

"Helper function which updates the horizontal gap DP table"
def calculate_horizontal_gap(M, pointers, i, j, rho, sigma):
  # Calculate score of horizontal gap
  le_score = M[0][i][j - 1] + sigma
  lo_score = M[1][i][j - 1] + sigma + rho

  # Determine whether this is a gap opening or extension and update table
  if le_score > lo_score:
    # Prefer extension over opening if equal
    M[0, i, j] = le_score
    pointers[0, i, j] = EXTEND_LEFT
  else:
    M[0, i, j] = lo_score
    pointers[0, i, j] = OPEN_LEFT

"Helper function which updates the vertical gap DP table"
def calculate_vertical_gap(M, pointers, i, j, rho, sigma):
  # Calculate scores of vertical gap
  ue_score = M[2][i - 1][j] + sigma
  uo_score = M[1][i - 1][j] + sigma + rho

  # Determine whether this is a gap opening or extension and update table
  if ue_score > uo_score:
    # Prefer extension over opening if equal
    M[2, i, j] = ue_score
    pointers[2, i, j] = EXTEND_UP
  else:
    M[2, i, j] = uo_score
    pointers[2, i, j] = OPEN_UP

"Helper function which updates the match/mismatch DP table"
def calculate_match_mismatch(M, pointers, i, j, v, w, delta):
  # Calculate match/mismatch score and update tables
  d_score = M[1, i - 1, j - 1] + delta[v[i - 1]][w[j - 1]]
  M[1, i, j] = d_score
  pointers[1, i, j] = DIAG

"Main function, returns the best score and alignment of the two strings"
def align(v, w, costs, style="global"):
  score, alignment = None, None
  rho, delta = costs

  # DP tables, with layer 0 = horizontal, 1 = match, 2 = vertical
  M = np.array([[[-np.inf for j in range(len(w)+1)] for i in range(len(v)+1)] for k in range(3)])
  pointers = np.array([[[ORIGIN for j in range(len(w)+1)] for i in range(len(v)+1)] for k in range(3)])

  for i in range(len(v) + 1):
    for j in range(len(w) + 1):
      if i == 0 and j == 0:
        match style:
          case "global":
            # Origin has zero score in match table, but not others
            M[1, 0, 0] = 0
            pointers[1, 0, 0] = ORIGIN
          case "fitting":
            # Allows initial gaps in first word but not second
            M[:2, 0, 0] = 0
            pointers[:2, 0, 0] = ORIGIN
          case "local":
            # Allows initial gaps in both words
            M[:, 0, 0] = 0
            pointers[:, 0, 0] = ORIGIN
      else: # Not origin
        # Update transitions in all tables
        if j > 0:
          calculate_horizontal_gap(M, pointers, i, j, rho, delta['-'][w[j - 1]])
        if i > 0:
          calculate_vertical_gap(M, pointers, i, j, rho, delta[v[i - 1]]['-'])
        if i > 0 and j > 0:
          calculate_match_mismatch(M, pointers, i, j, v, w, delta)

        # Determine whether or not to close any gaps
        if M[2, i, j] > M[1, i, j] and M[2, i, j] > M[0, i, j]:
          M[1, i, j] = M[2, i, j]
          pointers[1, i, j] = CLOSE_UP
        elif M[0, i, j] > M[1, i, j]:
          M[1, i, j] = M[0, i, j]
          pointers[1, i, j] = CLOSE_LEFT

        # Additional starting points for other alignments
        if i == 0 and style == "fitting" and M[1, i, j] < 0:
          M[:2, i, j] = 0
          pointers[:2, i, j] = ORIGIN
        if style == "local" and M[1, i, j] < 0:
          M[:, i, j] = 0
          pointers[:, i, j] = ORIGIN

  # Determine score and backtrace depending on type of alignment
  match style:
    case "global":
      score = M[1, -1, -1]
      alignment = traceback(v, w, len(v), len(w), pointers)
    case "fitting":
      score = np.max(M[1, -1, :])
      alignment = traceback(v, w, len(v), np.argmax(M[1, -1, :]), pointers)
    case "local":
      max_idx = np.argmax(M[1])
      x = max_idx // (len(w) + 1)
      y = max_idx % (len(w) + 1)
      score = M[1, x, y]
      alignment = traceback(v, w, x, y, pointers)
    case _:
      raise ValueError("Not a supported alignment type")

  return score, alignment

In [6]:
score, alignment = align("AATTAAGGAACC", "ATAGAC", costs)
print(alignment)
print(score)

AATTAAGGAACC
A------TAGAC
-16.0


In [16]:
f_score, f_align = align("CGCGCG", "AATTAATTCCGGCCGGCCGGAATTAATT", costs, "fitting")
print(f_align)
print(f_score)

CGCGCG
CGGCCG
2.0


In [15]:
l_score, l_align = align("AAAGGCCCGGCGGGAAA", "TTTTTTGGCCCGGCCGGGTTTTTTT", costs, "local")
print(l_align)
print(l_score)

GGCCCGGCGGG
GGCCCGGCCGG
9.0


#Run code on real alphabet

In [17]:
!wget -c 'https://drive.google.com/uc?export=download&id=1LZDSe3BIVN8nS68WsoS1k_kicTdeq1uF' -O data.zip
!unzip -o data.zip

--2024-12-20 23:42:17--  https://drive.google.com/uc?export=download&id=1LZDSe3BIVN8nS68WsoS1k_kicTdeq1uF
Resolving drive.google.com (drive.google.com)... 172.253.63.139, 172.253.63.102, 172.253.63.138, ...
Connecting to drive.google.com (drive.google.com)|172.253.63.139|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://drive.usercontent.google.com/download?id=1LZDSe3BIVN8nS68WsoS1k_kicTdeq1uF&export=download [following]
--2024-12-20 23:42:18--  https://drive.usercontent.google.com/download?id=1LZDSe3BIVN8nS68WsoS1k_kicTdeq1uF&export=download
Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 142.251.179.132, 2607:f8b0:4004:c1f::84
Connecting to drive.usercontent.google.com (drive.usercontent.google.com)|142.251.179.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 146529 (143K) [application/octet-stream]
Saving to: ‘data.zip’


2024-12-20 23:42:20 (6.20 MB/s) - ‘data.zip’ saved [146529/146529]



In [18]:
def read_blosum62(path):
    """
    Reads in the ncbi's BLOSUM62.txt file and loads the scoring matrix
    into a dictionary.

    :param: path is the full path in the local filesystem at which the .txt file is located
    :return: a dictionary of dictionaries which will hold the cost of various amino acid
    substitutions as defined in BLOSUM62.
    """
    delta = {}
    with open(path, 'r') as f:
        lines = f.readlines()[6:]
        keys = lines[0].split()
        keys[-1] = '-'
        for i, line in enumerate(lines[1:]):
            delta[keys[i]] = {k : int(v) for (k,v) in zip(keys, line.split()[1:])}
    return delta

blosum = read_blosum62('./BLOSUM62.txt')
b_costs = (rho, blosum)

No gaps gives the same score

In [19]:
human_a = 'GIVEQCCTSICSLYQLENYCN'
bovine_a = 'GIVEQCCASVCSLYQLENYCN'

scoreA, alignmentA = align(human_a, bovine_a, b_costs)
print(alignmentA)
print(scoreA)
print("vs")
scoreAc, alignmentAc = align(human_a, bovine_a, (0, blosum))
print(alignmentAc)
print(scoreAc)

GIVEQCCTSICSLYQLENYCN
GIVEQCCASVCSLYQLENYCN
115.0
vs
GIVEQCCTSICSLYQLENYCN
GIVEQCCASVCSLYQLENYCN
115.0


In [20]:
human_b = 'FVNQHLCGSHLVEALYLVCGERGFFYTPKT'
bovine_b = 'FVNQHLCGSHLVEALYLVCGERGFFYTPKA'

scoreB, alignmentB = align(human_b, bovine_b, b_costs)
print(alignmentB)
print(scoreB)
print("vs")
scoreBc, alignmentBc = align(human_b, bovine_b, (0, blosum))
print(alignmentBc)
print(scoreBc)

FVNQHLCGSHLVEALYLVCGERGFFYTPKT
FVNQHLCGSHLVEALYLVCGERGFFYTPKA
163.0
vs
FVNQHLCGSHLVEALYLVCGERGFFYTPKT
FVNQHLCGSHLVEALYLVCGERGFFYTPKA
163.0


For gene with splicing [chr11:5246500-5248500 (reverse strand), from CS 466 Lecture Notes]

In [21]:
full = "ATGGTGCATCTGACTCCTGAGGAGAAGTCTGCCGTTACTGCCCTGT\
GGGGCAAGGTGAACGTGGATGAAGTTGGTGGTGAGGCCCTGGGCAGGTTGGTATCAAGGTTACAAGACAGGTTTAAGGAGACCAATAGAAACTGGGCA\
TGTGGAGACAGAGAAGACTCTTGGGTTTCTGATAGGCACTGACTCTCTCTGCCTATTGGTCTATTTTCCCACCCTTAGGCTGCTGGTGGTCTACCCTT\
GGACCCAGAGGTTCTTTGAGTCCTTTGGGGATCTGTCCACTCCTGATGCTGTTATGGGCAACCCTAAGGTGAAGGCTCATGGCAAGAAAGTGCTCGGT\
GCCTTTAGTGATGGCCTGGCTCACCTGGACAACCTCAAGGGCACCTTTGCCACACTGAGTGAGCTGCACTGTGACAAGCTGCACGTGGATCCTGAGAA\
CTTCAGGGTGAGTCTATGGGACGCTTGATGTTTTCTTTCCCCTTCTTTTCTATGGTTAAGTTCATGTCATAGGAAGGGGATAAGTAACAGGGTACAGT\
TTAGAATGGGAAACAGACGAATGATTGCATCAGTGTGGAAGTCTCAGGATCGTTTTAGTTTCTTTTATTTGCTGTTCATAACAATTGTTTTCTTTTGT\
TTAATTCTTGCTTTCTTTTTTTTTCTTCTCCGCAATTTTTACTATTATACTTAATGCCTTAACATTGTGTATAACAAAAGGAAATATCTCTGAGATAC\
ATTAAGTAACTTAAAAAAAAACTTTACACAGTCTGCCTAGTACATTACTATTTGGAATATATGTGTGCTTATTTGCATATTCATAATCTCCCTACTTT\
ATTTTCTTTTATTTTTAATTGATACATAATCATTATACATATTTATGGGTTAAAGTGTAATGTTTTAATATGTGTACACATATTGACCAAATCAGGGT\
AATTTTGCATTTGTAATTTTAAAAAATGCTTTCTTCTTTTAATATACTTTTTTGTTTATCTTATTTCTAATACTTTCCCTAATCTCTTTCTTTCAGGG\
CAATAATGATACAATGTATCATGCCTCTTTGCACCATTCTAAAGAATAACAGTGATAATTTCTGGGTTAAGGCAATAGCAATATCTCTGCATATAAAT\
ATTTCTGCATATAAATTGTAACTGATGTAAGAGGTTTCATATTGCTAATAGCAGCTACAATCCAGCTACCATTCTGCTTTTATTTTATGGTTGGGATA\
AGGCTGGATTATTCTGAGTCCAAGCTAGGCCCTTTTGCTAATCATGTTCATACCTCTTATCTTCCTCCCACAGCTCCTGGGCAACGTGCTGGTCTGTG\
TGCTGGCCCATCACTTTGGCAAAGAATTCACCCCACCAGTGCAGGCTGCCTATCAGAAAGTGGTGGCTGGTGTGGCTAATGCCCTGGCCCACAAGTAT\
CACTAA"
spliced = "ATGGTGCATCTGACTCCTGAGGAGAAGTCTGCCGTTACTGCCCTGT\
GGGGCAAGGTGAACGTGGATGAAGTTGGTGGTGAGGCCCTGGGCAG\
GCTGCTGGTGGTCTACCCTT\
GGACCCAGAGGTTCTTTGAGTCCTTTGGGGATCTGTCCACTCCTGATGCTGTTATGGGCAACCCTAAGGTGAAGGCTCATGGCAAGAAAGTGCTCGGT\
GCCTTTAGTGATGGCCTGGCTCACCTGGACAACCTCAAGGGCACCTTTGCCACACTGAGTGAGCTGCACTGTGACAAGCTGCACGTGGATCCTGAGAA\
CTTCAGG\
CTCCTGGGCAACGTGCTGGTCTGTG\
TGCTGGCCCATCACTTTGGCAAAGAATTCACCCCACCAGTGCAGGCTGCCTATCAGAAAGTGGTGGCTGGTGTGGCTAATGCCCTGGCCCACAAGTAT\
CACTAA"

In [23]:
real_score, real_align = align(spliced, full, costs)
print(real_align)
print(real_score)
print("vs")
real_score_c, real_align_c = align(spliced, full, (0, delta))
print(real_align_c)
print(real_score_c)

ATGGTGCATCTGACTCCTGAGGAGAAGTCTGCCGTTACTGCCCTGTGGGGCAAGGTGAACGTGGATGAAGTTGGTGGTGAGGCCCTGGGC----------------------------------------------------------------------------------------------------------------------------------AGGCTGCTGGTGGTCTACCCTTGGACCCAGAGGTTCTTTGAGTCCTTTGGGGATCTGTCCACTCCTGATGCTGTTATGGGCAACCCTAAGGTGAAGGCTCATGGCAAGAAAGTGCTCGGTGCCTTTAGTGATGGCCTGGCTCACCTGGACAACCTCAAGGGCACCTTTGCCACACTGAGTGAGCTGCACTGTGACAAGCTGCACGTGGATCCTGAGAACTTCAG----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Affine gap penalty correctly aligns spliced section, while standard Needleman-Wunsch adds many random small gaps in the alignment.