In [1]:
def analyse_hits(hits, config):
    """ Segment reads based on alignment hits using dynamic programming.
        The algorithm is based on the rule that each primer alignment hit can be used only once.
        Hence if a segment is included, the next one has to be excluded.
    """
    segments = _build_segments(hits, config)
    nr = len(segments)

    # No hits, return no segments:
    if nr == 0:
        return (), (), 0

    # Initialize DP matrix and traceback dictionary:
    M = np.zeros((2, nr), dtype=int)
    B = dict()

    # Initialize entry for the first segment:
    M[1, 0] = segments[0].Len

    # Fill in DP matrix:
    for j in range(1, nr):
        for i in range(2):
            if i == 0:
                # First row holds excluded segments.
                # The can transition from eiter exluded or included segments:
                M[i, j] = M[0, j - 1]
                B[i, j] = (0, j - 1)
                if M[1, j - 1] > M[0, j - 1]:
                    M[i, j] = M[1, j - 1]
                    B[i, j] = (1, j - 1)
            elif i == 1:
                # Included segments can only transition from previosuly excluded segments:
                M[i, j] = M[0, j - 1] + segments[j].Len
                B[i, j] = (0, j - 1)

    tlen = np.argmax(M[:, nr - 1])
    valid_segments = []

    # Traceback and build solution:
    p = (tlen, nr - 1)
    while True:
        if p[0] == 1:
            s = segments[p[1]]
            # Filter out invalid segments which can be part of the
            # solution:
            if s.Len > 0:
                valid_segments.append(s)
        if p[1] == 0:
            break
        p = B[p]

    return tuple(valid_segments), hits, tlen


In [2]:
class hit:
    def __init__(self, Query, RefStart,RefEnd):
        self.Query = Query
        self.RefStart = RefStart
        self.RefEnd=RefEnd

In [3]:
N = 100000
hits = [hit(q_,rs_,rs_+50) for q_,rs_,re_ in zip(range(N),range(N),range(N))]
config = (3,4)

In [4]:
from pychopper.common_structures import Segment

In [5]:
def _build_segments(hits, config):
    "Build tuple of segments"
    segments = []
    if len(hits) == 0:
        return tuple(segments)
    for s in zip(hits, hits[1:]):
        strand = None
        seg_len = 0
        c = (s[0].Query, s[1].Query)
        if c in config:
            strand = config[c]
            seg_len = s[1].RefStart - s[0].RefEnd
        segments.append(Segment(s[0].RefStart, s[0].RefEnd, s[1].RefStart, s[1].RefEnd, strand, seg_len))
    return tuple(segments)

In [6]:
def _build_segments2(hits, config):
    "Build tuple of segments"
    segments = []
    if len(hits) == 0:
        return tuple(segments)
    for s0,s1 in zip(hits, hits[1:]):
        strand = None
        seg_len = 0
        c = (s0.Query, s1.Query)
        if c in config:
            strand = config[c]
            seg_len = s1.RefStart - s0.RefEnd
        segments.append(Segment(s0.RefStart, s0.RefEnd, s1.RefStart, s1.RefEnd, strand, seg_len))
    return tuple(segments)

In [7]:
%%timeit -n 10
_build_segments(hits,config)

101 ms ± 6.35 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [8]:
%%timeit -n 10
_build_segments2(hits,config)

77.8 ms ± 1.09 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [9]:
_build_segments(hits,config) == _build_segments2(hits,config)

True