diff --git a/.coveragerc b/.coveragerc deleted file mode 100644 index 576e8dd..0000000 --- a/.coveragerc +++ /dev/null @@ -1,19 +0,0 @@ -# this file is based on the examples provided on scikit-learn's .coveragerc - -[run] -omit = - */test* - */__init__.py -source = qiita_files -branch = True -include = */qiita_*/* - -[report] -exclude_lines = - pragma: no cover - def __repr__ - raise NotImplementedError - if __name__ == .__main__.: -omit = - */test* - */__init__.py diff --git a/.travis.yml b/.travis.yml index b759ad0..f6e1cb4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ install: - source activate env_name - travis_retry pip install . script: - - nosetests --with-doctest --with-coverage + - nosetests --with-doctest --with-coverage --cover-package=qiita_files - flake8 qiita_files setup.py after_success: - coveralls diff --git a/qiita_files/demux.py b/qiita_files/demux.py index f7ab4d0..a358b0b 100644 --- a/qiita_files/demux.py +++ b/qiita_files/demux.py @@ -54,6 +54,7 @@ from re import search import numpy as np +import joblib from future.utils import viewitems, viewvalues from future.builtins import zip @@ -389,6 +390,20 @@ def to_hdf5(fp, h5file, max_barcode_length=12): buffers[pjoin(dset_paths['qual'])].write(qual) +def _to_ascii(demux, samples, formatter): + """Aux function to change from hdf5 to ascii""" + id_fmt = (b"%(sample)s_%(idx)d orig_bc=%(bc_ori)s new_bc=%(bc_cor)s " + b"bc_diffs=%(bc_diff)d") + + for samp, idx, seq, qual, bc_ori, bc_cor, bc_err in fetch(demux, samples): + seq_id = id_fmt % {b'sample': samp, b'idx': idx, b'bc_ori': bc_ori, + b'bc_cor': bc_cor, b'bc_diff': bc_err} + if qual != []: + qual = qual.astype(np.uint8) + + yield formatter(seq_id, seq, qual) + + def to_ascii(demux, samples=None): """Consume a demuxed HDF5 file and yield sequence records @@ -412,19 +427,10 @@ def to_ascii(demux, samples=None): else: formatter = format_fasta_record - id_fmt = (b"%(sample)s_%(idx)d orig_bc=%(bc_ori)s new_bc=%(bc_cor)s " - b"bc_diffs=%(bc_diff)d") - if samples is None: samples = demux.keys() - for samp, idx, seq, qual, bc_ori, bc_cor, bc_err in fetch(demux, samples): - seq_id = id_fmt % {b'sample': samp, b'idx': idx, b'bc_ori': bc_ori, - b'bc_cor': bc_cor, b'bc_diff': bc_err} - if qual != []: - qual = qual.astype(np.uint8) - - yield formatter(seq_id, seq, qual) + return _to_ascii(demux, samples, formatter) def to_per_sample_ascii(demux, samples=None): @@ -455,6 +461,60 @@ def to_per_sample_ascii(demux, samples=None): yield samp, to_ascii(demux, samples=[samp]) +def _to_file(demux_fp, sample, fp, formatter): + with open_file(demux_fp, 'r+') as demux: + with open(fp, 'wb') as out: + for rec in _to_ascii(demux, [sample], formatter): + out.write(rec) + + +def to_per_sample_files(demux_fp, samples=None, out_dir='./', n_jobs=1, + out_format='fastq'): + """Writes per sample files + + Parameters + ---------- + demux_fp : str + The demux file path + samples : list of str, optional + Samples to pull out. If None, then all samples will be examined. + Defaults to None. + out_dir : str, optional + Path to output directory to store the per sample fasta. + Defaults to current directory + n_jobs : int, optional + Number of jobs to run in parallel. Defaults to 1 + out_format : {'fastq', 'fasta'} + The format in which the output files should be written. + """ + if out_format == 'fastq': + formatter = format_fastq_record + file_name_fmt = "%s.fastq" + elif out_format == 'fasta': + formatter = format_fasta_record + file_name_fmt = "%s.fna" + else: + raise ValueError("'out_format' should be either 'fastq' or 'fasta', " + "found: %s" % out_format) + if samples is None: + with open_file(demux_fp, 'r') as demux: + # We need to call list because demux.keys() is a KeysView object + # from the file, and the file will be closed once we exit the + # context manager + samples = list(demux.keys()) + + if out_dir is None: + out_dir = './' + + path_builder = partial(os.path.join, out_dir) + samples_and_paths = [(s.encode(), path_builder(file_name_fmt % s)) + for s in samples] + + with joblib.Parallel(n_jobs=n_jobs) as par: + par(joblib.delayed(_to_file)(demux_fp, sample, s_fp, formatter) + for sample, s_fp in samples_and_paths) + + def fetch(demux, samples=None, k=None): """Fetch sequences from a HDF5 demux file diff --git a/qiita_files/tests/test_demux.py b/qiita_files/tests/test_demux.py index 9f82c4b..f191ec9 100644 --- a/qiita_files/tests/test_demux.py +++ b/qiita_files/tests/test_demux.py @@ -11,6 +11,8 @@ import os import tempfile from unittest import TestCase, main +from functools import partial +from shutil import rmtree import h5py import numpy as np @@ -19,7 +21,8 @@ from qiita_files.demux import (buffer1d, buffer2d, _has_qual, _per_sample_lengths, _summarize_lengths, _set_attr_stats, _construct_datasets, to_hdf5, - to_ascii, stat, to_per_sample_ascii) + to_ascii, stat, to_per_sample_ascii, + to_per_sample_files) class BufferTests(TestCase): @@ -126,7 +129,11 @@ def setUp(self): def tearDown(self): self.hdf5_file.close() for f in self.to_remove: - os.remove(f) + if os.path.exists(f): + if os.path.isdir(f): + rmtree(f) + else: + os.remove(f) def test_has_qual(self): with tempfile.NamedTemporaryFile('r+', suffix='.fna') as f: @@ -321,6 +328,69 @@ def test_to_per_sample_ascii(self): obs = [(s[0], list(s[1])) for s in to_per_sample_ascii(self.hdf5_file)] self.assertEqual(obs, exp) + def test_to_files(self): + # implicitly tested with test_to_per_sample_fasta + pass + + def test_to_per_sample_files(self): + with tempfile.NamedTemporaryFile('r+', suffix='.fq', + delete=False) as f: + f.write(fqdata_variable_length) + + self.to_remove.append(f.name) + + with tempfile.NamedTemporaryFile('r+', suffix='.demux', + delete=False) as demux_f: + pass + + self.to_remove.append(demux_f.name) + + with h5py.File(demux_f.name, 'w') as demux: + to_hdf5(f.name, demux) + + tmp_dir = tempfile.mkdtemp() + self.to_remove.append(tmp_dir) + path_builder = partial(os.path.join, tmp_dir) + + # Test to fastq + to_per_sample_files(demux_f.name, out_dir=tmp_dir, n_jobs=1, + out_format='fastq') + sample_a_path = path_builder("a.fastq") + sample_b_path = path_builder("b.fastq") + self.assertTrue(os.path.exists(sample_a_path)) + self.assertTrue(os.path.exists(sample_b_path)) + + with open(sample_a_path, 'rb') as af: + obs = af.read() + self.assertEqual( + obs, b'@a_0 orig_bc=abc new_bc=abc bc_diffs=0\nxyz\n+\nABC\n') + + with open(sample_b_path, 'rb') as bf: + obs = bf.read() + self.assertEqual( + obs, b'@b_0 orig_bc=abw new_bc=wbc bc_diffs=4\nqwe\n+\nDFG\n' + b'@b_1 orig_bc=abw new_bc=wbc bc_diffs=4\nqwexx\n+\nDEF#G\n') + + # Test to fasta and parallel + to_per_sample_files(demux_f.name, out_dir=tmp_dir, n_jobs=2, + out_format='fasta') + + sample_a_path = path_builder("a.fna") + sample_b_path = path_builder("b.fna") + self.assertTrue(os.path.exists(sample_a_path)) + self.assertTrue(os.path.exists(sample_b_path)) + + with open(sample_a_path, 'rb') as af: + obs = af.read() + self.assertEqual( + obs, b'>a_0 orig_bc=abc new_bc=abc bc_diffs=0\nxyz\n') + + with open(sample_b_path, 'rb') as bf: + obs = bf.read() + self.assertEqual( + obs, b'>b_0 orig_bc=abw new_bc=wbc bc_diffs=4\nqwe\n' + b'>b_1 orig_bc=abw new_bc=wbc bc_diffs=4\nqwexx\n') + def test_fetch(self): # implicitly tested with test_to_ascii pass diff --git a/setup.py b/setup.py index 6692772..08f1736 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,6 @@ 'qiita_files/format', 'qiita_files/parse'], extras_require={'test': ["nose >= 0.10.1", "pep8"]}, - install_requires=['future', 'numpy', 'six', 'h5py'], + install_requires=['future', 'numpy', 'six', 'h5py', 'joblib'], classifiers=classifiers )