In [5]:
import subprocess
from pathos.helpers import mp as multiprocess
from pathos import multiprocessing as mp
import argparse
from Bio.Seq import Seq
from Bio.Alphabet import generic_dna
from array import array
from Bio import SeqIO
import gzip
from collections import Counter
from detag_v4 import *
from detag_v4 import get_tagset

In [6]:

def make_job_dict_manually(heel,
                           gsp,
                           tags,
                           h3score,
                           h5score,
                           outputprefix,
                           pairedfastq,
                           forward,
                           reverse,
                           filtering,
                           pairedparams=True,
                           unpairedfastq=None,
                          ):

    
    job_summary = ["starting Job with the following parameters:"]
    job = dict()
    job["heel_file"] = heel
    job["gsp"] = gsp
    job_summary.append("heel file is: "+str(job['heel_file']))
    job["tag_file"] = tags
    job_summary.append("tag file is: "+str(job['tag_file']))
    if h3score and h5score is None:
        job_summary.append("using default values for primerscores")
        job["h3score"] = float(0.9)
        job["h5score"] = float(0.9)
    else:
        job["h3score"] = float(h3score)
        job["h5score"] = float(h5score)

    job["output_prefix"] = outputprefix

    job_summary.append("primerscore (match/length ratio)  3'>{}  5'>{}".format(job["h3score"], job["h5score"]))
    if pairedfastq is True:
        job["filetype"] = "pairedfastq"
        job["forward_file"] = forward
        job["reverse_file"] = reverse
    if pairedfastq is not True:
        job["filetype"] = "unpairedfastq"
    job["unpaired_file"] = unpairedfastq
    if (unpairedfastq and forward) or (unpairedfastq and reverse ) is not None:
        print " if the reads are not paired, you should not specify --forward and --reverse"
        exit()

    job_summary.append("output_prefix is {}".format(job["output_prefix"]))
    job_summary.append("file type is : {} ".format(job["filetype"]))
    job_summary.append("input files are : {} , {}, {}".format(job['forward_file'],
                                                              job['reverse_file'],
                                                              job['unpaired_file']))

    if pairedparams is not None and pairedfastq is True:
        param_list = pairedparams.split(":")
        job["kmer_overlap"] = int(param_list[0])
        job["hsp_overlap"] = int(param_list[1])
        job["min_overlap"] = int(param_list[2])
    elif pairedparams is None and pairedfastq is True:
        job_summary.append("no pairedparams found, using default parameters.")
        job["kmer_overlap"] = 7
        job["hsp_overlap"] = 5
        job["min_overlap"] = 10

    elif pairedparams is not None and pairedfastq is not True:
        job_summary.append("paired - parameters only relevant for paired fastq files, ignoring parameters.")
    if pairedfastq is True:
        job_summary.append('using paired-parameters kmer_overlap = {} , hsp_overlap = {} , min_overlap = {}'.format(job["kmer_overlap"],
                                                                                                                    job["hsp_overlap"],
                                                                                                                    job["min_overlap"]))
    filter_list = filtering.split(":")
    job["filter_strategy"] = int(filter_list[0])
    job["filter_minlen"] = int(filter_list[1])
    job["filter_maxlen"] = int(filter_list[2])
    job["filter_meanqual"] = int(filter_list[3])
    job["filter_minqual"] = int(filter_list[4])

    job_summary.append("filtering strategy {} , with minlen {} , maxlen {}, meanqual {} , minqual {}".format(job["filter_strategy"],
                                                                                                             job["filter_minlen"],
                                                                                                             job["filter_maxlen"],
                                                                                                             job["filter_meanqual"],
                                                                                                             job["filter_minqual"]))
    return job, job_summary

In [None]:
job,jobsum = make_job_dict_manually(filtering="1:50:150:20:15",forward="test_data/first_10k_R1.fastq", reverse="test_data/first_10k_R2.fastq", gsp="test_data/tabulated_relevant_primers.txt", h3score="0.9", h5score="0.9", heel="test_data/heel.txt", outputprefix="test_data/debug_15_8_", pairedfastq=True, pairedparams="7:5:10", tags="test_data/tags_no_CAA.txt",) #FIXME add input here

In [None]:
seq_data = Pair(fastq_1=job["forward_file"],
                fastq_2=job["reverse_file"],
                hsp=job["hsp_overlap"],
                kmer=job["kmer_overlap"],
                min=job["min_overlap"])

In [None]:
a =(qseq for qseq in seq_data)

In [None]:
type(a)

In [None]:
u = list(a)

In [None]:
#print u

In [None]:
r = a.next()

In [None]:
l = r.get()

In [None]:
print(l)

In [None]:
def new_main():
    job,jobsum = make_job_dict_manually(filtering="1:50:150:20:15",forward="test_data/first_10k_R1.fastq", reverse="test_data/first_10k_R2.fastq", gsp="test_data/tabulated_relevant_primers.txt", h3score="0.9", h5score="0.9", heel="test_data/heel.txt", outputprefix="test_data/debug_15_8_", pairedfastq=True, pairedparams="7:5:10", tags="test_data/tags_no_CAA.txt") #FIXME add input here
    tags5,tagsum = get_tagset(tag_input_file=job["tag_file"],end='3')
    tags3 = get_tagset(tag_input_file=job["tag_file"],end='5')[0]
    if job["gsp"]:
        gsp3,gspsum = get_tagset(tag_input_file=job["gsp"],end='3')
        gsp5 = get_tagset(tag_input_file=job["gsp"],end='5')[0]

    heel5, heel3, heelsum = get_heels(job["heel_file"])
    seq_data = Pair(fastq_1=job["forward_file"],
                    fastq_2=job["reverse_file"],
                    hsp=job["hsp_overlap"],
                    kmer=job["kmer_overlap"],
                    min=job["min_overlap"])
    # init DeTagSeq
    if not job["gsp"]:
        dtseq = DeTagSeq(p3=heel3, p3s=job["h3score"], p5=heel5, p5s=job["h5score"], t3=tags3, t5=tags5)
    else:
        dtseq = DeTagSeq(p3=heel3, p3s=job["h3score"], p5=heel5, p5s=job["h5score"], t3=tags3, t5=tags5, gsp3=gsp3, gsp5=gsp5)

    # init ResultProcessor
    if not job["gsp"]:
        reproc = ResultProcessor(tags=tags3, output_prefix=job["output_prefix"])
    else:
        reproc = ResultProcessor(tags=tags3, output_prefix=job["output_prefix"], gsp=gsp3)
    # init threading:
    qseq_gen = (qseq for qseq in seq_data)
    td = ThreadingController(work=list(qseq_gen))
    # execute detag & reproc
    reproc.open_tag_files()
    # create manager, queue
    man, q = td.create_q()
    # execute controller
    td.controller(q)
    
    reproc.close_tag_files()
    reproc.harvest_stats()
    reproc.write_stats()
    write_logfile(job["output_prefix"],jobsum, heelsum, tagsum,)

In [None]:
class ThreadingController(object):
    ''' base class for multithreading jobs'''
    def __init__(self, max_proc=1, work=None):
        self.max_proc = max_proc
        self.work = work

    def create_q(self):
        '''creates the manager and the queue as external objects.
            multiprocess/pickling dont play well with Classes '''
        manager = multiprocess.Manager()
        queue = manager.Queue()
        return manager, queue

    def worker(self, work):
        '''runs analysis on seq_data'''
        r=work.get()
        res = dict(status = dict())
        seq = r[0]
        qual = r[1]
        detagged_seq = None
        if seq:
            res["id"] = seq.id
            if job["filter_strategy"] == 0: # Filter full ###
                if not qual:
                    raise Exception("Full sequence filtering requires quality data")

                seq, status = filter_full(seq_record=seq,
                                          qual=qual,
                                          min_length=job["filter_minlen"],
                                          mean_min=job["filter_meanqual"],
                                          min_qual=job["filter_minqual"])
            elif job["filter_strategy"] == 1: # Full sequence, no filtering###
                status = None
            elif job["filter_strategy"] == 2: # HQR###
                if not qual:
                    raise Exception("HQR filtering requires quality data")
                seq, status = filter_hqr(seq_record=seq,
                                         qual=qual,
                                         min_length=job["filter_minlen"],
                                         mean_min=job["filter_meanqual"],
                                         min_qual=job["filter_minqual"])
            elif job["filter_strategy"] == 3: # Primers first
                if not qual:
                    raise Exception("Amplicon quality filtering requires quality data")
                detagged_seq, qual = dtseq.detag_seq(seq,qual)
                if "status" in detagged_seq:
                    res["status"][detagged_seq["status"]] = 1
                    res["detagged_seq"] = detagged_seq
                    #result_list.append(res)
                    return res ###
                    

                seq, status = filter_full(seq_record=detagged_seq["seq_record"],
                                          qual=qual,
                                          min_length=job["filter_minlen"],
                                          mean_min=job["filter_meanqual"],
                                          min_qual=job["filter_minqual"])
            else:
                raise Exception("Unknown filter type")
        else:
            status = "failed_pair"
        if status:
            res["status"][status] = 1
            #result_list.append(res)
            return res ###
            
        if len(seq.seq) < job["filter_minlen"]:
            res["status"]["too_short"] = 1
            #result_list.append(res)
            return res ###
           

        if not detagged_seq:
            detagged_seq, qual = dtseq.detag_seq(seq, qual)
            res["detagged_seq"] = detagged_seq
        if "status" in detagged_seq:
            res["status"][detagged_seq["status"]] = 1
                #result_list.append(res)
            res["detagged_seq"] = detagged_seq
            return res ###
           
        if len(detagged_seq["seq"]) < job["filter_minlen"]:
            res["status"]["too_short"] = 1
            res["detagged_seq"] = detagged_seq
                #result_list.append(res)
            return res


    def watcher(self,q):
        ''' watches the queue until the kill-word is on the queue
            processes items on the queue via self.handling()'''
        while True:
            a = q.get()
            if a == "kill":
                break
            else:
                self.handling(a)
            

    def handling(self, result):
        reproc.process_result(result)
        return None

    def controller(self, q):
        ''' controls the worker and watcher processes
            passes the kill word to the queue once all jobs are done'''
        # create ProcessPool
        pool = mp.ProcessPool(processes=self.max_proc)
        # start the watcher
        eye = pool.uimap(self.watcher, (q,))
        #print len(self.work)
        #print type(self.work)
        # map workers to the input, put the results into the writing queue
        for i in pool.uimap(self.worker, self.work):
            q.put(i)
        # terminate watcher
        q.put("kill")

In [27]:

def worker(work):
    '''runs analysis on seq_data'''
    r=work.get()
    res = dict(status = dict())
    seq = r[0]
    qual = r[1]
    detagged_seq = None
    if seq:
        res["id"] = seq.id
        if job["filter_strategy"] == 0: # Filter full ###
            if not qual:
                raise Exception("Full sequence filtering requires quality data")

            seq, status = filter_full(seq_record=seq,
                                        qual=qual,
                                        min_length=job["filter_minlen"],
                                        mean_min=job["filter_meanqual"],
                                        min_qual=job["filter_minqual"])
        elif job["filter_strategy"] == 1: # Full sequence, no filtering###
            status = None
        elif job["filter_strategy"] == 2: # HQR###
            if not qual:
                raise Exception("HQR filtering requires quality data")
            seq, status = filter_hqr(seq_record=seq,
                                     qual=qual,
                                     min_length=job["filter_minlen"],
                                     mean_min=job["filter_meanqual"],
                                     min_qual=job["filter_minqual"])
        elif job["filter_strategy"] == 3: # Primers first
            if not qual:
                raise Exception("Amplicon quality filtering requires quality data")
            detagged_seq, qual = dtseq.detag_seq(seq,qual)
            if "status" in detagged_seq:
                res["status"][detagged_seq["status"]] = 1
                res["detagged_seq"] = detagged_seq
                #result_list.append(res)
                return res ###
                    

            seq, status = filter_full(seq_record=detagged_seq["seq_record"],
                                      qual=qual,
                                      min_length=job["filter_minlen"],
                                      mean_min=job["filter_meanqual"],
                                      min_qual=job["filter_minqual"])
        else:
            raise Exception("Unknown filter type")
    else:
        status = "failed_pair"
    if status:
        res["status"][status] = 1
        #result_list.append(res)
        return res ###
            
    if len(seq.seq) < job["filter_minlen"]:
        res["status"]["too_short"] = 1
        #result_list.append(res)
        return res ###
           

    if not detagged_seq:
        detagged_seq, qual = dtseq.detag_seq(seq, qual)
        res["detagged_seq"] = detagged_seq
        
    if "status" in detagged_seq:
        res["status"][detagged_seq["status"]] = 1
            #result_list.append(res)
        res["detagged_seq"] = detagged_seq
        return res ###
           
    if len(detagged_seq["seq"]) < job["filter_minlen"]:
        res["status"]["too_short"] = 1
        res["detagged_seq"] = detagged_seq
            #result_list.append(res)
        return res

    if job["filter_maxlen"] > 0 and len(detagged_seq["seq"]) > job["filter_maxlen"]:
        res["status"]["truncated"] = 1
        detagged_seq["seq"] = detagged_seq["seq"][:job["filter_maxlen"]]
        res["detagged_seq"] = detagged_seq
        # return res

    if job["filter_maxlen"] < 0:
        res["status"]["truncated"] = 1
        detagged_seq["seq"] = detagged_seq["seq"][:job["filter_maxlen"]]
        res["detagged_seq"] = detagged_seq
#       # return res

#        print res
    res["keep"] = True
    res["id"] = seq.id
    res["detag_qual"] = qual
        #reproc.process_result(res)
    return res

In [28]:
def watcher(q, job):
    ''' watches the queue until the kill-word is on the queue
        processes items on the queue via self.handling()'''
    reproc.open_tag_files()   
    while True:
        a = q.get()
        if a == "kill":
            reproc.close_tag_files()
            reproc.harvest_stats()
            reproc.write_stats()
            break
        else:
            #dummy.write("dummy \n")
            if a is not None:
                reproc.process_result(a)
            
            

In [33]:

job,jobsum = make_job_dict_manually(tags="test_data/tags_no_CAA.txt",
                                    heel="test_data/heel.txt",
                                    pairedfastq=True,
                                    forward="test_data/first_100k_R1.fastq",
                                    reverse="test_data/first_100k_R2.fastq",
                                    pairedparams="7:5:10",
                                    outputprefix="test_data/debuggy_15_8_",
                                    h3score="0.9",
                                    h5score="0.9",
                                    filtering="1:50:150:20:15",
                                    gsp="test_data/tabulated_relevant_primers.txt")
                                     #FIXME add input here
tags5,tagsum = get_tagset(tag_input_file=job["tag_file"],end='3')
tags3 = get_tagset(tag_input_file=job["tag_file"],end='5')[0]
if job["gsp"]:
    gsp3,gspsum = get_tagset(tag_input_file=job["gsp"],end='3')
    gsp5 = get_tagset(tag_input_file=job["gsp"],end='5')[0]

heel5, heel3, heelsum = get_heels(job["heel_file"])
seq_data = Pair(fastq_1=job["forward_file"],
                fastq_2=job["reverse_file"],
                hsp=job["hsp_overlap"],
                kmer=job["kmer_overlap"],
                min=job["min_overlap"])
# init DeTagSeq
if not job["gsp"]:
    dtseq = DeTagSeq(p3=heel3, p3s=job["h3score"], p5=heel5, p5s=job["h5score"], t3=tags3, t5=tags5)
else:
    dtseq = DeTagSeq(p3=heel3, p3s=job["h3score"], p5=heel5, p5s=job["h5score"], t3=tags3, t5=tags5, gsp3=gsp3, gsp5=gsp5)
# init ResultProcessor
if not job["gsp"]:
    reproc = ResultProcessor(tags=tags3, output_prefix=job["output_prefix"])
else:
    reproc = ResultProcessor(tags=tags3, output_prefix=job["output_prefix"], gsp=gsp3)
# init threading:
pool = mp.ProcessPool()
qseq_gen = (qseq for qseq in seq_data)


# create manager, queue
man = multiprocess.Manager()
q = man.Queue()
# execute controller
#dummy = open("dummy.txt", "a")
eye = pool.uimap(watcher, (q,),job )    
for i in pool.uimap(worker, list(qseq_gen)):
    q.put(i)
#pool.uimap(put_worker, list(qseq_gen))
#dummy.close()
    
    
q.put("kill")
#write_logfile(job["output_prefix"],jobsum, heelsum, tagsum,)


In [34]:
q.qsize()

0

In [31]:
watcher(q, job)

In [32]:
q.qsize()

0

In [25]:
a=q.get()
print a

{'status': {'no_primer3': 1}, 'id': 'M00485:302:000000000-AM3NR:1:1101:15825:1343', 'detagged_seq': {'status': 'no_primer3', 'tag_name': 'Hiplex_tag_6', 'gsprim_name': 'Pa_31_1', 'rev': True}}


In [146]:
def main():
    job,jobsum = make_job_dict_manually(tags="test_data/tags_no_CAA.txt",
                                    heel="test_data/heel.txt",
                                    pairedfastq=True,
                                    forward="test_data/first_100k_R1.fastq",
                                    reverse="test_data/first_100k_R2.fastq",
                                    pairedparams="7:5:10",
                                    outputprefix="test_data/debby_bug_15_8_",
                                    h3score="0.9",
                                    h5score="0.9",
                                    filtering="1:50:150:20:15",
                                    gsp="test_data/tabulated_relevant_primers.txt")
                                     #FIXME add input here
    tags5,tagsum = get_tagset(tag_input_file=job["tag_file"],end='3')
    tags3 = get_tagset(tag_input_file=job["tag_file"],end='5')[0]
    if job["gsp"]:
        gsp3,gspsum = get_tagset(tag_input_file=job["gsp"],end='3')
        gsp5 = get_tagset(tag_input_file=job["gsp"],end='5')[0]

    heel5, heel3, heelsum = get_heels(job["heel_file"])
    seq_data = Pair(fastq_1=job["forward_file"],
                    fastq_2=job["reverse_file"],
                    hsp=job["hsp_overlap"],
                    kmer=job["kmer_overlap"],
                    min=job["min_overlap"])
    # init DeTagSeq
    if not job["gsp"]:
        dtseq = DeTagSeq(p3=heel3, p3s=job["h3score"], p5=heel5, p5s=job["h5score"], t3=tags3, t5=tags5)
    else:
        dtseq = DeTagSeq(p3=heel3, p3s=job["h3score"], p5=heel5, p5s=job["h5score"], t3=tags3, t5=tags5, gsp3=gsp3, gsp5=gsp5)

    # init ResultProcessor
    if not job["gsp"]:
        reproc = ResultProcessor(tags=tags3, output_prefix=job["output_prefix"])
    else:
        reproc = ResultProcessor(tags=tags3, output_prefix=job["output_prefix"], gsp=gsp3)

    # execute detag & reproc
    reproc.open_tag_files()

    for qseq in seq_data:
        r=qseq.get()
        res = dict(status = dict())
        seq = r[0]
        qual = r[1]
        detagged_seq = None
        if seq:
            res["id"] = seq.id
            if job["filter_strategy"] == 0: # Filter full ###
                if not qual:
                    raise Exception("Full sequence filtering requires quality data")

                seq, status = filter_full(seq_record=seq,
                                          qual=qual,
                                          min_length=job["filter_minlen"],
                                          mean_min=job["filter_meanqual"],
                                          min_qual=job["filter_minqual"])
            elif job["filter_strategy"] == 1: # Full sequence, no filtering###
                status = None
            elif job["filter_strategy"] == 2: # HQR###
                if not qual:
                    raise Exception("HQR filtering requires quality data")
                seq, status = filter_hqr(seq_record=seq,
                                         qual=qual,
                                         min_length=job["filter_minlen"],
                                         mean_min=job["filter_meanqual"],
                                         min_qual=job["filter_minqual"])
            elif job["filter_strategy"] == 3: # Primers first
                if not qual:
                    raise Exception("Amplicon quality filtering requires quality data")
                detagged_seq, qual = dtseq.detag_seq(seq,qual)
                if "status" in detagged_seq:
                    res["status"][detagged_seq["status"]] = 1
                    res["detagged_seq"] = detagged_seq
                    #result_list.append(res)
                    reproc.process_result(res) ###
                    continue

                seq, status = filter_full(seq_record=detagged_seq["seq_record"],
                                          qual=qual,
                                          min_length=job["filter_minlen"],
                                          mean_min=job["filter_meanqual"],
                                          min_qual=job["filter_minqual"])

            else:
                raise Exception("Unknown filter type")
        else:
            status = "failed_pair"
        if status:
            res["status"][status] = 1
            #result_list.append(res)
            reproc.process_result(res) ###
            continue
        if len(seq.seq) < job["filter_minlen"]:
            res["status"]["too_short"] = 1
            #result_list.append(res)
            reproc.process_result(res) ###
            continue

        if not detagged_seq:
            detagged_seq, qual = dtseq.detag_seq(seq, qual)
    #print detagged_seq

            res["detagged_seq"] = detagged_seq
        if "status" in detagged_seq:
#            print detagged_seq["status"]
            res["status"][detagged_seq["status"]] = 1
            #result_list.append(res)
            res["detagged_seq"] = detagged_seq
            reproc.process_result(res) ###
            continue
        if len(detagged_seq["seq"]) < job["filter_minlen"]:
            res["status"]["too_short"] = 1
            res["detagged_seq"] = detagged_seq
            #result_list.append(res)
            reproc.process_result(res) ###
            continue

        if job["filter_maxlen"] > 0 and len(detagged_seq["seq"]) > job["filter_maxlen"]:
            res["status"]["truncated"] = 1
            detagged_seq["seq"] = detagged_seq["seq"][:job["filter_maxlen"]]
            res["detagged_seq"] = detagged_seq
            continue

        if job["filter_maxlen"] < 0:
            res["status"]["truncated"] = 1
            detagged_seq["seq"] = detagged_seq["seq"][:job["filter_maxlen"]]
            res["detagged_seq"] = detagged_seq
#            continue

#        print res
        res["keep"] = True
        res["id"] = seq.id
        res["detag_qual"] = qual
        reproc.process_result(res)

    reproc.close_tag_files()
    reproc.harvest_stats()
    reproc.write_stats()
    write_logfile(job["output_prefix"],jobsum, heelsum, tagsum,)

In [176]:
main()

In [25]:
help(pool.__enter__())

Help on ProcessPool in module pathos.multiprocessing object:

class ProcessPool(pathos.abstract_launcher.AbstractWorkerPool)
 |  Mapper that leverages python's multiprocessing.
 |  
 |  Method resolution order:
 |      ProcessPool
 |      pathos.abstract_launcher.AbstractWorkerPool
 |      __builtin__.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, *args, **kwds)
 |      Important class members:
 |          nodes       - number (and potentially description) of workers
 |          ncpus       - number of worker processors
 |          servers     - list of worker servers
 |          scheduler   - the associated scheduler
 |          workdir     - associated $WORKDIR for scratch calculations/files
 |      
 |      Other class members:
 |          scatter     - True, if uses 'scatter-gather' (instead of 'worker-pool')
 |          source      - False, if minimal use of TemporaryFiles is desired
 |          timeout     - number of seconds to wait for return value from scheduler