diff --git a/src/qp_klp/Assays.py b/src/qp_klp/Assays.py index 7ed35cf3..c89fee7c 100644 --- a/src/qp_klp/Assays.py +++ b/src/qp_klp/Assays.py @@ -1,14 +1,15 @@ +from collections import defaultdict +from json import dumps from os import listdir, makedirs, walk -from os.path import isfile, join, basename, dirname, abspath +from os.path import abspath, basename, dirname, isfile, join from shutil import copyfile -from sequence_processing_pipeline.NuQCJob import NuQCJob + +import pandas as pd + from sequence_processing_pipeline.FastQCJob import FastQCJob from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob from sequence_processing_pipeline.MultiQCJob import MultiQCJob -import pandas as pd -from json import dumps -from collections import defaultdict - +from sequence_processing_pipeline.NuQCJob import NuQCJob ASSAY_NAME_NONE = "Assay" ASSAY_NAME_AMPLICON = "Amplicon" @@ -20,7 +21,7 @@ ARTIFACT_TYPE_METAOMICS = "per_sample_FASTQ" -class Assay(): +class Assay: """ Assay encapsulate Job()s and other functionality that varies on the assay-type of the run. All Assays are mixins for Workflow() classes @@ -34,6 +35,7 @@ class Assay(): methods used by other functions in Assay() or its children begin w/'_'. """ + assay_type = ASSAY_NAME_NONE assay_warnings = [] @@ -49,17 +51,17 @@ def _replace_tube_ids_w_sample_names(cls, prep_file_path, tube_id_map): reversed_map = {tube_id_map[k]: k for k in tube_id_map} # passing tube_id_map as a parameter allows for easier testing. - df = pd.read_csv(prep_file_path, sep='\t', dtype=str, index_col=False) + df = pd.read_csv(prep_file_path, sep="\t", dtype=str, index_col=False) # save copy of sample_name column as 'old_sample_name' - df['old_sample_name'] = df['sample_name'] + df["old_sample_name"] = df["sample_name"] for i in df.index: sample_name = df.at[i, "sample_name"] # blanks do not get their names swapped. - if sample_name.startswith('BLANK'): + if sample_name.startswith("BLANK"): continue # remove leading zeroes if they exist to match Qiita results. - sample_name = sample_name.lstrip('0') + sample_name = sample_name.lstrip("0") if sample_name in reversed_map: df.at[i, "sample_name"] = reversed_map[sample_name] @@ -79,8 +81,8 @@ def overwrite_prep_files(self, prep_file_paths): projects = self.pipeline.get_project_info(short_names=True) for project in projects: - project_name = project['project_name'] - qiita_id = str(project['qiita_id']) + project_name = project["project_name"] + qiita_id = str(project["qiita_id"]) if qiita_id not in self.tube_id_map: continue @@ -88,16 +90,17 @@ def overwrite_prep_files(self, prep_file_paths): # prep files are named in the following form: # 20220423_FS10001773_12_BRB11603-0615.Matrix_Tube_LBM_14332.1.tsv fqp_name = "%s_%s" % (project_name, qiita_id) - matching_files = [prep_file for prep_file in prep_file_paths if - fqp_name in prep_file] + matching_files = [ + prep_file for prep_file in prep_file_paths if fqp_name in prep_file + ] if len(matching_files) == 0: continue for matching_file in matching_files: - Assay._replace_tube_ids_w_sample_names(matching_file, - self.tube_id_map[ - qiita_id]) + Assay._replace_tube_ids_w_sample_names( + matching_file, self.tube_id_map[qiita_id] + ) @classmethod def _parse_prep_file(cls, prep_file_path, convert_to_dict=True): @@ -107,20 +110,22 @@ def _parse_prep_file(cls, prep_file_path, convert_to_dict=True): :param convert_to_dict: If True, a dict() is returned. :return: A DataFrame() is returned, unless convert_to_dict is True. """ - metadata = pd.read_csv(prep_file_path, - dtype=str, - delimiter='\t', - # forces Pandas to not make the first column - # the index even when the values appear numeric. - index_col=False) + metadata = pd.read_csv( + prep_file_path, + dtype=str, + delimiter="\t", + # forces Pandas to not make the first column + # the index even when the values appear numeric. + index_col=False, + ) if metadata is None: raise ValueError(f"{prep_file_path} does not exist.") - metadata.set_index('sample_name', inplace=True) + metadata.set_index("sample_name", inplace=True) if convert_to_dict: - return metadata.to_dict('index') + return metadata.to_dict("index") else: return metadata @@ -131,7 +136,7 @@ def _generate_artifact_name(self, prep_file_path): :return: If prep is a replicate, returns artifact-name, repl-number, and True. Otherwise, returns artifact-name and False. """ - a_name = f'{self.pipeline.run_id}_{self.lane_number}' + a_name = f"{self.pipeline.run_id}_{self.lane_number}" repl_num = basename(dirname(prep_file_path)) if self.has_replicates is True: @@ -139,16 +144,16 @@ def _generate_artifact_name(self, prep_file_path): # append a replication number to each name to # make it unique from other replicates. # return ('%s_r%s' % (a_name, result[1]), True) - return ('%s_r%s' % (a_name, repl_num), True) + return ("%s_r%s" % (a_name, repl_num), True) else: # this is a normal pre-prep or sample-sheet. return (a_name, False) def execute_pipeline(self): - ''' + """ Executes steps of pipeline in proper sequence. :return: None - ''' + """ # pre_check-ing the status of the workflow self.pre_check() @@ -188,7 +193,7 @@ def execute_pipeline(self): # moved final component of genprepfilejob outside of object. # obtain the paths to the prep-files generated by GenPrepFileJob # w/out having to recover full state. - tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') + tmp = join(self.pipeline.output_path, "GenPrepFileJob", "PrepFiles") self.has_replicates = False @@ -199,14 +204,14 @@ def execute_pipeline(self): for _file in files: # we are looing for .tsv files and we are only interested # in the string after the last _, which is the study_id - if not _file.endswith('.tsv'): + if not _file.endswith(".tsv"): continue # continue if no underscore - chunks = _file.rsplit('_', 1) + chunks = _file.rsplit("_", 1) if len(chunks) <= 1: continue # continue if no int after . - qid = chunks[-1].split('.')[0] + qid = chunks[-1].split(".")[0] if not qid.isnumeric(): continue if qid not in self.prep_file_paths: @@ -217,7 +222,7 @@ def execute_pipeline(self): self.prep_file_paths[qid].append(_path) for _dir in dirs: - if _dir == '1': + if _dir == "1": # if PrepFiles contains the '1' directory, then it's a # given that this sample-sheet contains replicates. self.has_replicates = True @@ -256,7 +261,7 @@ def execute_pipeline(self): # before we pack the results, we need to generate the human-readable # report of samples lost in each step. The tracking is being done # within fsr (FailedSamplesRecord), in conjuction with Job.audit. - if hasattr(self, 'fsr'): + if hasattr(self, "fsr"): self.fsr.generate_report() self.update_status("Generating packaging commands", 8, 9) @@ -265,9 +270,9 @@ def execute_pipeline(self): # store the warnings, if they exist so they are packed with the # final results if self.assay_warnings: - wfp = f'{self.pipeline.output_path}/final_results/WARNINGS.txt' - with open(wfp, 'w') as f: - f.write('\n'.join(self.assay_warnings)) + wfp = f"{self.pipeline.output_path}/final_results/WARNINGS.txt" + with open(wfp, "w") as f: + f.write("\n".join(self.assay_warnings)) self.update_status("Packaging results", 9, 9) if self.update: @@ -275,8 +280,8 @@ def execute_pipeline(self): class Amplicon(Assay): - AMPLICON_TYPE = 'Amplicon' - AMPLICON_SUB_TYPES = {'16S', '18S', 'ITS'} + AMPLICON_TYPE = "Amplicon" + AMPLICON_SUB_TYPES = {"16S", "18S", "ITS"} assay_type = ASSAY_NAME_AMPLICON def qc_reads(self): @@ -291,7 +296,7 @@ def qc_reads(self): # Simulate NuQCJob's output directory for use as input into FastQCJob. projects = self.pipeline.get_project_info() - projects = [x['project_name'] for x in projects] + projects = [x["project_name"] for x in projects] for project_name in projects: # FastQC expects the ConvertJob output to also be organized by @@ -303,10 +308,17 @@ def qc_reads(self): output_folder = join(self.raw_fastq_files_path, project_name) makedirs(output_folder) - job_output = [join(self.raw_fastq_files_path, x) for x in - listdir(self.raw_fastq_files_path)] - job_output = [x for x in job_output if isfile(x) and x.endswith( - 'fastq.gz') and not basename(x).startswith('Undetermined')] + job_output = [ + join(self.raw_fastq_files_path, x) + for x in listdir(self.raw_fastq_files_path) + ] + job_output = [ + x + for x in job_output + if isfile(x) + and x.endswith("fastq.gz") + and not basename(x).startswith("Undetermined") + ] for raw_fastq_file in job_output: new_path = join(output_folder, basename(raw_fastq_file)) @@ -314,15 +326,17 @@ def qc_reads(self): # copy the files from ConvertJob output to faked NuQCJob output # folder: $WKDIR/$RUN_ID/NuQCJob/$PROJ_NAME/amplicon - output_folder = join(self.pipeline.output_path, - 'NuQCJob', - project_name, - # for legacy purposes, output folders are - # either 'trimmed_sequences', 'amplicon', or - # 'filtered_sequences'. Hence, this folder - # is not defined using AMPLICON_TYPE as that - # value may or may not equal the needed value. - 'amplicon') + output_folder = join( + self.pipeline.output_path, + "NuQCJob", + project_name, + # for legacy purposes, output folders are + # either 'trimmed_sequences', 'amplicon', or + # 'filtered_sequences'. Hence, this folder + # is not defined using AMPLICON_TYPE as that + # value may or may not equal the needed value. + "amplicon", + ) makedirs(output_folder) # copy the file @@ -331,68 +345,74 @@ def qc_reads(self): copyfile(fastq_file, new_path) def generate_reports(self): - config = self.pipeline.get_software_configuration('fastqc') - fcjob = FastQCJob(self.pipeline.run_dir, - self.pipeline.output_path, - self.raw_fastq_files_path, - join(self.pipeline.output_path, 'NuQCJob'), - config['nprocs'], - config['nthreads'], - config['fastqc_executable_path'], - config['modules_to_load'], - self.master_qiita_job_id, - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['job_total_memory_limit'], - config['job_pool_size'], - config['job_max_array_length'], - True) - mqcjob = MultiQCJob(self.pipeline.run_dir, - self.pipeline.output_path, - self.raw_fastq_files_path, - join(self.pipeline.output_path, 'NuQCJob'), - config['nprocs'], - config['nthreads'], - config['multiqc_executable_path'], - config['modules_to_load'], - self.master_qiita_job_id, - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['job_total_memory_limit'], - config['job_pool_size'], - join(self.pipeline.output_path, 'FastQCJob'), - config['job_max_array_length'], - config['multiqc_config_file_path'], - True) - - if 'FastQCJob' not in self.skip_steps: + config = self.pipeline.get_software_configuration("fastqc") + fcjob = FastQCJob( + self.pipeline.run_dir, + self.pipeline.output_path, + self.raw_fastq_files_path, + join(self.pipeline.output_path, "NuQCJob"), + config["nprocs"], + config["nthreads"], + config["fastqc_executable_path"], + config["modules_to_load"], + self.master_qiita_job_id, + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["job_total_memory_limit"], + config["job_pool_size"], + config["job_max_array_length"], + True, + ) + mqcjob = MultiQCJob( + self.pipeline.run_dir, + self.pipeline.output_path, + self.raw_fastq_files_path, + join(self.pipeline.output_path, "NuQCJob"), + config["nprocs"], + config["nthreads"], + config["multiqc_executable_path"], + config["modules_to_load"], + self.master_qiita_job_id, + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["job_total_memory_limit"], + config["job_pool_size"], + join(self.pipeline.output_path, "FastQCJob"), + config["job_max_array_length"], + config["multiqc_config_file_path"], + True, + ) + + if "FastQCJob" not in self.skip_steps: fcjob.run(callback=self.job_callback) - if 'MultiQCJob' not in self.skip_steps: + if "MultiQCJob" not in self.skip_steps: mqcjob.run(callback=self.job_callback) def generate_prep_file(self): - config = self.pipeline.get_software_configuration('seqpro') + config = self.pipeline.get_software_configuration("seqpro") # NB: For amplicon runs, the executable used is currently a variant # of seqpro called 'seqpro_mf'. It is stored in the same location as # 'seqpro'. - seqpro_path = config['seqpro_path'].replace('seqpro', 'seqpro_mf') - - job = GenPrepFileJob(self.pipeline.run_dir, - self.raw_fastq_files_path, - join(self.pipeline.output_path, 'NuQCJob'), - self.pipeline.output_path, - self.pipeline.input_file_path, - seqpro_path, - config['modules_to_load'], - self.master_qiita_job_id, - self.reports_path, - is_amplicon=True) - - if 'GenPrepFileJob' not in self.skip_steps: + seqpro_path = config["seqpro_path"].replace("seqpro", "seqpro_mf") + + job = GenPrepFileJob( + self.pipeline.run_dir, + self.raw_fastq_files_path, + join(self.pipeline.output_path, "NuQCJob"), + self.pipeline.output_path, + self.pipeline.input_file_path, + seqpro_path, + config["modules_to_load"], + self.master_qiita_job_id, + self.reports_path, + is_amplicon=True, + ) + + if "GenPrepFileJob" not in self.skip_steps: job.run(callback=self.job_callback) self.dereplicated_input_file_paths = job.dereplicated_input_file_paths @@ -410,32 +430,33 @@ def update_prep_templates(self): for prep_fp in self.prep_file_paths[study_id]: metadata = Assay._parse_prep_file(prep_fp) afact_name, is_repl = self._generate_artifact_name(prep_fp) - data = {'prep_info': dumps(metadata), - 'study': study_id, - 'data_type': None, - 'job-id': self.master_qiita_job_id, - 'name': afact_name} - - if 'target_gene' in metadata[list(metadata.keys())[0]]: - tg = metadata[list(metadata.keys())[0]]['target_gene'] + data = { + "prep_info": dumps(metadata), + "study": study_id, + "data_type": None, + "job-id": self.master_qiita_job_id, + "name": afact_name, + } + + if "target_gene" in metadata[list(metadata.keys())[0]]: + tg = metadata[list(metadata.keys())[0]]["target_gene"] for key in Amplicon.AMPLICON_SUB_TYPES: if key in tg: - data['data_type'] = key + data["data_type"] = key - if data['data_type'] is None: - raise ValueError("data_type could not be " - "determined from target_gene " - "column") + if data["data_type"] is None: + raise ValueError( + "data_type could not be determined from target_gene column" + ) else: - raise ValueError("target_gene must be specified for " - "amplicon type") + raise ValueError("target_gene must be specified for amplicon type") - reply = self.qclient.post('/qiita_db/prep_template/', - data=data) - prep_id = reply['prep'] + reply = self.qclient.post("/qiita_db/prep_template/", data=data) + prep_id = reply["prep"] results[study_id].append((prep_id, afact_name, is_repl)) - self.run_prefixes[prep_id] = [metadata[sample]['run_prefix'] - for sample in metadata] + self.run_prefixes[prep_id] = [ + metadata[sample]["run_prefix"] for sample in metadata + ] self.touched_studies_prep_info = results return results @@ -444,7 +465,8 @@ def load_preps_into_qiita(self): data = [] for project, _, qiita_id in self.special_map: fastq_files = self._get_postqc_fastq_files( - self.pipeline.output_path, project) + self.pipeline.output_path, project + ) for vals in self.touched_studies_prep_info[qiita_id]: prep_id, artifact_name, is_repl = vals @@ -462,15 +484,30 @@ def load_preps_into_qiita(self): else: working_set = fastq_files - data.append(self._load_prep_into_qiita( - self.qclient, prep_id, artifact_name, qiita_id, project, - working_set, ARTIFACT_TYPE_AMPLICON)) + data.append( + self._load_prep_into_qiita( + self.qclient, + prep_id, + artifact_name, + qiita_id, + project, + working_set, + ARTIFACT_TYPE_AMPLICON, + ) + ) df = pd.DataFrame(data) - opath = join(self.pipeline.output_path, 'touched_studies.html') - with open(opath, 'w') as f: - f.write(df.to_html(border=2, index=False, justify="left", - render_links=True, escape=False)) + opath = join(self.pipeline.output_path, "touched_studies.html") + with open(opath, "w") as f: + f.write( + df.to_html( + border=2, + index=False, + justify="left", + render_links=True, + escape=False, + ) + ) return df @@ -480,119 +517,128 @@ class MetaOmic(Assay): MetaOmic() is a base class for Metagenomic() and Metatranscriptomic(), which are currently identical in functionality. """ + # MetaOmic does not have an assay_type of its own. It is defined by its # children. def qc_reads(self): # because this is a mixin, assume containing object will contain # a pipeline object. - config = self.pipeline.get_software_configuration('nu-qc') + config = self.pipeline.get_software_configuration("nu-qc") # files_regex is used with # sequencing_processing_pipeline.util.FILES_REGEX to decide # the file formats to look for. In this case, if self.files_regex # is not defined, just fallback to the SPP expected default regex - if not hasattr(self, 'files_regex'): - self.files_regex = 'SPP' + if not hasattr(self, "files_regex"): + self.files_regex = "SPP" # base quality control used by multiple Assay types. - job = NuQCJob(self.raw_fastq_files_path, - self.pipeline.output_path, - self.pipeline.sample_sheet.path, - config['minimap2_databases'], - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['job_total_memory_limit'], - config['fastp_executable_path'], - config['minimap2_executable_path'], - config['samtools_executable_path'], - config['modules_to_load'], - self.master_qiita_job_id, - config['job_max_array_length'], - config['known_adapters_path'], - config['movi_executable_path'], - config['gres_value'], - config['pmls_path'], - config['additional_fastq_tags'], - bucket_size=config['bucket_size'], - length_limit=config['length_limit'], - cores_per_task=config['cores_per_task'], - files_regex=self.files_regex, - read_length=self.read_length) - - if 'NuQCJob' not in self.skip_steps: + job = NuQCJob( + self.raw_fastq_files_path, + self.pipeline.output_path, + self.pipeline.sample_sheet.path, + config["minimap2_databases"], + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["job_total_memory_limit"], + config["fastp_executable_path"], + config["minimap2_executable_path"], + config["samtools_executable_path"], + config["modules_to_load"], + self.master_qiita_job_id, + config["job_max_array_length"], + config["known_adapters_path"], + config["movi_executable_path"], + config["gres_value"], + config["pmls_path"], + config["additional_fastq_tags"], + bucket_size=config["bucket_size"], + length_limit=config["length_limit"], + cores_per_task=config["cores_per_task"], + files_regex=self.files_regex, + read_length=self.read_length, + ) + + if "NuQCJob" not in self.skip_steps: job.run(callback=self.job_callback) # audit the results to determine which samples failed to convert # properly. Append these to the failed-samples report and also # return the list directly to the caller. failed_samples = job.audit(self.pipeline.get_sample_ids()) - if hasattr(self, 'fsr'): + if hasattr(self, "fsr"): self.fsr.write(failed_samples, job.__class__.__name__) return failed_samples def generate_reports(self): - config = self.pipeline.get_software_configuration('fastqc') - fqjob = FastQCJob(self.pipeline.run_dir, - self.pipeline.output_path, - self.raw_fastq_files_path, - join(self.pipeline.output_path, 'NuQCJob'), - config['nprocs'], - config['nthreads'], - config['fastqc_executable_path'], - config['modules_to_load'], - self.master_qiita_job_id, - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['job_total_memory_limit'], - config['job_pool_size'], - config['job_max_array_length'], - False) - mqcjob = MultiQCJob(self.pipeline.run_dir, - self.pipeline.output_path, - self.raw_fastq_files_path, - join(self.pipeline.output_path, 'NuQCJob'), - config['nprocs'], - config['nthreads'], - config['multiqc_executable_path'], - config['modules_to_load'], - self.master_qiita_job_id, - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['job_total_memory_limit'], - config['job_pool_size'], - join(self.pipeline.output_path, 'FastQCJob'), - config['job_max_array_length'], - config['multiqc_config_file_path'], - False) - - if 'FastQCJob' not in self.skip_steps: + config = self.pipeline.get_software_configuration("fastqc") + fqjob = FastQCJob( + self.pipeline.run_dir, + self.pipeline.output_path, + self.raw_fastq_files_path, + join(self.pipeline.output_path, "NuQCJob"), + config["nprocs"], + config["nthreads"], + config["fastqc_executable_path"], + config["modules_to_load"], + self.master_qiita_job_id, + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["job_total_memory_limit"], + config["job_pool_size"], + config["job_max_array_length"], + False, + ) + mqcjob = MultiQCJob( + self.pipeline.run_dir, + self.pipeline.output_path, + self.raw_fastq_files_path, + join(self.pipeline.output_path, "NuQCJob"), + config["nprocs"], + config["nthreads"], + config["multiqc_executable_path"], + config["modules_to_load"], + self.master_qiita_job_id, + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["job_total_memory_limit"], + config["job_pool_size"], + join(self.pipeline.output_path, "FastQCJob"), + config["job_max_array_length"], + config["multiqc_config_file_path"], + False, + ) + + if "FastQCJob" not in self.skip_steps: fqjob.run(callback=self.job_callback) - if 'MultiQCJob' not in self.skip_steps: + if "MultiQCJob" not in self.skip_steps: mqcjob.run(callback=self.job_callback) failed_samples = fqjob.audit(self.pipeline.get_sample_ids()) - if hasattr(self, 'fsr'): + if hasattr(self, "fsr"): self.fsr.write(failed_samples, fqjob.__class__.__name__) return failed_samples def generate_prep_file(self): - config = self.pipeline.get_software_configuration('seqpro') - - job = GenPrepFileJob(self.pipeline.run_dir, - self.raw_fastq_files_path, - join(self.pipeline.output_path, 'NuQCJob'), - self.pipeline.output_path, - self.pipeline.input_file_path, - config['seqpro_path'], - config['modules_to_load'], - self.master_qiita_job_id, - self.reports_path) - - if 'GenPrepFileJob' not in self.skip_steps: + config = self.pipeline.get_software_configuration("seqpro") + + job = GenPrepFileJob( + self.pipeline.run_dir, + self.raw_fastq_files_path, + join(self.pipeline.output_path, "NuQCJob"), + self.pipeline.output_path, + self.pipeline.input_file_path, + config["seqpro_path"], + config["modules_to_load"], + self.master_qiita_job_id, + self.reports_path, + ) + + if "GenPrepFileJob" not in self.skip_steps: job.run(callback=self.job_callback) self.dereplicated_input_file_paths = job.dereplicated_input_file_paths @@ -613,27 +659,31 @@ def update_prep_templates(self): # the preps are created by seqpro within the GenPrepFileJob; # thus, the "best" place to overwrite the values of the # metadata are here - if hasattr(self, 'overwrite_prep_with_original') and \ - self.overwrite_prep_with_original: - sid = basename(prep_fp).split('.')[1] + if ( + hasattr(self, "overwrite_prep_with_original") + and self.overwrite_prep_with_original + ): + sid = basename(prep_fp).split(".")[1] afact_name = sid - prep_fp = f'{self.pipeline.output_path}/original-prep.csv' + prep_fp = f"{self.pipeline.output_path}/original-prep.csv" metadata = Assay._parse_prep_file(prep_fp) - data = {'prep_info': dumps(metadata), - 'study': study_id, - 'job-id': self.master_qiita_job_id, - 'name': afact_name, - 'data_type': self.pipeline.pipeline_type} + data = { + "prep_info": dumps(metadata), + "study": study_id, + "job-id": self.master_qiita_job_id, + "name": afact_name, + "data_type": self.pipeline.pipeline_type, + } # since all Assays are mixins for Workflows, assume # self.qclient exists and available. - reply = self.qclient.post('/qiita_db/prep_template/', - data=data) - prep_id = reply['prep'] + reply = self.qclient.post("/qiita_db/prep_template/", data=data) + prep_id = reply["prep"] results[study_id].append((prep_id, afact_name, is_repl)) - self.run_prefixes[prep_id] = [metadata[sample]['run_prefix'] - for sample in metadata] + self.run_prefixes[prep_id] = [ + metadata[sample]["run_prefix"] for sample in metadata + ] self.touched_studies_prep_info = results return results @@ -643,7 +693,8 @@ def load_preps_into_qiita(self): empty_projects = [] for project, _, qiita_id in self.special_map: fastq_files = self._get_postqc_fastq_files( - self.pipeline.output_path, project) + self.pipeline.output_path, project + ) for vals in self.touched_studies_prep_info[qiita_id]: prep_id, artifact_name, is_repl = vals @@ -653,9 +704,9 @@ def load_preps_into_qiita(self): for key in fastq_files: working_set[key] = [] for run_prefix in self.run_prefixes[prep_id]: - working_set[key] += [fastq for fastq in - fastq_files[key] if - run_prefix in fastq] + working_set[key] += [ + fastq for fastq in fastq_files[key] if run_prefix in fastq + ] if is_repl: working_set = self._copy_files(working_set) @@ -666,28 +717,43 @@ def load_preps_into_qiita(self): if not v: empty_projects.append(project) - data.append(self._load_prep_into_qiita( - self.qclient, prep_id, artifact_name, qiita_id, project, - working_set, ARTIFACT_TYPE_METAOMICS)) + data.append( + self._load_prep_into_qiita( + self.qclient, + prep_id, + artifact_name, + qiita_id, + project, + working_set, + ARTIFACT_TYPE_METAOMICS, + ) + ) if empty_projects: ep = set(empty_projects) - raise ValueError(f'These projects have no files: {ep}') + raise ValueError(f"These projects have no files: {ep}") df = pd.DataFrame(data) - opath = join(self.pipeline.output_path, 'touched_studies.html') - with open(opath, 'w') as f: - f.write(df.to_html(border=2, index=False, justify="left", - render_links=True, escape=False)) + opath = join(self.pipeline.output_path, "touched_studies.html") + with open(opath, "w") as f: + f.write( + df.to_html( + border=2, + index=False, + justify="left", + render_links=True, + escape=False, + ) + ) return df class Metagenomic(MetaOmic): - METAGENOMIC_TYPE = 'Metagenomic' + METAGENOMIC_TYPE = "Metagenomic" assay_type = ASSAY_NAME_METAGENOMIC class Metatranscriptomic(MetaOmic): - METATRANSCRIPTOMIC_TYPE = 'Metatranscriptomic' + METATRANSCRIPTOMIC_TYPE = "Metatranscriptomic" assay_type = ASSAY_NAME_METATRANSCRIPTOMIC diff --git a/src/qp_klp/FailedSamplesRecord.py b/src/qp_klp/FailedSamplesRecord.py index 73e0153b..330fe7d1 100644 --- a/src/qp_klp/FailedSamplesRecord.py +++ b/src/qp_klp/FailedSamplesRecord.py @@ -1,5 +1,6 @@ from json import dumps, load -from os.path import join, exists +from os.path import exists, join + import pandas as pd @@ -9,8 +10,8 @@ def __init__(self, output_dir, samples): # each Job is run, and we want to organize that output by project, we # need to keep a running state of failed samples, and reuse the method # to reorganize the running-results and write them out to disk. - self.output_path = join(output_dir, 'failed_samples.json') - self.report_path = join(output_dir, 'failed_samples.html') + self.output_path = join(output_dir, "failed_samples.json") + self.report_path = join(output_dir, "failed_samples.html") # create an initial dictionary with sample-ids as keys and their # associated project-name and status as values. Afterwards, we'll @@ -21,20 +22,19 @@ def __init__(self, output_dir, samples): self.project_map = {x.Sample_ID: x.Sample_Project for x in samples} def dump(self): - output = {'sample_state': self.sample_state, - 'project_map': self.project_map} + output = {"sample_state": self.sample_state, "project_map": self.project_map} - with open(self.output_path, 'w') as f: + with open(self.output_path, "w") as f: f.write(dumps(output, indent=2, sort_keys=True)) def load(self): # if recorded state exists, overwrite initial state. if exists(self.output_path): - with open(self.output_path, 'r') as f: + with open(self.output_path, "r") as f: state = load(f) - self.sample_state = state['sample_state'] - self.project_map = state['project_map'] + self.sample_state = state["sample_state"] + self.project_map = state["project_map"] def update(self, failed_ids, job_name): # as a rule, if a failed_id were to appear in more than one @@ -54,17 +54,30 @@ def write(self, failed_ids, job_name): def generate_report(self): # filter out the sample-ids w/out a failure status - filtered_fails = {x: self.sample_state[x] for x in self.sample_state if - self.sample_state[x] is not None} + filtered_fails = { + x: self.sample_state[x] + for x in self.sample_state + if self.sample_state[x] is not None + } data = [] for sample_id in filtered_fails: - data.append({'Project': filtered_fails[sample_id], - 'Sample ID': sample_id, - 'Failed at': self.project_map[sample_id] - }) + data.append( + { + "Project": filtered_fails[sample_id], + "Sample ID": sample_id, + "Failed at": self.project_map[sample_id], + } + ) df = pd.DataFrame(data) - with open(self.report_path, 'w') as f: - f.write(df.to_html(border=2, index=False, justify="left", - render_links=True, escape=False)) + with open(self.report_path, "w") as f: + f.write( + df.to_html( + border=2, + index=False, + justify="left", + render_links=True, + escape=False, + ) + ) diff --git a/src/qp_klp/PacBioMetagenomicWorkflow.py b/src/qp_klp/PacBioMetagenomicWorkflow.py index 7a83d8e5..78ff1000 100644 --- a/src/qp_klp/PacBioMetagenomicWorkflow.py +++ b/src/qp_klp/PacBioMetagenomicWorkflow.py @@ -1,71 +1,83 @@ -from .Protocol import PacBio +from os.path import join + +import pandas as pd + from sequence_processing_pipeline.Pipeline import Pipeline -from .Assays import Metagenomic -from .Assays import ASSAY_NAME_METAGENOMIC + +from .Assays import ASSAY_NAME_METAGENOMIC, Metagenomic from .FailedSamplesRecord import FailedSamplesRecord +from .Protocol import PacBio from .Workflows import Workflow -import pandas as pd -from os.path import join class PacBioMetagenomicWorkflow(Workflow, Metagenomic, PacBio): def __init__(self, **kwargs): super().__init__(**kwargs) - self.mandatory_attributes = ['qclient', 'uif_path', - 'lane_number', 'config_fp', - 'run_identifier', 'output_dir', 'job_id', - 'is_restart'] + self.mandatory_attributes = [ + "qclient", + "uif_path", + "lane_number", + "config_fp", + "run_identifier", + "output_dir", + "job_id", + "is_restart", + ] self.confirm_mandatory_attributes() # second stage initializer that could conceivably be pushed down into # specific children requiring specific parameters. - self.qclient = self.kwargs['qclient'] + self.qclient = self.kwargs["qclient"] self.overwrite_prep_with_original = False - if 'overwrite_prep_with_original' in self.kwargs: - self.overwrite_prep_with_original = \ - self.kwargs['overwrite_prep_with_original'] - self.pipeline = Pipeline(self.kwargs['config_fp'], - self.kwargs['run_identifier'], - self.kwargs['uif_path'], - self.kwargs['output_dir'], - self.kwargs['job_id'], - ASSAY_NAME_METAGENOMIC, - lane_number=self.kwargs['lane_number']) - - self.fsr = FailedSamplesRecord(self.kwargs['output_dir'], - self.pipeline.sample_sheet.samples) + if "overwrite_prep_with_original" in self.kwargs: + self.overwrite_prep_with_original = self.kwargs[ + "overwrite_prep_with_original" + ] + self.pipeline = Pipeline( + self.kwargs["config_fp"], + self.kwargs["run_identifier"], + self.kwargs["uif_path"], + self.kwargs["output_dir"], + self.kwargs["job_id"], + ASSAY_NAME_METAGENOMIC, + lane_number=self.kwargs["lane_number"], + ) + + self.fsr = FailedSamplesRecord( + self.kwargs["output_dir"], self.pipeline.sample_sheet.samples + ) samples = [ - {'barcode': sample['barcode_id'], - 'sample_name': sample['Sample_ID'], - 'project_name': sample['Sample_Project'], - 'lane': sample['Lane']} - for sample in self.pipeline.sample_sheet.samples] + { + "barcode": sample["barcode_id"], + "sample_name": sample["Sample_ID"], + "project_name": sample["Sample_Project"], + "lane": sample["Lane"], + } + for sample in self.pipeline.sample_sheet.samples + ] df = pd.DataFrame(samples) sample_list_fp = f"{self.kwargs['output_dir']}/sample_list.tsv" - df.to_csv(sample_list_fp, sep='\t', index=False) + df.to_csv(sample_list_fp, sep="\t", index=False) - self.master_qiita_job_id = self.kwargs['job_id'] + self.master_qiita_job_id = self.kwargs["job_id"] - self.lane_number = self.kwargs['lane_number'] - self.is_restart = bool(self.kwargs['is_restart']) + self.lane_number = self.kwargs["lane_number"] + self.is_restart = bool(self.kwargs["is_restart"]) if self.is_restart is True: - self.raw_fastq_files_path = join(self.pipeline.output_path, - 'ConvertJob') - self.reports_path = join(self.raw_fastq_files_path, - 'SeqCounts.csv') + self.raw_fastq_files_path = join(self.pipeline.output_path, "ConvertJob") + self.reports_path = join(self.raw_fastq_files_path, "SeqCounts.csv") self.determine_steps_to_skip() # this is a convenience member to allow testing w/out updating Qiita. self.update = True - if 'update_qiita' in kwargs: - if not isinstance(kwargs['update_qiita'], bool): - raise ValueError("value for 'update_qiita' must be of " - "type bool") + if "update_qiita" in kwargs: + if not isinstance(kwargs["update_qiita"], bool): + raise ValueError("value for 'update_qiita' must be of type bool") - self.update = kwargs['update_qiita'] + self.update = kwargs["update_qiita"] diff --git a/src/qp_klp/Protocol.py b/src/qp_klp/Protocol.py index 4191d757..a29b0a7d 100644 --- a/src/qp_klp/Protocol.py +++ b/src/qp_klp/Protocol.py @@ -1,54 +1,61 @@ -from sequence_processing_pipeline.ConvertJob import ( - ConvertJob, ConvertPacBioBam2FastqJob) -from sequence_processing_pipeline.TellReadJob import TellReadJob -from sequence_processing_pipeline.SeqCountsJob import SeqCountsJob -from sequence_processing_pipeline.TRIntegrateJob import TRIntegrateJob -from sequence_processing_pipeline.PipelineError import PipelineError -from sequence_processing_pipeline.util import determine_orientation -from re import match +from glob import glob from os import makedirs, rename, walk -from os.path import join, split, basename, dirname, exists +from os.path import basename, dirname, exists, join, split +from re import match + +import pandas as pd from metapool import load_sample_sheet from metapool.sample_sheet import ( - PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ, - PROTOCOL_NAME_PACBIO_SMRT) -import pandas as pd -from glob import glob + PROTOCOL_NAME_ILLUMINA, + PROTOCOL_NAME_PACBIO_SMRT, + PROTOCOL_NAME_TELLSEQ, +) from qiita_client.util import system_call +from sequence_processing_pipeline.ConvertJob import ( + ConvertJob, + ConvertPacBioBam2FastqJob, +) +from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.SeqCountsJob import SeqCountsJob +from sequence_processing_pipeline.TellReadJob import TellReadJob +from sequence_processing_pipeline.TRIntegrateJob import TRIntegrateJob +from sequence_processing_pipeline.util import determine_orientation PROTOCOL_NAME_NONE = "None" -class Protocol(): +class Protocol: """ Protocols encapsulate Job()s and other functionality that vary on the nature of the Instrument used to create the raw data. All Instruments are mixins for Workflow() classes and shouldn't define their own initialization. """ + protocol_type = PROTOCOL_NAME_NONE # this value was selected by looking at all the successful NuQC/SPP jobs, # the max sequeces were: 712,497,596 MAX_READS = 720000000 def subsample_reads(self): - if self.assay_type == 'Amplicon': + if self.assay_type == "Amplicon": return df = pd.read_csv(self.reports_path) - if 'raw_reads_r1r2' in df.columns: + if "raw_reads_r1r2" in df.columns: # this is a TellSeq run: SeqCounts.csv - read_col = 'raw_reads_r1r2' - index_col = 'Sample_ID' - elif '# Reads' in df.columns: + read_col = "raw_reads_r1r2" + index_col = "Sample_ID" + elif "# Reads" in df.columns: # this is a Illumina: Demultiplex_Stats.csv - read_col = '# Reads' - index_col = 'SampleID' + read_col = "# Reads" + index_col = "SampleID" else: raise ValueError( - 'Not sure how to check for seq counts to subsample, ' - 'please let an admin know.') + "Not sure how to check for seq counts to subsample, " + "please let an admin know." + ) # df will keep any rows/samples with more than the self.MAX_READS df = df[df[read_col] > self.MAX_READS] if df.shape[0]: @@ -56,7 +63,7 @@ def subsample_reads(self): sn = row[index_col] # look for any sample (fwd/rev pairs) that have the sample_name # as prefix of their filename - files = glob(f'{self.raw_fastq_files_path}/*/{sn}*.fastq.gz') + files = glob(f"{self.raw_fastq_files_path}/*/{sn}*.fastq.gz") # for each file let's get their folder (dn) and filename (bn), # then create a fullpath with with dn and bn where we are # changing the filename from fastq.gz to full.gz; then @@ -65,19 +72,19 @@ def subsample_reads(self): for f in files: dn = dirname(f) bn = basename(f) - nbn = join(dn, bn.replace('fastq.gz', 'full.gz')) - cmd = f'mv {f} {nbn}' + nbn = join(dn, bn.replace("fastq.gz", "full.gz")) + cmd = f"mv {f} {nbn}" _, se, rv = system_call(cmd) if rv != 0 or se: - raise ValueError(f'Error during mv: {cmd}. {se}') - cmd = (f'seqtk sample -s 42 {nbn} {self.MAX_READS} ' - f'| gzip > {f}') + raise ValueError(f"Error during mv: {cmd}. {se}") + cmd = f"seqtk sample -s 42 {nbn} {self.MAX_READS} | gzip > {f}" _, se, rv = system_call(cmd) if rv != 0 or se: - raise ValueError(f'Error during seqtk: {cmd}. {se}') + raise ValueError(f"Error during seqtk: {cmd}. {se}") self.assay_warnings.append( - f'{sn} ({bn}) had {row[read_col]} sequences, ' - f'subsampling to {self.MAX_READS}') + f"{sn} ({bn}) had {row[read_col]} sequences, " + f"subsampling to {self.MAX_READS}" + ) class Illumina(Protocol): @@ -85,23 +92,24 @@ class Illumina(Protocol): # required files for successful operation for Illumina (making the default # here) both RTAComplete.txt and RunInfo.xml should reside in the root of # the run directory. - required_files = ['RTAComplete.txt', 'RunInfo.xml'] - read_length = 'short' + required_files = ["RTAComplete.txt", "RunInfo.xml"] + read_length = "short" def __init__(self) -> None: super().__init__() for some_file in self.required_files: if not exists(join(self.run_dir, some_file)): - raise PipelineError(f"required file '{some_file}' is not " - f"present in {self.run_dir}.") + raise PipelineError( + f"required file '{some_file}' is not present in {self.run_dir}." + ) # verify that RunInfo.xml file is readable. try: - fp = open(join(self.run_dir, 'RunInfo.xml')) + fp = open(join(self.run_dir, "RunInfo.xml")) fp.close() except PermissionError: - raise PipelineError('RunInfo.xml is present, but not readable') + raise PipelineError("RunInfo.xml is present, but not readable") def convert_raw_to_fastq(self): def get_config(command): @@ -123,41 +131,43 @@ def get_config(command): # will get and thus we must check for the presence or absence of # both dictionaries in the configuration file at run-time and make # a determination based on that. - bcl2fastq_conf = get_config('bcl2fastq') - bclcnvrt_conf = get_config('bcl-convert') + bcl2fastq_conf = get_config("bcl2fastq") + bclcnvrt_conf = get_config("bcl-convert") if bclcnvrt_conf is None and bcl2fastq_conf is None: - raise PipelineError("bcl-convert and bcl2fastq sections not " - "defined in configuation profile.") + raise PipelineError( + "bcl-convert and bcl2fastq sections not " + "defined in configuation profile." + ) # if both are defined, we will use bcl-convert by default. config = bcl2fastq_conf if bclcnvrt_conf is None else bclcnvrt_conf - job = ConvertJob(self.pipeline.run_dir, - self.pipeline.output_path, - # for amplicon runs, get_sample_sheet_path() will - # return the path for a generated dummy sheet. - self.pipeline.get_sample_sheet_path(), - config['queue'], - config['nodes'], - config['nprocs'], - config['wallclock_time_in_minutes'], - config['per_process_memory_limit'], - config['executable_path'], - config['modules_to_load'], - self.master_qiita_job_id) - - self.raw_fastq_files_path = join(self.pipeline.output_path, - 'ConvertJob') - - if 'ConvertJob' not in self.skip_steps: + job = ConvertJob( + self.pipeline.run_dir, + self.pipeline.output_path, + # for amplicon runs, get_sample_sheet_path() will + # return the path for a generated dummy sheet. + self.pipeline.get_sample_sheet_path(), + config["queue"], + config["nodes"], + config["nprocs"], + config["wallclock_time_in_minutes"], + config["per_process_memory_limit"], + config["executable_path"], + config["modules_to_load"], + self.master_qiita_job_id, + ) + + self.raw_fastq_files_path = join(self.pipeline.output_path, "ConvertJob") + + if "ConvertJob" not in self.skip_steps: job.run(callback=self.job_callback) # if successful, set self.reports_path - self.reports_path = join(self.pipeline.output_path, - 'ConvertJob', - 'Reports', - 'Demultiplex_Stats.csv') + self.reports_path = join( + self.pipeline.output_path, "ConvertJob", "Reports", "Demultiplex_Stats.csv" + ) # TODO: Include alternative path when using bcl2fastq instead of # bcl-convert. @@ -165,7 +175,7 @@ def get_config(command): # properly. Append these to the failed-samples report and also # return the list directly to the caller. failed_samples = job.audit(self.pipeline.get_sample_ids()) - if hasattr(self, 'fsr'): + if hasattr(self, "fsr"): self.fsr.write(failed_samples, job.__class__.__name__) return failed_samples @@ -179,41 +189,46 @@ def generate_sequence_counts(self): class TellSeq(Protocol): protocol_type = PROTOCOL_NAME_TELLSEQ - read_length = 'short' + read_length = "short" def convert_raw_to_fastq(self): - config = self.pipeline.get_software_configuration('tell-seq') - - job = TellReadJob(self.pipeline.run_dir, - self.pipeline.output_path, - self.pipeline.input_file_path, - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['tellread_mem_limit'], - config['modules_to_load'], - self.master_qiita_job_id, - config['reference_base'], - config['reference_map'], - config['sing_script_path'], - config['tellread_cores']) - - self.raw_fastq_files_path = join(self.pipeline.output_path, - 'TellReadJob', 'Full') - self.my_sil_path = join(self.pipeline.output_path, - 'TellReadJob', - 'sample_index_list_TellReadJob.txt') + config = self.pipeline.get_software_configuration("tell-seq") + + job = TellReadJob( + self.pipeline.run_dir, + self.pipeline.output_path, + self.pipeline.input_file_path, + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["tellread_mem_limit"], + config["modules_to_load"], + self.master_qiita_job_id, + config["reference_base"], + config["reference_map"], + config["sing_script_path"], + config["tellread_cores"], + ) + + self.raw_fastq_files_path = join( + self.pipeline.output_path, "TellReadJob", "Full" + ) + self.my_sil_path = join( + self.pipeline.output_path, + "TellReadJob", + "sample_index_list_TellReadJob.txt", + ) # if TellReadJob already completed, then skip the over the time- # consuming portion but populate the needed member variables. - if 'TellReadJob' not in self.skip_steps: + if "TellReadJob" not in self.skip_steps: job.run(callback=self.job_callback) # audit the results to determine which samples failed to convert # properly. Append these to the failed-samples report and also # return the list directly to the caller. failed_samples = job.audit() - if hasattr(self, 'fsr'): + if hasattr(self, "fsr"): # NB 16S does not require a failed samples report and # it is not performed by SPP. self.fsr.write(failed_samples, job.__class__.__name__) @@ -221,37 +236,38 @@ def convert_raw_to_fastq(self): return failed_samples def generate_sequence_counts(self): - config = self.pipeline.get_software_configuration('tell-seq') + config = self.pipeline.get_software_configuration("tell-seq") - files_to_count_path = join(self.pipeline.output_path, - 'files_to_count.txt') + files_to_count_path = join(self.pipeline.output_path, "files_to_count.txt") - with open(files_to_count_path, 'w') as f: + with open(files_to_count_path, "w") as f: for root, _, files in walk(self.raw_fastq_files_path): for _file in files: - if determine_orientation(_file) in ['R1', 'R2']: + if determine_orientation(_file) in ["R1", "R2"]: print(join(root, _file), file=f) - job = SeqCountsJob(self.pipeline.run_dir, - self.pipeline.output_path, - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['normcount_mem_limit'], - config['modules_to_load'], - self.master_qiita_job_id, - config['job_max_array_length'], - files_to_count_path, - self.pipeline.get_sample_sheet_path(), - cores_per_task=config['tellread_cores']) - - if 'SeqCountsJob' not in self.skip_steps: + job = SeqCountsJob( + self.pipeline.run_dir, + self.pipeline.output_path, + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["normcount_mem_limit"], + config["modules_to_load"], + self.master_qiita_job_id, + config["job_max_array_length"], + files_to_count_path, + self.pipeline.get_sample_sheet_path(), + cores_per_task=config["tellread_cores"], + ) + + if "SeqCountsJob" not in self.skip_steps: job.run(callback=self.job_callback) # if successful, set self.reports_path - self.reports_path = join(self.pipeline.output_path, - 'SeqCountsJob', - 'SeqCounts.csv') + self.reports_path = join( + self.pipeline.output_path, "SeqCountsJob", "SeqCounts.csv" + ) # Do not add an entry to fsr because w/respect to counting, either # all jobs are going to fail or none are going to fail. It's not @@ -259,51 +275,52 @@ def generate_sequence_counts(self): # of the samples. def integrate_results(self): - config = self.pipeline.get_software_configuration('tell-seq') + config = self.pipeline.get_software_configuration("tell-seq") # after the primary job and the optional counts job is completed, # the job to integrate results and add metadata to the fastq files # is performed. - job = TRIntegrateJob(self.pipeline.run_dir, - self.pipeline.output_path, - self.pipeline.input_file_path, - config['queue'], - config['nodes'], - config['wallclock_time_in_minutes'], - config['integrate_mem_limit'], - config['modules_to_load'], - self.master_qiita_job_id, - config['integrate_script_path'], - # NB: sample_index_list used may vary - # from project to project in the future. - # If so replace config entry with a user - # supplied entry or an entry in the sample - # sheet. - - # NB: the version of the sil needed is the one - # generated by TellReadJob(), not the master - # sil. The former will account for a set of - # barcode_ids that don't begin at C501 and/or - # skip over some values like C509. - self.my_sil_path, - self.raw_fastq_files_path, - "", - "", - config['integrate_cores']) - - if 'TRIntegrateJob' not in self.skip_steps: + job = TRIntegrateJob( + self.pipeline.run_dir, + self.pipeline.output_path, + self.pipeline.input_file_path, + config["queue"], + config["nodes"], + config["wallclock_time_in_minutes"], + config["integrate_mem_limit"], + config["modules_to_load"], + self.master_qiita_job_id, + config["integrate_script_path"], + # NB: sample_index_list used may vary + # from project to project in the future. + # If so replace config entry with a user + # supplied entry or an entry in the sample + # sheet. + # NB: the version of the sil needed is the one + # generated by TellReadJob(), not the master + # sil. The former will account for a set of + # barcode_ids that don't begin at C501 and/or + # skip over some values like C509. + self.my_sil_path, + self.raw_fastq_files_path, + "", + "", + config["integrate_cores"], + ) + + if "TRIntegrateJob" not in self.skip_steps: job.run(callback=self.job_callback) # raw_fastq_files_path is used by downstream processes to know # where to locate 'raw' fastq files. Before we could assume that # it would always be in ConvertJob's working directory but now # this is no longer the case. Currently used by NuQCJob. - self.raw_fastq_files_path = join(self.pipeline.output_path, - 'TRIntegrateJob', - 'integrated') + self.raw_fastq_files_path = join( + self.pipeline.output_path, "TRIntegrateJob", "integrated" + ) - if 'TRIJ_Post_Processing' in self.skip_steps: + if "TRIJ_Post_Processing" in self.skip_steps: # if 'post_processing_completed' is found in the same location, # this means the steps below were already completed. return @@ -317,16 +334,14 @@ def integrate_results(self): for root, dirs, files in walk(self.raw_fastq_files_path): for _file in files: fastq_file = join(root, _file) - self._post_process_file(fastq_file, - mapping, - self.lane_number) + self._post_process_file(fastq_file, mapping, self.lane_number) # audit the results to determine which samples failed to convert # properly. Append these to the failed-samples report and also # return the list directly to the caller. failed_samples = job.audit() - if hasattr(self, 'fsr'): + if hasattr(self, "fsr"): # NB 16S does not require a failed samples report and # it is not performed by SPP. self.fsr.write(failed_samples, job.__class__.__name__) @@ -343,11 +358,13 @@ def _generate_mapping(self): count = 1 for sample in sheet.samples: - barcode_id = sample['barcode_id'] - results[barcode_id] = {'sample_name': sample['Sample_Name'], - 'sample_id': sample['Sample_ID'], - 'sample_index': count, - 'project_name': sample['Sample_Project']} + barcode_id = sample["barcode_id"] + results[barcode_id] = { + "sample_name": sample["Sample_Name"], + "sample_id": sample["Sample_ID"], + "sample_index": count, + "project_name": sample["Sample_Project"], + } count += 1 return results @@ -363,8 +380,7 @@ def _post_process_file(self, fastq_file, mapping, lane): m = match(r"(C5\d\d)\.([R,I]\d)\.fastq.gz", _file) if m is None: - raise ValueError(f"The filename '{_file}' is not of a " - "recognizable form") + raise ValueError(f"The filename '{_file}' is not of a recognizable form") barcode_id = m[1] read_type = m[2] @@ -372,16 +388,18 @@ def _post_process_file(self, fastq_file, mapping, lane): if barcode_id not in mapping: raise ValueError(f"{barcode_id} is not present in sample-sheet") - sample_id = mapping[barcode_id]['sample_id'] - project_name = mapping[barcode_id]['project_name'] - sample_index = mapping[barcode_id]['sample_index'] + sample_id = mapping[barcode_id]["sample_id"] + project_name = mapping[barcode_id]["project_name"] + sample_index = mapping[barcode_id]["sample_index"] # generate the new filename for the fastq file, and reorganize the # files by project. - new_name = "%s_S%d_%s_%s_001.fastq.gz" % (sample_id, - sample_index, - "L%s" % str(lane).zfill(3), - read_type) + new_name = "%s_S%d_%s_%s_001.fastq.gz" % ( + sample_id, + sample_index, + "L%s" % str(lane).zfill(3), + read_type, + ) # ensure that the project directory exists before we rename and move # the file to that location. @@ -397,37 +415,37 @@ def _post_process_file(self, fastq_file, mapping, lane): class PacBio(Protocol): protocol_type = PROTOCOL_NAME_PACBIO_SMRT - read_length = 'long' + read_length = "long" def convert_raw_to_fastq(self): - config = self.pipeline.get_software_configuration('pacbio_convert') + config = self.pipeline.get_software_configuration("pacbio_convert") job = ConvertPacBioBam2FastqJob( self.pipeline.run_dir, self.pipeline.output_path, self.pipeline.input_file_path, - config['queue'], - config['nodes'], - config['nprocs'], - config['wallclock_time_in_minutes'], - config['per_process_memory_limit'], - config['executable_path'], - config['modules_to_load'], - self.master_qiita_job_id) - - self.raw_fastq_files_path = join(self.pipeline.output_path, - 'ConvertJob') + config["queue"], + config["nodes"], + config["nprocs"], + config["wallclock_time_in_minutes"], + config["per_process_memory_limit"], + config["executable_path"], + config["modules_to_load"], + self.master_qiita_job_id, + ) + + self.raw_fastq_files_path = join(self.pipeline.output_path, "ConvertJob") # if ConvertJob already completed, then skip the over the time- # consuming portion but populate the needed member variables. - if 'ConvertJob' not in self.skip_steps: + if "ConvertJob" not in self.skip_steps: job.run(callback=self.job_callback) # audit the results to determine which samples failed to convert # properly. Append these to the failed-samples report and also # return the list directly to the caller. failed_samples = job.audit(self.pipeline.get_sample_ids()) - if hasattr(self, 'fsr'): + if hasattr(self, "fsr"): # NB 16S does not require a failed samples report and # it is not performed by SPP. self.fsr.write(failed_samples, job.__class__.__name__) @@ -438,29 +456,30 @@ def generate_sequence_counts(self): # for other instances of generate_sequence_counts in other objects # the sequence counting needs to be done; however, for PacBio we # already have done it and just need to merge the results. - gz_files = glob(f'{self.raw_fastq_files_path}/*/*.fastq.gz') + gz_files = glob(f"{self.raw_fastq_files_path}/*/*.fastq.gz") data, missing_files = [], [] for gzf in gz_files: - cf = gzf.replace('.fastq.gz', '.counts.txt') + cf = gzf.replace(".fastq.gz", ".counts.txt") sn = basename(cf).replace( - f'_S000_L00{self.lane_number}_R1_001.counts.txt', '') + f"_S000_L00{self.lane_number}_R1_001.counts.txt", "" + ) if not exists(cf): missing_files.append(sn) continue - with open(cf, 'r') as fh: + with open(cf, "r") as fh: counts = fh.read().strip() - data.append({'Sample_ID': sn, - 'raw_reads_r1r2': counts, - 'Lane': self.lane_number}) + data.append( + {"Sample_ID": sn, "raw_reads_r1r2": counts, "Lane": self.lane_number} + ) if missing_files: - raise ValueError(f'Missing count files: {missing_files}') + raise ValueError(f"Missing count files: {missing_files}") df = pd.DataFrame(data) - self.reports_path = join(self.pipeline.output_path, - 'ConvertJob', - 'SeqCounts.csv') + self.reports_path = join( + self.pipeline.output_path, "ConvertJob", "SeqCounts.csv" + ) df.to_csv(self.reports_path, index=False) def integrate_results(self): diff --git a/src/qp_klp/StandardAmpliconWorkflow.py b/src/qp_klp/StandardAmpliconWorkflow.py index 5cd2d914..aaa654b7 100644 --- a/src/qp_klp/StandardAmpliconWorkflow.py +++ b/src/qp_klp/StandardAmpliconWorkflow.py @@ -1,7 +1,7 @@ -from .Protocol import Illumina from sequence_processing_pipeline.Pipeline import Pipeline -from .Assays import Amplicon -from .Assays import ASSAY_NAME_AMPLICON + +from .Assays import ASSAY_NAME_AMPLICON, Amplicon +from .Protocol import Illumina from .Workflows import Workflow @@ -9,33 +9,42 @@ class StandardAmpliconWorkflow(Workflow, Amplicon, Illumina): def __init__(self, **kwargs): super().__init__(**kwargs) - self.mandatory_attributes = ['qclient', 'uif_path', - 'lane_number', 'config_fp', - 'run_identifier', 'output_dir', 'job_id', - 'lane_number', 'is_restart'] + self.mandatory_attributes = [ + "qclient", + "uif_path", + "lane_number", + "config_fp", + "run_identifier", + "output_dir", + "job_id", + "lane_number", + "is_restart", + ] self.confirm_mandatory_attributes() # second stage initializer that could conceivably be pushed down into # specific children requiring specific parameters. - self.qclient = self.kwargs['qclient'] - - self.pipeline = Pipeline(self.kwargs['config_fp'], - self.kwargs['run_identifier'], - self.kwargs['uif_path'], - self.kwargs['output_dir'], - self.kwargs['job_id'], - ASSAY_NAME_AMPLICON, - # amplicon runs always use lane 1. - lane_number=1) + self.qclient = self.kwargs["qclient"] + + self.pipeline = Pipeline( + self.kwargs["config_fp"], + self.kwargs["run_identifier"], + self.kwargs["uif_path"], + self.kwargs["output_dir"], + self.kwargs["job_id"], + ASSAY_NAME_AMPLICON, + # amplicon runs always use lane 1. + lane_number=1, + ) # NB: Amplicon workflows don't have failed samples records because # the fastq files are not demultiplexed. - self.master_qiita_job_id = self.kwargs['job_id'] + self.master_qiita_job_id = self.kwargs["job_id"] - self.lane_number = self.kwargs['lane_number'] - self.is_restart = bool(self.kwargs['is_restart']) + self.lane_number = self.kwargs["lane_number"] + self.is_restart = bool(self.kwargs["is_restart"]) if self.is_restart is True: self.determine_steps_to_skip() @@ -43,9 +52,8 @@ def __init__(self, **kwargs): # this is a convenience member to allow testing w/out updating Qiita. self.update = True - if 'update_qiita' in kwargs: - if not isinstance(kwargs['update_qiita'], bool): - raise ValueError("value for 'update_qiita' must be of " - "type bool") + if "update_qiita" in kwargs: + if not isinstance(kwargs["update_qiita"], bool): + raise ValueError("value for 'update_qiita' must be of type bool") - self.update = kwargs['update_qiita'] + self.update = kwargs["update_qiita"] diff --git a/src/qp_klp/StandardMetatranscriptomicWorkflow.py b/src/qp_klp/StandardMetatranscriptomicWorkflow.py index 4b3dea98..c7aa9d0b 100644 --- a/src/qp_klp/StandardMetatranscriptomicWorkflow.py +++ b/src/qp_klp/StandardMetatranscriptomicWorkflow.py @@ -1,41 +1,50 @@ -from .Protocol import Illumina from sequence_processing_pipeline.Pipeline import Pipeline -from .Assays import Metatranscriptomic -from .Assays import ASSAY_NAME_METATRANSCRIPTOMIC + +from .Assays import ASSAY_NAME_METATRANSCRIPTOMIC, Metatranscriptomic from .FailedSamplesRecord import FailedSamplesRecord +from .Protocol import Illumina from .Workflows import Workflow -class StandardMetatranscriptomicWorkflow(Workflow, Metatranscriptomic, - Illumina): +class StandardMetatranscriptomicWorkflow(Workflow, Metatranscriptomic, Illumina): def __init__(self, **kwargs): super().__init__(**kwargs) - self.mandatory_attributes = ['qclient', 'uif_path', - 'lane_number', 'config_fp', - 'run_identifier', 'output_dir', 'job_id', - 'lane_number', 'is_restart'] + self.mandatory_attributes = [ + "qclient", + "uif_path", + "lane_number", + "config_fp", + "run_identifier", + "output_dir", + "job_id", + "lane_number", + "is_restart", + ] self.confirm_mandatory_attributes() # second stage initializer that could conceivably be pushed down into # specific children requiring specific parameters. - self.qclient = self.kwargs['qclient'] - - self.pipeline = Pipeline(self.kwargs['config_fp'], - self.kwargs['run_identifier'], - self.kwargs['uif_path'], - self.kwargs['output_dir'], - self.kwargs['job_id'], - ASSAY_NAME_METATRANSCRIPTOMIC, - lane_number=self.kwargs['lane_number']) - self.fsr = FailedSamplesRecord(self.kwargs['output_dir'], - self.pipeline.sample_sheet.samples) - - self.master_qiita_job_id = self.kwargs['job_id'] - - self.lane_number = self.kwargs['lane_number'] - self.is_restart = bool(self.kwargs['is_restart']) + self.qclient = self.kwargs["qclient"] + + self.pipeline = Pipeline( + self.kwargs["config_fp"], + self.kwargs["run_identifier"], + self.kwargs["uif_path"], + self.kwargs["output_dir"], + self.kwargs["job_id"], + ASSAY_NAME_METATRANSCRIPTOMIC, + lane_number=self.kwargs["lane_number"], + ) + self.fsr = FailedSamplesRecord( + self.kwargs["output_dir"], self.pipeline.sample_sheet.samples + ) + + self.master_qiita_job_id = self.kwargs["job_id"] + + self.lane_number = self.kwargs["lane_number"] + self.is_restart = bool(self.kwargs["is_restart"]) if self.is_restart is True: self.determine_steps_to_skip() @@ -43,9 +52,8 @@ def __init__(self, **kwargs): # this is a convenience member to allow testing w/out updating Qiita. self.update = True - if 'update_qiita' in kwargs: - if not isinstance(kwargs['update_qiita'], bool): - raise ValueError("value for 'update_qiita' must be of " - "type bool") + if "update_qiita" in kwargs: + if not isinstance(kwargs["update_qiita"], bool): + raise ValueError("value for 'update_qiita' must be of type bool") - self.update = kwargs['update_qiita'] + self.update = kwargs["update_qiita"] diff --git a/src/qp_klp/TellseqMetagenomicWorkflow.py b/src/qp_klp/TellseqMetagenomicWorkflow.py index 1808711e..67a008b9 100644 --- a/src/qp_klp/TellseqMetagenomicWorkflow.py +++ b/src/qp_klp/TellseqMetagenomicWorkflow.py @@ -1,60 +1,73 @@ +from sequence_processing_pipeline.Pipeline import InstrumentUtils, Pipeline + +from .Assays import ASSAY_NAME_METAGENOMIC, Metagenomic +from .FailedSamplesRecord import FailedSamplesRecord from .Protocol import TellSeq -from sequence_processing_pipeline.Pipeline import Pipeline, InstrumentUtils -from .Assays import Metagenomic -from .Assays import ASSAY_NAME_METAGENOMIC from .Workflows import Workflow -from .FailedSamplesRecord import FailedSamplesRecord class TellSeqMetagenomicWorkflow(Workflow, Metagenomic, TellSeq): def __init__(self, **kwargs): super().__init__(**kwargs) - self.mandatory_attributes = ['qclient', 'uif_path', 'config_fp', - 'run_identifier', 'output_dir', 'job_id', - 'is_restart'] + self.mandatory_attributes = [ + "qclient", + "uif_path", + "config_fp", + "run_identifier", + "output_dir", + "job_id", + "is_restart", + ] self.confirm_mandatory_attributes() # second stage initializer that could conceivably be pushed down into # specific children requiring specific parameters. - self.qclient = self.kwargs['qclient'] + self.qclient = self.kwargs["qclient"] - run_id = self.kwargs['run_identifier'] + run_id = self.kwargs["run_identifier"] - self.pipeline = Pipeline(self.kwargs['config_fp'], - run_id, - self.kwargs['uif_path'], - self.kwargs['output_dir'], - self.kwargs['job_id'], - ASSAY_NAME_METAGENOMIC, - lane_number=self.kwargs['lane_number']) + self.pipeline = Pipeline( + self.kwargs["config_fp"], + run_id, + self.kwargs["uif_path"], + self.kwargs["output_dir"], + self.kwargs["job_id"], + ASSAY_NAME_METAGENOMIC, + lane_number=self.kwargs["lane_number"], + ) - self.fsr = FailedSamplesRecord(self.kwargs['output_dir'], - self.pipeline.sample_sheet.samples) + self.fsr = FailedSamplesRecord( + self.kwargs["output_dir"], self.pipeline.sample_sheet.samples + ) # given run_id, Pipeline should have found the appropriate run_dir. type = InstrumentUtils.get_instrument_type(self.pipeline.run_dir) - self.iseq_run = True if type == 'iSeq' else False + self.iseq_run = True if type == "iSeq" else False - self.master_qiita_job_id = self.kwargs['job_id'] + self.master_qiita_job_id = self.kwargs["job_id"] - self.lane_number = self.kwargs['lane_number'] - self.is_restart = bool(self.kwargs['is_restart']) + self.lane_number = self.kwargs["lane_number"] + self.is_restart = bool(self.kwargs["is_restart"]) self.directories_to_check = [ - 'TellReadJob', 'TRIntegrateJob', 'NuQCJob', 'FastQCJob', - 'SeqCountsJob', 'GenPrepFileJob'] + "TellReadJob", + "TRIntegrateJob", + "NuQCJob", + "FastQCJob", + "SeqCountsJob", + "GenPrepFileJob", + ] if self.is_restart is True: self.determine_steps_to_skip() self.update = True - if 'update_qiita' in kwargs: - if not isinstance(kwargs['update_qiita'], bool): - raise ValueError("value for 'update_qiita' must be of " - "type bool") + if "update_qiita" in kwargs: + if not isinstance(kwargs["update_qiita"], bool): + raise ValueError("value for 'update_qiita' must be of type bool") - self.update = kwargs['update_qiita'] + self.update = kwargs["update_qiita"] diff --git a/src/qp_klp/WorkflowFactory.py b/src/qp_klp/WorkflowFactory.py index 870449e5..37383038 100644 --- a/src/qp_klp/WorkflowFactory.py +++ b/src/qp_klp/WorkflowFactory.py @@ -1,27 +1,30 @@ +from metapool import load_sample_sheet +from metapool.sample_sheet import SAMPLE_SHEETS_BY_PROTOCOL as SSBP + +from sequence_processing_pipeline.Pipeline import Pipeline + +from .Assays import ASSAY_NAME_AMPLICON, METAOMIC_ASSAY_NAMES +from .PacBioMetagenomicWorkflow import PacBioMetagenomicWorkflow from .StandardAmpliconWorkflow import StandardAmpliconWorkflow from .StandardMetagenomicWorkflow import StandardMetagenomicWorkflow -from .StandardMetatranscriptomicWorkflow import \ - StandardMetatranscriptomicWorkflow +from .StandardMetatranscriptomicWorkflow import StandardMetatranscriptomicWorkflow from .TellseqMetagenomicWorkflow import TellSeqMetagenomicWorkflow -from .PacBioMetagenomicWorkflow import PacBioMetagenomicWorkflow -from sequence_processing_pipeline.Pipeline import Pipeline -from metapool import load_sample_sheet -from metapool.sample_sheet import SAMPLE_SHEETS_BY_PROTOCOL as SSBP -from .Assays import METAOMIC_ASSAY_NAMES, ASSAY_NAME_AMPLICON from .Workflows import WorkflowError -class WorkflowFactory(): - WORKFLOWS = [StandardMetagenomicWorkflow, - StandardMetatranscriptomicWorkflow, - StandardAmpliconWorkflow, - TellSeqMetagenomicWorkflow, - PacBioMetagenomicWorkflow] +class WorkflowFactory: + WORKFLOWS = [ + StandardMetagenomicWorkflow, + StandardMetatranscriptomicWorkflow, + StandardAmpliconWorkflow, + TellSeqMetagenomicWorkflow, + PacBioMetagenomicWorkflow, + ] @classmethod def _get_instrument_type(cls, sheet): for instrument_type in SSBP: - if sheet.Header['SheetType'] in SSBP[instrument_type]: + if sheet.Header["SheetType"] in SSBP[instrument_type]: return instrument_type @classmethod @@ -32,10 +35,10 @@ def generate_workflow(cls, **kwargs): # if kwargs is None or {}, raise an Error raise ValueError(msg) - if 'uif_path' not in kwargs: + if "uif_path" not in kwargs: raise ValueError(msg) - if Pipeline.is_sample_sheet(kwargs['uif_path']): + if Pipeline.is_sample_sheet(kwargs["uif_path"]): # NB: The Pipeline() determines an input-file is a sample-sheet # if the first line begins with "[Header]" followed by any number # of ','. A file that begins this way but fails to load @@ -43,7 +46,7 @@ def generate_workflow(cls, **kwargs): # SheetVersion will raise a ValueError() here, w/the message # "'{sheet}' doesn't appear to be a valid sample-sheet." - sheet = load_sample_sheet(kwargs['uif_path']) + sheet = load_sample_sheet(kwargs["uif_path"]) # if we do not validate the sample-sheet now, it will be validated # downstream when we attempt to instantiate a Workflow(), which in @@ -52,35 +55,40 @@ def generate_workflow(cls, **kwargs): # abort. Expect the user/caller to diagnose the sample-sheet in a # notebook or by other means. if sheet.validate_and_scrub_sample_sheet(): - assay_type = sheet.Header['Assay'] + assay_type = sheet.Header["Assay"] if assay_type not in METAOMIC_ASSAY_NAMES: # NB: This Error is not likely to be raised unless an # assay type is defined in metapool but not in Assays. - raise WorkflowError("Can't determine workflow from assay " - "type: %s" % assay_type) + raise WorkflowError( + "Can't determine workflow from assay type: %s" % assay_type + ) instrument_type = cls._get_instrument_type(sheet) else: - raise WorkflowError(f"'{kwargs['uif_path']} doesn't appear to " - "be a valid sample-sheet.") - elif Pipeline.is_mapping_file(kwargs['uif_path']): + raise WorkflowError( + f"'{kwargs['uif_path']} doesn't appear to be a valid sample-sheet." + ) + elif Pipeline.is_mapping_file(kwargs["uif_path"]): # if file is readable as a basic TSV and contains all the required # headers, then treat this as a mapping file, even if it's an # invalid one. assay_type = ASSAY_NAME_AMPLICON # for Amplicon runs, the lane_number is always one, even if the # user supplies another value in the UI. - kwargs['lane_number'] = 1 + kwargs["lane_number"] = 1 # NB: For now, let's assume all Amplicon runs are Illumina, since # the entire Amplicon pipeline assumes as much. - instrument_type = 'Illumina' + instrument_type = "Illumina" else: - raise ValueError("Your uploaded file doesn't appear to be a " - "sample-sheet or a mapping-file.") + raise ValueError( + "Your uploaded file doesn't appear to be a " + "sample-sheet or a mapping-file." + ) for workflow in WorkflowFactory.WORKFLOWS: - if workflow.read_length not in {'short', 'long'}: - raise ValueError('Invalid read_length: ' - f'{workflow.read_length} for {workflow}') + if workflow.read_length not in {"short", "long"}: + raise ValueError( + f"Invalid read_length: {workflow.read_length} for {workflow}" + ) if workflow.assay_type == assay_type: if workflow.protocol_type == instrument_type: # return instantiated workflow object @@ -89,6 +97,8 @@ def generate_workflow(cls, **kwargs): # This Error will only be raised if a sample-sheet passes metapool's # validation method but a Workflow() for its instrument-type and # assay-type doesn't exist. - raise ValueError(f"Assay type '{assay_type}' and Instrument type " - f"'{instrument_type}' did not match any known " - "workflow configuration") + raise ValueError( + f"Assay type '{assay_type}' and Instrument type " + f"'{instrument_type}' did not match any known " + "workflow configuration" + ) diff --git a/src/qp_klp/Workflows.py b/src/qp_klp/Workflows.py index 9155004c..a64f7aa2 100644 --- a/src/qp_klp/Workflows.py +++ b/src/qp_klp/Workflows.py @@ -1,16 +1,18 @@ -from os.path import join, exists, split -from os import walk, makedirs, listdir, environ -import pandas as pd -from json import dumps -from subprocess import Popen, PIPE -from glob import glob -from shutil import copyfile import logging -from shutil import rmtree -from .Assays import ASSAY_NAME_AMPLICON +from glob import glob +from json import dumps +from os import environ, listdir, makedirs, walk +from os.path import exists, join, split +from shutil import copyfile, rmtree +from subprocess import PIPE, Popen + +import pandas as pd from metapool.sample_sheet import PROTOCOL_NAME_PACBIO_SMRT + from sequence_processing_pipeline.util import determine_orientation +from .Assays import ASSAY_NAME_AMPLICON + class WorkflowError(Exception): def __init__(self, message=None): @@ -18,7 +20,7 @@ def __init__(self, message=None): super().__init__(self.message) -class Workflow(): +class Workflow: def __init__(self, **kwargs): """ base initializer allows WorkflowFactory to return the correct @@ -47,14 +49,18 @@ def __init__(self, **kwargs): self.sifs = None self.skip_steps = [] self.special_map = None - self.status_msg = '' + self.status_msg = "" self.touched_studies_prep_info = None self.tube_id_map = None self.directories_to_check = [ - 'ConvertJob', 'NuQCJob', 'FastQCJob', 'GenPrepFileJob'] - - if 'status_update_callback' in kwargs: - self.status_update_callback = kwargs['status_update_callback'] + "ConvertJob", + "NuQCJob", + "FastQCJob", + "GenPrepFileJob", + ] + + if "status_update_callback" in kwargs: + self.status_update_callback = kwargs["status_update_callback"] else: self.status_update_callback = None @@ -69,17 +75,19 @@ def confirm_mandatory_attributes(self): absent_list.append(attribute) if absent_list: - raise ValueError(f"The following values must also be defined in " - f"kwargs for {self.__class__.__name__} workflows" - + ": " + ', '.join(absent_list)) + raise ValueError( + f"The following values must also be defined in " + f"kwargs for {self.__class__.__name__} workflows" + + ": " + + ", ".join(absent_list) + ) def job_callback(self, jid, status): """ Update main status message w/current child job status. """ if self.status_update_callback: - self.status_update_callback(self.status_msg + - f" ({jid}: {status})") + self.status_update_callback(self.status_msg + f" ({jid}: {status})") def update_status(self, msg, step_number, total_steps): """ @@ -101,8 +109,7 @@ def what_am_i(self): """ Returns text description of Workflow's Instrument & Assay mixins. """ - return (f"Instrument: {self.protocol_type}" + "\t" + - f"Assay: {self.assay_type}") + return f"Instrument: {self.protocol_type}" + "\t" + f"Assay: {self.assay_type}" def pre_check(self): if self.is_restart: @@ -125,16 +132,16 @@ def pre_check(self): # for each project. The names of the projects are unimportant. We # want to abort early if any project in the sample-sheet/pre-prep # file contains samples that aren't registered in Qiita. - tmp = [len(project['samples_not_in_qiita']) for project in results] + tmp = [len(project["samples_not_in_qiita"]) for project in results] missing_counts = [count for count in tmp if count != 0] if missing_counts: msgs = [] for result in results: - msgs += result['messages'] + msgs += result["messages"] if msgs: - raise WorkflowError('\n'.join(msgs)) + raise WorkflowError("\n".join(msgs)) def generate_special_map(self): """ @@ -149,11 +156,11 @@ def generate_special_map(self): results = self.qclient.get("/qiita_db/artifacts/types/") projects = self.pipeline.get_project_info() for project in projects: - upload_path = join(results['uploads'], project['qiita_id']) + upload_path = join(results["uploads"], project["qiita_id"]) makedirs(upload_path, exist_ok=True) - special_map.append((project['project_name'], - upload_path, - project['qiita_id'])) + special_map.append( + (project["project_name"], upload_path, project["qiita_id"]) + ) self.special_map = special_map @@ -166,7 +173,8 @@ def generate_sifs(self): # generate SIF files with paths to the input file(s) (multiples when # there are replicates) self.sifs = self.pipeline.generate_sample_info_files( - self.dereplicated_input_file_paths) + self.dereplicated_input_file_paths + ) return self.sifs @@ -180,90 +188,119 @@ def update_blanks_in_qiita(self): # get study_id from sif_file_name ...something_14385_blanks.tsv study_id = self.pipeline.get_qiita_id_from_sif_fp(sif_path) - df = pd.read_csv(sif_path, delimiter='\t', dtype=str) + df = pd.read_csv(sif_path, delimiter="\t", dtype=str) # Prepend study_id to make them compatible w/list from Qiita. - df['sample_name'] = f'{study_id}.' + df['sample_name'].astype(str) + df["sample_name"] = f"{study_id}." + df["sample_name"].astype(str) # SIFs only contain blanks. Get the list of potentially new blanks. - blanks = df['sample_name'].tolist() + blanks = df["sample_name"].tolist() if len(blanks) == 0: # we have nothing to do so let's return early return # Get list of samples already registered in Qiita # (will include any already-registered blanks) - from_qiita = self.qclient.get(f'/api/v1/study/{study_id}/samples') + from_qiita = self.qclient.get(f"/api/v1/study/{study_id}/samples") # Generate list of blanks that need to be ADDED to Qiita. new_blanks = (set(blanks) | set(from_qiita)) - set(from_qiita) if len(new_blanks): # Generate dummy entries for each new blank, if any. - url = f'/api/v1/study/{study_id}/samples/info' + url = f"/api/v1/study/{study_id}/samples/info" logging.debug(url) - categories = self.qclient.get(url)['categories'] + categories = self.qclient.get(url)["categories"] # initialize payload w/required dummy categories - data = {i: {c: 'control sample' for c in categories} for i in - new_blanks} + data = { + i: {c: "control sample" for c in categories} for i in new_blanks + } # populate payload w/additional columns and/or overwrite # existing columns w/metadata from SIF file. - sif_data = df.set_index('sample_name').T.to_dict() + sif_data = df.set_index("sample_name").T.to_dict() for new_blank in new_blanks: for column in sif_data[new_blank]: data[new_blank][column] = sif_data[new_blank][column] # http_patch will raise Error if insert failed. - self.qclient.http_patch(f'/api/v1/study/{study_id}/samples', - data=dumps(data)) + self.qclient.http_patch( + f"/api/v1/study/{study_id}/samples", data=dumps(data) + ) def _helper_process_operations(self): """ Helper method for generate_commands() :return: """ - RESULTS_DIR = 'final_results' - TAR_CMD = 'tar zcvf' - LOG_PREFIX = 'logs' - REPORT_PREFIX = 'reports' - PREP_PREFIX = 'prep-files' - CONVERT_JOB = 'ConvertJob' - QC_JOB = 'NuQCJob' - FASTQC_JOB = 'FastQCJob' - PREPFILE_JOB = 'GenPrepFileJob' - TAR_EXT = 'tgz' - - op_meta = [(['ConvertJob/logs'], TAR_CMD, - f'{LOG_PREFIX}-{CONVERT_JOB}.{TAR_EXT}', 'OUTPUT_FIRST'), - - (['ConvertJob/Reports', 'ConvertJob/logs'], TAR_CMD, - f'{REPORT_PREFIX}-{CONVERT_JOB}.{TAR_EXT}', - 'OUTPUT_FIRST'), - - (['NuQCJob/logs'], TAR_CMD, - f'{LOG_PREFIX}-{QC_JOB}.{TAR_EXT}', 'OUTPUT_FIRST'), - - (['FastQCJob/logs'], TAR_CMD, - f'{LOG_PREFIX}-{FASTQC_JOB}.{TAR_EXT}', 'OUTPUT_FIRST'), - - (['FastQCJob/fastqc'], TAR_CMD, - f'{REPORT_PREFIX}-{FASTQC_JOB}.{TAR_EXT}', 'OUTPUT_FIRST'), - - (['GenPrepFileJob/logs'], TAR_CMD, - f'{LOG_PREFIX}-{PREPFILE_JOB}.{TAR_EXT}', 'OUTPUT_FIRST'), - - (['GenPrepFileJob/PrepFiles'], TAR_CMD, - f'{PREP_PREFIX}.{TAR_EXT}', 'OUTPUT_FIRST'), - - (['failed_samples.html', - 'touched_studies.html', - 'MultiQCJob/multiqc', - 'TellReadJob/QC_Analysis_TellReadJob.html'], - 'mv', RESULTS_DIR, 'INPUTS_FIRST'), - - (['FastQCJob/multiqc'], 'mv', RESULTS_DIR, 'INPUTS_FIRST')] + RESULTS_DIR = "final_results" + TAR_CMD = "tar zcvf" + LOG_PREFIX = "logs" + REPORT_PREFIX = "reports" + PREP_PREFIX = "prep-files" + CONVERT_JOB = "ConvertJob" + QC_JOB = "NuQCJob" + FASTQC_JOB = "FastQCJob" + PREPFILE_JOB = "GenPrepFileJob" + TAR_EXT = "tgz" + + op_meta = [ + ( + ["ConvertJob/logs"], + TAR_CMD, + f"{LOG_PREFIX}-{CONVERT_JOB}.{TAR_EXT}", + "OUTPUT_FIRST", + ), + ( + ["ConvertJob/Reports", "ConvertJob/logs"], + TAR_CMD, + f"{REPORT_PREFIX}-{CONVERT_JOB}.{TAR_EXT}", + "OUTPUT_FIRST", + ), + ( + ["NuQCJob/logs"], + TAR_CMD, + f"{LOG_PREFIX}-{QC_JOB}.{TAR_EXT}", + "OUTPUT_FIRST", + ), + ( + ["FastQCJob/logs"], + TAR_CMD, + f"{LOG_PREFIX}-{FASTQC_JOB}.{TAR_EXT}", + "OUTPUT_FIRST", + ), + ( + ["FastQCJob/fastqc"], + TAR_CMD, + f"{REPORT_PREFIX}-{FASTQC_JOB}.{TAR_EXT}", + "OUTPUT_FIRST", + ), + ( + ["GenPrepFileJob/logs"], + TAR_CMD, + f"{LOG_PREFIX}-{PREPFILE_JOB}.{TAR_EXT}", + "OUTPUT_FIRST", + ), + ( + ["GenPrepFileJob/PrepFiles"], + TAR_CMD, + f"{PREP_PREFIX}.{TAR_EXT}", + "OUTPUT_FIRST", + ), + ( + [ + "failed_samples.html", + "touched_studies.html", + "MultiQCJob/multiqc", + "TellReadJob/QC_Analysis_TellReadJob.html", + ], + "mv", + RESULTS_DIR, + "INPUTS_FIRST", + ), + (["FastQCJob/multiqc"], "mv", RESULTS_DIR, "INPUTS_FIRST"), + ] cmds = [] @@ -281,14 +318,13 @@ def _helper_process_operations(self): # the inputs exists. It's okay for a command to go unprocessed. if confirmed_inputs: # convert to string form before using. - confirmed_inputs = ' '.join(confirmed_inputs) - if order == 'OUTPUT_FIRST': - cmds.append(f'{action} {output} {confirmed_inputs}') - elif order == 'INPUTS_FIRST': - cmds.append(f'{action} {confirmed_inputs} {output}') + confirmed_inputs = " ".join(confirmed_inputs) + if order == "OUTPUT_FIRST": + cmds.append(f"{action} {output} {confirmed_inputs}") + elif order == "INPUTS_FIRST": + cmds.append(f"{action} {confirmed_inputs} {output}") else: - raise ValueError(f"'{order}' is not a defined order of " - "operations") + raise ValueError(f"'{order}' is not a defined order of operations") return cmds @@ -297,13 +333,14 @@ def _process_blanks(self): Helper method for generate_commands(). :return: """ - results = [x for x in listdir(self.pipeline.output_path) if - self.pipeline.is_sif_fp(x)] + results = [ + x for x in listdir(self.pipeline.output_path) if self.pipeline.is_sif_fp(x) + ] results.sort() if len(results) > 0: - return 'tar zcvf sample-files.tgz' + ' ' + ' '.join(results) + return "tar zcvf sample-files.tgz" + " " + " ".join(results) def _process_fastp_report_dirs(self): """ @@ -314,15 +351,15 @@ def _process_fastp_report_dirs(self): for root, dirs, files in walk(self.pipeline.output_path): for dir_name in dirs: - if dir_name == 'fastp_reports_dir': + if dir_name == "fastp_reports_dir": # generate the full path for this directory before # truncating everything up to the NuQCJob directory. - full_path = join(root, dir_name).split('NuQCJob/') - report_dirs.append(join('NuQCJob', full_path[1])) + full_path = join(root, dir_name).split("NuQCJob/") + report_dirs.append(join("NuQCJob", full_path[1])) if report_dirs: report_dirs.sort() - return 'tar zcvf reports-NuQCJob.tgz ' + ' '.join(report_dirs) + return "tar zcvf reports-NuQCJob.tgz " + " ".join(report_dirs) else: # It is okay to return an empty list of commands if reports_dirs # is empty. Some pipelines do not generate fastp reports. @@ -333,10 +370,10 @@ def _write_commands_to_output_path(self): Helper method for generate_commands(). :return: """ - self.cmds_log_path = join(self.pipeline.output_path, 'cmds.log') - with open(self.cmds_log_path, 'w') as f: + self.cmds_log_path = join(self.pipeline.output_path, "cmds.log") + with open(self.cmds_log_path, "w") as f: for cmd in self.cmds: - f.write(f'{cmd}\n') + f.write(f"{cmd}\n") def generate_commands(self): cmds = self._helper_process_operations() @@ -354,12 +391,13 @@ def generate_commands(self): # if one or more tar-gzip files are found (which we expect there to # be), move them into the 'final_results' directory. However, if none # are present, don't raise an error. - cmds.append('(find *.tgz -maxdepth 1 -type f | xargs mv -t ' - 'final_results) || true') + cmds.append( + "(find *.tgz -maxdepth 1 -type f | xargs mv -t final_results) || true" + ) # prepend each command with a change-directory to the correct # location. - cmds = [f'cd {self.pipeline.output_path}; {cmd}' for cmd in cmds] + cmds = [f"cd {self.pipeline.output_path}; {cmd}" for cmd in cmds] self.cmds = cmds @@ -368,11 +406,9 @@ def generate_commands(self): def execute_commands(self): # execute the list of commands in order for cmd in self.cmds: - p = Popen(cmd, - universal_newlines=True, - shell=True, - stdout=PIPE, - stderr=PIPE) + p = Popen( + cmd, universal_newlines=True, shell=True, stdout=PIPE, stderr=PIPE + ) _, _ = p.communicate() return_code = p.returncode @@ -391,7 +427,7 @@ def _project_metadata_check(self): # names in each project's sample metadata. We'll let Pipeline() # decide (using its metapool dependency) which column names are # reserved. - qiita_ids = [x['qiita_id'] for x in self.pipeline.get_project_info()] + qiita_ids = [x["qiita_id"] for x in self.pipeline.get_project_info()] results = [] @@ -405,8 +441,9 @@ def _project_metadata_check(self): # if any reserved words were identified, generate an appropriate # error message for it and add it to the list of error messages # to return to the user. - res = [f"'{x}' exists in Qiita study {qiita_id}'s sample metadata" - for x in res] + res = [ + f"'{x}' exists in Qiita study {qiita_id}'s sample metadata" for x in res + ] results += res @@ -422,8 +459,10 @@ def _process_tube_ids(self, qiita_id, samples): :return: """ if qiita_id in self.tube_id_map: - tids = [self.tube_id_map[qiita_id][sample] for sample in - self.tube_id_map[qiita_id]] + tids = [ + self.tube_id_map[qiita_id][sample] + for sample in self.tube_id_map[qiita_id] + ] not_in_qiita = samples - set(tids) @@ -431,8 +470,7 @@ def _process_tube_ids(self, qiita_id, samples): # strip any leading zeroes from the sample-ids. Note that # if a sample-id has more than one leading zero, all of # them will be removed. - not_in_qiita = set([x.lstrip('0') for x in samples]) - \ - set(tids) + not_in_qiita = set([x.lstrip("0") for x in samples]) - set(tids) # convert examples to strings before returning examples = [str(example) for example in tids[:5]] @@ -454,72 +492,85 @@ def _compare_samples_against_qiita(self): for project in projects: msgs = [] self._get_tube_ids_from_qiita() - p_name = project['project_name'] - qiita_id = str(project['qiita_id']) - contains_replicates = project['contains_replicates'] + p_name = project["project_name"] + qiita_id = str(project["qiita_id"]) + contains_replicates = project["contains_replicates"] # get list of samples as presented by the sample-sheet or mapping # file and confirm that they are all registered in Qiita. if contains_replicates: # don't match against sample-names with a trailing well-id # if project contains replicates. - msgs.append("This sample-sheet contains replicates. sample-" - "names will be sourced from orig_name column.") + msgs.append( + "This sample-sheet contains replicates. sample-" + "names will be sourced from orig_name column." + ) samples = set(self.pipeline.get_orig_names_from_sheet(p_name)) else: samples = set(self.pipeline.get_sample_names(p_name)) # do not include blanks. If they are unregistered, we will add # them downstream. - samples = {smpl for smpl in samples - if not smpl.startswith('BLANK')} + samples = {smpl for smpl in samples if not smpl.startswith("BLANK")} - msgs.append(f"The total number of samples found in {p_name} that " - f"aren't BLANK is: {len(samples)}") + msgs.append( + f"The total number of samples found in {p_name} that " + f"aren't BLANK is: {len(samples)}" + ) - results_sn = self._process_sample_names(p_name, qiita_id, - samples) + results_sn = self._process_sample_names(p_name, qiita_id, samples) rsn = results_sn[0] - msgs.append('Number of sample-names not in Qiita: ' - f'{len(rsn)}; {list(rsn)[:3]}') + msgs.append( + f"Number of sample-names not in Qiita: {len(rsn)}; {list(rsn)[:3]}" + ) use_tids = False if len(results_sn[0]) == 0: - msgs.append(f"All values in sheet matched sample-names " - f"registered with {p_name}") + msgs.append( + f"All values in sheet matched sample-names registered with {p_name}" + ) else: # not all values were matched to sample-names. # check for possible match w/tube-ids, if defined in project. results_tid = self._process_tube_ids(qiita_id, samples) if results_tid: rtid = results_tid[0] - msgs.append('Number of tube-ids not in Qiita: ' - f'{len(rtid)}; {list(rtid)[:3]}') + msgs.append( + "Number of tube-ids not in Qiita: " + f"{len(rtid)}; {list(rtid)[:3]}" + ) if len(results_tid[0]) == 0: # all values were matched to tube-ids. use_tids = True - msgs.append(f"All values in sheet matched tube-ids " - f"registered with {p_name}") + msgs.append( + f"All values in sheet matched tube-ids " + f"registered with {p_name}" + ) else: # we have sample-names and tube-ids and neither is # a perfect match. if len(results_tid[0]) < len(results_sn[0]): # more tube-ids matched than sample-names. use_tids = True - msgs.append(f"More values in sheet matched tube-" - f"ids than sample-names with {p_name}") + msgs.append( + f"More values in sheet matched tube-" + f"ids than sample-names with {p_name}" + ) elif len(results_tid[0]) == len(results_sn[0]): - msgs.append("Sample-names and tube-ids were " - "equally non-represented in the " - "sample-sheet") + msgs.append( + "Sample-names and tube-ids were " + "equally non-represented in the " + "sample-sheet" + ) else: - msgs.append(f"More values in sheet matched sample-" - f"names than tube-ids with {p_name}") + msgs.append( + f"More values in sheet matched sample-" + f"names than tube-ids with {p_name}" + ) else: - msgs.append("there are no tube-ids registered with " - f"{p_name}") + msgs.append(f"there are no tube-ids registered with {p_name}") if use_tids: not_in_qiita = results_tid[0] @@ -532,35 +583,39 @@ def _compare_samples_against_qiita(self): # return an entry for all projects, even when samples_not_in_qiita # is an empty list, as the information is still valuable. - results.append({'samples_not_in_qiita': not_in_qiita, - 'examples_in_qiita': examples, - 'project_name': p_name, - 'total_in_qiita': total_in_qiita, - 'used_tids': use_tids, - 'messages': msgs}) + results.append( + { + "samples_not_in_qiita": not_in_qiita, + "examples_in_qiita": examples, + "project_name": p_name, + "total_in_qiita": total_in_qiita, + "used_tids": use_tids, + "messages": msgs, + } + ) return results @classmethod def get_samples_in_qiita(cls, qclient, qiita_id): - ''' + """ Obtain lists for sample-names and tube-ids registered in Qiita. :param qclient: QiitaClient object :param qiita_id: Qiita ID for the project in question. :return: a tuple of lists, one for sample-names, another for tube-ids. - ''' - samples = qclient.get(f'/api/v1/study/{qiita_id}/samples') + """ + samples = qclient.get(f"/api/v1/study/{qiita_id}/samples") # remove Qiita ID as a prefix from the sample-names. - samples = {x.replace(f'{qiita_id}.', '') for x in samples} + samples = {x.replace(f"{qiita_id}.", "") for x in samples} # find out if tube-ids are registered in the study. - categories = qclient.get(f'/api/v1/study/{qiita_id}' - '/samples/info')['categories'] + categories = qclient.get(f"/api/v1/study/{qiita_id}/samples/info")["categories"] - if 'tube_id' in categories: - tids = qclient.get(f'/api/v1/study/{qiita_id}/samples/' - 'categories=tube_id')['samples'] + if "tube_id" in categories: + tids = qclient.get(f"/api/v1/study/{qiita_id}/samples/categories=tube_id")[ + "samples" + ] else: tids = None @@ -568,42 +623,41 @@ def get_samples_in_qiita(cls, qclient, qiita_id): def _get_postqc_fastq_files(self, out_dir, project): af = None - sub_folders = ['amplicon', 'filtered_sequences', 'trimmed_sequences'] + sub_folders = ["amplicon", "filtered_sequences", "trimmed_sequences"] for sub_folder in sub_folders: - sf = join(out_dir, 'NuQCJob', project, sub_folder) + sf = join(out_dir, "NuQCJob", project, sub_folder) if exists(sf): - af = [f for f in glob(join(sf, '*.fastq.gz'))] + af = [f for f in glob(join(sf, "*.fastq.gz"))] break if af is None or not af: raise WorkflowError("NuQCJob output not in expected location") - files = {'raw_barcodes': [], 'raw_forward_seqs': [], - 'raw_reverse_seqs': []} + files = {"raw_barcodes": [], "raw_forward_seqs": [], "raw_reverse_seqs": []} for fastq_file in af: _, file_name = split(fastq_file) orientation = determine_orientation(file_name) - if orientation in ['I1', 'I2']: - files['raw_barcodes'].append(fastq_file) - elif orientation == 'R1': - files['raw_forward_seqs'].append(fastq_file) - elif orientation == 'R2': - files['raw_reverse_seqs'].append(fastq_file) + if orientation in ["I1", "I2"]: + files["raw_barcodes"].append(fastq_file) + elif orientation == "R1": + files["raw_forward_seqs"].append(fastq_file) + elif orientation == "R2": + files["raw_reverse_seqs"].append(fastq_file) else: raise ValueError(f"Unrecognized file: {fastq_file}") - files['raw_barcodes'].sort() - files['raw_forward_seqs'].sort() - files['raw_reverse_seqs'].sort() + files["raw_barcodes"].sort() + files["raw_forward_seqs"].sort() + files["raw_reverse_seqs"].sort() # Amplicon runs should contain raw_barcodes/I1 files. # Meta*omics files doesn't use them. if self.pipeline.pipeline_type != ASSAY_NAME_AMPLICON: - del (files['raw_barcodes']) + del files["raw_barcodes"] # PacBio doesn't have reverse reads if self.protocol_type == PROTOCOL_NAME_PACBIO_SMRT: - del (files['raw_reverse_seqs']) + del files["raw_reverse_seqs"] # confirm expected lists of reads are not empty. for f_type in files: @@ -614,32 +668,41 @@ def _get_postqc_fastq_files(self, out_dir, project): return files - def _load_prep_into_qiita(self, qclient, prep_id, artifact_name, - qiita_id, project, fastq_files, atype): - surl = f'{qclient._server_url}/study/description/{qiita_id}' - prep_url = (f'{qclient._server_url}/study/description/' - f'{qiita_id}?prep_id={prep_id}') + def _load_prep_into_qiita( + self, qclient, prep_id, artifact_name, qiita_id, project, fastq_files, atype + ): + surl = f"{qclient._server_url}/study/description/{qiita_id}" + prep_url = ( + f"{qclient._server_url}/study/description/{qiita_id}?prep_id={prep_id}" + ) # ideally we would use the email of the user that started the SPP # run but at this point there is no easy way to retrieve it - pdata = {'user_email': 'qiita.help@gmail.com', - 'prep_id': prep_id, - 'artifact_type': atype, - 'command_artifact_name': artifact_name, - 'files': dumps(fastq_files)} + pdata = { + "user_email": "qiita.help@gmail.com", + "prep_id": prep_id, + "artifact_type": atype, + "command_artifact_name": artifact_name, + "files": dumps(fastq_files), + } # this will block adding the default workflow to the # long read / pacbio processing; which is desirable # until we have a processing pipeline - note that # this will only be added if _not_ 'long' - if self.read_length != 'long': - pdata['add_default_workflow'] = True + if self.read_length != "long": + pdata["add_default_workflow"] = True - job_id = qclient.post('/qiita_db/artifact/', data=pdata)['job_id'] + job_id = qclient.post("/qiita_db/artifact/", data=pdata)["job_id"] - return {'Project': project, 'Qiita Study ID': qiita_id, - 'Qiita Prep ID': prep_id, 'Qiita URL': surl, - 'Artifact Name': artifact_name, - 'Prep URL': prep_url, 'Linking JobID': job_id} + return { + "Project": project, + "Qiita Study ID": qiita_id, + "Qiita Prep ID": prep_id, + "Qiita URL": surl, + "Artifact Name": artifact_name, + "Prep URL": prep_url, + "Linking JobID": job_id, + } def _copy_files(self, files): # increment the prep_copy_index before generating a new set of copies. @@ -649,7 +712,7 @@ def _copy_files(self, files): new_files[key] = [] for some_path in files[key]: path_name, file_name = split(some_path) - path_name = join(path_name, f'copy{self.prep_copy_index}') + path_name = join(path_name, f"copy{self.prep_copy_index}") makedirs(path_name, exist_ok=True) new_files[key].append(join(path_name, file_name)) @@ -665,8 +728,10 @@ def _get_tube_ids_from_qiita(self): # Update get_project_info() so that it can return a list of # samples in projects['samples']. Include blanks in projects['blanks'] # just in case there are duplicate qiita_ids - qiita_ids = [proj['qiita_id'] for proj in - self.pipeline.get_project_info(short_names=True)] + qiita_ids = [ + proj["qiita_id"] + for proj in self.pipeline.get_project_info(short_names=True) + ] tids_by_qiita_id = {} sample_names_by_qiita_id = {} @@ -681,8 +746,7 @@ def _get_tube_ids_from_qiita(self): if tids is not None: # fix values in tids to be a string instead of a list of one. # also, remove the qiita_id prepending each sample-name. - tids = {k.replace(f'{qiita_id}.', ''): tids[k][0] for k in - tids} + tids = {k.replace(f"{qiita_id}.", ""): tids[k][0] for k in tids} # the values Qiita returns for tids seems like it can include # empty strings if there is no tube-id associated with a @@ -713,8 +777,7 @@ def determine_steps_to_skip(self): # check if the test flag is on! test = False - if 'PrepNuQCJob_TEST' in environ and \ - environ['PrepNuQCJob_TEST'] == 'true': + if "PrepNuQCJob_TEST" in environ and environ["PrepNuQCJob_TEST"] == "true": test = True # Although amplicon runs don't perform host-filtering, @@ -723,13 +786,14 @@ def determine_steps_to_skip(self): # absence of a 'NuQCJob' directory is still a thing (for now) for directory in self.directories_to_check: if exists(join(out_dir, directory)): - if exists(join(out_dir, directory, 'job_completed')): + if exists(join(out_dir, directory, "job_completed")): # this step completed successfully but # TRIJ_Post_Processing is a special case and we # need to look for post_processing_completed - if directory == 'TRIJ_Post_Processing': - if not exists(join(out_dir, directory, - 'post_processing_completed')): + if directory == "TRIJ_Post_Processing": + if not exists( + join(out_dir, directory, "post_processing_completed") + ): rmtree(join(out_dir, directory)) break self.skip_steps.append(directory) diff --git a/src/qp_klp/__init__.py b/src/qp_klp/__init__.py index 30e403fa..b5935161 100644 --- a/src/qp_klp/__init__.py +++ b/src/qp_klp/__init__.py @@ -5,44 +5,56 @@ # # The full license is in the file LICENSE, distributed with this software. # ----------------------------------------------------------------------------- -from qiita_client import QiitaPlugin, QiitaCommand -from .klp import sequence_processing_pipeline, PrepNuQCJob +from qiita_client import QiitaCommand, QiitaPlugin + +from .klp import PrepNuQCJob, sequence_processing_pipeline class QiitaPluginAdmin(QiitaPlugin): _plugin_type = "private" -__version__ = '2024.11' +__version__ = "2024.11" # Adding SPP job -plugin = QiitaPluginAdmin('qp-klp', __version__, 'Knight Lab Processing') +plugin = QiitaPluginAdmin("qp-klp", __version__, "Knight Lab Processing") req_params = { - 'run_identifier': ('string', ['']), - 'sample_sheet': ('prep_template', ['']), - 'lane_number': ('integer', [None]), - } + "run_identifier": ("string", [""]), + "sample_sheet": ("prep_template", [""]), + "lane_number": ("integer", [None]), +} opt_params = dict() -outputs = {'output': 'job-output-folder'} +outputs = {"output": "job-output-folder"} dflt_param_set = dict() sequence_processing_pipeline_cmd = QiitaCommand( - 'Sequence Processing Pipeline', 'Associate a sample sheet with a ' - 'sequencing run and generate the corresponding sequence files', - sequence_processing_pipeline, req_params, opt_params, outputs, - dflt_param_set) + "Sequence Processing Pipeline", + "Associate a sample sheet with a " + "sequencing run and generate the corresponding sequence files", + sequence_processing_pipeline, + req_params, + opt_params, + outputs, + dflt_param_set, +) plugin.register_command(sequence_processing_pipeline_cmd) # adding PrepNuQC job -req_params = {'prep_id': ('integer', [None])} -outputs = {'output': 'job-output-folder'} +req_params = {"prep_id": ("integer", [None])} +outputs = {"output": "job-output-folder"} prepNuQC_cmd = QiitaCommand( - 'Human Filter & QC existing Prep', 'Apply NuQCJob to an existing ' - 'Qiita prep raw data', PrepNuQCJob, req_params, dict(), outputs, dict()) + "Human Filter & QC existing Prep", + "Apply NuQCJob to an existing Qiita prep raw data", + PrepNuQCJob, + req_params, + dict(), + outputs, + dict(), +) plugin.register_command(prepNuQC_cmd) diff --git a/src/qp_klp/klp.py b/src/qp_klp/klp.py index 37c139b0..4b1eecac 100644 --- a/src/qp_klp/klp.py +++ b/src/qp_klp/klp.py @@ -5,27 +5,28 @@ # # The full license is in the file LICENSE, distributed with this software. # ----------------------------------------------------------------------------- +import traceback from functools import partial +from os import environ, makedirs +from os.path import exists, join + +from metapool import load_sample_sheet from qiita_client import ArtifactInfo -from os import makedirs, environ -from os.path import join, exists -import traceback from sequence_processing_pipeline.PipelineError import PipelineError -from metapool import load_sample_sheet -from .Workflows import WorkflowError -from .WorkflowFactory import WorkflowFactory -from .StandardMetagenomicWorkflow import PrepNuQCWorkflow +from .StandardMetagenomicWorkflow import PrepNuQCWorkflow +from .WorkflowFactory import WorkflowFactory +from .Workflows import WorkflowError CONFIG_FP = environ["QP_KLP_CONFIG_FP"] -class StatusUpdate(): +class StatusUpdate: def __init__(self, qclient, job_id): self.qclient = qclient self.job_id = job_id - self.msg = '' + self.msg = "" def update_job_status(self, msg): self.qclient.update_job_step(self.job_id, msg) @@ -59,18 +60,20 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): # submit(). New SPP jobs will record all the information needed to perform # a restart should it fail. out_path = partial(join, out_dir) - is_restart = True if exists(out_path('restart_me')) else False + is_restart = True if exists(out_path("restart_me")) else False if is_restart: status_line.update_job_status("Restarting pipeline") - with open(out_path('restart_me'), 'r') as f: + with open(out_path("restart_me"), "r") as f: lines = f.readlines() lines = [x.strip() for x in lines] - lines = [x for x in lines if x != ''] + lines = [x for x in lines if x != ""] if len(lines) != 2: - raise ValueError(f"{out_path('restart_me')} can only contain " - "the run-identifier on line 1 and the name " - "of the user input file on line 2") + raise ValueError( + f"{out_path('restart_me')} can only contain " + "the run-identifier on line 1 and the name " + "of the user input file on line 2" + ) run_identifier = lines[0] uif_path = out_path(lines[1]) @@ -87,14 +90,18 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): # Amplicon run, then Assay type will be 'TruSeq HT' and Chemistry # will be 'Amplicon'. For now, raise Error on restarting an # Amplicon run so we don't have to search for the pre-prep file. - if sheet.Header['Assay'] == 'TruSeq HT' and \ - sheet.Header['Chemistry'] == 'Amplicon': + if ( + sheet.Header["Assay"] == "TruSeq HT" + and sheet.Header["Chemistry"] == "Amplicon" + ): raise ValueError("Restarting Amplicon jobs currently unsupported") # add a note for the wetlab that this job was restarted. - with open(join(out_dir, 'notes.txt'), 'w') as f: - f.write("This job was restarted.\n" - "failed_samples.html may contain incorrect data.\n") + with open(join(out_dir, "notes.txt"), "w") as f: + f.write( + "This job was restarted.\n" + "failed_samples.html may contain incorrect data.\n" + ) else: # This is a new, fresh SPP job. Get the parameters from the user and # create a 'restart_me' file in the working directory. @@ -104,17 +111,23 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): # available fields for parameters are: # run_identifier, sample_sheet, content_type, filename, lane_number - run_identifier = parameters.pop('run_identifier') - user_input_file = parameters.pop('sample_sheet') - lane_number = parameters.pop('lane_number') - - if {'body', 'content_type', 'filename'} != set(user_input_file): - return False, None, ("This doesn't appear to be a valid sample " - "sheet or mapping file; please review.") - uif_path = out_path(user_input_file['filename'].replace(' ', '_')) + run_identifier = parameters.pop("run_identifier") + user_input_file = parameters.pop("sample_sheet") + lane_number = parameters.pop("lane_number") + + if {"body", "content_type", "filename"} != set(user_input_file): + return ( + False, + None, + ( + "This doesn't appear to be a valid sample " + "sheet or mapping file; please review." + ), + ) + uif_path = out_path(user_input_file["filename"].replace(" ", "_")) # save raw data to file - with open(uif_path, 'w') as f: - f.write(user_input_file['body']) + with open(uif_path, "w") as f: + f.write(user_input_file["body"]) # the run_identifier must be saved because it is not always preserved # in a dependable location downstream. The user input file must be @@ -122,25 +135,27 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): # to be the only .csv or .txt file at the root level of the working # directory. The Lane number is not needed because it is written into # the user_input file on the first run. - restart_file_path = out_path('restart_me') - with open(restart_file_path, 'w') as f: + restart_file_path = out_path("restart_me") + with open(restart_file_path, "w") as f: f.write(f"{run_identifier}\n{uif_path}") - final_results_path = out_path('final_results') + final_results_path = out_path("final_results") makedirs(final_results_path, exist_ok=True) - kwargs = {'qclient': qclient, - 'uif_path': uif_path, - 'lane_number': lane_number, - 'config_fp': CONFIG_FP, - 'run_identifier': run_identifier, - 'output_dir': out_dir, - 'job_id': job_id, - 'status_update_callback': status_line.update_job_status, - # set 'update_qiita' to False to avoid updating Qiita DB - # and copying files into uploads dir. Useful for testing. - 'update_qiita': True, - 'is_restart': is_restart} + kwargs = { + "qclient": qclient, + "uif_path": uif_path, + "lane_number": lane_number, + "config_fp": CONFIG_FP, + "run_identifier": run_identifier, + "output_dir": out_dir, + "job_id": job_id, + "status_update_callback": status_line.update_job_status, + # set 'update_qiita' to False to avoid updating Qiita DB + # and copying files into uploads dir. Useful for testing. + "update_qiita": True, + "is_restart": is_restart, + } status_line.update_job_status("Getting project information") try: workflow = WorkflowFactory().generate_workflow(**kwargs) @@ -148,16 +163,15 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): except (PipelineError, WorkflowError) as e: # add full traceback to the working directory so devs/admins # can review and get as much info as possible - with open(f'{out_dir}/error-traceback.err', 'w') as f: + with open(f"{out_dir}/error-traceback.err", "w") as f: f.write(traceback.format_exc()) return False, None, str(e) status_line.update_job_status("SPP finished") # return success, ainfo, and the last status message. - paths = [(f'{final_results_path}/', 'directory')] - return (True, [ArtifactInfo('output', 'job-output-folder', paths)], - status_line.msg) + paths = [(f"{final_results_path}/", "directory")] + return (True, [ArtifactInfo("output", "job-output-folder", paths)], status_line.msg) def PrepNuQCJob(qclient, job_id, parameters, out_dir): @@ -181,18 +195,23 @@ def PrepNuQCJob(qclient, job_id, parameters, out_dir): status_line.update_job_status("Setting up pipeline") - kwargs = {'qclient': qclient, 'job_id': job_id, - 'parameters': parameters, 'out_dir': out_dir, - 'config_fp': CONFIG_FP, 'status_line': status_line} + kwargs = { + "qclient": qclient, + "job_id": job_id, + "parameters": parameters, + "out_dir": out_dir, + "config_fp": CONFIG_FP, + "status_line": status_line, + } try: workflow = PrepNuQCWorkflow(**kwargs) workflow.execute_pipeline() except (PipelineError, WorkflowError, RuntimeError) as e: - with open(f'{out_dir}/error-traceback.err', 'w') as f: + with open(f"{out_dir}/error-traceback.err", "w") as f: f.write(traceback.format_exc()) return False, None, str(e) status_line.update_job_status("prep_NuQCJob finished") # return success, ainfo, and the last status message. - paths = [(f'{workflow.final_results_path}/', 'directory')] - return (True, [ArtifactInfo('output', 'job-output-folder', paths)], '') + paths = [(f"{workflow.final_results_path}/", "directory")] + return (True, [ArtifactInfo("output", "job-output-folder", paths)], "") diff --git a/src/qp_klp/scripts/configure_klp.py b/src/qp_klp/scripts/configure_klp.py index 3059d996..af457b46 100755 --- a/src/qp_klp/scripts/configure_klp.py +++ b/src/qp_klp/scripts/configure_klp.py @@ -14,16 +14,25 @@ @click.command() -@click.option('--env-script', prompt='Environment script', required=True, - default='source activate qp-klp') -@click.option('--ca-cert', prompt='Server certificate', required=False, - default='None', show_default=True) +@click.option( + "--env-script", + prompt="Environment script", + required=True, + default="source activate qp-klp", +) +@click.option( + "--ca-cert", + prompt="Server certificate", + required=False, + default="None", + show_default=True, +) def config(env_script, ca_cert): """Generates the Qiita configuration files""" - if ca_cert == 'None': + if ca_cert == "None": ca_cert = None - plugin.generate_config(env_script, 'start_klp', ca_cert) + plugin.generate_config(env_script, "start_klp", ca_cert) -if __name__ == '__main__': +if __name__ == "__main__": config() diff --git a/src/qp_klp/scripts/pacbio_commands.py b/src/qp_klp/scripts/pacbio_commands.py index b0ab0717..8dd6ea6f 100755 --- a/src/qp_klp/scripts/pacbio_commands.py +++ b/src/qp_klp/scripts/pacbio_commands.py @@ -7,52 +7,53 @@ # # The full license is in the file LICENSE, distributed with this software. # ----------------------------------------------------------------------------- -import click -import pandas as pd from glob import glob from os import makedirs +import click +import pandas as pd + @click.command() -@click.argument('sample_list', required=True) -@click.argument('run_folder', required=True) -@click.argument('outdir', required=True) -@click.argument('threads', required=True, default=1) +@click.argument("sample_list", required=True) +@click.argument("run_folder", required=True) +@click.argument("outdir", required=True) +@click.argument("threads", required=True, default=1) def generate_bam2fastq_commands(sample_list, run_folder, outdir, threads): """Generates the bam2fastq commands""" - df = pd.read_csv(sample_list, sep='\t') + df = pd.read_csv(sample_list, sep="\t") # pacbio raw files are in a hifi_reads folder, wihtin multiple folders # (1_A01, 2_A02, ect), within the run-id folder; and are named # m[run-id]XXX.hifi_reads.[barcode].bam; thus to find the [barcode] we # can split on '.' and then the second to last element [-2]. - files = {f.split('.')[-2]: f - for f in glob(f'{run_folder}/*/hifi_reads/*.bam')} + files = {f.split(".")[-2]: f for f in glob(f"{run_folder}/*/hifi_reads/*.bam")} makedirs(outdir, exist_ok=True) commands, missing_files = [], [] for _, row in df.iterrows(): - bc = row['barcode'] - sn = row['sample_name'] - pn = row['project_name'] - lane = row['lane'] + bc = row["barcode"] + sn = row["sample_name"] + pn = row["project_name"] + lane = row["lane"] if bc not in files: missing_files.append(bc) continue - od = f'{outdir}/{pn}' + od = f"{outdir}/{pn}" makedirs(od, exist_ok=True) - fn = f'{od}/{sn}_S000_L00{lane}_R1_001' - cmd = (f'bam2fastq -j {threads} -o {fn} -c 9 ' - f'{files[bc]}; ' - f'fqtools count {fn}.fastq.gz > ' - f'{fn}.counts.txt') + fn = f"{od}/{sn}_S000_L00{lane}_R1_001" + cmd = ( + f"bam2fastq -j {threads} -o {fn} -c 9 " + f"{files[bc]}; " + f"fqtools count {fn}.fastq.gz > " + f"{fn}.counts.txt" + ) commands.append(cmd) if missing_files: - raise ValueError( - f'{run_folder} is missing barcodes: {missing_files}') + raise ValueError(f"{run_folder} is missing barcodes: {missing_files}") for cmd in commands: print(cmd) diff --git a/src/qp_klp/scripts/start_klp.py b/src/qp_klp/scripts/start_klp.py index b43aca70..33aa2c52 100755 --- a/src/qp_klp/scripts/start_klp.py +++ b/src/qp_klp/scripts/start_klp.py @@ -14,13 +14,13 @@ @click.command() -@click.argument('url', required=True) -@click.argument('job_id', required=True) -@click.argument('output_dir', required=True) +@click.argument("url", required=True) +@click.argument("job_id", required=True) +@click.argument("output_dir", required=True) def execute(url, job_id, output_dir): """Executes the task given by job_id and puts the output in output_dir""" plugin(url, job_id, output_dir) -if __name__ == '__main__': +if __name__ == "__main__": execute() diff --git a/src/sequence_processing_pipeline/contrib/integrate-indices-np.py b/src/sequence_processing_pipeline/contrib/integrate-indices-np.py index b1be83a6..304fc789 100644 --- a/src/sequence_processing_pipeline/contrib/integrate-indices-np.py +++ b/src/sequence_processing_pipeline/contrib/integrate-indices-np.py @@ -46,16 +46,16 @@ # barcodes are incorporating as expected in output. # # author: Daniel McDonald (d3mcdonald@eng.ucsd.edu) -import numpy as np -import click -import re -import io -import pgzip import gzip +import io +import re +import click +import numpy as np +import pgzip -RECORD = re.compile(rb'@\S+\n[ATGCN]+\n\+\n\S+\n') -BARCODE = re.compile(rb'@\S+\n([ATGCN]+)\n\+\n\S+\n') +RECORD = re.compile(rb"@\S+\n[ATGCN]+\n\+\n\S+\n") +BARCODE = re.compile(rb"@\S+\n([ATGCN]+)\n\+\n\S+\n") def gather_order(i1_in_fp): @@ -78,9 +78,9 @@ def gather_order(i1_in_fp): end = len(i1) # get the number of records. we completely assume non-multiline fastq here - newlines = i1.count(b'\n') + newlines = i1.count(b"\n") assert newlines % 4 == 0 - barcodes = np.empty(newlines // 4, dtype='|S%d' % rec_len) + barcodes = np.empty(newlines // 4, dtype="|S%d" % rec_len) # walk all index records # grab each barcode @@ -108,19 +108,43 @@ def gather_order(i1_in_fp): def test_gather_order(): - i1data = [b'@foo', b'ATGC', b'+', b'!!!!', - b'@bar', b'TTGG', b'+', b'!!!!', - b'@baz', b'ATGC', b'+', b'!!!!', - b'@oof', b'TTTT', b'+', b'!!!!', - b'@rab', b'TTGG', b'+', b'!!!!', - b'@zab', b'TTTT', b'+', b'!!!!', - b'@ofo', b'TTTT', b'+', b'!!!!', b''] - - i1 = io.BytesIO(b'\n'.join(i1data)) + i1data = [ + b"@foo", + b"ATGC", + b"+", + b"!!!!", + b"@bar", + b"TTGG", + b"+", + b"!!!!", + b"@baz", + b"ATGC", + b"+", + b"!!!!", + b"@oof", + b"TTTT", + b"+", + b"!!!!", + b"@rab", + b"TTGG", + b"+", + b"!!!!", + b"@zab", + b"TTTT", + b"+", + b"!!!!", + b"@ofo", + b"TTTT", + b"+", + b"!!!!", + b"", + ] + + i1 = io.BytesIO(b"\n".join(i1data)) order, unique, bounds = gather_order(i1) exp_order = np.array([0, 2, 1, 4, 3, 5, 6]) - exp_unique = np.array([b'ATGC', b'TTGG', b'TTTT']) + exp_unique = np.array([b"ATGC", b"TTGG", b"TTTT"]) exp_bounds = np.array([0, 2, 4]) assert (order == exp_order).all() @@ -173,69 +197,140 @@ def troll_and_write(order, unique, bounds, in_, out_): # n.b., b'foo'[0] is int, because yay, so we use a slice to maintain # a human readable character to test against as most mortals haven't # memorized the ascii table - assert record[:1] == b'@' - assert record[-1:] == b'\n' + assert record[:1] == b"@" + assert record[-1:] == b"\n" with_barcode = insert_barcode(record, current_barcode) out_.write(with_barcode) def test_troll_and_write(): - i1data = [b'@foo', b'ATGC', b'+', b'!!!!', - b'@bar', b'TTGG', b'+', b'!!!!', - b'@baz', b'ATGC', b'+', b'!!!!', - b'@oof', b'TTTT', b'+', b'!!!!', - b'@rab', b'TTGG', b'+', b'!!!!', - b'@zab', b'TTTT', b'+', b'!!!!', - b'@ofo', b'TTTT', b'+', b'!!!!', b''] - - i1 = io.BytesIO(b'\n'.join(i1data)) + i1data = [ + b"@foo", + b"ATGC", + b"+", + b"!!!!", + b"@bar", + b"TTGG", + b"+", + b"!!!!", + b"@baz", + b"ATGC", + b"+", + b"!!!!", + b"@oof", + b"TTTT", + b"+", + b"!!!!", + b"@rab", + b"TTGG", + b"+", + b"!!!!", + b"@zab", + b"TTTT", + b"+", + b"!!!!", + b"@ofo", + b"TTTT", + b"+", + b"!!!!", + b"", + ] + + i1 = io.BytesIO(b"\n".join(i1data)) order, unique, bounds = gather_order(i1) # we assume records are in the same order, as that has previously been # observed w/ tellread and is the normal expectation - r1data = [b'@foo', b'AATGC', b'+', b'!!!!!', - b'@bar', b'ATTGG', b'+', b'!!!!!', - b'@baz', b'AATGC', b'+', b'!!!!!', - b'@oof', b'ATTTT', b'+', b'!!!!!', - b'@rab', b'ATTGG', b'+', b'!!!!!', - b'@zab', b'ATTTT', b'+', b'!!!!!', - b'@ofo', b'ATTTT', b'+', b'!!!!!', b''] - r1 = io.BytesIO(b'\n'.join(r1data)) + r1data = [ + b"@foo", + b"AATGC", + b"+", + b"!!!!!", + b"@bar", + b"ATTGG", + b"+", + b"!!!!!", + b"@baz", + b"AATGC", + b"+", + b"!!!!!", + b"@oof", + b"ATTTT", + b"+", + b"!!!!!", + b"@rab", + b"ATTGG", + b"+", + b"!!!!!", + b"@zab", + b"ATTTT", + b"+", + b"!!!!!", + b"@ofo", + b"ATTTT", + b"+", + b"!!!!!", + b"", + ] + r1 = io.BytesIO(b"\n".join(r1data)) r1out = io.BytesIO() troll_and_write(order, unique, bounds, r1, r1out) r1out.seek(0) - r1exp = [b'@foo BX:Z:ATGC-1', b'AATGC', b'+', b'!!!!!', - b'@baz BX:Z:ATGC-1', b'AATGC', b'+', b'!!!!!', - b'@bar BX:Z:TTGG-1', b'ATTGG', b'+', b'!!!!!', - b'@rab BX:Z:TTGG-1', b'ATTGG', b'+', b'!!!!!', - b'@oof BX:Z:TTTT-1', b'ATTTT', b'+', b'!!!!!', - b'@zab BX:Z:TTTT-1', b'ATTTT', b'+', b'!!!!!', - b'@ofo BX:Z:TTTT-1', b'ATTTT', b'+', b'!!!!!', - b''] - r1exp = b'\n'.join(r1exp) + r1exp = [ + b"@foo BX:Z:ATGC-1", + b"AATGC", + b"+", + b"!!!!!", + b"@baz BX:Z:ATGC-1", + b"AATGC", + b"+", + b"!!!!!", + b"@bar BX:Z:TTGG-1", + b"ATTGG", + b"+", + b"!!!!!", + b"@rab BX:Z:TTGG-1", + b"ATTGG", + b"+", + b"!!!!!", + b"@oof BX:Z:TTTT-1", + b"ATTTT", + b"+", + b"!!!!!", + b"@zab BX:Z:TTTT-1", + b"ATTTT", + b"+", + b"!!!!!", + b"@ofo BX:Z:TTTT-1", + b"ATTTT", + b"+", + b"!!!!!", + b"", + ] + r1exp = b"\n".join(r1exp) assert r1exp == r1out.read() def create_tag(t): - return b'BX:Z:%s-1' % t + return b"BX:Z:%s-1" % t def create_tag_no_suffix(t): - return b'BX:Z:%s' % t + return b"BX:Z:%s" % t def insert_barcode(record, barcode): """Get the current ID, smash the needed tag in""" # @foo\nATGC\n+\n!!!!\n - id_, remainder = record.split(b'\n', 1) + id_, remainder = record.split(b"\n", 1) tag = create_tag(barcode) - return b'%s %s\n%s' % (id_, tag, remainder) + return b"%s %s\n%s" % (id_, tag, remainder) def readfq(fp): - if fp.mode == 'rb': + if fp.mode == "rb": strip = bytes.strip else: strip = str.strip @@ -251,7 +346,7 @@ def readfq(fp): def writefq(rec, out): for item in rec: out.write(item) - out.write(b'\n') + out.write(b"\n") @click.group() @@ -266,21 +361,21 @@ def tests(): @cli.command() -@click.option('--r1-in', type=click.Path(exists=True), required=True) -@click.option('--r2-in', type=click.Path(exists=True), required=True) -@click.option('--i1-in', type=click.Path(exists=True), required=True) -@click.option('--r1-out', type=click.Path(exists=False), required=True) -@click.option('--r2-out', type=click.Path(exists=False), required=True) -@click.option('--threads', type=int, required=False, default=1) -@click.option('--no-sort', is_flag=True, default=False) +@click.option("--r1-in", type=click.Path(exists=True), required=True) +@click.option("--r2-in", type=click.Path(exists=True), required=True) +@click.option("--i1-in", type=click.Path(exists=True), required=True) +@click.option("--r1-out", type=click.Path(exists=False), required=True) +@click.option("--r2-out", type=click.Path(exists=False), required=True) +@click.option("--threads", type=int, required=False, default=1) +@click.option("--no-sort", is_flag=True, default=False) def integrate(r1_in, r2_in, i1_in, r1_out, r2_out, threads, no_sort): - r1_in_fp = open(r1_in, 'rb') - r2_in_fp = open(r2_in, 'rb') - i1_in_fp = open(i1_in, 'rb') + r1_in_fp = open(r1_in, "rb") + r2_in_fp = open(r2_in, "rb") + i1_in_fp = open(i1_in, "rb") if no_sort: - r1_out_fp = gzip.open(r1_out, mode='wb') - r2_out_fp = gzip.open(r2_out, mode='wb') + r1_out_fp = gzip.open(r1_out, mode="wb") + r2_out_fp = gzip.open(r2_out, mode="wb") r1_sniff = r1_in_fp.readline().strip() r2_sniff = r2_in_fp.readline().strip() @@ -289,20 +384,22 @@ def integrate(r1_in, r2_in, i1_in, r1_out, r2_out, threads, no_sort): # outputs from tellread don't seem to have orientation information # some downstream programs hate this, so let's add if needed. - if r1_sniff.endswith(b'/1'): - if not r2_sniff.endswith(b'/2'): - raise ValueError('unexpected endings: ' - f'{r1_sniff.decode("utf-8")} ' - f'{r2_sniff.decode("utf-8")}') - orient_r1 = '' - orient_r2 = '' + if r1_sniff.endswith(b"/1"): + if not r2_sniff.endswith(b"/2"): + raise ValueError( + "unexpected endings: " + f"{r1_sniff.decode('utf-8')} " + f"{r2_sniff.decode('utf-8')}" + ) + orient_r1 = "" + orient_r2 = "" else: - assert b'/1' not in r1_sniff + assert b"/1" not in r1_sniff - orient_r1 = b'/1' - orient_r2 = b'/2' + orient_r1 = b"/1" + orient_r2 = b"/2" - for (r1, r2, i1) in zip(*map(readfq, [r1_in_fp, r2_in_fp, i1_in_fp])): + for r1, r2, i1 in zip(*map(readfq, [r1_in_fp, r2_in_fp, i1_in_fp])): assert r1[0] == r2[0] assert r1[0] == i1[0] @@ -315,10 +412,8 @@ def integrate(r1_in, r2_in, i1_in, r1_out, r2_out, threads, no_sort): r2_out_fp.close() else: # 200MB is what they use in their readme... - r1_out_fp = pgzip.open(r1_out, mode='wb', thread=threads, - blocksize=2*10**8) - r2_out_fp = pgzip.open(r2_out, mode='wb', thread=threads, - blocksize=2*10**8) + r1_out_fp = pgzip.open(r1_out, mode="wb", thread=threads, blocksize=2 * 10**8) + r2_out_fp = pgzip.open(r2_out, mode="wb", thread=threads, blocksize=2 * 10**8) order, unique, bounds = gather_order(i1_in_fp) @@ -328,5 +423,5 @@ def integrate(r1_in, r2_in, i1_in, r1_out, r2_out, threads, no_sort): out_.close() -if __name__ == '__main__': +if __name__ == "__main__": cli() diff --git a/src/sequence_processing_pipeline/contrib/plot_counts.py b/src/sequence_processing_pipeline/contrib/plot_counts.py index ecab9e49..04923835 100644 --- a/src/sequence_processing_pipeline/contrib/plot_counts.py +++ b/src/sequence_processing_pipeline/contrib/plot_counts.py @@ -1,27 +1,28 @@ -import matplotlib.pyplot as plt +import os import re import sys -import os + +import matplotlib.pyplot as plt import pandas as pd -ex = re.compile(r'_I1_(C5\d\d).fastq.gz.corrected.err_barcode_removed.fastq') +ex = re.compile(r"_I1_(C5\d\d).fastq.gz.corrected.err_barcode_removed.fastq") # remove total line from wc -data = [x.strip().split(' ') for x in open(sys.argv[1])][:-1] +data = [x.strip().split(" ") for x in open(sys.argv[1])][:-1] plotdata = [(ex.search(i).groups()[0], int(v) / 4) for v, i in data] sheetdata = dict(plotdata) ordered = sorted(plotdata, key=lambda x: x[1]) f = plt.figure(figsize=(16, 8)) plt.bar([i for i, _ in ordered], [v for _, v in ordered]) -plt.ylabel('I1 reads') +plt.ylabel("I1 reads") plt.xticks(list(range(len(ordered))), [i for i, _ in ordered], rotation=90) -plt.savefig(sys.argv[3] + '/counts.pdf') +plt.savefig(sys.argv[3] + "/counts.pdf") sheet = pd.read_csv(sys.argv[2], dtype=str) -sheet = sheet[~sheet['Lane'].isnull()] -sheet['read_counts'] = [sheetdata[i] for i in sheet['Barcode_ID']] -name = os.path.basename(sys.argv[2]).rsplit('.', 1)[0] -newname = name + '.read_counts.tsv' +sheet = sheet[~sheet["Lane"].isnull()] +sheet["read_counts"] = [sheetdata[i] for i in sheet["Barcode_ID"]] +name = os.path.basename(sys.argv[2]).rsplit(".", 1)[0] +newname = name + ".read_counts.tsv" -sheet.to_csv(sys.argv[3] + '/' + newname, sep='\t', index=False, header=True) +sheet.to_csv(sys.argv[3] + "/" + newname, sep="\t", index=False, header=True)