In [213]:
def fasta_to_list(filename='BJ01.fasta'):
    with open(filename) as f:
        a = f.readlines()
    return "".join([i[:-1] for i in a[1:]])

In [303]:
penalty_table.shape

(29725, 29736)

In [181]:
def weighted_edit_distance(s1, s2, penalty = dict({'M':0,'R':1,'G':2})):
    '''
    calculate weighted edit distance of two list/str presented here,
    penalty function G=2 gap; R=1 mismatch; M=0 match
    return: 
        p penalty matrix: nrows == len(s1)
                          ncloumns == len(s2)
        D weighted edit distance: nrows == len(s1)+1
                                  ncloumns == len(s2)+1
        tranceback: the same shape of penalty matrix, each cell has its values which 
                    represent the location of distance where it derive
        penalty_table: the same shape of penalty matrix, each cell has its values which
                       represent the penalty of last distance after going forward
        D[-1,-1]: distance sum               
    '''
    import numpy as np
    p = np.ones((len(s1),len(s2)))*penalty['M']
    for i in range(len(s1)):
        for j in range(len(s2)):
            if s1[i] != s2[j]:
                p[i,j] = penalty['R']
    D = np.zeros((len(s1)+1,len(s2)+1))
    tranceback = np.zeros((len(s1),len(s2),2))
    penalty_table = np.zeros((len(s1),len(s2)))
    for i in range(len(s1)+1):
        D[i,0] = i*penalty['G']
    for j in range(len(s2)+1):
        D[0,j] = j*penalty['G']
    for i in range(len(s1)):
        for j in range(len(s2)):
            source = D[i+1,j]+penalty['G'],D[i,j]+p[i,j],D[i,j+1]+penalty['G']
            
            D[i+1,j+1] = min(source)
            if source.index(D[i+1,j+1]) == 0:
                tranceback[i,j] = [i+1,j]
                penalty_table[i,j] = penalty['G']
            elif source.index(D[i+1,j+1]) == 1:
                tranceback[i,j] = [i,j]
                penalty_table[i,j] = p[i,j]
            else:
                tranceback[i,j] = [i,j+1]
                penalty_table[i,j] = penalty['G']
    return p,D,tranceback,penalty_table,D[-1,-1]

In [197]:
def visualization(s1=s1,s2=s2,tranceback=tranceback,penalty_table=penalty_table,penalty = dict({'M':0,'R':1,'G':2})):
    '''
    find the traceback, denoted as route from the right-down corner of the distance matrix 
    to left-up corner. 
    return:
        s1_tran print(s1) beautifully '|' represent match
                                      '*' represent mismatch
                                      ' ' represent gap
        s2_tran 
        s2_tran
        
    '''
    import numpy as np
    #### the symbol between two fasta file
    route = [np.array([len(s1),len(s2)])]
    route.append(tranceback[len(s1)-1,len(s2)-1])
    lasti,lastj = tranceback[-1,-1]
    for i in range(len(s1))[::-1]:
        for j in range(len(s2))[::-1]:
            if lasti != i+1 or lastj != j+1:
                continue;
            lasti,lastj = tranceback[i,j]
            route.append(tranceback[i,j])
    transcript = [penalty_table[int(i[0])-1,int(i[1])-1] for i in route[:-1]][::-1]
    
    #### s1,s2 print and annotation
    s1_tran, s2_tran = [], []
    s1_edit, s2_edit = [], []
    for i in range(len(route)-1):
        s1_char, s2_char = route[i]
        if transcript[::-1][i] == penalty['G']: # when gap penalty
            if route[i][0] == route[i+1][0]:
                s1_tran.append('-')
                s1_edit.append('D')  # when s1 deletion
            else:
                s1_tran.append(s1[int(s1_char-1)])
                s1_edit.append('I')  # when s1 insertion
            if route[i][1] == route[i+1][1]:
                s2_tran.append('-')
                s2_edit.append('D')  # when s2 deletion
            else:
                s2_tran.append(s2[int(s2_char-1)])
                s2_edit.append('I')  # when s2 insertion
        else:
            s1_tran.append(s1[int(s1_char-1)])
            s2_tran.append(s2[int(s2_char-1)])
            if transcript[::-1][i] == penalty['R']: # when replacement
                s1_edit.append('R')
                s2_edit.append('R')
            elif transcript[::-1][i] == penalty['M']:
                s1_edit.append('M')
                s2_edit.append('M')        
    s1_tran,s2_tran,s1_edit,s2_edit = s1_tran[::-1],s2_tran[::-1],s1_edit[::-1],s2_edit[::-1]
    transcript_sym = transcript.copy()
    dict_num2sym = {0:'|',1:'*',2:' '}
    for i in range(len(transcript)):
        transcript_sym[i] = dict_num2sym[transcript[i]]
    return s1_tran,s2_tran,s1_edit,s2_edit,transcript,transcript_sym

In [214]:
p,D,tranceback,penalty_table,d = \
    weighted_edit_distance(fasta_to_list(filename='BJ01.fasta'), 
                            fasta_to_list(filename='TOR2.fasta'))
s1_tran,s2_tran,s1_edit,s2_edit,transcript,transcript_sym = visualization()

In [228]:
s1_tran,s2_tran,s1_edit,s2_edit,transcript,transcript_sym = \
visualization(s1=fasta_to_list(filename='BJ01.fasta'),
              s2=fasta_to_list(filename='TOR2.fasta'),
              tranceback=tranceback,
              penalty_table=penalty_table,
              penalty = dict({'M':0,'R':1,'G':2}))

In [280]:
with open('alignment.txt','w+') as f:
    length = len(s1_tran)
    lineword = 56
    n = int(length)//int(lineword)
    m = length-n*lineword
    str = "#################################\n" \
              "sequence_id\tbase_amount\n" \
              "BJ01.fasta\t%s\n" \
              "TOR2.fasta\t%s\n" \
      "#################################\n" \
      "weighted edit distance\t%s\n" \
      "base matched\t%s\n" \
      "base replacement\t%s\n" \
      "base insertion/deletion\t%s\n" \
      "#################################\n"%(len(fasta_to_list(filename='BJ01.fasta')),
                                      len(fasta_to_list(filename='TOR2.fasta')),
                                      d,
                                      sum([i==dict({'M':0,'R':1,'G':2})['M'] for i in transcript]),
                                      sum([i==dict({'M':0,'R':1,'G':2})['R'] for i in transcript]),
                                      sum([i==dict({'M':0,'R':1,'G':2})['G'] for i in transcript]))
    f.write(str)
    for i in range(n):
        if i==0:
            f.write('BJ01.fasta:'+"".join(s1_tran[i*lineword:(i+1)*lineword])+'\n')
            f.write('    symbol:'+"".join(transcript_sym[i*lineword:(i+1)*lineword])+'\n')
            f.write('TOR2.fasta:'+"".join(s2_tran[i*lineword:(i+1)*lineword])+'\n')
        else:
            f.write("".join(s1_tran[i*lineword:(i+1)*lineword])+'\n')
            f.write("".join(transcript_sym[i*lineword:(i+1)*lineword])+'\n')
            f.write("".join(s2_tran[i*lineword:(i+1)*lineword])+'\n')
    f.write("".join(s1_tran[(i+1)*lineword:length])+'\n')
    f.write("".join(transcript_sym[(i+1)*lineword:length])+'\n')
    f.write("".join(s2_tran[(i+1)*lineword:length])+'\n')

In [285]:
with open('transcript.txt','w+') as f:
    length = len(s1_tran)
    lineword = 56
    n = int(length)//int(lineword)
    m = length-n*lineword
    str = "###########################################\n" \
          "transformation of BJ01.fasta to TOR2.fasta\n" \
          "###########################################\n" 
    f.write(str)
    for i in range(n):
        f.write("".join(s2_edit[i*lineword:(i+1)*lineword])+'\n')
    f.write("".join(s2_edit[(i+1)*lineword:length])+'\n')

In [289]:
print(D[100,100])
print(D[1000,1000])
print(D[10000,10000])

16.0
16.0
20.0


In [298]:
np.savez('store.npz',p=p,D=D,tranceback=tranceback,penalty_table=penalty_table,d=d)

In [299]:
np.savez('visualization.npz',s1_tran=s1_tran,s2_tran=s2_tran,
         s1_edit=s1_edit,s2_edit=s2_edit,transcript=transcript,transcript_sym=transcript_sym)