diff --git a/qiita_core/testing.py b/qiita_core/testing.py new file mode 100644 index 000000000..0a4ea6e26 --- /dev/null +++ b/qiita_core/testing.py @@ -0,0 +1,50 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- + +from json import loads +from time import sleep + +from moi import r_client + +from qiita_db.processing_job import ProcessingJob + + +def wait_for_prep_information_job(prep_id, raise_if_none=True): + """Waits until a prep information job is completed + + Parameters + ---------- + prep_id : int + Prep template id + raise_if_none : bool, optional + If True, raise an AssertionError if the correspondent redis key + is empty. Default: True + + Raises + ------ + AssertionError + If `raise_if_none` is True and the correspondent redis key is not set + """ + res = r_client.get('prep_template_%d' % prep_id) + + if raise_if_none and res is None: + raise AssertionError("unexpectedly None") + + if res is not None: + payload = loads(res) + job_id = payload['job_id'] + if payload['is_qiita_job']: + job = ProcessingJob(job_id) + while job.status not in ('success', 'error'): + sleep(0.05) + else: + redis_info = loads(r_client.get(job_id)) + while redis_info['status_msg'] == 'Running': + sleep(0.05) + redis_info = loads(r_client.get(job_id)) + sleep(0.05) diff --git a/qiita_pet/handlers/api_proxy/artifact.py b/qiita_pet/handlers/api_proxy/artifact.py index 8622e3f1f..703860221 100644 --- a/qiita_pet/handlers/api_proxy/artifact.py +++ b/qiita_pet/handlers/api_proxy/artifact.py @@ -16,8 +16,7 @@ from qiita_core.qiita_settings import qiita_config from qiita_pet.handlers.api_proxy.util import check_access, check_fp from qiita_ware.context import safe_submit -from qiita_ware.dispatchable import (create_raw_data, copy_raw_data, - delete_artifact) +from qiita_ware.dispatchable import (copy_raw_data, delete_artifact) from qiita_db.artifact import Artifact from qiita_db.user import User from qiita_db.metadata_template.prep_template import PrepTemplate @@ -288,7 +287,8 @@ def artifact_post_req(user_id, filepaths, artifact_type, name, 'message': message, 'artifact': id} """ - prep = PrepTemplate(int(prep_template_id)) + prep_template_id = int(prep_template_id) + prep = PrepTemplate(prep_template_id) study_id = prep.study_id # First check if the user has access to the study @@ -299,10 +299,11 @@ def artifact_post_req(user_id, filepaths, artifact_type, name, if artifact_id: # if the artifact id has been provided, import the artifact job_id = safe_submit(user_id, copy_raw_data, prep, artifact_id) + is_qiita_job = False else: uploads_path = get_mountpoint('uploads')[0][1] path_builder = partial(join, uploads_path, str(study_id)) - cleaned_filepaths = [] + cleaned_filepaths = {} for ftype, file_list in viewitems(filepaths): # JavaScript sends us this list as a comma-separated list for fp in file_list.split(','): @@ -318,7 +319,9 @@ def artifact_post_req(user_id, filepaths, artifact_type, name, if exists['status'] != 'success': return {'status': 'error', 'message': 'File does not exist: %s' % fp} - cleaned_filepaths.append((full_fp, ftype)) + if ftype not in cleaned_filepaths: + cleaned_filepaths[ftype] = [] + cleaned_filepaths[ftype].append(full_fp) # This should never happen, but it doesn't hurt to actually have # a explicit check, in case there is something odd with the JS @@ -326,13 +329,22 @@ def artifact_post_req(user_id, filepaths, artifact_type, name, return {'status': 'error', 'message': "Can't create artifact, no files provided."} - job_id = safe_submit(user_id, create_raw_data, artifact_type, prep, - cleaned_filepaths, name=name) + command = Command.get_validator(artifact_type) + job = ProcessingJob.create( + User(user_id), + Parameters.load(command, values_dict={ + 'template': prep_template_id, + 'files': dumps(cleaned_filepaths), + 'artifact_type': artifact_type + })) + job.submit() + job_id = job.id + is_qiita_job = True - r_client.set(PREP_TEMPLATE_KEY_FORMAT % prep.id, dumps({'job_id': job_id})) + r_client.set(PREP_TEMPLATE_KEY_FORMAT % prep.id, + dumps({'job_id': job_id, 'is_qiita_job': is_qiita_job})) - return {'status': 'success', - 'message': ''} + return {'status': 'success', 'message': ''} def artifact_patch_request(user_id, req_op, req_path, req_value=None, @@ -481,7 +493,7 @@ def artifact_delete_req(artifact_id, user_id): job_id = safe_submit(user_id, delete_artifact, artifact_id) r_client.set(PREP_TEMPLATE_KEY_FORMAT % pt_id, - dumps({'job_id': job_id})) + dumps({'job_id': job_id, 'is_qiita_job': False})) return {'status': 'success', 'message': ''} diff --git a/qiita_pet/handlers/api_proxy/prep_template.py b/qiita_pet/handlers/api_proxy/prep_template.py index 67ee7157d..47508a1f1 100644 --- a/qiita_pet/handlers/api_proxy/prep_template.py +++ b/qiita_pet/handlers/api_proxy/prep_template.py @@ -25,6 +25,7 @@ from qiita_db.user import User from qiita_db.ontology import Ontology from qiita_db.metadata_template.prep_template import PrepTemplate +from qiita_db.processing_job import ProcessingJob PREP_TEMPLATE_KEY_FORMAT = 'prep_template_%s' @@ -113,24 +114,31 @@ def prep_template_ajax_get_req(user_id, prep_id): job_info = loads(job_info) job_id = job_info['job_id'] if job_id: - redis_info = loads(r_client.get(job_id)) - processing = redis_info['status_msg'] == 'Running' + if job_info['is_qiita_job']: + job = ProcessingJob(job_id) + processing = job.status in ('queued', 'running') + success = job.status == 'success' + alert_type = 'info' if processing or success else 'danger' + alert_msg = (job.log.msg.replace('\n', '
') + if job.log is not None else "") + else: + redis_info = loads(r_client.get(job_id)) + processing = redis_info['status_msg'] == 'Running' + success = redis_info['status_msg'] == 'Success' + alert_type = redis_info['return']['status'] + alert_msg = redis_info['return']['message'].replace( + '\n', '
') + if processing: alert_type = 'info' alert_msg = 'This prep template is currently being updated' - elif redis_info['status_msg'] == 'Success': - alert_type = redis_info['return']['status'] - alert_msg = redis_info['return']['message'].replace('\n', - '
') + elif success: payload = {'job_id': None, 'status': alert_type, - 'message': alert_msg} + 'message': alert_msg, + 'is_qiita_job': job_info['is_qiita_job']} r_client.set(PREP_TEMPLATE_KEY_FORMAT % prep_id, dumps(payload)) - else: - alert_type = redis_info['return']['status'] - alert_msg = redis_info['return']['message'].replace('\n', - '
') else: processing = False alert_type = job_info['status'] @@ -437,7 +445,7 @@ def prep_template_patch_req(user_id, req_op, req_path, req_value=None, fp = fp['file'] job_id = safe_submit(user_id, update_prep_template, prep_id, fp) r_client.set(PREP_TEMPLATE_KEY_FORMAT % prep_id, - dumps({'job_id': job_id})) + dumps({'job_id': job_id, 'is_qiita_job': False})) else: # We don't understand the attribute so return an error return {'status': 'error', @@ -465,7 +473,7 @@ def prep_template_patch_req(user_id, req_op, req_path, req_value=None, prep_id, attribute, attr_id) # Store the job id attaching it to the sample template id r_client.set(PREP_TEMPLATE_KEY_FORMAT % prep_id, - dumps({'job_id': job_id})) + dumps({'job_id': job_id, 'is_qiita_job': False})) return {'status': 'success', 'message': ''} else: return {'status': 'error', diff --git a/qiita_pet/handlers/api_proxy/tests/test_artifact.py b/qiita_pet/handlers/api_proxy/tests/test_artifact.py index 715ef4031..27d92c112 100644 --- a/qiita_pet/handlers/api_proxy/tests/test_artifact.py +++ b/qiita_pet/handlers/api_proxy/tests/test_artifact.py @@ -10,14 +10,13 @@ from os import remove, close from datetime import datetime from tempfile import mkstemp -from json import loads -from time import sleep import pandas as pd import numpy.testing as npt from moi import r_client from qiita_core.util import qiita_test_checker +from qiita_core.testing import wait_for_prep_information_job from qiita_db.artifact import Artifact from qiita_db.metadata_template.prep_template import PrepTemplate from qiita_db.study import Study @@ -389,12 +388,7 @@ def test_artifact_delete_req(self): # This is needed so the clean up works - this is a distributed system # so we need to make sure that all processes are done before we reset # the test database - obs = r_client.get('prep_template_1') - self.assertIsNotNone(obs) - redis_info = loads(r_client.get(loads(obs)['job_id'])) - while redis_info['status_msg'] == 'Running': - sleep(0.05) - redis_info = loads(r_client.get(loads(obs)['job_id'])) + wait_for_prep_information_job(1) def test_artifact_delete_req_no_access(self): obs = artifact_delete_req(self.artifact.id, 'demo@microbio.me') @@ -409,7 +403,6 @@ def test_artifact_post_req(self): pd.DataFrame({'new_col': {'1.SKD6.640190': 1}}), Study(1), '16S') self._files_to_remove.extend([fp for _, fp in pt.get_filepaths()]) - new_artifact_id = get_count('qiita.artifact') + 1 filepaths = {'raw_forward_seqs': 'uploaded_file.txt', 'raw_barcodes': 'update.txt'} obs = artifact_post_req( @@ -417,18 +410,7 @@ def test_artifact_post_req(self): exp = {'status': 'success', 'message': ''} self.assertEqual(obs, exp) - - obs = r_client.get('prep_template_%d' % pt.id) - self.assertIsNotNone(obs) - redis_info = loads(r_client.get(loads(obs)['job_id'])) - while redis_info['status_msg'] == 'Running': - sleep(0.05) - redis_info = loads(r_client.get(loads(obs)['job_id'])) - - # Instantiate the artifact to make sure it was made and - # to clean the environment - a = Artifact(new_artifact_id) - self._files_to_remove.extend([fp for _, fp, _ in a.filepaths]) + wait_for_prep_information_job(pt.id) # Test importing an artifact # Create new prep template to attach artifact to @@ -437,23 +419,18 @@ def test_artifact_post_req(self): pd.DataFrame({'new_col': {'1.SKD6.640190': 1}}), Study(1), '16S') self._files_to_remove.extend([fp for _, fp in pt.get_filepaths()]) - new_artifact_id_2 = get_count('qiita.artifact') + 1 + new_artifact_id = get_count('qiita.artifact') + 1 obs = artifact_post_req( - 'test@foo.bar', {}, 'FASTQ', 'New Test Artifact 2', pt.id, - new_artifact_id) + 'test@foo.bar', {}, 'Demultiplexed', 'New Test Artifact 2', + pt.id, 3) exp = {'status': 'success', 'message': ''} self.assertEqual(obs, exp) - obs = r_client.get('prep_template_%d' % pt.id) - self.assertIsNotNone(obs) - redis_info = loads(r_client.get(loads(obs)['job_id'])) - while redis_info['status_msg'] == 'Running': - sleep(0.05) - redis_info = loads(r_client.get(loads(obs)['job_id'])) + wait_for_prep_information_job(pt.id) # Instantiate the artifact to make sure it was made and # to clean the environment - a = Artifact(new_artifact_id_2) + a = Artifact(new_artifact_id) self._files_to_remove.extend([fp for _, fp, _ in a.filepaths]) def test_artifact_post_req_error(self): diff --git a/qiita_pet/handlers/study_handlers/tests/test_artifact.py b/qiita_pet/handlers/study_handlers/tests/test_artifact.py index 8ff529c1e..608ec4900 100644 --- a/qiita_pet/handlers/study_handlers/tests/test_artifact.py +++ b/qiita_pet/handlers/study_handlers/tests/test_artifact.py @@ -11,16 +11,15 @@ from os import remove, close from tempfile import mkstemp from json import loads -from time import sleep import pandas as pd import numpy.testing as npt -from moi import r_client +from qiita_core.testing import wait_for_prep_information_job from qiita_pet.test.tornado_test_base import TestHandlerBase from qiita_db.artifact import Artifact from qiita_db.study import Study -from qiita_db.util import get_mountpoint, get_count +from qiita_db.util import get_mountpoint from qiita_db.metadata_template.prep_template import PrepTemplate from qiita_db.exceptions import QiitaDBWarning @@ -126,16 +125,7 @@ def test_post_artifact(self): self.assertEqual(response.code, 200) # make sure new artifact created - obs = r_client.get('prep_template_%s' % self.prep.id) - self.assertIsNotNone(obs) - redis_info = loads(r_client.get(loads(obs)['job_id'])) - while redis_info['status_msg'] == 'Running': - sleep(0.05) - redis_info = loads(r_client.get(loads(obs)['job_id'])) - new_artifact_id = get_count('qiita.artifact') - artifact = Artifact(new_artifact_id) - self.assertEqual(artifact.name, 'New Artifact Handler test') - self._files_to_remove.extend([fp for _, fp, _ in artifact.filepaths]) + wait_for_prep_information_job(self.prep.id) class ArtifactAJAXTests(TestHandlerBase): @@ -147,12 +137,7 @@ def test_delete_artifact(self): # This is needed so the clean up works - this is a distributed system # so we need to make sure that all processes are done before we reset # the test database - obs = r_client.get('prep_template_1') - self.assertIsNotNone(obs) - redis_info = loads(r_client.get(loads(obs)['job_id'])) - while redis_info['status_msg'] == 'Running': - sleep(0.05) - redis_info = loads(r_client.get(loads(obs)['job_id'])) + wait_for_prep_information_job(1) class ArtifactAdminAJAXTestsReadOnly(TestHandlerBase):