## imports

In [0]:
import sys

In [0]:
sys.path.append("../include_utils/")

In [0]:
from IPython.parallel import Client
import os, time
#import include_utils as u
import pandas as pd
import numpy as np
import scipy as sp
import numbers
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib.cm as cm
import matplotlib.colors as mcolors
import vcf
from sklearn import preprocessing
from subprocess import Popen, PIPE
import seaborn as sns
from IPython.display import FileLink
import urllib2
import dill
import traceback
from pandas import Series, DataFrame
import gzip
import warnings
warnings.filterwarnings('ignore',category=pd.io.pytables.PerformanceWarning)
%config InlineBackend.figure_format = 'retina'

## ipython parallel

In [0]:
rc = Client(profile="xmn")
dview = rc[:]
lview = rc.load_balanced_view()
#len(dview)
len(lview)

In [0]:
with dview.sync_imports():
    import os
    import sys
    import socket
    import stopwatch
    from subprocess import Popen, PIPE
    import tempfile
    import shutil

In [0]:
@dview.remote(block=True)
def get_cpu_count():
    import multiprocessing as mp
    import socket
    return socket.gethostname(), mp.cpu_count()
cpu_counts = get_cpu_count()
from collections import defaultdict
cpu_dict = defaultdict(list)
for i, c in enumerate(cpu_counts):
    if c[1] >= 16:
        cpu_dict[c[0]].append(i)  
print cpu_dict

In [0]:
cview = rc.load_balanced_view(targets=[v[0] for k, v in cpu_dict.items()])

### find bam files

In [0]:
bam_dir = "/gpfs_fs/home/eckertlab/wbp/bowtie2.1/bowtierun/"
bam_files = []
for root, dirs, files in os.walk(bam_dir):
    for f in files:
        if f.endswith("sorted_rg.bam"):
            bam_files.append(os.path.join(root,f))
bam_files = sorted(bam_files)
len(bam_files)            

In [0]:
bam_files[0]

In [0]:
samtools = "/home/lindb/g/src/samtools-1.2/samtools"
bcftools = "/home/lindb/g/src/bcftools-1.2/bcftools"
picard = "/home/lindb/g/src/picard-tools-1.130/picard.jar"
java = "/usr/global/jre/bin/java"
perl = "/home/lindb/g/src/ActivePerl-5.18.4.1804-x86_64-linux-glibc-2.5-298913/perl/bin/perl"
dup_dir = "/gpfs_fs/home/eckertlab/wbp/bowtie2.1/bowtierun/dup_dir/"

In [0]:
def mark_duplicates(args):
    java, picard, bam_file, dup_dir = args
    out_bam = os.path.join("%s_dedup.bam" % os.path.basename(bam_file[:-4]))
    out_bam = os.path.join(dup_dir, out_bam)
    t = tempfile.NamedTemporaryFile(delete=False)
    cmd = "%s -jar %s MarkDuplicates \
    INPUT=%s OUTPUT=%s METRICS_FILE=%s.metrics" %     (java,
                              picard,
                              bam_file,
                              t.name,
                              out_bam)
    print cmd
    !$cmd
    shutil.move(t.name, out_bam)
    return cmd, out_bam
dview['mark_duplicates'] = mark_duplicates

In [0]:
dup_dir = os.path.join(bam_dir, "dup_dir")
if not os.path.exists(dup_dir):
    os.makedirs(dup_dir)
assert os.path.exists(dup_dir)

In [0]:
bam_files[0][:-4]

In [0]:
dup_jobs = []
for b in bam_files:
    dup_jobs.append(cview.apply_async(mark_duplicates, (java, picard, b, dup_dir)))

In [0]:
counts = 0
uncounts = 0
for d in dup_jobs:
    if d.ready():
        #print d.r
        counts +=1
    else:
        #print d.stdout
        uncounts += 1
counts,uncounts,counts+uncounts

In [0]:
dedup_files = []
for root,dirs,files in os.walk(dup_dir):
    for f in files:
        if f.endswith("bam"):
            dedup_files.append(os.path.join(root,f))
dedup_files = sorted(dedup_files)
len(dedup_files),dedup_files

In [0]:
for d in dedup_files:
    d2 = d.replace("rg.bam","rg")
    cmd = "%s %s" % (d,d2)
    !mv $cmd
    #print cmd

In [0]:
for b in dedup_files:
    rg = !$samtools view -H {b} | grep '^@RG'
    print "\t".join(rg)
    break

In [0]:
def create_dedup_file(args):
    dedup_files, analysis_dir = args
    dedup_file = os.path.join(analysis_dir, "%s.txt" % "dedup_file")
    with open(dedup_file, "w") as o:
        for d in dedup_files:
            #009compiled_sorted_rg_dedup.bam -> 009compiled_sorted
            name = "_".join(os.path.basename(d).split("_")[0:2])
            #name = os.path.join("/home/lindb/g/wbp/samtools/snps",name)
            o.write("%s\t%d\n" % (name, 2))
    return dedup_file
dview['create_dedup_file'] = create_dedup_file

In [0]:
def call_snps(args):
    print socket.gethostname()
    timer = stopwatch.Timer()
    samtools, velvet_ass, dedup_files, bcftools, raw_vcf, out_dir = args 
    if not out_dir:
        out_dir = os.environ['TMPDIR']
    raw_vcf = os.path.join(out_dir, raw_vcf)
    dedup_file = create_dedup_file((dedup_files, out_dir))
    pileup = "%s mpileup -ugf %s %s | %s call -S %s -vmO z -o %s" % (samtools, 
                                                                     velvet_ass, 
                                                                     ' '.join(dedup_files), 
                                                                     bcftools, 
                                                                     dedup_file,
                                                                     raw_vcf) 
    
    #print pileup
    !$pileup
    timer.stop()
    return pileup, timer.elapsed
dview['call_snps'] = call_snps

In [0]:
#velvet_ass = "/home/lindb/g/vo/auto_data_41/contigs.fa"
#outdir = "/home/lindb/g/wbp/samtools/snps/"

velvet_ass = "/gpfs_fs/home/eckertlab/wbp/assembly/auto_data_45/contigs.fa"

In [0]:
args = [samtools, 
        velvet_ass, 
        dedup_files, 
        bcftools, 
        "samtools_1.2.vcf.gz", 
        dup_dir]

In [0]:
#samtools_job = cview.apply_async(call_snps, args)

In [0]:
for line in samtools_job.stdout.split("\n"):
    print line

In [0]:
#samtools,velvet_ass,dedup_files[0],bcftools,"samtools_1.2.vcf.gz",outdir

In [0]:
#call_snps(args)