In [None]:
# default_exp core

# RNAseq Pipelines using Dask and Prefect

> A framework for processing RNAseq data using Dask/Prefect.

In [1]:
#hide
from nbdev.showdoc import *

In [2]:
#hide 
from nbdev.export import notebook2script

In [49]:
notebook2script()

Converted 00_core.ipynb.
Converted 01_hdict.ipynb.
Converted 02_seqs.ipynb.
Converted 03_utils.ipynb.
Converted 04_builtins.ipynb.
Converted index.ipynb.


In [3]:
%load_ext autoreload

In [4]:
%autoreload 2

# Pipeline elements (Tasks)

- Unit of execution (node). 

## Base class

In [29]:
#export
# use format from previously made pipeline
# inputs
import os
import subprocess
import shutil
import socket
import glob
from itertools import groupby

import numpy as N
import pandas as PD

import paramiko
import prefect
from prefect import task, Flow, Parameter, unmapped, case
from prefect.engine import signals
from prefect.tasks import Task

from dask_rnaseq.utils import *
from dask_rnaseq.hdict import *

In [52]:
# export

# A pipeline is a Prefect Flow which consists of Prefect Tasks (Analysis Tasks).
# A pipeline accepts site config and a list samples. 
# A site config is a site specific relatively constant config info, such as paths, exec info. 
# A list of samples changes each time and each sample contains 
#  1) sample id (sid), 2) input files, 3) destination location for saving analysis results.


class FileInFileOut(Task):
    """
    Inputs and outputs (and temporary files) are defined by files.
    """
    nickname = 'analysis' # program name
    i_tmpls = dict(
        r1 = '{r1}',
        r2 = '{r2}',
    )
    o_tmpls = {}
    t_tmpls = {}
              
    def __init__(self, cfg, tag=None, **kw):
        self.tag = tag
        self.pname = f'{self.nickname}.{tag}' if tag else f'{self.nickname}'
        self.cfg = cfg
        self.set_param({})
        super().__init__(**kw)        
        
    def set_param(self, param, cfg=None):
        # copy and setup convenience access
        if cfg is not None:
            self.cfg = cfg
        self.param = HBox(param.copy())
        self.p = self.cfg.merge(param)
        self.pcfg = self.p.overridden(self.pname)
        self._prep_dstdir()
        self._define_files()        
        return self.p

    def _prep_dstdir(self):
        "Prepare program output directory."
        p = self.p
        # destination directory: Output dir/DatasetID/progname.tag
        ddir = os.path.join(p.odir, p.did, self.pname)
        mkdirs(ddir)
        p.dstdir = self.dstdir = ddir
        # prefix = {destdir}/SampleID
        p.prefix = self.prefix = os.path.join(ddir, p.sid)  
        
    def _define_files(self):
        p = self.p
        self.inps = {k: v.format(**p) for k, v in self.i_tmpls.items()}
        self.outs = {k: v.format(**p) for k, v in self.o_tmpls.items()}
        self.tmps = {k: v.format(**p) for k, v in self.t_tmpls.items()}
                
    def check_files(self):
        skip = False
        force = self.p.get('force', False)
        signalskip = self.p.get('signalskip',True)
        # check input exists
        for k,f in self.inps.items():
            if not os.path.exists(f):
                raise signals.FAIL(message=f'input file {k}:{f} does not exist')
        done = all([os.path.exists(x) for k,x in self.outs.items()])
        # if force delete tmps and outs
        if force or (not done): # delete all existing
            deletefiles(list(self.tmps.values())+list(self.outs.values()))
            # if all outs exists then skip
        elif done:
            deletefiles(list(self.tmps.values()))
            if signalskip:
                raise signals.SKIP(message=f'all outputs {self.outs} already exist')
            skip = True
        return skip

    def modify_param(self, skip=False):
        self._modify_param_common(skip)
        self._modify_param_specific(skip)
        return self.param
    
    def _modify_param_common(self, skip=False):
        p = self.param
        pname = self.pname
        outs = self.outs
        if skip:
            inps = self.inps
            tmps = self.tmps
        else:
            inps = self._inps # ramdisk
            tmps = self._tmps # ramdisk
        p[f'{pname}.dstdir'] = self.dstdir
        p[f'{pname}.prefix'] = self.prefix
        p[f'{pname}.inputs'] = [os.path.basename(x) for k,x in inps.items()]
        p[f'{pname}.outputs'] = [os.path.basename(x) for k,x in outs.items()]
        p[f'{pname}.temps'] = [os.path.basename(x) for k,x in tmps.items()]
        p[f'info.{pname}'] = self._read_stats()
        p.history = pname if p.get('history','')=='' else f'{p.history}|{pname}'
        if skip:
            p[f'{pname}.skipped'] = True
            
    def _modify_param_specific(self,skip=False):
        pass
    
    def _read_stats(self):
        return {}
        
    def nfs_to_ramdisk(self):
        "Copy and decompress files to local RAMDISK"
        p = self.p
        ramdisk = p.get('ramdisk','')
        nfs = p.get('nfs','')
        inps = self.inps
        tmps = self.tmps
        outs = self.outs
        ppre = self.prefix
        if (not ramdisk) or (nfs==ramdisk):
            # don't move
            self._inps = inps
            self._tmps = tmps
            self._outs = outs
            self._prefix = ppre
            return
        # inps
        delete = []
        host = get_hostname()
        nfslocal = (host==p.nfshost)
        def _nfs_to_ramdisk_copy(dic):
            rslt = []
            for k,src in dic.items():
                if src.startswith(nfs):
                    if nfslocal: # don't copy if local
                        dst = src
                    else:
                        dst = src.replace(nfs, ramdisk)
                    if not os.path.exists(dst):
                        mkdirs(os.path.dirname(dst))
                        if src != dst:
                            shutil.copyfile(src,dst)
                            print('********** NFS => RAMDISK *********************************')
                            print(f'{src}=>{host}:{dst}')
                            print('***********************************************************')
                            delete.append((host,dst))
                    rslt.append((k,dst))
                else:
                    rslt.append((k,src))
            return dict(rslt)
        def _nfs_to_ramdisk_path(dic):
            rslt = dict([(k,x.replace(nfs,ramdisk)) for k,x in dic.items()])
            deletefiles(list(rslt.values())) # clean destination
            return rslt
        self._inps  = _nfs_to_ramdisk_copy(inps)
        self._tmps   = _nfs_to_ramdisk_path(tmps)
        self._outs = _nfs_to_ramdisk_path(outs)
        self._prefix = ppre.replace(nfs,ramdisk)
        mkdirs(os.path.dirname(self._prefix))
        self.param.delete = self.param.get('delete',[]) + delete # copied to ramdisk
    
    def ramdisk_to_nfs(self, delete_outs1=False):
        """
        Move outputs from local RAMDISK (srcdir) to NFS (dstdir),
        delete other files used in RAMDISK.
        """
        p = self.p
        nfs = p.get('nfs','')
        ramdisk = p.get('ramdisk','')
        inps = self.inps
        inps1 = self._inps
        tmps1 = self._tmps
        outs1 = self._outs
        if (not ramdisk) or (nfs==ramdisk): # no op
            return 
        delete = []
        host = socket.gethostname()
        for k,src in outs1.items():
            if src.startswith(ramdisk):
                dst = src.replace(ramdisk, nfs)
                try:
                    mkdirs(os.path.dirname(dst))
                    if src != dst:
                        shutil.copyfile(src,dst)
                        os.chmod(dst, 0o660)
                        if delete_outs1:
                            os.unlink(src)
                        else:
                            delete.append((host,src))
                        print('********** RAMDISK => NFS *********************************')
                        print(f'{host}:{src}=>{dst}')
                        print('***********************************************************')
                except:
                    raise signals.FAIL(message=f'shutil.copy {src} to {dst} failed.')
        # delete tmps1 regardless
        deletefiles(tmps1.values())
        # delete inps1 if not original
        tgts = [inps1[k] for k in inps1 if inps[k]!=inps1[k]]
        deletefiles(tgts)
        # delete later
        self.param.delete = self.param.get('delete',[]) + delete
    
    def _run(self):
        pass
    
    def get_nthreads(self, progname=None)->int:
        # return num of max threads
        ntmax = self.p.get('maxthreads', 8)
        if (progname is None) or (progname == self.pname):
            ntprg = self.pcfg.get('nthreads',ntmax)
        else:
            ntprg = self.p.overridden(progname).get('nthreads',ntmax)
        return min(ntmax, ntprg)

    def compress(self, files):
        if isinstance(files, dict):
            files = files.values()
        # compress with pigz
        pigz = self.p.pigz.execpath
        nt = self.p.pigz.get('nthreads',self.p.get('maxthreads',8))
        for f in files:
            subprocesscall(f'{pigz} -p {nt} --fast {f}')
                
    def run(self, param):
        """
        Common pattern in analysis:
            1. setup destination directory
            2. check files (already done?)
            3. transfer files (between machines)
        
        Override 
            1. _define_files
            2. _run
            3. _modify_param_specific
            4. _read_stats
        """
        self.set_param(param)
        if self.check_files(): # skip?
            return self.modify_param(skip=True)
        self.nfs_to_ramdisk()
        self._run()
        self.ramdisk_to_nfs()
        return self.modify_param(skip=False)

        
        
        

## Base class tests

In [7]:
# test config
import socket

cfg = HBox.from_toml(f"""
nfs = './tests'
ramdisk = './tests2'
odir = './tests/outputs'
idir = './tests'
nfshost = 'host1'
sshuser = 'user1'
workers = ['host1']
force = true
did = 'datasetid'
maxthreads = 8

[analysis]
execpath = 'nonexistent'

[analysis.tag]
extra = 'param1'
""")

In [8]:
pn = PipelineNode('tag', cfg)
assert(pn.tag=='tag')
assert(pn.pname=='analysis.tag')
assert(pn.pcfg.execpath=='nonexistent')
assert(pn.pcfg.extra=='param1')
assert(pn.cfg.did=='datasetid')
assert(pn.pcfg.did=='datasetid')

In [293]:
ptoml(pn.p)

nfs = "./tests"
ramdisk = "./tests2"
odir = "./tests/outputs"
idir = "./tests"
nfshost = "host1"
sshuser = "user1"
workers = [ "host1",]
force = true
did = "datasetid"
maxthreads = 8

[analysis]
execpath = "nonexistent"

[analysis.tag]
extra = "param1"



In [294]:
ptoml(pn.param)




In [295]:
ptoml(pn.pcfg)

nfs = "./tests"
ramdisk = "./tests2"
odir = "./tests/outputs"
idir = "./tests"
nfshost = "host1"
sshuser = "user1"
workers = [ "host1",]
force = true
did = "datasetid"
maxthreads = 8
execpath = "nonexistent"
extra = "param1"



In [296]:
pn.set_param(HBox({'sid':'sample1'}))
pn._prep_dstdir()
assert(os.path.exists('./tests/outputs/datasetid/analysis.tag'))
assert(pn.prefix == './tests/outputs/datasetid/analysis.tag/sample1')

In [297]:
ptoml(pn.param)

sid = "sample1"



In [298]:
ptoml(pn.p)

nfs = "./tests"
ramdisk = "./tests2"
odir = "./tests/outputs"
idir = "./tests"
nfshost = "host1"
sshuser = "user1"
workers = [ "host1",]
force = true
did = "datasetid"
maxthreads = 8
sid = "sample1"

[analysis]
execpath = "nonexistent"

[analysis.tag]
extra = "param1"



In [299]:
pn._define_files()
assert(pn.inps=={})
assert(len(pn.outs)==0)
assert(len(pn.tmps)==0)

In [300]:
ptoml(pn.param)

sid = "sample1"



In [301]:
pn.inps = {'r1':'./tests/5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz',
           'r2':'./tests/5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz'}

In [302]:
assert(pn._check_files()==False)

In [303]:
pn._modify_param_common(skip=True)
assert(pn.param.to_toml()==\
"""sid = "sample1"
history = "analysis.tag"

[analysis.tag]
dstdir = "./tests/outputs/datasetid/analysis.tag"
prefix = "./tests/outputs/datasetid/analysis.tag/sample1"
inputs = [ "5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz", "5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz",]
outputs = []
temps = []
skipped = true

[info.analysis.tag]
""")

In [305]:
pn._modify_param_common(skip=True)
ptoml(pn.param)

sid = "sample1"
history = "analysis.tag|analysis.tag"

[analysis.tag]
dstdir = "./tests/outputs/datasetid/analysis.tag"
prefix = "./tests/outputs/datasetid/analysis.tag/sample1"
inputs = [ "5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz", "5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz",]
outputs = []
temps = []
skipped = true

[info.analysis.tag]



In [306]:
ptoml(pn.p)

nfs = "./tests"
ramdisk = "./tests2"
odir = "./tests/outputs"
idir = "./tests"
nfshost = "host1"
sshuser = "user1"
workers = [ "host1",]
force = true
did = "datasetid"
maxthreads = 8
sid = "sample1"

[analysis]
execpath = "nonexistent"

[analysis.tag]
extra = "param1"



In [307]:
ptoml(pn.pcfg)

nfs = "./tests"
ramdisk = "./tests2"
odir = "./tests/outputs"
idir = "./tests"
nfshost = "host1"
sshuser = "user1"
workers = [ "host1",]
force = true
did = "datasetid"
maxthreads = 8
sid = "sample1"
execpath = "nonexistent"
extra = "param1"



In [312]:
rm -rf ./tests2/*

In [313]:
pn._nfs_to_ramdisk()
assert(len(pn._inps)==2)
assert(pn._outs=={})
assert(pn._tmps=={})
host = get_hostname()
assert(pn.param.delete==[(host, './tests2/5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz'), 
                         (host, './tests2/5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz')])

********** NFS => RAMDISK *********************************
./tests/5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz=>suginok-p1:./tests2/5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz
***********************************************************
********** NFS => RAMDISK *********************************
./tests/5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz=>suginok-p1:./tests2/5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz
***********************************************************
Directory ./tests2/outputs/datasetid/analysis.tag created successfully


In [315]:
pn._ramdisk_to_nfs()

In [323]:
pn.param.history=''
pn._modify_param_common()
assert(pn.param.to_toml()=="""sid = "sample1"
history = "analysis.tag"
delete = [ [ "suginok-p1", "./tests2/5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz",], [ "suginok-p1", "./tests2/5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz",],]

[analysis.tag]
dstdir = "./tests/outputs/datasetid/analysis.tag"
prefix = "./tests/outputs/datasetid/analysis.tag/sample1"
inputs = [ "5d_GH146mtdT_PN_Lee_L1_S69_R1_001.fastq.gz", "5d_GH146mtdT_PN_Lee_L1_S69_R2_001.fastq.gz",]
outputs = []
temps = []
skipped = true

[info.analysis.tag]
""")

# Function to run single task on multiple samples

In [23]:
def run_task(task, params, executor, **kw):
    name = task.run.__name__
    with Flow(name) as flow:
        p1 = task.map(p=params,**kw)
    rslt = flow.run(executor=executor)
    info = rslt.result[flow.get_tasks(name)[0]].result
    return info

# Utility Tasks

In [30]:
# export
@task
def clean_ramdisk_local(p):
    host = get_hostname()
    tgts = p.get('delete',[])
    others = []
    for h,f in tgts:
        if (h==host):
            if os.path.exists(f):
                os.unlink(f)
                print(f'CLEANED:{h}:{f}')
        else:
            others.append((h,f))
            print(f'OTHER HOST:{h}:{f}')
    p.delete = others
    if 'dummy' in p:
        p.dummy = '' # clear dummy data
    return p

@task
def clean_ramdisk_global(ps, cfg):
    tgts = [x for p in ps for x in p.get('delete',[])]
    if len(tgts)==0:
        return ps
    # groupby host
    key = lambda x:x[0]
    grp = {k:list(g) for k,g in groupby(sorted(tgts, key=key), key=key)}
    host = get_hostname()
    # for each remote host use paramiko to delete
    remained = []
    for h in grp.keys():
        files = list(set([x[1] for x in grp[h]]))
        if h!=host:
            remain = remote_remove_files(h, files, cfg.sshuser)
            remained += [(h,f) for f in remain]
        else:# delete local
            deletefiles(files)
            print(f'LOCAL CLEAN: {h}\n','\n  '.join(files))
    p[0].delete = remained
    for p in ps[1:]:
        p.delete = []
    return ps

In [31]:
# export
@task
def merge_params(p1,p2,fields):
    "merge parameters from two upstreams"
    # merge delete by default
    p1 = p1.copy()
    if 'delete' not in fields:
        fields += ['delete']
    for srcdst in fields:
        if isinstance(srcdst,tuple):
            src,dst = srcdst
        else:
            src,dst = srcdst,srcdst
        sval = p2.get(src,None)
        if sval is None:
            continue
        dval = p1.get(dst,None)
        if dval is None: #  just set
            p1[dst] = sval
        else: # merge
            if isinstance(dval,list):
                if isinstance(sval,list):
                    val = dval+sval
                else:
                    val = dval+[sval]
            else:
                if isinstance(sval,list):
                    val = [dval]+sval
                else:
                    val = [dval, sval]    
            #print('VAL:', val, 'dst:', dst)
            p1[dst] = val
    return p1

@task
def write_aggregated_info(ps,cfg,fields,fname='info.csv'):
    odir = cfg.odir
    out = f'{odir}/{fname}'
    if os.path.exists(out):
        os.unlink(out)
    rows = [x.subset(fields) for x in ps]
    df = PD.DataFrame(rows) 
    df.to_csv(out, sep='\t')
    return {'info':df, 'params':ps}

