diff --git a/.github/workflows/qiita-ci.yml b/.github/workflows/qiita-ci.yml index d3c6558..6d58a2c 100644 --- a/.github/workflows/qiita-ci.yml +++ b/.github/workflows/qiita-ci.yml @@ -52,8 +52,11 @@ jobs: # we need to download qiita directly so we have "easy" access to # all config files - wget https://github.com/biocore/qiita/archive/dev.zip - unzip dev.zip + # wget https://github.com/biocore/qiita/archive/dev.zip + # unzip dev.zip + wget https://github.com/jlab/qiita/archive/refs/heads/tornado_FetchFileFromCentralHandler.zip + unzip tornado_FetchFileFromCentralHandler.zip + mv qiita-tornado_FetchFileFromCentralHandler qiita-dev # pull out the port so we can modify the configuration file easily pgport=${{ job.services.postgres.ports[5432] }} diff --git a/qiita_client/plugin.py b/qiita_client/plugin.py index 4c30bdc..679ef4a 100644 --- a/qiita_client/plugin.py +++ b/qiita_client/plugin.py @@ -15,7 +15,7 @@ from future import standard_library from json import dumps import urllib -from qiita_client import QiitaClient +from qiita_client import QiitaClient, ArtifactInfo import logging @@ -100,9 +100,48 @@ def __init__(self, name, description, function, required_parameters, self.outputs = outputs self.analysis_only = analysis_only + @staticmethod + def _push_artifacts_files_to_central(qclient, artifacts): + """Pushes all files of a list of artifacts to BASE_DATA_DIR. + + Parameters + ---------- + qclient : qiita_client.QiitaClient + The Qiita server client + artifacts : [ArtifactInfo] + A list of qiita Artifacts + + Returns + ------- + The input list of artifacts + """ + if artifacts is None: + return artifacts + + for artifact in artifacts: + if isinstance(artifact, ArtifactInfo): + logger.debug('QiitaCommand::__call__: Push artifact files ' + 'via %s to central:' % qclient._plugincoupling) + for i in range(len(artifact.files)): + (fp, ftype) = artifact.files[i] + # send file to Qiita central and potentially update + # filepath, which is not done at the moment (2025-11-14) + logger.debug(' artifact files %s pushed to central' % fp) + fp = qclient.push_file_to_central(fp) + artifact.files[i] = (fp, ftype) + def __call__(self, qclient, server_url, job_id, output_dir): logger.debug('Entered QiitaCommand.__call__()') - return self.function(qclient, server_url, job_id, output_dir) + results = self.function( + qclient, server_url, job_id, output_dir) + # typical, but not all, functions of QiitaCommands return 3-tuple + # status=bool, list of artifacts, error_message=str + if isinstance(results, tuple) and (len(results) == 3) and \ + isinstance(results[0], bool) and \ + isinstance(results[1], list) and \ + isinstance(results[2], str): + QiitaCommand._push_artifacts_files_to_central(qclient, results[1]) + return results class QiitaArtifactType(object): diff --git a/qiita_client/qiita_client.py b/qiita_client/qiita_client.py index 4b34613..0ea317a 100644 --- a/qiita_client/qiita_client.py +++ b/qiita_client/qiita_client.py @@ -12,8 +12,19 @@ import requests import threading import pandas as pd -from json import dumps +from json import dumps, loads +try: + from json import JSONDecodeError +except ImportError: + # dirty hack to cope with the fact that python 2.7 does not have + # JSONDecodeError, but is needed for qp-target-gene plugin + JSONDecodeError = ValueError from random import randint +import fnmatch +from io import BytesIO +from zipfile import ZipFile +import re + try: from itertools import zip_longest @@ -268,7 +279,7 @@ def _request_oauth2(self, req, rettype, *args, **kwargs): The request to execute rettype : string The return type of the function, either "json" or - if e.g. files are transferred "content" + "object" for the response object itself args : tuple The request args kwargs : dict @@ -327,7 +338,7 @@ def _request_retry(self, req, url, rettype='json', **kwargs): The request to execute rettype : string The return type of the function, either "json" (default) or - if e.g. files are transferred "content" + "object" for the response object itself url : str The url to access in the server kwargs : dict @@ -335,7 +346,7 @@ def _request_retry(self, req, url, rettype='json', **kwargs): Returns ------- - dict or None or plain content IF rettype='content' + dict or None or response object IF rettype='object' The JSON information in the request response, if any Raises @@ -384,19 +395,20 @@ def _request_retry(self, req, url, rettype='json', **kwargs): elif r.status_code in (500, 405): raise RuntimeError( "Request '%s %s' did not succeed. Status code: %d. " - "Message: %s" % (req.__name__, url, r.status_code, r.text)) + "Message: %s%s" % (req.__name__, url, r.status_code, + r.text, r.reason)) elif 0 <= (r.status_code - 200) < 100: try: if rettype is None or rettype == 'json': return r.json() else: - if rettype == 'content': - return r.content + if rettype == 'object': + return r else: raise ValueError( ("return type rettype='%s' cannot be " "understand. Choose from 'json' (default) " - "or 'content!") % rettype) + "or 'object!") % rettype) except ValueError: return None stime = randint(MIN_TIME_SLEEP, MAX_TIME_SLEEP) @@ -408,7 +420,41 @@ def _request_retry(self, req, url, rettype='json', **kwargs): "Request '%s %s' did not succeed. Status code: %d. Message: %s" % (req.__name__, url, r.status_code, r.text)) - def get(self, url, rettype='json', **kwargs): + def _fetch_artifact_files(self, ainfo): + """helper method to fetch all files of an artifact from Qiita main. + + Parameters + ---------- + ainfo : json dict + Information about Qiita artifact + + Returns + ------- + Same as input BUT filepaths are adapated after downloading files from + Qiita main to local IF protocol coupling != filesystem. Otherwise, no + change occurs. + """ + if self._plugincoupling != 'filesystem': + logger.debug('QiitaClient::get: fetching artfiact file from ' + 'central: %s' % ainfo['files']) + if 'files' in ainfo.keys(): + ainfo['files'] = { + filetype: [ + { + k: self.fetch_file_from_central(v) + if k == 'filepath' else v + for k, v + in file.items()} + for file + in ainfo['files'][filetype]] + for filetype + in ainfo['files'].keys() + } + return ainfo + else: + return ainfo + + def get(self, url, rettype='json', no_file_fetching=False, **kwargs): """Execute a get request against the Qiita server Parameters @@ -417,7 +463,13 @@ def get(self, url, rettype='json', **kwargs): The url to access in the server rettype : string The return type of the function, either "json" (default) or - if e.g. files are transferred "content" + "object" for the response object itself + no_file_fetching : bool + If plugin is coupled through none "filesystem" protocols, artifact + files will automatically fetched from Qiita central when requesting + via "/qiita_db/prep_template/" or "/qiita_db/artifacts/". For + testing, you can turn off this behaviour. Handy if e.g. files not + yet exists in Qiita central. kwargs : dict The request kwargs @@ -427,9 +479,32 @@ def get(self, url, rettype='json', **kwargs): The JSON response from the server """ logger.debug('Entered QiitaClient.get()') - return self._request_retry( + result = self._request_retry( self._session.get, url, rettype=rettype, **kwargs) + if (self._plugincoupling != 'filesystem') and \ + (no_file_fetching is False): + # intercept get requests from plugins that request metadata or + # artifact files and ensure they get transferred from Qiita + # central, when not using "filesystem" + if re.search(r"/qiita_db/prep_template/\d+/?$", url): + # client is requesting filepath to a prep/metadata file, see + # qiita/qiita_db/handlers/prep_template.py:: + # PrepTemplateDBHandler::get + # for the "result" data-structure + logger.debug('QiitaClient::get: fetching artifact metadata' + '-file from central:') + for fp in ['prep-file', 'sample-file']: + logger.debug('QiitaClient::get: file %s' % result[fp]) + result[fp] = self.fetch_file_from_central(result[fp]) + elif re.search(r"/qiita_db/artifacts/\d+/?$", url): + # client is requesting an artifact, see + # qiita/qiita_db/handlers/artifact.py::ArtifactHandler::get + # for the "result" data-structure + result = self._fetch_artifact_files(result) + + return result + def post(self, url, **kwargs): """Execute a post request against the Qiita server @@ -506,6 +581,26 @@ def patch(self, url, op, path, value=None, from_p=None, **kwargs): # we made sure that data is correctly formatted here kwargs['data'] = data + # similar to above get() injection mechanism, we are here pushing files + # to Qiita central, when patching artifact summaries + if (self._plugincoupling != 'filesystem') and \ + (path == '/html_summary/') and (op == 'add'): + if re.search(r"/qiita_db/artifacts/\d+/?$", url): + if value is not None: + logger.debug('QiitaClient::patch: push summary files to ' + 'central: %s' % value) + try: + # values might be an json encoded dictioary with + # multiple filepaths... + dictValues = loads(value) + for ftype in ['html', 'dir']: + if (ftype in dictValues.keys()) and \ + (dictValues[ftype] is not None): + self.push_file_to_central(dictValues[ftype]) + except (TypeError, JSONDecodeError): + # or just a single string, i.e. filepath + self.push_file_to_central(value) + return self._request_retry( self._session.patch, url, rettype='json', **kwargs) @@ -745,8 +840,8 @@ def _process_files_per_sample_fastq(self, files, prep_info, return sample_names, prep_info def fetch_file_from_central(self, filepath, prefix=None): - """Moves content of a file from Qiita's central BASE_DATA_DIR to a - local plugin file-system. + """Moves content of a file or directory from Qiita's central + BASE_DATA_DIR to a local plugin file-system. By default, this is exactly the same location, i.e. the return filepath is identical to the requested one and nothing is moved / @@ -759,20 +854,26 @@ def fetch_file_from_central(self, filepath, prefix=None): ---------- filepath : str The filepath in Qiita's central BASE_DATA_DIR to the requested - file content + file or directory content prefix : str Primarily for testing: prefix the target filepath with this filepath prefix to - a) in 'filesystem' mode: create an actual file copy (for testing) + a) in 'filesystem' mode: create an actual file/directiry copy + (for testing) If prefix=None, nothing will be copied/moved - b) in 'https' mode: flexibility to locate files differently in - plugin local file system. + b) in 'https' mode: flexibility to locate files/directories + differently in plugin local file system. Returns ------- - str : the filepath of the requested file within the local file system + str : the filepath of the requested file or directory within the local + file system """ target_filepath = filepath + logger.debug( + 'Fetching file/directory "%s" via protocol=%s from Qiita main.' % ( + filepath, self._plugincoupling)) + if (prefix is not None) and (prefix != ""): # strip off root if filepath.startswith(os.path.abspath(os.sep)): @@ -784,9 +885,13 @@ def fetch_file_from_central(self, filepath, prefix=None): if self._plugincoupling == 'filesystem': if (prefix is not None) and (prefix != ""): # create necessary directory locally - os.makedirs(os.path.dirname(target_filepath), exist_ok=True) + if not os.path.exists(os.path.dirname(target_filepath)): + os.makedirs(os.path.dirname(target_filepath)) - shutil.copyfile(filepath, target_filepath) + if os.path.isdir(filepath): + shutil.copytree(filepath, target_filepath) + else: + shutil.copyfile(filepath, target_filepath) return target_filepath @@ -795,19 +900,25 @@ def fetch_file_from_central(self, filepath, prefix=None): if filepath.startswith(os.path.abspath(os.sep)): filepath = filepath[len(os.path.abspath(os.sep)):] - logger.debug('Requesting file %s from qiita server.' % filepath) - # actual call to Qiita central to obtain file content - content = self.get( + response = self.get( '/cloud/fetch_file_from_central/' + filepath, - rettype='content') + rettype='object') - # create necessary directory locally - os.makedirs(os.path.dirname(target_filepath), exist_ok=True) + # check if requested filepath is a single file OR a whole directory + if 'Is-Qiita-Directory' in response.headers.keys(): + with ZipFile(BytesIO(response.content)) as zf: + zf.extractall(path=target_filepath) + else: + content = response.content - # write retrieved file content - with open(target_filepath, 'wb') as f: - f.write(content) + # create necessary directory locally + if not os.path.exists(os.path.dirname(target_filepath)): + os.makedirs(os.path.dirname(target_filepath)) + + # write retrieved file content + with open(target_filepath, 'wb') as f: + f.write(content) return target_filepath @@ -817,20 +928,23 @@ def fetch_file_from_central(self, filepath, prefix=None): "configuration is NOT defined.") % self._plugincoupling) def push_file_to_central(self, filepath): - """Pushs filecontent to Qiita's central BASE_DATA_DIR directory. + """Pushs file- or directory content to Qiita's central BASE_DATA_DIR + directory. By default, plugin and Qiita's central BASE_DATA_DIR filesystems are identical. In this case, no files are touched and the filepath is directly returned. If however, plugincoupling is set to 'https', the content of the file - is sent via https POST to Qiita's master/worker, which has to receive - and store in an appropriate location. + (or content of recursively all files in the given directory) is sent + via https POST to Qiita's master/worker, which has to receive and store + in an appropriate location. Parameters ---------- filepath : str - The filepath of the files whos content shall be send to Qiita's - central BASE_DATA_DIR + The filepath of the file(s) whos content shall be send to Qiita's + central BASE_DATA_DIR. + Can be a path to a directory as well. Returns ------- @@ -840,19 +954,72 @@ def push_file_to_central(self, filepath): return filepath elif self._plugincoupling == 'https': - logger.debug('Submitting file %s to qiita server.' % filepath) + logger.debug('Submitting %s %s to qiita server via %s' % ( + 'directory' if os.path.isdir(filepath) else 'file', filepath, + self._plugincoupling)) # target path, i.e. without filename dirpath = os.path.dirname(filepath) if dirpath == "": dirpath = "/" - self.post( - '/cloud/push_file_to_central/', - files={dirpath: open(filepath, 'rb')}) + if os.path.isdir(filepath): + # Pushing all files of a directory, not only a single file. + # Cannot use "glob" as it lacks the "recursive" parameter in + # py27. This is used e.g. in qp-target-gene + for root, dirnames, filenames in os.walk(filepath): + for filename in fnmatch.filter(filenames, "*"): + fp = os.path.join(root, filename) + self.post('/cloud/push_file_to_central/', + files={os.path.join( + dirpath, + os.path.dirname(fp)): open(fp, 'rb')}) + else: + self.post( + '/cloud/push_file_to_central/', + files={dirpath: open(filepath, 'rb')}) return filepath raise ValueError( ("File communication protocol '%s' as defined in plugins " "configuration is NOT defined.") % self._plugincoupling) + + def delete_file_from_central(self, filepath): + """Deletes a file in Qiita's central BASE_DATA_DIR directory. + + I currently (2025-11-12) assess this operation to be too dangerous for + protocols other than "filesystem", i.e. on "https" the files are NOT + deleted. + However, in plugin tests, this function is needed and I therefore + implemented an API endpoint which is only activated when Qiita main + instance is operated in test mode. + This might change in the future and since I don't want to + touch every plugin's code again, I am adding this function here already + and use it in according plugin code locations, e.g. function _gzip_file + in qtp-sequencing. + + Parameters + ---------- + filepath : str + The filepath of the file that shall be deletes in Qiita's + central BASE_DATA_DIR + + Returns + ------- + The given filepath - to be transparent in plugin code. + """ + if self._plugincoupling == 'filesystem': + if os.path.exists(filepath): + if os.path.isdir(filepath): + shutil.rmtree(filepath) + else: + os.remove(filepath) + elif self._plugincoupling == 'https': + # will return in internal server error, when qiita is in productive + # mode + self.get( + '/cloud/delete_file_from_central/' + filepath, + rettype='object') + + return filepath diff --git a/qiita_client/testing.py b/qiita_client/testing.py index 41b183c..e4ff381 100644 --- a/qiita_client/testing.py +++ b/qiita_client/testing.py @@ -11,6 +11,7 @@ from time import sleep from qiita_client import QiitaClient +from .plugin import BaseQiitaPlugin import logging @@ -36,6 +37,24 @@ def setUpClass(cls): logger.debug( 'PluginTestCase.setUpClass() token %s' % cls.qclient._token) cls.qclient.post('/apitest/reload_plugins/') + + # When testing, we access plugin functions often directly. Plugin + # configuration files are not parsed in these cases. To be able to + # change the plugin coupling protocol, we resort to the environment + # variable here. + cls.qclient._plugincoupling = environ.get( + 'QIITA_PLUGINCOUPLING', BaseQiitaPlugin._DEFAULT_PLUGIN_COUPLINGS) + + # Determine BASE_DATA_DIR of qiita central, without having direct + # access to qiita's settings file. This is done by requesting + # information about prep 1, which should be in the test database. + # This might break IF file + # qiita-spots/qiita/qiita_db/support_files/populate_test_db.sql + # changes. + prep_info = cls.qclient.get('/qiita_db/prep_template/1/', + no_file_fetching=True) + cls.base_data_dir = prep_info['prep-file'].split('templates/')[0] + # Give enough time for the plugins to register sleep(5) diff --git a/qiita_client/tests/test_plugin.py b/qiita_client/tests/test_plugin.py index 43a8eca..f3010c8 100644 --- a/qiita_client/tests/test_plugin.py +++ b/qiita_client/tests/test_plugin.py @@ -73,6 +73,26 @@ def func(a, b, c, d): self.exp_opt, self.exp_out, self.exp_dflt) self.assertEqual(obs('a', 'b', 'c', 'd'), 42) + def test__push_artifacts_files_to_central(self): + class fakeClient(): + def __init__(self): + self._plugincoupling = 'null protocol' + + def push_file_to_central(self, filepath): + return 'pushed:%s' % filepath + + artifacts = [ + ArtifactInfo("stefArtiName", "Atype", [ + ("fp1", "preprocessed_fasta"), + ("fp2", "preprocessed_fastq")]), + None, + ArtifactInfo("artiName", "artiType", [])] + QiitaCommand._push_artifacts_files_to_central(fakeClient(), artifacts) + + self.assertIn('pushed:', artifacts[0].files[0][0]) + self.assertIn('pushed:', artifacts[0].files[1][0]) + self.assertEqual([], artifacts[2].files) + class QiitaArtifactTypeTest(TestCase): def test_init(self): diff --git a/qiita_client/tests/test_qiita_client.py b/qiita_client/tests/test_qiita_client.py index 15e4f77..6abe399 100644 --- a/qiita_client/tests/test_qiita_client.py +++ b/qiita_client/tests/test_qiita_client.py @@ -8,11 +8,12 @@ from unittest import TestCase, main import filecmp -from os import remove, close -from os.path import basename, exists, expanduser, join +from os import remove, close, makedirs +from os.path import basename, exists, expanduser, join, isdir, dirname from tempfile import mkstemp from json import dumps import pandas as pd +from shutil import rmtree from qiita_client.qiita_client import (QiitaClient, _format_payload, ArtifactInfo) @@ -110,7 +111,10 @@ def setUp(self): def tearDown(self): for fp in self.clean_up_files: if exists(fp): - remove(fp) + if isdir(fp): + rmtree(fp) + else: + remove(fp) def test_init(self): obs = QiitaClient(URL, @@ -184,7 +188,7 @@ def test_patch(self): with open(fp, 'w') as f: f.write('\n') self.clean_up_files.append(fp) - obs = self.tester.patch(f'/qiita_db/artifacts/{artifact_id}/', 'add', + obs = self.tester.patch('/qiita_db/artifacts/%s/' % artifact_id, 'add', '/html_summary/', value=fp) self.assertIsNone(obs) @@ -440,6 +444,104 @@ def test_push_file_to_central(self): fp_obs = self.tester.push_file_to_central(fp_source) self.assertEqual(fp_source, fp_obs) + def _create_test_dir(self, prefix=None): + """Creates a test directory with files and subdirs.""" + # prefix + # |- testdir/ + # |---- fileA.txt + # |---- subdirA_l1/ + # |-------- fileB.fna + # |-------- subdirC_l2/ + # |------------ fileC.log + # |------------ fileD.seq + # |---- subdirB_l1/ + # |-------- fileE.sff + if (prefix is not None) and (prefix != ""): + prefix = join(prefix, 'testdir') + else: + prefix = 'testdir' + + for dir in [join(prefix, 'subdirA_l1', 'subdirC_l2'), + join(prefix, 'subdirB_l1')]: + if not exists(dir): + makedirs(dir) + for file, cont in [(join(prefix, 'fileA.txt'), 'contentA'), + (join(prefix, 'subdirA_l1', + 'fileB.fna'), 'this is B'), + (join(prefix, 'subdirA_l1', 'subdirC_l2', + 'fileC.log'), 'call me c'), + (join(prefix, 'subdirA_l1', 'subdirC_l2', + 'fileD.seq'), 'I d'), + (join(prefix, 'subdirB_l1', 'fileE.sff'), 'oh e')]: + with open(file, "w") as f: + f.write(cont + "\n") + self.clean_up_files.append(prefix) + + return prefix + + def test_push_file_to_central_dir(self): + self.tester._plugincoupling = 'https' + + fp_source = self._create_test_dir('/tmp/test_push_dir/') + fp_obs = self.tester.push_file_to_central(fp_source) + self.assertEqual(fp_source, fp_obs) + # As we don't necessarily know the QIITA_BASE_DIR, we cannot fetch one + # of the files to double check for it's content + + def test_delete_file_from_central(self): + # obtain current filepaths to infer QIITA_BASE_DIR + ainfo = self.tester.get("/qiita_db/artifacts/%s/" % 1) + cwd = dirname(ainfo['files']['raw_forward_seqs'][0]['filepath']) + + for protocol in ['filesystem', 'https']: + self.qclient._plugincoupling = protocol + + # deposit a test file + fp_test = join(cwd, 'deleteme_%s.txt' % protocol) + makedirs(cwd, exist_ok=True) + with open(fp_test, 'w') as f: + f.write('This is a testfile content\n') + self.clean_up_files.append(fp_test) + + # sanity check that test file has been deposited correctly + # no push required, as in this test local and remote QIITA_BASE_DIR + # is identical + fp_obs = self.qclient.fetch_file_from_central(fp_test) + self.assertTrue(exists(fp_obs)) + + # delete file and test if it is gone + fp_deleted = self.qclient.delete_file_from_central(fp_test) + # all three fp should point to the same filepath + self.assertFalse(exists(fp_obs)) + self.assertFalse(exists(fp_test)) + self.assertFalse(exists(fp_deleted)) + + def test_fetch_directory(self): + # a bit hacky, but should work as long as test database does not change + ainfo = self.qclient.get('/qiita_db/artifacts/1/') + base_data_dir = ainfo['files']['raw_forward_seqs'][0]['filepath'][ + :(-1 * len('raw_data/1_s_G1_L001_sequences.fastq.gz'))] + + # creating a LOCAL test directory within base_data_dir as the DB entry + # but no files exist. "job" is the according mountpoint + fp_test = join(base_data_dir, 'job', '2_test_folder') + self._create_test_dir(prefix=fp_test) + + # transmitting test directory to qiita main (remote) + self.tester._plugincoupling = 'https' + self.tester.push_file_to_central(fp_test) + # fp_main = join(base_data_dir, join(*Path(fp_test).parts)) + + # fetch test directory from qiita main to a different location + # (=prefix) than it was generated + prefix = join(expanduser("~"), 'localFetch') + fp_obs = self.tester.fetch_file_from_central(fp_test, prefix=prefix) + + # test a file of the freshly transferred directory from main has + # expected file content + with open(join(fp_obs, 'testdir', 'fileA.txt'), 'r') as f: + self.assertIn('contentA', '\n'.join(f.readlines())) + if __name__ == '__main__': main()