diff --git a/.github/workflows/qiita-ci.yml b/.github/workflows/qiita-ci.yml index d09ac5e..d3c6558 100644 --- a/.github/workflows/qiita-ci.yml +++ b/.github/workflows/qiita-ci.yml @@ -93,7 +93,7 @@ jobs: conda activate qiita export QIITA_ROOTCA_CERT=`pwd`/qiita-dev/qiita_core/support_files/ci_rootca.crt export QIITA_CONFIG_FP=`pwd`/qiita-dev/qiita_core/support_files/config_test_local.cfg - sed "s#/home/runner/work/qiita/qiita#${PWD}/qiita-dev/#g" `pwd`/qiita-dev/qiita_core/support_files/config_test.cfg > ${QIITA_CONFIG_FP} + sed "s#/home/runner/work/qiita/qiita#${PWD}/qiita-dev#g" `pwd`/qiita-dev/qiita_core/support_files/config_test.cfg > ${QIITA_CONFIG_FP} export REDBIOM_HOST="http://localhost:7379" @@ -104,7 +104,8 @@ jobs: mkdir -p ${CONDA_PREFIX}/var/run/nginx/ export NGINX_FILE=`pwd`/qiita-dev/qiita_pet/nginx_example.conf export NGINX_FILE_NEW=`pwd`/qiita-dev/qiita_pet/nginx_example_local.conf - sed "s#/home/runner/work/qiita/qiita#${PWD}/qiita-dev/#g" ${NGINX_FILE} > ${NGINX_FILE_NEW} + sed "s#/home/runner/work/qiita/qiita#${PWD}/qiita-dev#g" ${NGINX_FILE} > ${NGINX_FILE_NEW} + sed -i "s#/Users/username/qiita#${PWD}/qiita-dev#g" ${NGINX_FILE_NEW} nginx -c ${NGINX_FILE_NEW} echo "3. Setting up qiita" diff --git a/README.md b/README.md index 2f7eb4d..a921528 100644 --- a/README.md +++ b/README.md @@ -16,3 +16,32 @@ Also, if Qiita is running with the default server SSL certificate, you need to e export QIITA_ROOT_CA=/qiita_core/support_files/ci_rootca.crt ``` + +Configure for cloud computing +----------------------------- +In the default scenario, Qiita main and Qiita plugins are executed on the same +machines, maybe spread across a Slurm or other grid compute cluster, but main +and plugins have direct access to all files in `BASE_DATA_DIR`. + +This can be different, if you set up Qiita within a cloud compute environment, +where main and plugins do **not** share one file system. In this case, input- +files must first be transferred from main to plugin, then plugin can do its +processing and resulting files must be transferred back to main, once +processing is finished. To achieve this, the qiita_client, as it is part of +each plugin, provides the two functions for this file transfer +`fetch_file_from_central` and `push_file_to_central`. According to +`self._plugincoupling`, these functions operate on different "protocols"; +as of 2025-08-29, either "filesystem" or "https". Switch to **"https"** for +cloud environments, default is **filesystem**. + +The plugin coupling protocoll can be set in three ways + +1. default is always `filesystem`, i.e. `_DEFAULT_PLUGIN_COUPLINGS` + This is to be downward compatible. +2. the plugin configuration can hold a section `network` with an + option `PLUGINCOUPLING`. For old config files, this might not + (yet) be the case. Therefore, we are double checking existance + of this section and parameter here. +3. you can set the environment variable `QIITA_PLUGINCOUPLING` + Precedence is 3, 2, 1, i.e. the environment variable overrides + the other two ways. \ No newline at end of file diff --git a/qiita_client/plugin.py b/qiita_client/plugin.py index 7ee87b4..d455e48 100644 --- a/qiita_client/plugin.py +++ b/qiita_client/plugin.py @@ -136,13 +136,46 @@ def __init__(self, name, description, can_be_submitted_to_ebi, class BaseQiitaPlugin(object): - def __init__(self, name, version, description, publications=None): + _DEFAULT_PLUGIN_COUPLINGS = 'filesystem' + _ALLOWED_PLUGIN_COUPLINGS = [_DEFAULT_PLUGIN_COUPLINGS, 'https'] + + def __init__(self, name, version, description, publications=None, + plugincoupling=_DEFAULT_PLUGIN_COUPLINGS): logger.debug('Entered BaseQiitaPlugin.__init__()') self.name = name self.version = version self.description = description self.publications = dumps(publications) if publications else "" + # Depending on your compute architecture, there are multiple options + # available how "thight" plugins are coupled to the central + # Qiita master/workers + # --- filesystem --- + # The default scenario is "filesystem", i.e. plugins as well as + # master/worker have unrestricted direct access to a shared filesystem, + # e.g. a larger volume / directory, defined in the server configuration + # as base_data_dir + # --- https --- + # A second scenario is that your plugins execute as independent jobs on + # another machine, e.g. as docker containers or other cloud techniques. + # Intentionally, you don't want to use a shared filesystem, but you + # have to make sure necessary input files are provided to the + # containerized plugin before execution and resulting files are + # transfered back to the central Qiita master/worker. In this case, + # files are pulled / pushed through functions + # qiita_client.fetch_file_from_central and + # qiita_client.push_file_to_central, respectivey. + # Actually, all files need to be decorated with this function. + # The decision how data are transferred is then made within these two + # functions according to the "plugincoupling" setting. + if plugincoupling not in self._ALLOWED_PLUGIN_COUPLINGS: + raise ValueError( + ("valid plugincoupling values are ['%s'], but you " + "provided %s") % ( + "', '".join(self._ALLOWED_PLUGIN_COUPLINGS), + plugincoupling)) + self.plugincoupling = plugincoupling + # Will hold the different commands self.task_dict = {} @@ -151,7 +184,8 @@ def __init__(self, name, version, description, publications=None): 'QIITA_PLUGINS_DIR', join(expanduser('~'), '.qiita_plugins')) self.conf_fp = join(conf_dir, "%s_%s.conf" % (self.name, self.version)) - def generate_config(self, env_script, start_script, server_cert=None): + def generate_config(self, env_script, start_script, server_cert=None, + plugin_coupling=_DEFAULT_PLUGIN_COUPLINGS): """Generates the plugin configuration file Parameters @@ -165,6 +199,9 @@ def generate_config(self, env_script, start_script, server_cert=None): If the Qiita server used does not have a valid certificate, the path to the Qiita certificate so the plugin can connect over HTTPS to it + plugin_coupling : str + Type of coupling of plugin to central for file exchange. + Valid values: see _ALLOWED_PLUGIN_COUPLINGS. """ logger.debug('Entered BaseQiitaPlugin.generate_config()') sr = SystemRandom() @@ -178,7 +215,8 @@ def generate_config(self, env_script, start_script, server_cert=None): f.write(CONF_TEMPLATE % (self.name, self.version, self.description, env_script, start_script, self._plugin_type, self.publications, - server_cert, client_id, client_secret)) + server_cert, client_id, client_secret, + plugin_coupling)) def _register_command(self, command): """Registers a command in the plugin @@ -188,8 +226,8 @@ def _register_command(self, command): command: QiitaCommand The command to be added to the plugin """ - logger.debug( - f'Entered BaseQiitaPlugin._register_command({command.name})') + logger.debug('Entered BaseQiitaPlugin._register_command(%s)' % + command.name) self.task_dict[command.name] = command def _register(self, qclient): @@ -244,6 +282,14 @@ def __call__(self, server_url, job_id, output_dir): with open(self.conf_fp, 'U') as conf_file: config.readfp(conf_file) + plugincoupling = self._DEFAULT_PLUGIN_COUPLINGS + if config.has_section('network') and \ + config.has_option('network', 'PLUGINCOUPLING'): + plugincoupling = config.get('network', 'PLUGINCOUPLING') + if 'QIITA_PLUGINCOUPLING' in environ.keys() and \ + environ['QIITA_PLUGINCOUPLING'] is not None: + plugincoupling = environ['QIITA_PLUGINCOUPLING'] + qclient = QiitaClient(server_url, config.get('oauth2', 'CLIENT_ID'), config.get('oauth2', 'CLIENT_SECRET'), # for this group of tests, confirm optional @@ -251,7 +297,8 @@ def __call__(self, server_url, job_id, output_dir): # this value will prevent underlying libraries # from validating the server's cert using # certifi's pem cache. - ca_cert=config.get('oauth2', 'SERVER_CERT')) + ca_cert=config.get('oauth2', 'SERVER_CERT'), + plugincoupling=plugincoupling) if job_id == 'register': self._register(qclient) @@ -314,9 +361,11 @@ class QiitaTypePlugin(BaseQiitaPlugin): _plugin_type = "artifact definition" def __init__(self, name, version, description, validate_func, - html_generator_func, artifact_types, publications=None): + html_generator_func, artifact_types, publications=None, + plugincoupling=BaseQiitaPlugin._DEFAULT_PLUGIN_COUPLINGS): super(QiitaTypePlugin, self).__init__(name, version, description, - publications=publications) + publications=publications, + plugincoupling=plugincoupling) logger.debug('Entered QiitaTypePlugin.__init__()') self.artifact_types = artifact_types @@ -382,4 +431,8 @@ def register_command(self, command): [oauth2] SERVER_CERT = %s CLIENT_ID = %s -CLIENT_SECRET = %s""" +CLIENT_SECRET = %s + +[network] +PLUGINCOUPLING = %s +""" diff --git a/qiita_client/qiita_client.py b/qiita_client/qiita_client.py index 553fae8..4b34613 100644 --- a/qiita_client/qiita_client.py +++ b/qiita_client/qiita_client.py @@ -6,6 +6,8 @@ # The full license is in the file LICENSE, distributed with this software. # ----------------------------------------------------------------------------- +import os +import shutil import time import requests import threading @@ -182,7 +184,8 @@ class QiitaClient(object): get post """ - def __init__(self, server_url, client_id, client_secret, ca_cert=None): + def __init__(self, server_url, client_id, client_secret, ca_cert=None, + plugincoupling='filesystem'): self._server_url = server_url self._session = requests.Session() @@ -218,6 +221,9 @@ def __init__(self, server_url, client_id, client_secret, ca_cert=None): self._token = None self._fetch_token() + # store protocol for plugin coupling + self._plugincoupling = plugincoupling + def _fetch_token(self): """Retrieves an access token from the Qiita server @@ -253,13 +259,16 @@ def _fetch_token(self): # Timeout, etc. and logs them logger.debug(str(e)) - def _request_oauth2(self, req, *args, **kwargs): + def _request_oauth2(self, req, rettype, *args, **kwargs): """Executes a request using OAuth2 authorization Parameters ---------- req : function The request to execute + rettype : string + The return type of the function, either "json" or + if e.g. files are transferred "content" args : tuple The request args kwargs : dict @@ -309,13 +318,16 @@ def _request_oauth2(self, req, *args, **kwargs): r = req(*args, **kwargs) return r - def _request_retry(self, req, url, **kwargs): + def _request_retry(self, req, url, rettype='json', **kwargs): """Executes a request retrying it 2 times in case of failure Parameters ---------- req : function The request to execute + rettype : string + The return type of the function, either "json" (default) or + if e.g. files are transferred "content" url : str The url to access in the server kwargs : dict @@ -323,7 +335,7 @@ def _request_retry(self, req, url, **kwargs): Returns ------- - dict or None + dict or None or plain content IF rettype='content' The JSON information in the request response, if any Raises @@ -358,7 +370,8 @@ def _request_retry(self, req, url, **kwargs): retries = MAX_RETRIES while retries > 0: retries -= 1 - r = self._request_oauth2(req, url, verify=self._verify, **kwargs) + r = self._request_oauth2( + req, rettype, url, verify=self._verify, **kwargs) r.close() # There are some error codes that the specification says that they # shouldn't be retried @@ -374,7 +387,16 @@ def _request_retry(self, req, url, **kwargs): "Message: %s" % (req.__name__, url, r.status_code, r.text)) elif 0 <= (r.status_code - 200) < 100: try: - return r.json() + if rettype is None or rettype == 'json': + return r.json() + else: + if rettype == 'content': + return r.content + else: + raise ValueError( + ("return type rettype='%s' cannot be " + "understand. Choose from 'json' (default) " + "or 'content!") % rettype) except ValueError: return None stime = randint(MIN_TIME_SLEEP, MAX_TIME_SLEEP) @@ -386,13 +408,16 @@ def _request_retry(self, req, url, **kwargs): "Request '%s %s' did not succeed. Status code: %d. Message: %s" % (req.__name__, url, r.status_code, r.text)) - def get(self, url, **kwargs): + def get(self, url, rettype='json', **kwargs): """Execute a get request against the Qiita server Parameters ---------- url : str The url to access in the server + rettype : string + The return type of the function, either "json" (default) or + if e.g. files are transferred "content" kwargs : dict The request kwargs @@ -402,7 +427,8 @@ def get(self, url, **kwargs): The JSON response from the server """ logger.debug('Entered QiitaClient.get()') - return self._request_retry(self._session.get, url, **kwargs) + return self._request_retry( + self._session.get, url, rettype=rettype, **kwargs) def post(self, url, **kwargs): """Execute a post request against the Qiita server @@ -420,7 +446,8 @@ def post(self, url, **kwargs): The JSON response from the server """ logger.debug('Entered QiitaClient.post(%s)' % url) - return self._request_retry(self._session.post, url, **kwargs) + return self._request_retry( + self._session.post, url, rettype='json', **kwargs) def patch(self, url, op, path, value=None, from_p=None, **kwargs): """Executes a JSON patch request against the Qiita server @@ -479,7 +506,8 @@ def patch(self, url, op, path, value=None, from_p=None, **kwargs): # we made sure that data is correctly formatted here kwargs['data'] = data - return self._request_retry(self._session.patch, url, **kwargs) + return self._request_retry( + self._session.patch, url, rettype='json', **kwargs) # The functions are shortcuts for common functionality that all plugins # need to implement. @@ -502,7 +530,8 @@ def http_patch(self, url, **kwargs): The JSON response from the server """ logger.debug('Entered QiitaClient.http_patch()') - return self._request_retry(self._session.patch, url, **kwargs) + return self._request_retry( + self._session.patch, url, rettype='json', **kwargs) def start_heartbeat(self, job_id): """Create and start a thread that would send heartbeats to the server @@ -714,3 +743,116 @@ def _process_files_per_sample_fastq(self, files, prep_info, prep_info = prep_info.filter(items=sample_names.keys(), axis=0) return sample_names, prep_info + + def fetch_file_from_central(self, filepath, prefix=None): + """Moves content of a file from Qiita's central BASE_DATA_DIR to a + local plugin file-system. + + By default, this is exactly the same location, i.e. the return + filepath is identical to the requested one and nothing is moved / + copied. + However, for less tight plugin couplings, file content can be + transferred via https for situations where the plugin does not have + native access to Qiita's overall BASE_DATA_DIR. + + Parameters + ---------- + filepath : str + The filepath in Qiita's central BASE_DATA_DIR to the requested + file content + prefix : str + Primarily for testing: prefix the target filepath with this + filepath prefix to + a) in 'filesystem' mode: create an actual file copy (for testing) + If prefix=None, nothing will be copied/moved + b) in 'https' mode: flexibility to locate files differently in + plugin local file system. + + Returns + ------- + str : the filepath of the requested file within the local file system + """ + target_filepath = filepath + if (prefix is not None) and (prefix != ""): + # strip off root + if filepath.startswith(os.path.abspath(os.sep)): + target_filepath = target_filepath[ + len(os.path.abspath(os.sep)):] + # prefix filepath with given prefix + target_filepath = os.path.join(prefix, target_filepath) + + if self._plugincoupling == 'filesystem': + if (prefix is not None) and (prefix != ""): + # create necessary directory locally + os.makedirs(os.path.dirname(target_filepath), exist_ok=True) + + shutil.copyfile(filepath, target_filepath) + + return target_filepath + + elif self._plugincoupling == 'https': + # strip off root + if filepath.startswith(os.path.abspath(os.sep)): + filepath = filepath[len(os.path.abspath(os.sep)):] + + logger.debug('Requesting file %s from qiita server.' % filepath) + + # actual call to Qiita central to obtain file content + content = self.get( + '/cloud/fetch_file_from_central/' + filepath, + rettype='content') + + # create necessary directory locally + os.makedirs(os.path.dirname(target_filepath), exist_ok=True) + + # write retrieved file content + with open(target_filepath, 'wb') as f: + f.write(content) + + return target_filepath + + else: + raise ValueError( + ("File communication protocol '%s' as defined in plugins " + "configuration is NOT defined.") % self._plugincoupling) + + def push_file_to_central(self, filepath): + """Pushs filecontent to Qiita's central BASE_DATA_DIR directory. + + By default, plugin and Qiita's central BASE_DATA_DIR filesystems are + identical. In this case, no files are touched and the filepath is + directly returned. + If however, plugincoupling is set to 'https', the content of the file + is sent via https POST to Qiita's master/worker, which has to receive + and store in an appropriate location. + + Parameters + ---------- + filepath : str + The filepath of the files whos content shall be send to Qiita's + central BASE_DATA_DIR + + Returns + ------- + The given filepath - to be transparent in plugin code. + """ + if self._plugincoupling == 'filesystem': + return filepath + + elif self._plugincoupling == 'https': + logger.debug('Submitting file %s to qiita server.' % filepath) + + # target path, i.e. without filename + dirpath = os.path.dirname(filepath) + if dirpath == "": + dirpath = "/" + + self.post( + '/cloud/push_file_to_central/', + files={dirpath: open(filepath, 'rb')}) + + return filepath + + raise ValueError( + ("File communication protocol '%s' as defined in plugins " + "configuration is NOT defined.") % self._plugincoupling) diff --git a/qiita_client/tests/test_plugin.py b/qiita_client/tests/test_plugin.py index ab87f6b..43a8eca 100644 --- a/qiita_client/tests/test_plugin.py +++ b/qiita_client/tests/test_plugin.py @@ -150,9 +150,11 @@ def html_generator_func(a, b, c, d): 'SERVER_CERT = \n'] # We will test the last 2 lines independently since they're variable # in each test run - self.assertEqual(conf[:-2], exp_lines) - self.assertTrue(conf[-2].startswith('CLIENT_ID = ')) - self.assertTrue(conf[-1].startswith('CLIENT_SECRET = ')) + self.assertEqual(conf[:-5], exp_lines) + self.assertTrue(conf[-5].startswith('CLIENT_ID = ')) + self.assertTrue(conf[-4].startswith('CLIENT_SECRET = ')) + self.assertTrue(conf[-2].startswith('[network]')) + self.assertTrue(conf[-1].startswith('PLUGINCOUPLING = ')) def test_call(self): def validate_func(a, b, c, d): diff --git a/qiita_client/tests/test_qiita_client.py b/qiita_client/tests/test_qiita_client.py index 12689e8..15e4f77 100644 --- a/qiita_client/tests/test_qiita_client.py +++ b/qiita_client/tests/test_qiita_client.py @@ -7,8 +7,9 @@ # ----------------------------------------------------------------------------- from unittest import TestCase, main +import filecmp from os import remove, close -from os.path import basename, exists +from os.path import basename, exists, expanduser, join from tempfile import mkstemp from json import dumps import pandas as pd @@ -385,6 +386,60 @@ def test_artifact_and_preparation_files(self): self.assertEqual(fobs, fexp) self.assertEqual(piobs.shape, (2, 1)) + def test_fetch_file_from_central(self): + self.tester._plugincoupling = 'filesystem' + + ainfo = self.tester.get("/qiita_db/artifacts/%s/" % 1) + fp = ainfo['files']['raw_forward_seqs'][0]['filepath'] + + # mode: filesystem, prefix='': no copy, directly return given fp + fp_obs = self.tester.fetch_file_from_central(fp) + self.assertEqual(fp, fp_obs) + + # mode: filesystem, prefix='/karl': make file copy + prefix = join(expanduser("~"), 'karl') + self.clean_up_files.append(prefix + fp) + fp_obs = self.tester.fetch_file_from_central(fp, prefix=prefix) + self.assertEqual(prefix + fp, fp_obs) + self.assertTrue(filecmp.cmp(fp, fp_obs, shallow=False)) + + # non existing mode + with self.assertRaises(ValueError): + self.tester._plugincoupling = 'foo' + self.tester.fetch_file_from_central(fp) + + # change transfer mode to https + self.tester._plugincoupling = 'https' + prefix = join(expanduser("~"), 'kurt') + self.clean_up_files.append(prefix + fp) + fp_obs = self.tester.fetch_file_from_central(fp, prefix=prefix) + self.assertEqual(prefix + fp, fp_obs) + self.assertTrue(filecmp.cmp(fp, fp_obs, shallow=False)) + + def test_push_file_to_central(self): + self.tester._plugincoupling = 'filesystem' + + ainfo = self.tester.get("/qiita_db/artifacts/%s/" % 1) + fp = ainfo['files']['raw_forward_seqs'][0]['filepath'] + + # mode: filesystem + fp_obs = self.tester.push_file_to_central(fp) + self.assertEqual(fp, fp_obs) + + # non existing mode + with self.assertRaises(ValueError): + self.tester._plugincoupling = 'foo' + self.tester.push_file_to_central(fp) + + # change transfer mode to https + self.tester._plugincoupling = 'https' + fp_source = 'foo.bar' + with open(fp_source, 'w') as f: + f.write("this is a test\n") + self.clean_up_files.append(fp_source) + fp_obs = self.tester.push_file_to_central(fp_source) + self.assertEqual(fp_source, fp_obs) + if __name__ == '__main__': main()