In [1]:
import VariantAnnotation
from DAE import genomesDB

GA = genomesDB.get_genome()
gmDB = genomesDB.get_gene_models()

the twin 13772.s2 is not quite in the collection
set(['1995-09', '1998-11'])
birth month mismatch for twins from family 11665
the twin 12782.s2 is not quite in the collection


In [2]:
import pickle 

variants = []

with open("output_aws", "rb") as f:
    while True:
        try:
            variants.append(pickle.load(f))
        except EOFError:
            break

In [3]:
sorted_effects = sorted(variants, key=lambda k: k['count'], reverse=True)

In [4]:
all_lines = reduce(lambda acc, ef: acc | ef['lines'], sorted_effects, set())

In [5]:
coverage = []
while len(all_lines) > 0:
    all_lines = all_lines - sorted_effects[0]['lines']
    coverage.append(sorted_effects[0])
    
    for effect in sorted_effects[1:]:
        effect['lines'] = effect['lines'] - sorted_effects[0]['lines']
        effect['count'] = len(effect['lines'])
    sorted_effects = sorted(sorted_effects[1:], key=lambda k: k['count'], reverse=True)

In [6]:
asserts_text = """
        self.assertEqual({0}.gene, {1})
        self.assertEqual({0}.transcript_id, {2})
        self.assertEqual({0}.strand, {3})
        self.assertEqual({0}.effect, {4})
        self.assertEqual({0}.prot_pos, {5})
        self.assertEqual({0}.prot_length, {6})
        self.assertEqual({0}.aa_change, {7})
"""

singe_res_text = """
    def {0}(self):
        [effect] = VariantAnnotation.annotate_variant(self.gmDB, self.GA,
                                                      loc="{1}",
                                                      var="{2}")
"""

multiple_res_text = """
    def {0}(self):
        effects = VariantAnnotation.annotate_variant(self.gmDB, self.GA,
                                                     loc="{1}",
                                                     var="{2}")
        self.assertEqual(len(effects), {3})
        effects_sorted = sorted(effects, key=lambda k: k.transcript_id)
"""

def add_quotes(string):
    if string is None:
        return string
    else:
        return "\"{}\"".format(string)

def generate_single_result_test(loc, var, effect, name):
    test_code = singe_res_text.format(name, loc, var)
    assert_code = asserts_text.format("effect",
                                      add_quotes(effect.gene), 
                                      add_quotes(effect.transcript_id),
                                      add_quotes(effect.strand), 
                                      add_quotes(effect.effect), 
                                      effect.prot_pos, effect.prot_length, 
                                      add_quotes(effect.aa_change))
    return test_code + assert_code

def generate_multi_line_result_test(loc, var, effects, name):
    effects_sorted = sorted(effects, key=lambda k: k.transcript_id)
    

    test_code = multiple_res_text.format(name, loc, var, len(effects_sorted))
    
    for i, effect in enumerate(effects_sorted):
        effects_elem = "effects_sorted[{}]".format(i)
        assert_code = asserts_text.format(effects_elem, 
                                          add_quotes(effect.gene), 
                                          add_quotes(effect.transcript_id),
                                          add_quotes(effect.strand), 
                                          add_quotes(effect.effect), 
                                          effect.prot_pos, effect.prot_length, 
                                          add_quotes(effect.aa_change))
        test_code += assert_code
    
    return test_code

def generate_test(loc, var):
    chrom = loc.split(":")[0]
    chrom_loc = loc.split(":")[1]
    name = "test_chr{0}_{1}_{2}_var".format(chrom, chrom_loc, var[:3])
    result = VariantAnnotation.annotate_variant(gmDB, GA, 
                                                loc=loc, 
                                                var=var)
    if len(result) == 1:
        return generate_single_result_test(loc, var, result[0], name)
    else:
        return generate_multi_line_result_test(loc, var, result, name)

In [7]:
with open('autotests.py', 'w') as testsfile:
    for cov in coverage:
        testsfile.write(generate_test(cov['loc'], cov['var']))