https://tiefenauer.github.io/blog/smith-waterman/

In [1]:
import numpy as np
import regex as re
from tqdm import tqdm_notebook as tqdm
from time import time

In [2]:
def build_matrix(a, b, match_score=3, gap_cost=2, debug=False):  
    
    len_a = len(a)
    len_b = len(b)
    print( "* Complexity: {:,} ({:,} × {:,})".format( len_a * len_b, len_a, len_b ) )
    H, P = {}, {}
    
    if debug:
        H_ = np.zeros( ( len_a + 1, len_b + 1), np.int)
        P_ = np.zeros( ( len_a + 1, len_b + 1), np.int)
    
    for i in range( 1, len_a+1 ):
        for j in range( 1, len_b+1 ):
            match = H.get( (i - 1, j - 1 ), 0 ) + ( match_score if a[i - 1] == b[j - 1] else - match_score )
            delete = H.get( (i - 1, j ), 0 )  - gap_cost
            insert = H.get( (i , j - 1 ), 0 )  - gap_cost
            values = [ 0, match, delete, insert ]
            mx = max( values )
            if mx == 0 : continue
            argmax = values.index( mx )
            H[ (i, j) ] = mx
            P[ (i, j) ] = argmax

            if debug:
                H_[i,j] = mx
                P_[i,j] = argmax
    
    if debug:
        print(H_)
        print(P_)
        
    return H, P

In [3]:
def traceback( P, xy ):
    end_i, end_j = xy
    
    value = P.get( (end_i, end_j), 0 )
    if value == 1 : new_i, new_j = end_i - 1, end_j - 1 
    elif value == 2 : new_i, new_j = end_i - 1, end_j 
    elif value == 3 : new_i, new_j = end_i, end_j - 1 
    else: 
        return end_i, end_j
    return traceback( P, (new_i, new_j) )

In [4]:
def smith_waterman(a, b, match_score=3, gap_cost=2, min_len=8, debug=False ):
    """
    a : source
    b : target
    """
    cutoff = min_len * match_score
    print( "* Build Matrix ... ")
    _q = time()
    H, P = build_matrix(a, b, match_score, gap_cost, debug=debug )
    H_lst = H.items()
    H_sorted = sorted( H_lst, key=lambda x: ( x[1], x[0][1] ), reverse=True )
    print( "  ... {:0.3f}\n".format( time()-_q ) )
    
    # get all sets
    print( "* Traceback ... ")
    _q = time()
    quotes_all = []
    for (i, j), value in tqdm(H_sorted):
        if value < cutoff : continue
        end_i, end_j = i, j
        begin_i, begin_j = traceback( P, (end_i, end_j) )
        quotes_all.append( ( (begin_i, end_i), (begin_j, end_j) ) )     # string[begin:end]
    print( "  ... {:0.3f}\n".format( time()-_q ) )
    
    # remove overlap ranges
    print( "* Refine outputs ... ")
    _q = time()
    occupation = [0] * (len(b) + 1)
    quotes = []
    for elem in tqdm(quotes_all):
        (_, _), (b_j, e_j) = elem
        if sum( occupation[b_j:e_j] ) > 0: continue
        quotes.append( elem )
        occupation[b_j:e_j] = [1] * ( e_j - b_j )
    print( "  ... {:0.3f}\n".format( time()-_q ) )
    print( "* Complete!\n ")
    return sorted( quotes, key=lambda x: x[1][0] )

In [5]:
def print_output(a, b, indices ):
    
    for (i1,i2), (j1,j2) in indices:
        print( "TRG：{:08d}-{:08d}\t{}".format( j1, j2, b[j1:j2] ) )
        print( "SRC：{:08d}-{:08d}\t{}".format( i1, i2, a[i1:i2] ) ) 
        print()

In [6]:
def print_html(a, b, indices ):

    style = """
    <style>
    .match { 
        color: blue;
    }
    </style>

    """

    handler = open("finder.SmithWaterman.v2.html", 'w', encoding="utf-8")
    i = 1
    border_size = 20
    for ( i_b, i_e ), ( j_b, j_e)  in indices:    
        handler.write( "<div>" )
        handler.write( "\t<h2>{:d}</h2>\n".format(i) )
        handler.write( "\t<p class='rtg'>{}<span class='match'>{}</span>{}</p>".format( b[j_b-border_size:j_b], b[j_b:j_e], b[j_e:j_e+border_size] ) )
        handler.write( "\t<p class='ref'>{}<span class='match'>{}</span>{}</p>".format( a[i_b-border_size:i_b], a[i_b:i_e], a[i_e:i_e+border_size] ) )
        handler.write( "\n</div>" )
        handler.write( "<br><br>")
        i += 1

    handler.write( style )
    handler.close()

In [7]:
def get_texts( ref_raw, trg_raw ):

    # hanzi 범위
    cjk_range = "[\p{Han}]"
    cjk_range_re = re.compile( cjk_range, re.UNICODE)

    # hanzi 추출
    ref = "".join( re.findall( cjk_range_re, ref_raw  ) ) 
    trg = "".join( re.findall( cjk_range_re, trg_raw  ) ) 

    return ref, trg

## TEST

In [8]:
a, b = 'GGTTGACTA', 'TGTTACGGTGTTACGGTGTTACGGTGTTACGG'
rst = smith_waterman(a, b, min_len=4, debug=True)
for (i1,i2), (j1,j2) in rst:
    print( a[i1:i2])
    print( b[j1:j2])
    print()


* Build Matrix ... 
* Complexity: 288 (9 × 32)
[[ 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
   0  0  0  0  0  0  0  0  0]
 [ 0  0  3  1  0  0  0  3  3  1  3  1  0  0  0  3  3  1  3  1  0  0  0  3
   3  1  3  1  0  0  0  3  3]
 [ 0  0  3  1  0  0  0  3  6  4  4  2  0  0  0  3  6  4  4  2  0  0  0  3
   6  4  4  2  0  0  0  3  6]
 [ 0  3  1  6  4  2  0  1  4  9  7  7  5  3  1  1  4  9  7  7  5  3  1  1
   4  9  7  7  5  3  1  1  4]
 [ 0  3  1  4  9  7  5  3  2  7  6 10 10  8  6  4  2  7  6 10 10  8  6  4
   2  7  6 10 10  8  6  4  2]
 [ 0  1  6  4  7  6  4  8  6  5 10  8  8  7  5  9  7  5 10  8  8  7  5  9
   7  5 10  8  8  7  5  9  7]
 [ 0  0  4  3  5 10  8  6  5  3  8  7  6 11  9  7  6  4  8  7  6 11  9  7
   6  4  8  7  6 11  9  7  6]
 [ 0  0  2  1  3  8 13 11  9  7  6  5  4  9 14 12 10  8  6  5  4  9 14 12
  10  8  6  5  4  9 14 12 10]
 [ 0  3  1  5  4  6 11 10  8 12 10  9  8  7 12 11  9 13 11  9  8  7 12 11
   9 13 11  9  8  7 12 11  9]
 [ 0  1  0  3  2 

HBox(children=(IntProgress(value=0, max=258), HTML(value='')))


  ... 0.020

* Refine outputs ... 


HBox(children=(IntProgress(value=0, max=13), HTML(value='')))


  ... 0.012

* Complete!
 
GTTGAC
GTTAC

GGTTGAC
GTGTTAC

GGTTGAC
GTGTTAC

GGTTGAC
GTGTTAC



## Experiment

In [9]:
txt_ref_path = "DATA/SOMUN.TN.txt"
txt_trg_path = "DATA/DYBG.TN.txt"

with open(txt_ref_path, 'r', encoding="utf-8") as fl:
    ref_raw = fl.readlines()

with open(txt_trg_path, 'r', encoding="utf-8") as fl:
    trg_raw = fl.read()

In [10]:
chapter_n = 0
ref, trg = get_texts( ref_raw[ chapter_n ], trg_raw )
# 황제내경 81편 중 n번째

In [11]:
rst = smith_waterman( ref, trg, min_len=8)

* Build Matrix ... 
* Complexity: 793061940 (911 × 870540)
  ... 890.414

* Traceback ... 


HBox(children=(IntProgress(value=0, max=7064517), HTML(value='')))


  ... 68.607

* Refine outputs ... 


HBox(children=(IntProgress(value=0, max=387321), HTML(value='')))


  ... 1.339

* Complete!
 


In [13]:
print_html( ref, trg, rst)

In [None]:
# print_output(txt_ref, txt_trg, rst)