Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running validate after a command finishes #1976

Merged
merged 7 commits into from
Nov 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions qiita_db/handlers/tests/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from os.path import exists

from tornado.web import HTTPError
import numpy.testing as npt
import pandas as pd

from qiita_db.handlers.tests.oauthbase import OauthTestingBase
import qiita_db as qdb
Expand Down Expand Up @@ -175,8 +177,18 @@ def test_post_job_failure(self):
self.assertEqual(job.log.msg, 'Job failure')

def test_post_job_success(self):
job = qdb.processing_job.ProcessingJob(
'd19f76ee-274e-4c1b-b3a2-a12d73507c55')
pt = npt.assert_warns(
qdb.exceptions.QiitaDBWarning,
qdb.metadata_template.prep_template.PrepTemplate.create,
pd.DataFrame({'new_col': {'1.SKD6.640190': 1}}),
qdb.study.Study(1), '16S')
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'),
qdb.software.Parameters.load(
qdb.software.Command.get_validator('BIOM'),
values_dict={'template': pt.id, 'files':
dumps({'BIOM': ['file']}),
'artifact_type': 'BIOM'}))
job._set_status('running')

fd, fp = mkstemp(suffix='_table.biom')
Expand All @@ -192,7 +204,7 @@ def test_post_job_success(self):
'artifacts': {'OTU table': {'filepaths': [(fp, 'biom')],
'artifact_type': 'BIOM'}}})
obs = self.post(
'/qiita_db/jobs/d19f76ee-274e-4c1b-b3a2-a12d73507c55/complete/',
'/qiita_db/jobs/%s/complete/' % job.id,
payload, headers=self.header)
self.assertEqual(obs.code, 200)
self.assertEqual(job.status, 'success')
Expand Down
169 changes: 133 additions & 36 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from os.path import join
from itertools import chain
from collections import defaultdict
from json import dumps
from json import dumps, loads

from future.utils import viewitems, viewvalues
import networkx as nx
Expand Down Expand Up @@ -332,6 +332,126 @@ def submit(self):
p = Process(target=_job_submitter, args=(self, cmd))
p.start()

def _complete_artifact_definition(self, artifact_data):
""""Performs the needed steps to complete an artifact definition job

In order to complete an artifact definition job we need to create
the artifact, and then start all the jobs that were waiting for this
artifact to be created. Note that each artifact definition job creates
one and only one artifact.

Parameters
----------
artfact_data : {'filepaths': list of (str, str), 'artifact_type': str}
Dict with the artifact information. `filepaths` contains the list
of filepaths and filepath types for the artifact and
`artifact_type` the type of the artifact
"""
with qdb.sql_connection.TRN:
atype = artifact_data['artifact_type']
filepaths = artifact_data['filepaths']
# We need to differentiate if this artifact is the
# result of a previous job or uploading
job_params = self.parameters.values
if job_params['provenance'] is not None:
# The artifact is a result from a previous job
provenance = loads(job_params['provenance'])
job = ProcessingJob(provenance['job'])
parents = job.input_artifacts
params = job.parameters
a = qdb.artifact.Artifact.create(
filepaths, atype, parents=parents,
processing_parameters=params)
cmd_out_id = provenance['cmd_out_id']
mapping = {cmd_out_id: a.id}
sql = """INSERT INTO
qiita.artifact_output_processing_job
(artifact_id, processing_job_id,
command_output_id)
VALUES (%s, %s, %s)"""
qdb.sql_connection.TRN.add(
sql, [a.id, job.id, cmd_out_id])
job._update_and_launch_children(mapping)
# Mark the parent job as success since the validation was
# successful
job._set_status('success')
else:
# The artifact is uploaded by the user
pt = qdb.metadata_template.prep_template.PrepTemplate(
job_params['template'])
a = qdb.artifact.Artifact.create(
filepaths, atype, prep_template=pt)
self._set_status('success')

def _complete_artifact_transformation(self, artifacts_data):
"""Performs the needed steps to complete an artifact transformation job

In order to complete an artifact transformation job, we need to create
a validate job for each artifact output and submit it.

Parameters
----------
artifacts_data : dict of dicts
The generated artifact information keyed by output name.
The format of each of the internal dictionaries must be
{'filepaths': list of (str, str), 'artifact_type': str}
where `filepaths` contains the list of filepaths and filepath types
for the artifact and `artifact_type` the type of the artifact

Raises
------
QiitaDBError
If there is more than one prep information attached to the new
artifact
"""
for out_name, a_data in viewitems(artifacts_data):
# Correct the format of the filepaths parameter so we can create
# a validate job
filepaths = defaultdict(list)
for fp, fptype in a_data['filepaths']:
filepaths[fptype].append(fp)
atype = a_data['artifact_type']

# The valdiate job needs a prep information file. In theory, a job
# can be generated from more that one prep information file, so
# we check here if we have one or more templates. At this moment,
# If we allow more than one template, there is a fair amount of
# changes that need to be done on the plugins, so we are going to
# restrict the number of templates to one. Note that at this moment
# there is no way of generating an artifact from 2 or more
# artifacts, so we can impose this limitation now and relax it
# later.
templates = set()
for artifact in self.input_artifacts:
templates.update(pt.id for pt in artifact.prep_templates)
if len(templates) > 1:
raise qdb.exceptions.QiitaDBError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this exception in the Raises section of the docstring?

"Currently only single prep template "
"is allwoed, found %d" % len(templates))
template = templates.pop()

# Once the validate job completes, it needs to know if it has been
# generated from a command (and how) or if it has been uploaded.
# In order to differentiate these cases, we populate the provenance
# parameter with some information about the current job and how
# this artifact has been generated. This does not affect the
# plugins since they can ignore this parameter
cmd_out_id = qdb.util.convert_to_id(
out_name, "command_output", "name")
provenance = {'job': self.id,
'cmd_out_id': cmd_out_id}

# Get the validator command for the current artifact type, create
# a new job and submit it.
cmd = qdb.software.Command.get_validator(atype)
validate_params = qdb.software.Parameters.load(
cmd, values_dict={'files': dumps(filepaths),
'artifact_type': atype,
'template': template,
'provenance': dumps(provenance)})
validate_job = ProcessingJob.create(self.user, validate_params)
validate_job.submit()

def complete(self, success, artifacts_data=None, error=None):
"""Completes the job, either with a success or error status

Expand Down Expand Up @@ -362,44 +482,21 @@ def complete(self, success, artifacts_data=None, error=None):
"Can't complete job: not in a running state")
if artifacts_data:
if self.command.software.type == 'artifact definition':
# In this case, the behavior of artifact_data is
# slightly different: we know that there is only 1
# new artifact and it doesn't have a parent
# There is only one artifact created
_, a_data = artifacts_data.popitem()
atype = a_data['artifact_type']
filepaths = a_data['filepaths']
pt_id = self.parameters.values['template']
pt = qdb.metadata_template.prep_template.PrepTemplate(
pt_id)
a = qdb.artifact.Artifact.create(
filepaths, atype, prep_template=pt)
self._complete_artifact_definition(a_data)
else:
artifact_ids = {}
for out_name, a_data in viewitems(artifacts_data):
filepaths = a_data['filepaths']
atype = a_data['artifact_type']
parents = self.input_artifacts
params = self.parameters
a = qdb.artifact.Artifact.create(
filepaths, atype, parents=parents,
processing_parameters=params)
cmd_out_id = qdb.util.convert_to_id(
out_name, "command_output", "name")
artifact_ids[cmd_out_id] = a.id
if artifact_ids:
sql = """INSERT INTO
qiita.artifact_output_processing_job
(artifact_id, processing_job_id,
command_output_id)
VALUES (%s, %s, %s)"""
sql_params = [[aid, self.id, out_id]
for out_id, aid in viewitems(
artifact_ids)]
qdb.sql_connection.TRN.add(sql, sql_params,
many=True)
self._update_and_launch_children(artifact_ids)
self._set_status('success')
self._complete_artifact_transformation(artifacts_data)
else:
if self.command.software.type == 'artifact definition':
job_params = self.parameters.values
if job_params['provenance'] is not None:
# This artifact definition job is a result of a command
# run, if it fails, set up the status of the "parent"
# job also as failed, and assign the sem error message
provenance = loads(job_params['provenance'])
job = ProcessingJob(provenance['job'])
job._set_error(error)
self._set_error(error)

@property
Expand Down
15 changes: 15 additions & 0 deletions qiita_db/support_files/patches/42.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Nov 19, 2016
-- Adding provenance parameter to validate commands

DO $do$
DECLARE
cmd RECORD;
BEGIN
FOR cmd IN
SELECT command_id FROM qiita.software_command WHERE name = 'Validate'
LOOP
INSERT INTO qiita.command_parameter (command_id, parameter_name, parameter_type, required, default_value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this add the actual provenance value to the already created artifacts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is adding a parameter called "provenance" to the validate command. The command itself doesn't use it but it is used internally in Qiita to differentiate a validate command that has been generated as a result of the user uploading data to Qiita from a validate command that has been generated as a result of an "artifact transformation" job finishing. This doesn't really contain the provenance of the artifact, so no need to add to the already created artifacts.

Note that the provenance of the artifacts is stored on the DAG structure in the DB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks. Just to confirm, this value is not required or defined by the artifacts, they are defined as part of the Validate command that generated those artifacts, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

VALUES (cmd.command_id, 'provenance', 'string', 'False', NULL);

END LOOP;
END $do$
16 changes: 15 additions & 1 deletion qiita_db/test/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from os import close, remove
from os.path import exists, join, basename
from functools import partial
from json import dumps

import pandas as pd
import networkx as nx
Expand Down Expand Up @@ -864,9 +865,22 @@ def test_delete_as_output_job(self):
f.write('\n')
data = {'OTU table': {'filepaths': [(fp, 'biom')],
'artifact_type': 'BIOM'}}
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'),
qdb.software.Parameters.load(
qdb.software.Command.get_validator('BIOM'),
values_dict={'files': dumps({'biom': [fp]}),
'artifact_type': 'BIOM',
'template': 1,
'provenance': dumps(
{'job': "bcc7ebcd-39c1-43e4-af2d-822e3589f14d",
'cmd_out_id': 3})}
)
)
job._set_status('running')
job.complete(True, artifacts_data=data)
job = qdb.processing_job.ProcessingJob(
"bcc7ebcd-39c1-43e4-af2d-822e3589f14d")
job.complete(True, artifacts_data=data)
artifact = job.outputs['OTU table']
self._clean_up_files.extend([afp for _, afp, _ in artifact.filepaths])

Expand Down
Loading