Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up strip_input_fastq process #327

Merged
merged 9 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* [#266](https://github.com/nf-core/eager/issues/266) - Added sanity checks for input filetypes (i.e. only BAM files can be supplied if `--bam`)
* [#237](https://github.com/nf-core/eager/issues/237) - Fixed and Updated script scrape_software_versions
* [#322](https://github.com/nf-core/eager/pull/322) - Move extract map reads fastq compression to pigz
* [#327](https://github.com/nf-core/eager/pull/327) - Speed up strip_input_fastq process and make it more robust

### `Dependencies`

Expand Down
274 changes: 167 additions & 107 deletions bin/extract_map_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import argparse
import multiprocessing
import pysam
from xopen import xopen
from functools import partial
import gzip
import sys
from Bio.SeqIO.QualityIO import FastqGeneralIterator
from io import StringIO


def _get_args():
Expand Down Expand Up @@ -57,34 +59,36 @@ def _get_args():
return(bam, in_fwd, in_rev, out_fwd, out_rev, mode, proc)


def extract_mapped_chr(chr, bam):
def extract_mapped_chr(chr):
"""
Get mapped reads per chromosome
INPUT:
- chr(str): chromosome
- bam(str): bamfile path
OUTPUT:
- res(list): list of mapped reads (str) name per chromosome
Args:
chr(str): chromosome
bam(str): bamfile path
Returns:
res(list): list of mapped reads (str) name per chromosome
"""
res = []
bamfile = pysam.AlignmentFile(bam, "rb")
reads = bamfile.fetch(chr, multiple_iterators=True)
reads = BAMFILE.fetch(chr, multiple_iterators=True)
for read in reads:
res.append(read.query_name)
if read.is_unmapped == False:
if read.query_name.startswith("M_"):
read_name = read.query_name.replace(
"M_", "").split()[0].split("/")[0]
else:
read_name = read.query_name.split()[0].split("/")[0]
res.append(read_name)
return(res)


def extract_mapped(bam, processes):
"""
Get mapped reads in parallel
INPUT:
- bam(str): bamfile path
OUTPUT:
- result(list) list of mapped reads name (str)
def extract_mapped(proc):
"""Get mapped reads in parallel
Returns:
bamfile(str): path to bam alignment file
result(list): list of mapped reads name (str)
"""
try:
bamfile = pysam.AlignmentFile(bam, "rb")
chrs = bamfile.references
chrs = BAMFILE.references
except ValueError as e:
print(e)

Expand All @@ -93,50 +97,50 @@ def extract_mapped(bam, processes):
return([])

# Checking that nb_process is not > nb_chromosomes
elif len(chrs) < processes:
print(
f"""Requested {processes} processe(s),
but can only be parallelized on {len(chrs)}
processes with these data""")
processes = len(chrs)

extract_mapped_chr_partial = partial(extract_mapped_chr, bam=bam)
p = multiprocessing.Pool(processes)
res = p.map(extract_mapped_chr_partial, chrs)
p.close()
p.join()
result = [i for ares in res for i in ares]
proc = min(proc, len(chrs))
with multiprocessing.Pool(proc) as p:
res = p.map(extract_mapped_chr, chrs)
result = [i for ares in res for i in ares if len(i) > 0]
return(result)


def parse_fq(fq):
"""
Parse a FASTQ file
INPUT:
- fq(str): path to fastq file
OUTPUT:
- fqd(dict): dictionary with read names as keys, seq and quality as values
"""Parse a FASTQ file
Args:
fq(str): path to fastq file
Returns:
fqd(dict): dictionary with read names as keys, seq and quality as values
in a list
"""

def get_fq_reads(allreads):
fqd = {}
myflag = True
for line in allreads:
line = line.decode('utf-8').rstrip()
if myflag == True:
instrument = line.split()[0].split(":")[0]
myflag = False
if line.startswith(instrument):
seqname = line[1:].split()[0]
fqd[seqname] = []
continue
else:
fqd[seqname].append(line)
return(fqd)
read_dict = {}
for title, seq, qual in FastqGeneralIterator(allreads):
# NEED TO ONLY KEEP THE FIRST PART OF THE FASTQ READ NAME FOR CROSS
# REFERENCING WITH BAM FILE: ONLY THIS INFORMATION IS KEPT WHEN
# COLLAPSING READS WITH ADAPTERREMOVAL

# Until fastq format 1.8
# Split after slash
# @HWUSI-EAS100R:6:73:941:1973#0/1
suf_title = ""
title_space = title.split()
if len(title_space) > 1:
title = title_space[0]
suf_title = f" {title_space[1]}"

# From fastq format 1.8
# Split after space
# @EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG
title_slash = title.split("/")
if len(title_slash) > 1:
title = title_slash[0]
suf_title = f"/{title_slash[1]}"

read_dict[title] = [suf_title, seq, "+", qual]
return(read_dict)

if fq.endswith('.gz'):
with gzip.open(fq, 'rb') as allreads:
with xopen(fq, 'r') as allreads:
fqd = get_fq_reads(allreads)
else:
with open(fq, 'r') as allreads:
Expand All @@ -145,83 +149,139 @@ def get_fq_reads(allreads):
return(fqd)


def sort_mapped(fq_dict, mapped_reads):
"""
Sort mapped reads from dictionary of fastq reads
INPUT:
- fq_dict(dict) dictionary with read names as keys, seq and quality as values
in a list
- mapped_reads(list) list of mapped reads
OUTPUT:
- mfqd(dict) dictionary with mapped read names as keys, seq and quality as values
def get_mapped_reads(fq_dict, mapped_reads):
"""Sort mapped reads from dictionary of fastq reads
Args:
fq_dict(dict) dictionary with read names as keys, seq and quality as values
in a list
- fqd(dict) dictionary with unmapped read names as key, unmapped/mapped (u|m),
mapped_reads(list) list of mapped reads
Returns:
fqd(dict) dictionary with read names as key, unmapped/mapped (u|m),
seq and quality as values in a list
"""

def intersection(list1, list2):
return(set(list1).intersection(list2))

def difference(list1, list2):
return(set(list1).difference(list2))

fqd = {}
unmapped = [i for i in list(fq_dict.keys()) if i not in mapped_reads]
mapped = [i for i in list(fq_dict.keys()) if i in mapped_reads]
# print(unmap)
for r in unmapped:
fqd[r] = ['u']+fq_dict[r]
for r in mapped:
fqd[r] = ['m']+fq_dict[r]
all_reads = list(fq_dict.keys())
mapped = intersection(all_reads, mapped_reads)
unmapped = difference(all_reads, mapped_reads)

for rm in mapped:
fqd[rm] = ['m']+fq_dict[rm]
for ru in unmapped:
fqd[ru] = ['u']+fq_dict[ru]

return(fqd)


def write_fq(fq_dict, fname, mode):
"""
Write to fastq file
INPUT:
- fq_dict(dict) dictionary with unmapped read names as keys,
unmapped/mapped (u|m), seq, and quality as values in a list
- fname(string) Path to output fastq file
- mode(string) strip (remove read) or replace (replace read sequence) by Ns
def write_fq(fq_dict, fname, write_mode, strip_mode, proc):
"""Write to fastq file
Args:
fq_dict(dict): fq_dict with unmapped read names as keys,
unmapped/mapped (u|m), seq, and quality as values in a list
fname(string) Path to output fastq file
write_mode (str): 'rb' or 'r'
strip_mode (str): strip (remove read) or replace (replace read sequence) by Ns
proc(int) number of processes
"""
with open(fname, 'w') as f:
for k in list(fq_dict.keys()):
if mode == 'strip':
if fq_dict[k][0] == 'u':
f.write(f"@{k}\n")
for i in fq_dict[k][1:]:
f.write(f"{i}\n")
elif fq_dict[k][0] == 'm':
continue
elif mode == 'replace':
if fq_dict[k][0] == 'u':
f.write(f"@{k}\n")
for i in fq_dict[k][1:]:
f.write(f"{i}\n")
elif fq_dict[k][0] == 'm':
f.write(f"@{k}\n")
f.write(f"{'N'*len(fq_dict[k][1])}\n")
for i in fq_dict[k][2:]:
f.write(f"{i}\n")
fq_dict_keys = list(fq_dict.keys())
if write_mode == 'wb':
with xopen(fname, mode='wb', threads=proc) as fw:
for fq_dict_key in fq_dict_keys:
wstring = ""
if strip_mode == 'strip':
if fq_dict[fq_dict_key][0] == 'u':
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
for i in fq_dict[fq_dict_key][2:]:
wstring += f"{i}\n"
elif strip_mode == 'replace':
# if unmapped, write all the read lines
if fq_dict[fq_dict_key][0] == 'u':
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
for i in fq_dict[fq_dict_key][2:]:
wstring += f"{i}\n"
# if mapped, write all the read lines, but replace sequence
# by N*(len(sequence))
elif fq_dict[fq_dict_key][0] == 'm':
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
wstring += f"{'N'*len(fq_dict[fq_dict_key][2])}\n"
for i in fq_dict[fq_dict_key][3:]:
wstring += f"{i}\n"
fw.write(wstring.encode())
else:
with open(fname, 'w') as fw:
for fq_dict_key in fq_dict_keys:
wstring = ""
if strip_mode == 'strip':
if fq_dict[fq_dict_key][0] == 'u':
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
for i in fq_dict[fq_dict_key][2:]:
wstring += f"{i}\n"
elif strip_mode == 'replace':
# if unmapped, write all the read lines
if fq_dict[fq_dict_key][0] == 'u':
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
for i in fq_dict[fq_dict_key][2:]:
wstring += f"{i}\n"
# if mapped, write all the read lines, but replace sequence
# by N*(len(sequence))
elif fq_dict[fq_dict_key][0] == 'm':
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
wstring += f"{'N'*len(fq_dict[fq_dict_key][2])}\n"
for i in fq_dict[fq_dict_key][3:]:
wstring += f"{i}\n"
fw.write(wstring)


def check_strip_mode(mode):
if mode.lower() not in ['replace', 'strip']:
print(f"Mode must be {' or '.join(mode)}")
return(mode.lower())


if __name__ == "__main__":
BAM, IN_FWD, IN_REV, OUT_FWD, OUT_REV, MODE, PROC = _get_args()

if OUT_FWD == None:
out_fwd = f"{IN_FWD.split('/')[-1].split('.')[0]}.r1.fq"
out_fwd = f"{IN_FWD.split('/')[-1].split('.')[0]}.r1.fq.gz"
else:
out_fwd = OUT_FWD

mapped_reads = extract_mapped(BAM, PROC)
fwd_dict = parse_fq(IN_FWD)
fwd_reads = sort_mapped(fwd_dict, mapped_reads)
write_fq(fwd_reads, out_fwd, MODE)
if out_fwd.endswith(".gz"):
write_mode = "wb"
else:
write_mode = "w"

strip_mode = check_strip_mode(MODE)
BAMFILE = pysam.AlignmentFile(BAM, 'r')

# FORWARD OR SE FILE
print(f"- Extracting mapped reads from {BAM}")
mapped_reads = extract_mapped(proc=PROC)
print(f"- Parsing forward fq file {IN_FWD}")
fqd_fwd = parse_fq(IN_FWD)
print("- Cross referencing mapped reads in forward fq")
fq_dict_fwd = get_mapped_reads(fqd_fwd, mapped_reads)
# print(fq_dict_fwd)
print(f"- Writing forward fq to {out_fwd}")
write_fq(fq_dict=fq_dict_fwd, fname=out_fwd,
write_mode=write_mode, strip_mode=strip_mode, proc=PROC)

# REVERSE FILE
if IN_REV:
if OUT_REV == None:
out_rev = f"{IN_REV.split('/')[-1].split('.')[0]}.r2.fq"
out_rev = f"{IN_REV.split('/')[-1].split('.')[0]}.r2.fq.gz"
else:
out_rev = OUT_REV
rev_dict = parse_fq(IN_REV)
rev_reads = sort_mapped(rev_dict, mapped_reads)
write_fq(rev_reads, out_rev, MODE)
print(f"- Parsing reverse fq file {IN_REV}")
fqd_rev = parse_fq(IN_REV)
print("- Cross referencing mapped reads in reverse fq")
fq_dict_rev = get_mapped_reads(fqd_rev, mapped_reads)
print(f"- Writing reverse fq to {out_rev}")
write_fq(fq_dict=fq_dict_rev, fname=out_rev,
write_mode=write_mode, strip_mode=strip_mode, proc=PROC)
5 changes: 1 addition & 4 deletions conf/base.config
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ process {
memory = { check_max( 2.GB, 'memory' ) }
cache = false
}

withName:strip_input_fastq {
time = { check_max( 4.h * task.attempt, 'time' ) }
}


withName:qualimap{
errorStrategy = 'ignore'
Expand Down
9 changes: 3 additions & 6 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -1313,20 +1313,17 @@ process strip_input_fastq {

script:
if (params.singleEnd) {
out_fwd = bam.baseName+'.stripped.fq'
out_fwd = bam.baseName+'.stripped.fq.gz'
"""
samtools index $bam
extract_map_reads.py $bam ${fq[0]} -m ${params.strip_mode} -of $out_fwd -p ${task.cpus}
pigz -p ${task.cpus} $out_fwd
"""
} else {
out_fwd = bam.baseName+'.stripped.fwd.fq'
out_rev = bam.baseName+'.stripped.rev.fq'
out_fwd = bam.baseName+'.stripped.fwd.fq.gz'
out_rev = bam.baseName+'.stripped.rev.fq.gz'
"""
samtools index $bam
extract_map_reads.py $bam ${fq[0]} -rev ${fq[1]} -m ${params.strip_mode} -of $out_fwd -or $out_rev -p ${task.cpus}
pigz -p ${task.cpus} $out_fwd
pigz -p ${task.cpus} $out_rev
"""
}

Expand Down