In this notebook I will trial a working concept of the core functionality.

Main design:
- Indexed bam file is read by multiple workers in paralllel
- Each worker fetches a region and applies a user-specified function over the region.
- Regions can be specified manually by user, or defined automatically according to different schemes. First scheme to implement is `byfeature` which bins genome into N non-overlapping regions that each contain features of coordinates `(start, end)` such that each feature is contained in only one region.

In [1]:
import multiprocessing as mp
import pysam
import pandas as pd
from datetime import datetime

In [2]:
bamfilepath = "../data/SLX-18505_N701_A03_r2.umiAppend.Aligned.out.featureCounts.sorted.bam"

In [3]:
samfile = pysam.AlignmentFile(bamfilepath, "rb")

In [4]:
samfile.lengths[0:9]

(248956422,
 133797422,
 135086622,
 133275309,
 114364328,
 107043718,
 101991189,
 90338345,
 83257441)

In [5]:
samfile.references[0:9]

('1.human',
 '10.human',
 '11.human',
 '12.human',
 '13.human',
 '14.human',
 '15.human',
 '16.human',
 '17.human')

In [6]:
samfile.nreferences

352

In [7]:
def fetch(bamfilepath, contig=None, start=None, end=None):
    samfile = pysam.AlignmentFile(bamfilepath, "rb")
    reads = []
    for read in samfile.fetch(contig, start, end):
        reads.append((read.query_name, ))
    samfile.close()
    return reads

def apply(func, args_df, processes=None, timeout=None):
    output = []
    with mp.Pool(processes=processes) as pool:
        results = []
        for i in range(len(args_df.index)):
            results.append(pool.apply_async(func, args_df.iloc[i,]))
        output = [res.get(timeout=timeout) for res in results]
    return output

def read_bam(bamfilepath, contigs=None, starts=None, ends=None, processes=None, timeout=None, chunks=None):
    kwargs_df = make_fetch_args(bamfilepath, contigs, starts, ends, chunks)
    res = apply(fetch, kwargs_df, processes=processes, timeout=timeout)
    return res

def make_fetch_args(bamfilepath, contigs, starts, ends, chunks):
    if contigs is None:
        contigs = get_all_contigs(bamfilepath)
    if chunks is None or chunks < len(contigs):
        kwargs_df = pd.DataFrame({
            "bamfilepath": bamfilepath,
            "contig": contigs
        })
    else:
        # split contigs by len in order to get right number of chunks
        raise NotImplementedError
    return kwargs_df

def get_all_contigs(bamfilepath):
    with pysam.AlignmentFile(bamfilepath, "rb") as samfile:
        contigs = samfile.references
    return contigs

In [8]:
print(datetime.now())
bam = read_bam(bamfilepath, processes=1)
print(datetime.now())

2020-11-27 16:45:06.213315
2020-11-27 16:45:12.381814


In [9]:
print(datetime.now())
bam = read_bam(bamfilepath, processes=8)
print(datetime.now())

2020-11-27 16:45:12.389575
2020-11-27 16:45:14.085814


In [10]:
print(datetime.now())
bam = read_bam(bamfilepath, processes=1, contigs=["1.human"])
print(datetime.now())

2020-11-27 16:45:14.092215
2020-11-27 16:45:14.994387


In [11]:
n_contigs = 4
print(datetime.now())
bam = read_bam(
    bamfilepath, processes=n_contigs,
    contigs=[str(chrom)+".human" for chrom in range(1,n_contigs+1)])
print(datetime.now())

2020-11-27 16:45:15.001242
2020-11-27 16:45:15.704204


In [12]:
n_contigs = 8
print(datetime.now())
bam = read_bam(
    bamfilepath, processes=n_contigs,
    contigs=[str(chrom)+".human" for chrom in range(1,n_contigs+1)])
print(datetime.now())

2020-11-27 16:45:15.710732
2020-11-27 16:45:16.506788
