In [1]:
__author__ = "Milad Miladi"
__license__ = "GPL"
__email__ = "miladim@cs.uni-freiburg.de"

import tempfile
from Bio import AlignIO, SeqIO
import os, re
import numpy as np
RNAFOLD = 'RNAfold -p --noPS '
RNAPLFOLD = 'RNAplfold '


In [7]:

def dotbracket_to_dict(struct):
    '''Returns a dictionary where basepairs are keys with !ONE! based indices joined by ":" ,
    e.g. dict {'0:10': 1, '2:8': 1} '''
    assert len(struct.replace('.', '').replace('(', '').replace(')', '')) == 0
    stack = list()
    pairs = dict()
    for pos, ch in enumerate(list(struct)):
        #         print pos+1, ch
        if ch == '(':
            stack.append(pos)
        elif ch == ')':
            left = stack.pop()
            key = "{}:{}".format(left+1, pos+1)
            pairs[key] = 1

    assert len(stack) == 0
    return pairs


def compute_part_func(infile_fa, seq_names, outdir_path="./", use_plfold=False, which_params='turner', dangles=2,
                      use_cache=False):
    '''Runs Vienna RNAfold/RNAplfold with partition function for all sequences inside input fasta file
    If use_cache, it does nothing if If the ps file with same paramaters exists '''
    from subprocess import Popen, PIPE
    #     print "compute_part_func(", infile_fa, seq_names
    if use_plfold:
        out_dir = outdir_path + RNAPLFOLD.replace(' ', '')
    else:
        out_dir = outdir_path + RNAFOLD.replace(' ', '')
    if not os.path.isdir(out_dir):
        os.mkdir(out_dir)

    if not os.path.isfile(infile_fa):
        raise IOError("Fastafile not found: {}".format(infile_fa))

    all_in_cache = all([os.path.isfile(os.path.join(out_dir, sname+'_dp.ps')) for sname in seq_names])
    if all_in_cache and use_cache:
        raise NotImplementedError("Sequence names for caching are not correctly set")
        return out_dir

    with open(infile_fa) as in_rna:

        arg_param = ""
        if which_params == 'quake':
            arg_param += " -P %s " % QUAKE_PARAM_FILE
#             dangles = 0
        elif which_params.startswith('andero'):
            assert dangles == 2
            arg_param += " -P %s " % ANDERO_PARAM_FILE
        elif which_params != 'turner':
            assert dangles == 2
            raise RuntimeError("Unknown parameter option {}".format(which_params))

        assert dangles >= 0 and dangles <= 2
        arg_param += " --dangles {} ".format(dangles)

        if use_plfold:
            p = Popen(('cd %s;' %out_dir) + RNAPLFOLD + arg_param, stdin=in_rna, shell=True, stdout=PIPE, stderr=PIPE)
        else:
            p = Popen(('cd %s;' %out_dir) + RNAFOLD + arg_param, stdin=in_rna, shell=True, stdout=PIPE, stderr=PIPE)

        out, err = p.communicate()
        if err:
            print ("Error in calling RNAfold for ", infile_fa)
            print (out)
            print (err)

            # With long sequences RNAfold prints scalign factor to stderr
            if (not use_plfold and not ("scaling factor" in err or "free energy" in err)):
                raise RuntimeError

    return out_dir


def parse_dp_ps(ps_file):
    '''Extracts base pair probabliies from vienna ps file
    returns: Dictinary of form dict[i:j]=p(i,j) '''

    # Extract sequence from ps file
    myseq = ""
    read_seq = False
    with open(ps_file) as in_ps:
        for line in in_ps:
            if "/sequence" in line:
                read_seq = True
            elif read_seq and ") } def" in line:
                read_seq = False
            elif read_seq:
                myseq += line.rstrip().rstrip("\\")
    #     print ps_file.rstrip("_dp.ps") , myseq

    ureg = re.compile(r'^(\d+)\s+(\d+)\s+(\d+\.\d+)\s+ubox\s*')
    bp_prob_dict = dict()
    bp_prob_mat = np.zeros((len(myseq), len(myseq)))

    with open(ps_file) as in_ps:
        for line in in_ps:
            if "ubox" in line:
                um = ureg.match(line)
                if um:
                    i, j, sqrp = um.groups()

                    #                     print i, j, sqrp

                    # keys are pair of indexes as smaller:larger
                    key = ":".join(sorted([i, j], reverse=True))
                    assert (key not in bp_prob_dict)
                    bpprob = float(sqrp)*float(sqrp)
                    bp_prob_dict[key] = bpprob

                    i, j = int(i), int(j)
                    bp_prob_mat[i-1, j-1] = bpprob
    return bp_prob_mat


def get_expected_accuracy(reference_struct, dp_matrix):
    '''dp_matrix is a numpy matrix where base indeices are ZERO based'''
    assert dp_matrix.shape[0] == dp_matrix.shape[1]
    assert dp_matrix.shape[0] == len(reference_struct)
    reference_struct_dict = dotbracket_to_dict(reference_struct)
    sum_TP_prob = 0.0
    for bp_key in reference_struct_dict:
        i, j = bp_key.split(":")
        i, j = int(i), int(j)
        sum_TP_prob += dp_matrix[i-1, j-1]
#         print i,j, dp_matrix[i-1,j-1]

#     print "    TP_score: %.2f" % (sum_TP_prob/len(reference_struct_dict))
    if len(reference_struct_dict) == 0:
        return 1
    return (sum_TP_prob/len(reference_struct_dict))

def get_structure_accuracy(dp_ps_file,ref_structure):
    print('Compute accuracy for: ', dp_ps_file, ref_structure)
    assert(os.path.isfile(dp_ps_file))
    dp_matrix = parse_dp_ps(dp_ps_file)
    seq_score = get_expected_accuracy(ref_structure, dp_matrix)
    return seq_score

get_structure_accuracy('./AF007904.1_659-784_dp.ps',
                       '.........(((((.....((((((...((((((....)))))))))))).....((((((....))))))......((((((....))))))...))))).........................'
                       
                      )

Compute accuracy for:  ./AF007904.1_659-784_dp.ps .........(((((.....((((((...((((((....)))))))))))).....((((((....))))))......((((((....))))))...))))).........................


0.90535097488014527

In [8]:
def fold_sequence_get_structure_accuracy(fasta_file, reference_structures, dp_outdir="./"):
    print('Compute expected sturcture accuracy for fasta file:', fasta_file)
    if not os.path.isdir(dp_outdir):
        os.mkdir(dp_out_base)
    seq_ids = [s.id for s in SeqIO.parse(fasta_file,'fasta')]
    print (seq_ids)
    if (len(reference_structures) != len(seq_ids)):
        raise RuntimeError("Difference between number of provided structures ({}) and sequences ({})".format(len(reference_structures), len(seq_ids)))
    dp_out_path = tempfile.mkdtemp(suffix='accuracy_', dir=dp_outdir)
    dp_outdir = compute_part_func(fasta_file, seq_ids, 
                                  outdir_path=dp_out_path, use_plfold=False, )

    ret_dict = dict()
    for seq_id, ref_struct in zip(seq_ids,reference_structures):
        faltten_id = seq_id.replace('/','_')
        dp_ps = '{}/{}_dp.ps'.format(dp_outdir, faltten_id)
        assert(os.path.isfile(dp_ps))
        dp_matrix = parse_dp_ps(dp_ps)
        seq_score = get_expected_accuracy(ref_struct, dp_matrix)
        ret_dict[seq_id] = seq_score

    return ret_dict


fold_sequence_get_structure_accuracy('AF007904.1_709-734_known_nt.fasta',
                                    ['.........(((((.....((((((...((((((....)))))))))))).....((((((....))))))......((((((....))))))...))))).........................'])    

Compute expected sturcture accuracy for fasta file: AF007904.1_709-734_known_nt.fasta
['AF007904.1/659-784']


{'AF007904.1/659-784': 0.90535097488014527}