# Work on updating queue

When running my updated workflows I need to update the queue by checking if the outputs are there and if they are to remove the samples from the queue.

In [1]:
# %load ../start.py
# Load useful extensions

# Activate the autoreload extension for easy reloading of external packages
%reload_ext autoreload
%autoreload 2

# Trun on the water mark
%reload_ext watermark
%watermark -u -d -g

# Load ipycache extension
%reload_ext ipycache
from ipycache import CacheMagics
CacheMagics.cachedir = '../cachedir'

# Add project library to path
import sys
sys.path.insert(0, '../../lib/python')

# The usual suspects
import os
import numpy as np
import pandas as pd

# plotting
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline
sns.set_context('poster')

# Turn off scientific notation
np.set_printoptions(precision=5, suppress=True)


last updated: 2017-10-20 
Git hash: 0cccc7b3c485cf7ec0f83945ebdc172b9d875143


In [2]:
# more imports
from lcdblib.snakemake import helpers
from lcdblib.utils import utils

from ncbi_remap.io import remove_chunk, add_table

In [72]:
store = pd.HDFStore('../../sra.h5')

In [73]:
store

<class 'pandas.io.pytables.HDFStore'>
File path: ../../sra.h5
/aln/alignment_bad                    frame_table  (typ->appendable,nrows->84,ncols->2,indexers->[index],dc->[srr,srx])   
/aln/complete                         frame_table  (typ->appendable,nrows->15157,ncols->2,indexers->[index],dc->[srr,srx])
/aln/queue                            frame_table  (typ->appendable,nrows->3,ncols->2,indexers->[index],dc->[srx,srr])    
/ids                                  frame_table  (typ->appendable,nrows->35363,ncols->2,indexers->[index],dc->[srx,srr])
/prealn/abi_solid                     frame_table  (typ->appendable,nrows->224,ncols->2,indexers->[index],dc->[srr,srx])  
/prealn/alignment_bad                 frame_table  (typ->appendable,nrows->2242,ncols->2,indexers->[index],dc->[srr,srx]) 
/prealn/complete                      frame_table  (typ->appendable,nrows->22194,ncols->2,indexers->[index],dc->[srr,srx])
/prealn/download_bad                  frame_table  (typ->appendable,nrows->72

# Set up a test queue

In [24]:
# Create a test queue from the real queue
queue = store['prealn/queue']
add_table(store, 'test/queue', queue, force=True, columns='all')
store.root.test.queue.table

/test/queue/table (Table(10632,)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "srx": StringCol(itemsize=10, shape=(), dflt=b'', pos=1),
  "srr": StringCol(itemsize=10, shape=(), dflt=b'', pos=2)}
  byteorder := 'little'
  chunkshape := (2340,)
  autoindex := True
  colindexes := {
    "srx": Index(9, full, shuffle, zlib(1)).is_csi=True,
    "srr": Index(9, full, shuffle, zlib(1)).is_csi=True}

In [5]:
# get the number of rows
orig = store['test/queue'].shape[0]

# Update queue

In [6]:
# grab a list of ids from the queue. doing a small set for speed.
ids = store['test/queue'].head(10)
ids

Unnamed: 0,srx,srr
7,SRX2551010,SRR5244308
8,SRX2551009,SRR5244306
39,SRX2541752,SRR5234226
40,SRX2541751,SRR5234225
41,SRX2541750,SRR5234224
42,SRX2541749,SRR5234223
43,SRX2541748,SRR5234222
44,SRX2541747,SRR5234221
87,SRX062905,SRR206928
88,SRX039370,SRR206929


In [7]:
# functions and constants
# patterns from workflow
patterns = {
    'fastq': {
        'r1': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}_1.fastq.gz',
        'r2': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}_2.fastq.gz',
        'summary': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.fastq.tsv',
    },
    'layout': '../../prealn-wf/output/samples/{srx}/{srr}/LAYOUT',
    'fastq_screen': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}_1.fastq_screen.txt',
    'fastqc': {
        'html': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}_1.fastqc.html',
        'zip': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}_1.fastqc.zip',
    },
    'atropos': {
        'r1': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}_1.trim.clean.fastq.gz',
        'r2': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}_2.trim.clean.fastq.gz',
    },
    'hisat2': {
        'splice_sites': '../../prealn-wf/output/known_splice_sites_r6-11.txt',
        'bam': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam',
        'summary': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.tsv',
    },
    'bai': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.bai',
    'feature_counts': {
        'counts': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.feature_counts.counts',
        'jcounts': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.feature_counts.counts.jcounts',
        'summary': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.feature_counts.counts.summary',
    },
    'picard': {
        'collectrnaseqmetrics': {
            'metrics': {
                'unstranded': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.NONE.picard.collectrnaseqmetrics',
                'first': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.FIRST_READ_TRANSCRIPTION_STRAND.picard.collectrnaseqmetrics',
                'second': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.SECOND_READ_TRANSCRIPTION_STRAND.picard.collectrnaseqmetrics',
            },
        },
        'markduplicates': {
            'bam': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.picard.markduplicates.bam',
            'metrics': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.picard.markduplicates.metrics',
        },
    },
    'strand': '../../prealn-wf/output/samples/{srx}/{srr}/STRAND',
    'samtools_stats': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.samtools.stats',
    'samtools_idxstats': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.samtools.idxstats',
    'bamtools_stats': '../../prealn-wf/output/samples/{srx}/{srr}/{srr}.hisat2.bam.bamtools.stats',
}

def keepers(targets):
    """These are files we want to keep from the workflow."""
    return [
        targets['fastq_screen'],
        targets['hisat2']['summary'],
        targets['feature_counts']['summary'],
        targets['samtools_stats'],
        targets['samtools_idxstats'],
        targets['bamtools_stats'],
        targets['picard']['markduplicates']['metrics']
    ]

def check(srx, srr, targets):
    """Check if all files are present"""
    for fname in utils.flatten(keepers(targets)):
        if not os.path.exists(fname):
            return
    return srx, srr

In [8]:
# Pull out samples that have all of the files.
done = []
for i, row in ids.iterrows():
    srx, srr = row.srx, row.srr
    targets = helpers.fill_patterns(patterns, row)
    value = check(srx, srr, targets)
    if value is not None:
        done.append(value)

df = pd.DataFrame(done, columns=['srx', 'srr'])

In [9]:
# Remove samples for test queue
remove_chunk(store, 'test/queue', df.srr.tolist())
updated = store['test/queue'].shape[0]

# make sure the test queue now is shorter than the origin queue
assert df.shape[0] == (orig - updated)

# Make sure the dropped srrs are no longer in the queue
assert store['test/queue'].srr.isin(df.srr).any() == False

In [10]:
# Add samples to test aln queue
add_table(store, 'test/aln/queue', data=df)

# test that adding keeps unique
new = store['test/aln/queue'].shape[0]
add_table(store, 'test/aln/queue', data=df)
assert new == store['test/aln/queue'].shape[0]

# Clean up

In [11]:
# remove test
store.remove('test')

In [12]:
add_table?

[0;31mSignature:[0m [0madd_table[0m[0;34m([0m[0mstore[0m[0;34m,[0m [0mkey[0m[0;34m,[0m [0mdata[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mforce[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Create a new HDF5 table.

Adds a dataframe to an HDF5 store and creates an index.

Parameters
----------
store : pd.io.pytables.HDFStore
    The data store to save to.
key : str
    The path in the HDF5 store to save data to.
data : pd.DataFrame
    The data to store.
force : bool
    If True then delete the previous store if it exists.
[0;31mFile:[0m      /spin1/users/fearjm/ncbi_remap/lib/python/ncbi_remap/io.py
[0;31mType:[0m      function


In [13]:
remove_chunk?

[0;31mSignature:[0m [0mremove_chunk[0m[0;34m([0m[0mstore[0m[0;34m,[0m [0mkey[0m[0;34m,[0m [0msrrs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Removes an ID to the ids data store.

If the SRR is not in the current collection, then append the srx and srr.

Parameters
----------
store : pd.io.pytables.HDFStore
    The data store to save to.
key : str
    The path in the HDF5 store to save data to.
srrs : list
    A list of SRRs to remove.
[0;31mFile:[0m      /spin1/users/fearjm/ncbi_remap/lib/python/ncbi_remap/io.py
[0;31mType:[0m      function


In [14]:
if None:
    print('bob')

In [17]:
sr = df.iloc[0]

In [18]:
sr.to_dict()

{'srr': 'SRR5244308', 'srx': 'SRX2551010'}

In [26]:
first = store['test/queue'].iloc[0]

In [28]:
first

srx    SRX2551010
srr    SRR5244308
Name: 7, dtype: object

In [27]:
from ncbi_remap.io import remove_id

In [34]:
remove_id(store, 'test/queue', **first.to_dict())

In [30]:
kwargs = first.to_dict()

In [31]:
kwargs

{'srr': 'SRR5244308', 'srx': 'SRX2551010'}

In [33]:
store['test/queue']

Unnamed: 0,srx,srr
8,SRX2551009,SRR5244306
39,SRX2541752,SRR5234226
40,SRX2541751,SRR5234225
41,SRX2541750,SRR5234224
42,SRX2541749,SRR5234223
43,SRX2541748,SRR5234222
44,SRX2541747,SRR5234221
87,SRX062905,SRR206928
88,SRX039370,SRR206929
89,SRX062906,SRR206930


In [41]:
store.root.strand.unstranded.table

/strand/unstranded/table (Table(2,)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "srr": StringCol(itemsize=10, shape=(), dflt=b'', pos=1),
  "srx": StringCol(itemsize=10, shape=(), dflt=b'', pos=2)}
  byteorder := 'little'
  chunkshape := (2340,)
  autoindex := True
  colindexes := {
    "srx": Index(9, full, shuffle, zlib(1)).is_csi=True,
    "srr": Index(9, full, shuffle, zlib(1)).is_csi=True}

In [47]:
bob = store['strand/unstranded'].iloc[0].to_frame().T

In [49]:
bob.isin(store['strand/unstranded'])

ValueError: cannot compute isin with a duplicate axis.

In [55]:
bob.index = [10]

In [64]:
store['strand/unstranded'].isin(bob.to_dict('list'))

Unnamed: 0,srr,srx
0,True,True
0,False,False


In [63]:
bob.to_dict('list')

{'srr': ['SRR5244308'], 'srx': ['SRX2551010']}

In [65]:
bob.isin(store['strand/unstranded'].to_dict('list'))

Unnamed: 0,srr,srx
10,True,True


In [70]:
store.remove('aln/queue')

In [71]:
store.close()