In [10]:
from __future__ import division
import os
import sys
import time

from glob import glob
from multiprocessing import Manager,Process
from Queue import Empty

from Bio import SeqIO
from dendropy import Tree

from IPython.display import display
from ipywidgets import *

In [20]:
class Worker(Process):
    
    def __init__(self, iq, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        
        self.iq = iq
        self.outDir = '/DATA/raw/dbpshp/EP'
        trees = glob('/DATA/raw/seqsim/noncoding/trees/Mammals/lower/*.nwk'.format(TAXA))
        self.ts = {}
        for t in trees:
            self.ts[os.path.basename(t).replace('.nwk','')] = Tree.get(path=t, schema='newick')
    
    def start(self):
        print 'Start. {}'.format(self.name)
        sys.stdout.flush()
        super(Worker, self).start()
    
    def terminate(self):
        print 'Terminate. {}'.format(self.name)
        sys.stdout.flush()
        super(Worker, self).terminate()
    
    def run(self):
        while True:
            try:
                task = self.iq.get(False)
                self.process_task(task)
            except Empty:
                print 'Done. {}'.format(self.name)
                sys.stdout.flush()
                break
    
    def process_task(self, fasta):
        gene = os.path.basename(fasta).split('.')[0]
        geneDir = os.path.join(self.outDir, gene)
        try:
            os.makedirs(geneDir)
        except OSError:
            pass
        
        for d in ['trees','seqs','AS']:
            try:
                os.makedirs(os.path.join(geneDir,d))
            except OSError:
                pass
        
        for t in self.ts:
            self.ts[t].write(path=os.path.join(geneDir,'trees','{}.nwk'.format(t)), schema='newick')
        
        bs = SeqIO.to_dict(SeqIO.parse(fasta, 'fasta'))
        for t in self.ts:
            ff = os.path.join(geneDir,'seqs','{}.fas'.format(t))
            taxa = self.ts[t].taxon_namespace.labels()
            keep = []
            for spp in taxa:
                keep.append(bs[spp])
            SeqIO.write(keep, ff, 'fasta')

In [21]:
alns = glob('/DATA/raw/EP/max/dbpshp_alignments/*.fasta')
print len(alns)

24060


In [22]:
%%time

m = Manager()
q = m.Queue()

for i in alns:
    q.put(i)

## Widgets
pb = IntProgress(width='800px')
pb.max = q.qsize()
ph = HTML()
ps = HTML()
eta = HTML()
display(pb, ph, ps, eta)

## Start workers
numWorkers = 16
workers = [Worker(q) for i in range(numWorkers)]
[p.start() for p in workers];
sys.stdout.flush()

SLEEP = 2
lastValue = 0
startTime = time.time()
while True:
    time.sleep(SLEEP)
    if q.empty():
        break
    if sum([p.is_alive() for p in workers]) == 0:
        break
    pb.value = pb.max - q.qsize()
    ph.value = '{:>5d} / {:<5d}'.format(pb.value, pb.max)
#     diff = pb.value - lastValue
#     lastValue = pb.value
#     ps.value = 'Avg. tasks/second: {:.2f}'.format(diff / SLEEP)
    tps = pb.value / (time.time()-startTime)
    ps.value = 'Avg. tasks/minute: {:.2f}'.format(tps*60)
#     if diff > 0:
    if tps > 0:
        remaining = pb.max - pb.value
#         m, s = divmod(int(remaining / (diff / SLEEP)), 60)
        m,s = divmod(int(remaining/tps),60)
        h, m = divmod(m, 60)
        eta.value = 'Est. Time:         %d:%02d:%02d' % (h, m, s)
        
pb.value = pb.max - q.qsize()
ph.value = '{:>5d} / {:<5d}'.format(pb.value, pb.max)
## Join
[p.join() for p in workers];

print 'Done.'
sys.stdout.flush()

pb.color = '#00ff00'

Start. Worker-2
Start. Worker-3
Start. Worker-4
Start. Worker-5
Start. Worker-6
Start. Worker-7
Start. Worker-8
Start. Worker-9
Start. Worker-10
Start. Worker-11
Start. Worker-12
Start. Worker-13
Start. Worker-14
Start. Worker-15
Start. Worker-16
Start. Worker-17
Done. Worker-10
Done. Worker-8
Done. Worker-2
Done. Worker-6
Done. Worker-4
Done. Worker-15
Done. Worker-13
Done. Worker-17
Done. Worker-3
Done. Worker-9
Done. Worker-12
Done. Worker-7
Done. Worker-5
Done. Worker-16
Done. Worker-11
Done. Worker-14
Done.
CPU times: user 836 ms, sys: 176 ms, total: 1.01 s
Wall time: 3min 1s
