Skip to content

Commit

Permalink
BUG: remove memory hog from storing barcode stats in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
wasade committed Nov 6, 2020
1 parent a968396 commit 28d7739
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 63 deletions.
70 changes: 35 additions & 35 deletions q2_demux/_demux.py
Expand Up @@ -13,7 +13,6 @@
import collections.abc
import random
import resource
import pandas as pd

import skbio
import psutil
Expand All @@ -24,11 +23,32 @@
SingleLanePerSamplePairedEndFastqDirFmt,
FastqManifestFormat, YamlFormat)
from ._ecc import GolayDecoder
from ._format import ErrorCorrectionDetailsFmt


FastqHeader = collections.namedtuple('FastqHeader', ['id', 'description'])


class ECDetails:
COLUMNS = ['id',
'sample',
'barcode-sequence-id',
'barcode-uncorrected',
'barcode-corrected',
'barcode-errors']

def __init__(self, fmt):
self._fp = open(str(fmt), 'w')
self._write_header()

def write(self, parts):
self._fp.write('\t'.join([str(part) for part in parts]))
self._fp.write('\n')

def _write_header(self):
self.write(self.COLUMNS)


def _read_fastq_seqs(filepath):
# This function is adapted from @jairideout's SO post:
# http://stackoverflow.com/a/39302117/3424666
Expand Down Expand Up @@ -248,7 +268,7 @@ def emp_single(seqs: BarcodeSequenceFastqIterator,
rev_comp_barcodes: bool = False,
rev_comp_mapping_barcodes: bool = False
) -> (SingleLanePerSampleSingleEndFastqDirFmt,
pd.DataFrame):
ErrorCorrectionDetailsFmt):

result = SingleLanePerSampleSingleEndFastqDirFmt()
barcode_map, barcode_len = _make_barcode_map(
Expand All @@ -265,7 +285,9 @@ def emp_single(seqs: BarcodeSequenceFastqIterator,
manifest_fh.write('# joined reads\n')

per_sample_fastqs = {}
ec_details = []

ec_details_fmt = ErrorCorrectionDetailsFmt()
ec_details = ECDetails(ec_details_fmt)

for i, (barcode_record, sequence_record) in enumerate(seqs, start=1):
barcode_read = barcode_record[1]
Expand All @@ -290,12 +312,12 @@ def emp_single(seqs: BarcodeSequenceFastqIterator,
sample_id = barcode_map.get(barcode_read)

record = [
i,
f'record-{i}',
sample_id,
barcode_record[0],
raw_barcode_read,
]
ec_details.append(record + golay_stats)
ec_details.write(record + golay_stats)

if sample_id is None:
continue
Expand Down Expand Up @@ -323,7 +345,6 @@ def emp_single(seqs: BarcodeSequenceFastqIterator,
fastq_lines = '\n'.join(sequence_record) + '\n'
fastq_lines = fastq_lines.encode('utf-8')
per_sample_fastqs[sample_id].write(fastq_lines)
barcode_count = str(i) # last value here should be our largest record no.

if len(per_sample_fastqs) == 0:
raise ValueError('No sequences were mapped to samples. Check that '
Expand All @@ -341,18 +362,7 @@ def emp_single(seqs: BarcodeSequenceFastqIterator,

_write_metadata_yaml(result)

columns = ['id',
'sample',
'barcode-sequence-id',
'barcode-uncorrected',
'barcode-corrected',
'barcode-errors']
details = pd.DataFrame(ec_details, columns=columns)
details['id'] = details['id'].apply(lambda x: 'record-%s' %
str(x).zfill(len(barcode_count)))
details = details.set_index('id')

return result, details
return result, ec_details_fmt


def emp_paired(seqs: BarcodePairedSequenceFastqIterator,
Expand All @@ -361,7 +371,7 @@ def emp_paired(seqs: BarcodePairedSequenceFastqIterator,
rev_comp_barcodes: bool = False,
rev_comp_mapping_barcodes: bool = False
) -> (SingleLanePerSamplePairedEndFastqDirFmt,
pd.DataFrame):
ErrorCorrectionDetailsFmt):

result = SingleLanePerSamplePairedEndFastqDirFmt()
barcode_map, barcode_len = _make_barcode_map(
Expand All @@ -375,7 +385,9 @@ def emp_paired(seqs: BarcodePairedSequenceFastqIterator,
manifest_fh.write('sample-id,filename,direction\n')

per_sample_fastqs = {}
ec_details = []

ec_details_fmt = ErrorCorrectionDetailsFmt()
ec_details = ECDetails(ec_details_fmt)

for i, record in enumerate(seqs, start=1):
barcode_record, forward_record, reverse_record = record
Expand All @@ -401,12 +413,12 @@ def emp_paired(seqs: BarcodePairedSequenceFastqIterator,
sample_id = barcode_map.get(barcode_read)

record = [
i,
f'record-{i}',
sample_id,
barcode_record[0],
raw_barcode_read,
]
ec_details.append(record + golay_stats)
ec_details.write(record + golay_stats)

if sample_id is None:
continue
Expand Down Expand Up @@ -443,7 +455,6 @@ def emp_paired(seqs: BarcodePairedSequenceFastqIterator,
fwd, rev = per_sample_fastqs[sample_id]
fwd.write(('\n'.join(forward_record) + '\n').encode('utf-8'))
rev.write(('\n'.join(reverse_record) + '\n').encode('utf-8'))
barcode_count = str(i) # last value here should be our largest record no.

if len(per_sample_fastqs) == 0:
raise ValueError('No sequences were mapped to samples. Check that '
Expand All @@ -462,15 +473,4 @@ def emp_paired(seqs: BarcodePairedSequenceFastqIterator,

_write_metadata_yaml(result)

columns = ['id',
'sample',
'barcode-sequence-id',
'barcode-uncorrected',
'barcode-corrected',
'barcode-errors']
details = pd.DataFrame(ec_details, columns=columns)
details['id'] = details['id'].apply(lambda x: 'record-%s' %
str(x).zfill(len(barcode_count)))
details = details.set_index('id')

return result, details
return result, ec_details_fmt
64 changes: 36 additions & 28 deletions q2_demux/tests/test_demux.py
Expand Up @@ -295,23 +295,24 @@ def test_valid_ecc_no_golay(self):
_, ecc = emp_single(self.bsi, self.barcode_map,
golay_error_correction=False)
exp_errors = pd.DataFrame([
['sample1', '@s1/2 abc/2', 'AAAA', None, None],
['sample3', '@s2/2 abc/2', 'TTAA', None, None],
['sample2', '@s3/2 abc/2', 'AACC', None, None],
['sample3', '@s4/2 abc/2', 'TTAA', None, None],
['sample2', '@s5/2 abc/2', 'AACC', None, None],
['sample1', '@s6/2 abc/2', 'AAAA', None, None],
['sample5', '@s7/2 abc/2', 'CGGC', None, None],
['sample4', '@s8/2 abc/2', 'GGAA', None, None],
['sample5', '@s9/2 abc/2', 'CGGC', None, None],
['sample5', '@s10/2 abc/2', 'CGGC', None, None],
['sample4', '@s11/2 abc/2', 'GGAA', None, None]
['sample1', '@s1/2 abc/2', 'AAAA', 'None', 'None'],
['sample3', '@s2/2 abc/2', 'TTAA', 'None', 'None'],
['sample2', '@s3/2 abc/2', 'AACC', 'None', 'None'],
['sample3', '@s4/2 abc/2', 'TTAA', 'None', 'None'],
['sample2', '@s5/2 abc/2', 'AACC', 'None', 'None'],
['sample1', '@s6/2 abc/2', 'AAAA', 'None', 'None'],
['sample5', '@s7/2 abc/2', 'CGGC', 'None', 'None'],
['sample4', '@s8/2 abc/2', 'GGAA', 'None', 'None'],
['sample5', '@s9/2 abc/2', 'CGGC', 'None', 'None'],
['sample5', '@s10/2 abc/2', 'CGGC', 'None', 'None'],
['sample4', '@s11/2 abc/2', 'GGAA', 'None', 'None']
],
columns=['sample', 'barcode-sequence-id',
'barcode-uncorrected', 'barcode-corrected',
'barcode-errors'],
index=pd.Index(['record-%02d' % i for i in range(1, 12)],
index=pd.Index(['record-%d' % i for i in range(1, 12)],
name='id'))
ecc = qiime2.Metadata.load(str(ecc)).to_dataframe()
pdt.assert_frame_equal(ecc, exp_errors)

def test_valid_with_barcode_errors(self):
Expand Down Expand Up @@ -359,16 +360,19 @@ def test_valid_with_barcode_errors(self):
['sample2', '@s5/2 abc/2', 'ACACACTATGGC', 'ACACACTATGGC', 0],
['sample1', '@s6/2 abc/2', 'ACGATGCGACCA', 'ACGATGCGACCA', 0],
['sample5', '@s7/2 abc/2', 'CATTGTATCAAC', 'CATCGTATCAAC', 1],
[None, '@s8/2 abc/2', 'CTAACGCAGGGG', None, 4],
['None', '@s8/2 abc/2', 'CTAACGCAGGGG', 'None', 4],
['sample5', '@s9/2 abc/2', 'CATCGTATCAAC', 'CATCGTATCAAC', 0],
['sample5', '@s10/2 abc/2', 'CATCGTATCAAC', 'CATCGTATCAAC', 0],
['sample4', '@s11/2 abc/2', 'CTAACGCAGTCA', 'CTAACGCAGTCA', 0]
],
columns=['sample', 'barcode-sequence-id',
'barcode-uncorrected', 'barcode-corrected',
'barcode-errors'],
index=pd.Index(['record-%02d' % i for i in range(1, 12)],
index=pd.Index(['record-%d' % i for i in range(1, 12)],
name='id'))
exp_errors['barcode-errors'] = \
exp_errors['barcode-errors'].astype(float)
error_detail = qiime2.Metadata.load(str(error_detail)).to_dataframe()
pdt.assert_frame_equal(error_detail, exp_errors)

@mock.patch('q2_demux._demux.OPEN_FH_LIMIT', 3)
Expand Down Expand Up @@ -707,16 +711,19 @@ def check_valid(self, *args, **kwargs):
['sample2', '@s5/2 abc/2', 'ACACACTATGGC', 'ACACACTATGGC', 0],
['sample1', '@s6/2 abc/2', 'ACGATGCGACCA', 'ACGATGCGACCA', 0],
['sample5', '@s7/2 abc/2', 'CATTGTATCAAC', 'CATCGTATCAAC', 1],
[None, '@s8/2 abc/2', 'CTAACGCAGGGG', None, 4],
['None', '@s8/2 abc/2', 'CTAACGCAGGGG', 'None', 4],
['sample5', '@s9/2 abc/2', 'CATCGTATCAAC', 'CATCGTATCAAC', 0],
['sample5', '@s10/2 abc/2', 'CATCGTATCAAC', 'CATCGTATCAAC', 0],
['sample4', '@s11/2 abc/2', 'CTAACGCAGTCA', 'CTAACGCAGTCA', 0]
],
columns=['sample', 'barcode-sequence-id',
'barcode-uncorrected', 'barcode-corrected',
'barcode-errors'],
index=pd.Index(['record-%02d' % i for i in range(1, 12)],
index=pd.Index(['record-%d' % i for i in range(1, 12)],
name='id'))
exp_errors['barcode-errors'] = \
exp_errors['barcode-errors'].astype(float)
ecc = qiime2.Metadata.load(str(ecc)).to_dataframe()
pdt.assert_frame_equal(ecc, exp_errors)

def test_valid(self):
Expand All @@ -727,23 +734,24 @@ def test_valid_ecc_no_golay(self):
_, ecc = emp_paired(self.bpsi, self.barcode_map,
golay_error_correction=False)
exp_errors = pd.DataFrame([
['sample1', '@s1/2 abc/2', 'AAAA', None, None],
['sample3', '@s2/2 abc/2', 'TTAA', None, None],
['sample2', '@s3/2 abc/2', 'AACC', None, None],
['sample3', '@s4/2 abc/2', 'TTAA', None, None],
['sample2', '@s5/2 abc/2', 'AACC', None, None],
['sample1', '@s6/2 abc/2', 'AAAA', None, None],
['sample5', '@s7/2 abc/2', 'CGGC', None, None],
['sample4', '@s8/2 abc/2', 'GGAA', None, None],
['sample5', '@s9/2 abc/2', 'CGGC', None, None],
['sample5', '@s10/2 abc/2', 'CGGC', None, None],
['sample4', '@s11/2 abc/2', 'GGAA', None, None]
['sample1', '@s1/2 abc/2', 'AAAA', 'None', 'None'],
['sample3', '@s2/2 abc/2', 'TTAA', 'None', 'None'],
['sample2', '@s3/2 abc/2', 'AACC', 'None', 'None'],
['sample3', '@s4/2 abc/2', 'TTAA', 'None', 'None'],
['sample2', '@s5/2 abc/2', 'AACC', 'None', 'None'],
['sample1', '@s6/2 abc/2', 'AAAA', 'None', 'None'],
['sample5', '@s7/2 abc/2', 'CGGC', 'None', 'None'],
['sample4', '@s8/2 abc/2', 'GGAA', 'None', 'None'],
['sample5', '@s9/2 abc/2', 'CGGC', 'None', 'None'],
['sample5', '@s10/2 abc/2', 'CGGC', 'None', 'None'],
['sample4', '@s11/2 abc/2', 'GGAA', 'None', 'None']
],
columns=['sample', 'barcode-sequence-id',
'barcode-uncorrected', 'barcode-corrected',
'barcode-errors'],
index=pd.Index(['record-%02d' % i for i in range(1, 12)],
index=pd.Index(['record-%d' % i for i in range(1, 12)],
name='id'))
ecc = qiime2.Metadata.load(str(ecc)).to_dataframe()
pdt.assert_frame_equal(ecc, exp_errors)

def test_valid_with_barcode_errors(self):
Expand Down

0 comments on commit 28d7739

Please sign in to comment.