Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
558 changes: 312 additions & 246 deletions src/qp_klp/Assays.py

Large diffs are not rendered by default.

49 changes: 31 additions & 18 deletions src/qp_klp/FailedSamplesRecord.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from json import dumps, load
from os.path import join, exists
from os.path import exists, join

import pandas as pd


Expand All @@ -9,8 +10,8 @@ def __init__(self, output_dir, samples):
# each Job is run, and we want to organize that output by project, we
# need to keep a running state of failed samples, and reuse the method
# to reorganize the running-results and write them out to disk.
self.output_path = join(output_dir, 'failed_samples.json')
self.report_path = join(output_dir, 'failed_samples.html')
self.output_path = join(output_dir, "failed_samples.json")
self.report_path = join(output_dir, "failed_samples.html")

# create an initial dictionary with sample-ids as keys and their
# associated project-name and status as values. Afterwards, we'll
Expand All @@ -21,20 +22,19 @@ def __init__(self, output_dir, samples):
self.project_map = {x.Sample_ID: x.Sample_Project for x in samples}

def dump(self):
output = {'sample_state': self.sample_state,
'project_map': self.project_map}
output = {"sample_state": self.sample_state, "project_map": self.project_map}

with open(self.output_path, 'w') as f:
with open(self.output_path, "w") as f:
f.write(dumps(output, indent=2, sort_keys=True))

def load(self):
# if recorded state exists, overwrite initial state.
if exists(self.output_path):
with open(self.output_path, 'r') as f:
with open(self.output_path, "r") as f:
state = load(f)

self.sample_state = state['sample_state']
self.project_map = state['project_map']
self.sample_state = state["sample_state"]
self.project_map = state["project_map"]

def update(self, failed_ids, job_name):
# as a rule, if a failed_id were to appear in more than one
Expand All @@ -54,17 +54,30 @@ def write(self, failed_ids, job_name):

def generate_report(self):
# filter out the sample-ids w/out a failure status
filtered_fails = {x: self.sample_state[x] for x in self.sample_state if
self.sample_state[x] is not None}
filtered_fails = {
x: self.sample_state[x]
for x in self.sample_state
if self.sample_state[x] is not None
}

data = []
for sample_id in filtered_fails:
data.append({'Project': filtered_fails[sample_id],
'Sample ID': sample_id,
'Failed at': self.project_map[sample_id]
})
data.append(
{
"Project": filtered_fails[sample_id],
"Sample ID": sample_id,
"Failed at": self.project_map[sample_id],
}
)
df = pd.DataFrame(data)

with open(self.report_path, 'w') as f:
f.write(df.to_html(border=2, index=False, justify="left",
render_links=True, escape=False))
with open(self.report_path, "w") as f:
f.write(
df.to_html(
border=2,
index=False,
justify="left",
render_links=True,
escape=False,
)
)
94 changes: 53 additions & 41 deletions src/qp_klp/PacBioMetagenomicWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,83 @@
from .Protocol import PacBio
from os.path import join

import pandas as pd

from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Metagenomic
from .Assays import ASSAY_NAME_METAGENOMIC

from .Assays import ASSAY_NAME_METAGENOMIC, Metagenomic
from .FailedSamplesRecord import FailedSamplesRecord
from .Protocol import PacBio
from .Workflows import Workflow
import pandas as pd
from os.path import join


class PacBioMetagenomicWorkflow(Workflow, Metagenomic, PacBio):
def __init__(self, **kwargs):
super().__init__(**kwargs)

self.mandatory_attributes = ['qclient', 'uif_path',
'lane_number', 'config_fp',
'run_identifier', 'output_dir', 'job_id',
'is_restart']
self.mandatory_attributes = [
"qclient",
"uif_path",
"lane_number",
"config_fp",
"run_identifier",
"output_dir",
"job_id",
"is_restart",
]

self.confirm_mandatory_attributes()

# second stage initializer that could conceivably be pushed down into
# specific children requiring specific parameters.
self.qclient = self.kwargs['qclient']
self.qclient = self.kwargs["qclient"]

self.overwrite_prep_with_original = False
if 'overwrite_prep_with_original' in self.kwargs:
self.overwrite_prep_with_original = \
self.kwargs['overwrite_prep_with_original']
self.pipeline = Pipeline(self.kwargs['config_fp'],
self.kwargs['run_identifier'],
self.kwargs['uif_path'],
self.kwargs['output_dir'],
self.kwargs['job_id'],
ASSAY_NAME_METAGENOMIC,
lane_number=self.kwargs['lane_number'])

self.fsr = FailedSamplesRecord(self.kwargs['output_dir'],
self.pipeline.sample_sheet.samples)
if "overwrite_prep_with_original" in self.kwargs:
self.overwrite_prep_with_original = self.kwargs[
"overwrite_prep_with_original"
]
self.pipeline = Pipeline(
self.kwargs["config_fp"],
self.kwargs["run_identifier"],
self.kwargs["uif_path"],
self.kwargs["output_dir"],
self.kwargs["job_id"],
ASSAY_NAME_METAGENOMIC,
lane_number=self.kwargs["lane_number"],
)

self.fsr = FailedSamplesRecord(
self.kwargs["output_dir"], self.pipeline.sample_sheet.samples
)

samples = [
{'barcode': sample['barcode_id'],
'sample_name': sample['Sample_ID'],
'project_name': sample['Sample_Project'],
'lane': sample['Lane']}
for sample in self.pipeline.sample_sheet.samples]
{
"barcode": sample["barcode_id"],
"sample_name": sample["Sample_ID"],
"project_name": sample["Sample_Project"],
"lane": sample["Lane"],
}
for sample in self.pipeline.sample_sheet.samples
]
df = pd.DataFrame(samples)
sample_list_fp = f"{self.kwargs['output_dir']}/sample_list.tsv"
df.to_csv(sample_list_fp, sep='\t', index=False)
df.to_csv(sample_list_fp, sep="\t", index=False)

self.master_qiita_job_id = self.kwargs['job_id']
self.master_qiita_job_id = self.kwargs["job_id"]

self.lane_number = self.kwargs['lane_number']
self.is_restart = bool(self.kwargs['is_restart'])
self.lane_number = self.kwargs["lane_number"]
self.is_restart = bool(self.kwargs["is_restart"])

if self.is_restart is True:
self.raw_fastq_files_path = join(self.pipeline.output_path,
'ConvertJob')
self.reports_path = join(self.raw_fastq_files_path,
'SeqCounts.csv')
self.raw_fastq_files_path = join(self.pipeline.output_path, "ConvertJob")
self.reports_path = join(self.raw_fastq_files_path, "SeqCounts.csv")
self.determine_steps_to_skip()

# this is a convenience member to allow testing w/out updating Qiita.
self.update = True

if 'update_qiita' in kwargs:
if not isinstance(kwargs['update_qiita'], bool):
raise ValueError("value for 'update_qiita' must be of "
"type bool")
if "update_qiita" in kwargs:
if not isinstance(kwargs["update_qiita"], bool):
raise ValueError("value for 'update_qiita' must be of type bool")

self.update = kwargs['update_qiita']
self.update = kwargs["update_qiita"]
Loading
Loading