From 41b64023ad29cda4b061f58a7a33dd43712fceba Mon Sep 17 00:00:00 2001 From: pulintz Date: Fri, 3 Jun 2016 15:13:19 -0400 Subject: [PATCH] CON-2 (cgates, pulintz): connor zygote --- .project | 17 ++++++ .pydevproject | 8 +++ connor/__init__.py | 0 connor/connor.py | 45 ++++++++++++++++ test/__init__.py | 0 test/connor_test.py | 125 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 195 insertions(+) create mode 100644 .project create mode 100644 .pydevproject create mode 100644 connor/__init__.py create mode 100644 connor/connor.py create mode 100644 test/__init__.py create mode 100644 test/connor_test.py diff --git a/.project b/.project new file mode 100644 index 0000000..926390a --- /dev/null +++ b/.project @@ -0,0 +1,17 @@ + + + Connor + + + + + + org.python.pydev.PyDevBuilder + + + + + + org.python.pydev.pythonNature + + diff --git a/.pydevproject b/.pydevproject new file mode 100644 index 0000000..037bd25 --- /dev/null +++ b/.pydevproject @@ -0,0 +1,8 @@ + + + +/${PROJECT_DIR_NAME} + +python 2.7 +Default + diff --git a/connor/__init__.py b/connor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/connor/connor.py b/connor/connor.py new file mode 100644 index 0000000..51d2cf5 --- /dev/null +++ b/connor/connor.py @@ -0,0 +1,45 @@ +''' +Created on Jun 3, 2016 + +@author: pulintz, cgates +''' +from __future__ import print_function, absolute_import, division +from _collections import defaultdict +from Bio.AlignIO.Interfaces import AlignmentWriter + +class LightweightAlignment(object): + """ Minimal info from PySam.AlignedSegment""" + def __init__(self, aligned_segment): + self.name = aligned_segment.query_name + chrom = aligned_segment.reference_name + pos1 = aligned_segment.reference_start + pos2 = aligned_segment.next_reference_start + if pos1 < pos2: + self.key = (chrom, pos1, pos2) + else: + self.key = (chrom, pos2, pos1) + +def _build_alignment_family_dict(lw_aligns): + af_dict = defaultdict(set) + for lwa in lw_aligns: + af_dict[lwa.key].add(lwa.name) + return af_dict + + +def _build_read_families(aligned_segments,coord_read_name_dict): + family_dict = defaultdict(set) + for aseg in aligned_segments: + key = LightweightAlignment(aseg).key + family_dict[key].add(aseg) + if (2*len(coord_read_name_dict[key])) == len(family_dict[key]): + yield family_dict.pop(key) + +def _build_consensus_pair(alignments): + start_alignment = alignments[0] + for alignment in alignments[1:]: + if alignment.query_name == start_alignment.query_name: + return (start_alignment, alignment) + + +if __name__ == '__main__': + pass diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/connor_test.py b/test/connor_test.py new file mode 100644 index 0000000..995b042 --- /dev/null +++ b/test/connor_test.py @@ -0,0 +1,125 @@ +#pylint: disable=invalid-name, too-few-public-methods, too-many-public-methods +#pylint: disable=protected-access, missing-docstring +from __future__ import print_function, absolute_import +from collections import namedtuple +import unittest +from connor import connor + + +MockPysamAlignedSegment = namedtuple('MockPysamAlignedSegment', + ('query_name,' + 'reference_name,' + 'reference_start,' + 'next_reference_start')) + +def align_seg(a, b, c, d): + return MockPysamAlignedSegment(query_name=a, + reference_name=b, + reference_start=c, + next_reference_start=d) + + +class TestConnor(unittest.TestCase): + + def test_build_alignment_family_dict(self): + Align = namedtuple('Align', 'name key') + align1 = Align(name='align1', key=3) + align2 = Align(name='align2', key=3) + align3 = Align(name='align3', key=4) + + actual_dict = connor._build_alignment_family_dict([align1, + align2, + align3]) + + expected_dict = {3: set(['align1', 'align2']), + 4: set(['align3'])} + self.assertEquals(expected_dict, actual_dict) + + def test_build_read_families_oneFamily(self): + align_A0 = align_seg("alignA", 'chr1', 10, 100) + align_A1 = align_seg("alignA", 'chr1', 100, 10) + align_B0 = align_seg("alignB", 'chr1', 10, 100) + align_B1 = align_seg("alignB", 'chr1', 100, 10) + alignments = [align_A0, align_B0, align_A1, align_B1] + coord_read_name_dict = {('chr1', 10, 100): set(['alignA', 'alignB'])} + + actual_families = [family for family in connor._build_read_families(alignments, coord_read_name_dict)] + + expected_families = [set([align_A0, align_A1, align_B0, align_B1])] + self.assertEquals(expected_families, actual_families) + + def test_build_read_families_threeFamilies(self): + align_A0 = align_seg("alignA", 'chr1', 10, 100) + align_A1 = align_seg("alignA", 'chr1', 100, 10) + align_B0 = align_seg("alignB", 'chr1', 10, 100) + align_B1 = align_seg("alignB", 'chr1', 100, 10) + align_C0 = align_seg("alignC", 'chr1', 20, 200) + align_C1 = align_seg("alignC", 'chr1', 200, 20) + align_D0 = align_seg("alignD", 'chr1', 30, 300) + align_D1 = align_seg("alignD", 'chr1', 300, 30) + alignments = [align_A0, align_B0, align_C0, align_A1, align_B1, + align_D0, align_D1, align_C1] + coord_read_name_dict = {('chr1', 10, 100): set(['alignA', 'alignB']), + ('chr1', 20, 200): set(['alignC']), + ('chr1', 30, 300): set(['alignD'])} + + actual_families = [family for family in connor._build_read_families(alignments, coord_read_name_dict)] + + expected_families = [set([align_A0, align_A1, align_B0, align_B1]), + set([align_D0, align_D1]), + set([align_C0, align_C1])] + self.assertEquals(expected_families, actual_families) + + def test_build_consensus_read(self): + align_A0 = align_seg("alignA", 'chr1', 10, 100) + align_A1 = align_seg("alignA", 'chr1', 100, 10) + align_B0 = align_seg("alignB", 'chr1', 10, 100) + align_B1 = align_seg("alignB", 'chr1', 100, 10) + alignments = [align_A0, align_B0, align_B1, align_A1] + + actual_pair = connor._build_consensus_pair(alignments) + + expected_pair = (align_A0, align_A1) + self.assertEquals(expected_pair, actual_pair) + + +class TestLightweightAlignment(unittest.TestCase): + def test_lightweight_alignment_forwardRead(self): + alignedSegment = MockPysamAlignedSegment(query_name="align1", + reference_name='chr1', + reference_start=10, + next_reference_start=100) + + actual_lwa = connor.LightweightAlignment(alignedSegment) + + self.assertEquals("align1", actual_lwa.name) + self.assertEquals(('chr1', 10, 100), actual_lwa.key) + + def test_lightweight_alignment_reverseRead(self): + alignedSegment = MockPysamAlignedSegment(query_name="align1", + reference_name='chr1', + reference_start=100, + next_reference_start=10) + + actual_lwa = connor.LightweightAlignment(alignedSegment) + + self.assertEquals("align1", actual_lwa.name) + self.assertEquals(('chr1', 10, 100), actual_lwa.key) + + def test_lightweight_alignment_weirdRead(self): + alignedSegment = MockPysamAlignedSegment(query_name="align1", + reference_name='chr1', + reference_start=100, + next_reference_start=100) + + actual_lwa = connor.LightweightAlignment(alignedSegment) + + self.assertEquals("align1", actual_lwa.name) + self.assertEquals(('chr1', 100, 100), actual_lwa.key) + + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testName'] + unittest.main()