Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/qiita-plugin-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: flake8
- name: ruff
uses: actions/setup-python@v2
with:
python-version: 3.9
Expand All @@ -166,5 +166,5 @@ jobs:
uses: actions/checkout@v2
- name: lint
run: |
pip install -q flake8
flake8 .
pip install -q ruff
ruff check .
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ git clone https://github.com/biocore/mg-scripts.git
Create a Python3 Conda environment in which to run the notebook:

```bash
conda create --yes -n spp python='python=3.9' scikit-learn pandas numpy nose pep8 flake8 matplotlib jupyter notebook 'seaborn>=0.7.1' pip openpyxl 'seqtk>=1.4' click scipy fastq-pair
conda create --yes -n spp python='python=3.9' scikit-learn pandas numpy nose ruff matplotlib jupyter notebook 'seaborn>=0.7.1' pip openpyxl 'seqtk>=1.4' click scipy fastq-pair
```

Activate the Conda environment:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ dependencies = [
"pandas",
"lxml",
'requests',
'flake8',
'ruff',
'nose',
'coverage',
'pgzip',
Expand Down
86 changes: 44 additions & 42 deletions src/sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
import glob
import gzip
import os

import click
from sequence_processing_pipeline.util import (iter_paired_files,
determine_orientation)

from sequence_processing_pipeline.util import determine_orientation, iter_paired_files


def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
batch_prefix, allow_fwd_only=False):
'''Partitions input fastqs to coarse bins
def split_similar_size_bins(
data_location_path, max_file_list_size_in_gb, batch_prefix, allow_fwd_only=False
):
"""Partitions input fastqs to coarse bins

:param data_location_path: Path to the ConvertJob directory.
:param max_file_list_size_in_gb: Upper threshold for file-size.
:param batch_prefix: Path + file-name prefix for output-files.
:param allow_fwd_only: ignore rev match, helpful for long reads.
:return: The number of output-files created, size of largest bin.
'''
"""
# to prevent issues w/filenames like the ones below from being mistaken
# for R1 or R2 files, use determine_orientation().
# LS_8_22_2014_R2_SRE_S2_L007_I1_001.fastq.gz
# LS_8_22_2014_R1_SRE_S3_L007_I1_001.fastq.gz

# since the names of all fastq files are being scanned for orientation,
# collect all of them instead of mistakenly pre-filtering some files.
fastq_paths = glob.glob(data_location_path + '/*/*.fastq.gz')
fastq_paths = [x for x in fastq_paths
if determine_orientation(x) in ['R1', 'R2']]
fastq_paths = glob.glob(data_location_path + "/*/*.fastq.gz")
fastq_paths = [x for x in fastq_paths if determine_orientation(x) in ["R1", "R2"]]

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)
max_size = int(max_file_list_size_in_gb) * (2**30) / 2

split_offset = 0

Expand All @@ -42,7 +43,7 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
if allow_fwd_only:
for a in fastq_paths:
r1_size = os.stat(a).st_size
output_base = os.path.dirname(a).split('/')[-1]
output_base = os.path.dirname(a).split("/")[-1]
if current_size + r1_size > max_size:
# bucket is full.
if bucket_size > max_bucket_size:
Expand All @@ -56,7 +57,7 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,

split_offset += 1
current_size = r1_size
fp = open(batch_prefix + '-%d' % split_offset, 'w')
fp = open(batch_prefix + "-%d" % split_offset, "w")
else:
# add to bucket_size
bucket_size += r1_size
Expand All @@ -68,7 +69,7 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
r1_size = os.stat(a).st_size
r2_size = os.stat(b).st_size

output_base = os.path.dirname(a).split('/')[-1]
output_base = os.path.dirname(a).split("/")[-1]
if current_size + r1_size > max_size:
# bucket is full.
if bucket_size > max_bucket_size:
Expand All @@ -82,7 +83,7 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,

split_offset += 1
current_size = r1_size
fp = open(batch_prefix + '-%d' % split_offset, 'w')
fp = open(batch_prefix + "-%d" % split_offset, "w")
else:
# add to bucket_size
bucket_size += r1_size + r2_size
Expand All @@ -93,9 +94,10 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
if fp is not None:
fp.close()

code_dir = 'qp-knight-lab-processing/tests/test_output/'
code_dir = "qp-knight-lab-processing/tests/test_output/"
is_test = data_location_path.endswith(
(f'{code_dir}ConvertJob', f'{code_dir}TRIntegrateJob/integrated'))
(f"{code_dir}ConvertJob", f"{code_dir}TRIntegrateJob/integrated")
)

if split_offset == 0 and not is_test:
raise ValueError("No splits made")
Expand All @@ -104,23 +106,23 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,


def demux_cmd(id_map_fp, fp_fp, out_d, task, maxtask):
with open(id_map_fp, 'r') as f:
with open(id_map_fp, "r") as f:
id_map = f.readlines()
id_map = [line.strip().split('\t') for line in id_map]
id_map = [line.strip().split("\t") for line in id_map]

# fp needs to be an open file handle.
# ensure task and maxtask are proper ints when coming from cmd-line.
with open(fp_fp, 'r') as fp:
with open(fp_fp, "r") as fp:
demux(id_map, fp, out_d, int(task), int(maxtask))


def demux(id_map, fp, out_d, task, maxtask):
"""Split infile data based in provided map"""
delimiter = '::MUX::'
mode = 'wt'
ext = '.fastq.gz'
sep = '/'
rec = '@'
delimiter = "::MUX::"
mode = "wt"
ext = ".fastq.gz"
sep = "/"
rec = "@"

openfps = {}

Expand All @@ -142,7 +144,7 @@ def demux(id_map, fp, out_d, task, maxtask):
pass
current_fp_r1 = gzip.open(fullname_r1, mode)
current_fp_r2 = gzip.open(fullname_r2, mode)
current_fp = {'1': current_fp_r1, '2': current_fp_r2}
current_fp = {"1": current_fp_r1, "2": current_fp_r2}
openfps[idx] = current_fp

# setup a parser
Expand Down Expand Up @@ -181,7 +183,7 @@ def demux(id_map, fp, out_d, task, maxtask):
# no '\n'
orientation = sid[-1]
# hexdump confirms separator is ' ', not '\t'
sid = rec + sid + ' ' + metadata + '\n'
sid = rec + sid + " " + metadata + "\n"
else:
raise ValueError(f"'{sid}' is not a recognized form")

Expand All @@ -201,29 +203,29 @@ def cli():


@cli.command()
@click.option('--id-map', type=click.Path(exists=True), required=True)
@click.option('--infile', type=click.Path(exists=True), required=True)
@click.option('--output', type=click.Path(exists=True), required=True)
@click.option('--task', type=int, required=True)
@click.option('--maxtask', type=int, required=True)
@click.option("--id-map", type=click.Path(exists=True), required=True)
@click.option("--infile", type=click.Path(exists=True), required=True)
@click.option("--output", type=click.Path(exists=True), required=True)
@click.option("--task", type=int, required=True)
@click.option("--maxtask", type=int, required=True)
def demux_just_fwd(id_map, infile, output, task, maxtask):
with open(id_map, 'r') as f:
with open(id_map, "r") as f:
id_map = f.readlines()
id_map = [line.strip().split('\t') for line in id_map]
id_map = [line.strip().split("\t") for line in id_map]

# fp needs to be an open file handle.
# ensure task and maxtask are proper ints when coming from cmd-line.
with open(infile, 'r') as fp:
with open(infile, "r") as fp:
demux_just_fwd_processing(id_map, fp, output, int(task), int(maxtask))


def demux_just_fwd_processing(id_map, fp, out_d, task, maxtask):
"""Split infile data based in provided map"""
delimiter = '::MUX::'
mode = 'wt'
ext = '.fastq.gz'
sep = '/'
rec = '@'
delimiter = "::MUX::"
mode = "wt"
ext = ".fastq.gz"
sep = "/"
rec = "@"

openfps = {}

Expand All @@ -243,7 +245,7 @@ def demux_just_fwd_processing(id_map, fp, out_d, task, maxtask):
except FileExistsError:
pass
current_fp_r1 = gzip.open(fullname_r1, mode)
current_fp = {'1': current_fp_r1}
current_fp = {"1": current_fp_r1}
openfps[idx] = current_fp

# setup a parser
Expand All @@ -253,7 +255,7 @@ def demux_just_fwd_processing(id_map, fp, out_d, task, maxtask):
qual = iter(fp)

# there is only fwd so the orientation is always '1'
orientation = '1'
orientation = "1"

for i, s, d, q in zip(seq_id, seq, dumb, qual):
# '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1'
Expand All @@ -270,7 +272,7 @@ def demux_just_fwd_processing(id_map, fp, out_d, task, maxtask):

current_fp = openfps[fname_encoded]

current_fp[orientation].write(f'{rec}{sid}')
current_fp[orientation].write(f"{rec}{sid}")
current_fp[orientation].write(s)
current_fp[orientation].write(d)
current_fp[orientation].write(q)
Expand Down
Loading
Loading