From 6e4e9d40e5e75078b88aca47d3898e103aadc8ad Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Thu, 18 Jan 2024 17:17:19 +0800 Subject: [PATCH 01/46] HTTP API settings --- looper/api/__init__.py | 0 looper/api/main.py | 10 ++++++++++ requirements/requirements-all.txt | 3 +++ 3 files changed, 13 insertions(+) create mode 100644 looper/api/__init__.py create mode 100644 looper/api/main.py diff --git a/looper/api/__init__.py b/looper/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/looper/api/main.py b/looper/api/main.py new file mode 100644 index 000000000..24715665f --- /dev/null +++ b/looper/api/main.py @@ -0,0 +1,10 @@ +from fastapi import FastAPI +from looper.command_models.commands import RunParserModel + +app = FastAPI(validate_model=True) + + +@app.post("/run") +async def run_endpoint(run_model: RunParserModel): + print(run_model) + return run_model diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index 71c0df877..0e793d423 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -12,3 +12,6 @@ rich>=9.10.0 ubiquerg>=0.5.2 yacman>=0.9.2 pydantic-argparse>=0.8.0 +pydantic-argparse==0.8.0 +fastapi +uvicorn From eab51271039a0cfcd8887b4eaef630483880fbe0 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Fri, 19 Jan 2024 12:13:35 +0800 Subject: [PATCH 02/46] Create an argparse.Namespace --- looper/api/main.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 24715665f..f46f97af8 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,10 +1,21 @@ +from argparse import Namespace + from fastapi import FastAPI from looper.command_models.commands import RunParserModel app = FastAPI(validate_model=True) +def create_argparse_namespace(run_model: RunParserModel) -> Namespace: + # Create an argparse namespace from the submitted run model + namespace = Namespace() + for arg in vars(run_model): + setattr(namespace, arg, getattr(run_model, arg)) + return namespace + + @app.post("/run") async def run_endpoint(run_model: RunParserModel): - print(run_model) + argparse_namespace = create_argparse_namespace(run_model) + print(argparse_namespace) return run_model From 72087ee35bc44c85e0ec09fb3072b5c2037281c7 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Fri, 19 Jan 2024 15:35:51 +0800 Subject: [PATCH 03/46] Add run function from cli_pydantic --- looper/api/main.py | 145 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 136 insertions(+), 9 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index f46f97af8..69e99ef1d 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,21 +1,148 @@ +import os +import sys from argparse import Namespace +import yaml +from divvy import select_divvy_config from fastapi import FastAPI -from looper.command_models.commands import RunParserModel +from looper.cli_looper import _proc_resources_spec +from looper.command_models.commands import ( # RunParserModel, + SUPPORTED_COMMANDS, + TopLevelParser, +) +from looper.const import * +from looper.divvy import DEFAULT_COMPUTE_RESOURCES_NAME, select_divvy_config +from looper.exceptions import * +from looper.looper import * +from looper.parser_types import * +from looper.project import Project, ProjectContext +from looper.utils import ( + dotfile_path, + enrich_args_via_cfg, + is_registry_path, + read_looper_dotfile, +) +from pephubclient import PEPHubClient app = FastAPI(validate_model=True) -def create_argparse_namespace(run_model: RunParserModel) -> Namespace: - # Create an argparse namespace from the submitted run model +def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: + # Create an argparse namespace from the submitted top level model namespace = Namespace() - for arg in vars(run_model): - setattr(namespace, arg, getattr(run_model, arg)) + for arg in vars(top_level_model): + if arg not in [cmd.name for cmd in SUPPORTED_COMMANDS]: + setattr(namespace, arg, getattr(top_level_model, arg)) + else: + command_namespace = Namespace() + command_namespace_args = getattr(top_level_model, arg) + for argname in vars(command_namespace_args): + setattr( + command_namespace, + argname, + getattr(command_namespace_args, argname), + ) + setattr(namespace, arg, command_namespace) return namespace +def run_cmd(args: Namespace): + # here comes adapted `cli_looper.py` code + looper_cfg_path = os.path.relpath(dotfile_path(), start=os.curdir) + try: + looper_config_dict = read_looper_dotfile() + + for looper_config_key, looper_config_item in looper_config_dict.items(): + print(looper_config_key, looper_config_item) + setattr(args, looper_config_key, looper_config_item) + + except OSError: + # parser.print_help(sys.stderr) + raise ValueError( + f"Looper config file does not exist. Use looper init to create one at {looper_cfg_path}." + ) + + print("#####################################") + print(args) + + # args = enrich_args_via_cfg(args, parser, False) + divcfg = ( + select_divvy_config(filepath=args.run.divvy) + if hasattr(args.run, "divvy") + else None + ) + # Ignore flags if user is selecting or excluding on flags: + if args.sel_flag or args.exc_flag: + args.ignore_flags = True + + # Initialize project + if is_registry_path(args.config_file): + if vars(args)[SAMPLE_PL_ARG]: + p = Project( + amendments=args.amend, + divcfg_path=divcfg, + runp=args.command == "runp", + project_dict=PEPHubClient()._load_raw_pep( + registry_path=args.config_file + ), + **{ + attr: getattr(args, attr) for attr in CLI_PROJ_ATTRS if attr in args + }, + ) + else: + raise MisconfigurationException( + f"`sample_pipeline_interface` is missing. Provide it in the parameters." + ) + else: + try: + p = Project( + cfg=args.config_file, + amendments=args.amend, + divcfg_path=divcfg, + runp=False, + **{ + attr: getattr(args, attr) for attr in CLI_PROJ_ATTRS if attr in args + }, + ) + except yaml.parser.ParserError as e: + _LOGGER.error(f"Project config parse failed -- {e}") + sys.exit(1) + + selected_compute_pkg = p.selected_compute_package or DEFAULT_COMPUTE_RESOURCES_NAME + if p.dcc is not None and not p.dcc.activate_package(selected_compute_pkg): + _LOGGER.info( + "Failed to activate '{}' computing package. " + "Using the default one".format(selected_compute_pkg) + ) + + with ProjectContext( + prj=p, + selector_attribute="toggle", + selector_include=None, + selector_exclude=None, + selector_flag=None, + exclusion_flag=None, + ) as prj: + command = "run" + if command == "run": + run = Runner(prj) + try: + compute_kwargs = _proc_resources_spec(args) + return run(args, rerun=False, **compute_kwargs) + except SampleFailedException: + sys.exit(1) + except IOError: + _LOGGER.error( + "{} pipeline_interfaces: '{}'".format( + prj.__class__.__name__, prj.pipeline_interface_sources + ) + ) + raise + + @app.post("/run") -async def run_endpoint(run_model: RunParserModel): - argparse_namespace = create_argparse_namespace(run_model) - print(argparse_namespace) - return run_model +async def run_endpoint(top_level_model: TopLevelParser): + print(top_level_model) + argparse_namespace = create_argparse_namespace(top_level_model) + run_cmd(argparse_namespace) + return top_level_model From 1734c80e7af977c881de8cd09922b87640a61256 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Fri, 19 Jan 2024 15:50:31 +0800 Subject: [PATCH 04/46] Adjust enrich_args_via_cfg to http api --- looper/utils.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/looper/utils.py b/looper/utils.py index 531ea01a6..e5f602005 100644 --- a/looper/utils.py +++ b/looper/utils.py @@ -18,8 +18,8 @@ from pephubclient.constants import RegistryPath from pydantic.error_wrappers import ValidationError -from .const import * from .command_models.commands import SUPPORTED_COMMANDS +from .const import * from .exceptions import MisconfigurationException, RegistryPathException _LOGGER = getLogger(__name__) @@ -251,7 +251,7 @@ def read_yaml_file(filepath): return data -def enrich_args_via_cfg(parser_args, aux_parser, test_args=None): +def enrich_args_via_cfg(parser_args, aux_parser, test_args=None, http_api=False): """ Read in a looper dotfile and set arguments. @@ -268,11 +268,17 @@ def enrich_args_via_cfg(parser_args, aux_parser, test_args=None): else dict() ) result = argparse.Namespace() - if test_args: - cli_args, _ = aux_parser.parse_known_args(args=test_args) + if not http_api: + if test_args: + cli_args, _ = aux_parser.parse_known_args(args=test_args) + else: + cli_args, _ = aux_parser.parse_known_args() else: - cli_args, _ = aux_parser.parse_known_args() + if aux_parser: + cli_args, _ = aux_parser.parse_known_args() + else: + cli_args = [] def set_single_arg(argname, default_source_namespace, result_namespace): if argname not in POSITIONAL or not hasattr(result, argname): From e0e3a6f06c80abd1946ca47a8d7ddc3a5b598016 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Fri, 19 Jan 2024 15:52:33 +0800 Subject: [PATCH 05/46] Run adjusted enrich_args_via_cfg in http api --- looper/api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 69e99ef1d..8540c2088 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -65,7 +65,7 @@ def run_cmd(args: Namespace): print("#####################################") print(args) - # args = enrich_args_via_cfg(args, parser, False) + args = enrich_args_via_cfg(args, None, False, True) divcfg = ( select_divvy_config(filepath=args.run.divvy) if hasattr(args.run, "divvy") From 67182ddcf7c8c73f305c9804ba8283dabe5d7876 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Fri, 19 Jan 2024 16:20:42 +0800 Subject: [PATCH 06/46] Re-organize cli_pydantic.py to run looper run via CLI and http-api --- looper/api/main.py | 134 +++++------------------------------------ looper/cli_pydantic.py | 21 ++++--- 2 files changed, 29 insertions(+), 126 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 8540c2088..65e16cbd4 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,34 +1,25 @@ -import os -import sys from argparse import Namespace -import yaml -from divvy import select_divvy_config from fastapi import FastAPI -from looper.cli_looper import _proc_resources_spec -from looper.command_models.commands import ( # RunParserModel, - SUPPORTED_COMMANDS, - TopLevelParser, -) -from looper.const import * -from looper.divvy import DEFAULT_COMPUTE_RESOURCES_NAME, select_divvy_config -from looper.exceptions import * -from looper.looper import * -from looper.parser_types import * -from looper.project import Project, ProjectContext -from looper.utils import ( - dotfile_path, - enrich_args_via_cfg, - is_registry_path, - read_looper_dotfile, -) -from pephubclient import PEPHubClient +from looper.cli_pydantic import run_looper +from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser app = FastAPI(validate_model=True) def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: - # Create an argparse namespace from the submitted top level model + """ + Converts a TopLevelParser instance into an argparse.Namespace object. + + This function takes a TopLevelParser instance, and converts it into an + argparse.Namespace object. It includes handling for supported commands + specified in SUPPORTED_COMMANDS. + + :param TopLevelParser top_level_model: An instance of the TopLevelParser + model + :return argparse.Namespace: An argparse.Namespace object representing + the parsed command-line arguments. + """ namespace = Namespace() for arg in vars(top_level_model): if arg not in [cmd.name for cmd in SUPPORTED_COMMANDS]: @@ -46,103 +37,8 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: return namespace -def run_cmd(args: Namespace): - # here comes adapted `cli_looper.py` code - looper_cfg_path = os.path.relpath(dotfile_path(), start=os.curdir) - try: - looper_config_dict = read_looper_dotfile() - - for looper_config_key, looper_config_item in looper_config_dict.items(): - print(looper_config_key, looper_config_item) - setattr(args, looper_config_key, looper_config_item) - - except OSError: - # parser.print_help(sys.stderr) - raise ValueError( - f"Looper config file does not exist. Use looper init to create one at {looper_cfg_path}." - ) - - print("#####################################") - print(args) - - args = enrich_args_via_cfg(args, None, False, True) - divcfg = ( - select_divvy_config(filepath=args.run.divvy) - if hasattr(args.run, "divvy") - else None - ) - # Ignore flags if user is selecting or excluding on flags: - if args.sel_flag or args.exc_flag: - args.ignore_flags = True - - # Initialize project - if is_registry_path(args.config_file): - if vars(args)[SAMPLE_PL_ARG]: - p = Project( - amendments=args.amend, - divcfg_path=divcfg, - runp=args.command == "runp", - project_dict=PEPHubClient()._load_raw_pep( - registry_path=args.config_file - ), - **{ - attr: getattr(args, attr) for attr in CLI_PROJ_ATTRS if attr in args - }, - ) - else: - raise MisconfigurationException( - f"`sample_pipeline_interface` is missing. Provide it in the parameters." - ) - else: - try: - p = Project( - cfg=args.config_file, - amendments=args.amend, - divcfg_path=divcfg, - runp=False, - **{ - attr: getattr(args, attr) for attr in CLI_PROJ_ATTRS if attr in args - }, - ) - except yaml.parser.ParserError as e: - _LOGGER.error(f"Project config parse failed -- {e}") - sys.exit(1) - - selected_compute_pkg = p.selected_compute_package or DEFAULT_COMPUTE_RESOURCES_NAME - if p.dcc is not None and not p.dcc.activate_package(selected_compute_pkg): - _LOGGER.info( - "Failed to activate '{}' computing package. " - "Using the default one".format(selected_compute_pkg) - ) - - with ProjectContext( - prj=p, - selector_attribute="toggle", - selector_include=None, - selector_exclude=None, - selector_flag=None, - exclusion_flag=None, - ) as prj: - command = "run" - if command == "run": - run = Runner(prj) - try: - compute_kwargs = _proc_resources_spec(args) - return run(args, rerun=False, **compute_kwargs) - except SampleFailedException: - sys.exit(1) - except IOError: - _LOGGER.error( - "{} pipeline_interfaces: '{}'".format( - prj.__class__.__name__, prj.pipeline_interface_sources - ) - ) - raise - - @app.post("/run") async def run_endpoint(top_level_model: TopLevelParser): - print(top_level_model) argparse_namespace = create_argparse_namespace(top_level_model) - run_cmd(argparse_namespace) + run_looper(argparse_namespace, None, True) return top_level_model diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index 831788383..e14be9b45 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -19,6 +19,7 @@ import os import sys +from argparse import Namespace import logmuse import pydantic_argparse @@ -46,7 +47,7 @@ ) -def run_looper(args: TopLevelParser, parser: ArgumentParser): +def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_api=False): # here comes adapted `cli_looper.py` code global _LOGGER @@ -77,11 +78,11 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser): setattr(args, looper_config_key, looper_config_item) except OSError: - parser.print_help(sys.stderr) - _LOGGER.warning( + if not http_api: + parser.print_help(sys.stderr) + raise ValueError( f"Looper config file does not exist. Use looper init to create one at {looper_cfg_path}." ) - sys.exit(1) else: _LOGGER.warning( "This PEP configures looper through the project config. This approach is deprecated and will " @@ -89,7 +90,7 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser): "looper.databio.org/en/latest/looper-config" ) - args = enrich_args_via_cfg(args, parser, False) + args = enrich_args_via_cfg(args, parser, False, http_api) # If project pipeline interface defined in the cli, change name to: "pipeline_interface" if vars(args)[PROJECT_PL_ARG]: @@ -97,9 +98,10 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser): divcfg = ( select_divvy_config(filepath=subcommand_args.divvy) - if hasattr(subcommand_args, "divvy") - else None + if hasattr(subcommand_args, "divvy") else None ) + args = enrich_args_via_cfg(args, parser, False, http_api) + # Ignore flags if user is selecting or excluding on flags: if args.sel_flag or args.exc_flag: args.ignore_flags = True @@ -176,6 +178,11 @@ def main() -> None: add_help=True, ) args = parser.parse_typed_args() +<<<<<<< HEAD +======= + print(args) + print("#########################################") +>>>>>>> 0141fb3 (Re-organize cli_pydantic.py to run looper run via CLI and http-api) run_looper(args, parser) From 634665416e9d400b00774b37f800ccefdb014c71 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 19 Jan 2024 14:27:29 +0100 Subject: [PATCH 07/46] Slight refactor of `create_argparse_namespace` --- looper/api/main.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 65e16cbd4..9ec0418f8 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -21,19 +21,20 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: the parsed command-line arguments. """ namespace = Namespace() - for arg in vars(top_level_model): - if arg not in [cmd.name for cmd in SUPPORTED_COMMANDS]: - setattr(namespace, arg, getattr(top_level_model, arg)) + + for argname, value in vars(top_level_model).items(): + if argname not in [cmd.name for cmd in SUPPORTED_COMMANDS]: + setattr(namespace, argname, value) else: command_namespace = Namespace() - command_namespace_args = getattr(top_level_model, arg) - for argname in vars(command_namespace_args): + command_namespace_args = value + for command_argname, command_arg_value in vars(command_namespace_args).items(): setattr( command_namespace, - argname, - getattr(command_namespace_args, argname), + command_argname, + command_arg_value, ) - setattr(namespace, arg, command_namespace) + setattr(namespace, argname, command_namespace) return namespace From e1f730841ff8427b5d1c4732ba543ee2f5e06c07 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 19 Jan 2024 14:47:16 +0100 Subject: [PATCH 08/46] Remove `run` from route That's because this endpoint will support _all_ commands, and not only `run`. --- looper/api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 9ec0418f8..a6addb2fb 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -38,7 +38,7 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: return namespace -@app.post("/run") +@app.post("/") async def run_endpoint(top_level_model: TopLevelParser): argparse_namespace = create_argparse_namespace(top_level_model) run_looper(argparse_namespace, None, True) From dd978c88d0b7257e58bf4d7806e82a0fd152d93f Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 19 Jan 2024 14:56:31 +0100 Subject: [PATCH 09/46] Capture stderr / stdout and return in HTTP response --- looper/api/main.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index a6addb2fb..7655c4b65 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,4 +1,6 @@ from argparse import Namespace +from contextlib import redirect_stderr, redirect_stdout +import io from fastapi import FastAPI from looper.cli_pydantic import run_looper @@ -41,5 +43,11 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: @app.post("/") async def run_endpoint(top_level_model: TopLevelParser): argparse_namespace = create_argparse_namespace(top_level_model) - run_looper(argparse_namespace, None, True) - return top_level_model + stdout_stream = io.StringIO() + stderr_stream = io.StringIO() + with redirect_stderr(stderr_stream), redirect_stdout(stdout_stream): + run_looper(argparse_namespace, None, True) + return { + "stdout": stdout_stream.getvalue(), + "stderr": stderr_stream.getvalue() + } From e010f75c6e29994fa92927c5cc2aab657c7ae04b Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 19 Jan 2024 15:05:23 +0100 Subject: [PATCH 10/46] Rename `run_endpoint` -> `main_endpoint` --- looper/api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 7655c4b65..338d15ef3 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -41,7 +41,7 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: @app.post("/") -async def run_endpoint(top_level_model: TopLevelParser): +async def main_endpoint(top_level_model: TopLevelParser): argparse_namespace = create_argparse_namespace(top_level_model) stdout_stream = io.StringIO() stderr_stream = io.StringIO() From 8af2bb2b76a6850508a344b0e5433a49accfdff8 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 19 Jan 2024 15:13:34 +0100 Subject: [PATCH 11/46] Add response model --- looper/api/main.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 338d15ef3..2143e9358 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -5,6 +5,7 @@ from fastapi import FastAPI from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser +import pydantic app = FastAPI(validate_model=True) @@ -39,15 +40,22 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: setattr(namespace, argname, command_namespace) return namespace +class MainResponse(pydantic.BaseModel): + """ + Response of the main endpoint. + """ + stdout: str = pydantic.Field(description="Standard output produced by `looper` while running a command") + stderr: str = pydantic.Field(description="Standard error output produced by `looper` while running a command") + @app.post("/") -async def main_endpoint(top_level_model: TopLevelParser): +async def main_endpoint(top_level_model: TopLevelParser) -> MainResponse: argparse_namespace = create_argparse_namespace(top_level_model) stdout_stream = io.StringIO() stderr_stream = io.StringIO() with redirect_stderr(stderr_stream), redirect_stdout(stdout_stream): run_looper(argparse_namespace, None, True) - return { - "stdout": stdout_stream.getvalue(), - "stderr": stderr_stream.getvalue() - } + return MainResponse( + stdout=stdout_stream.getvalue(), + stderr=stderr_stream.getvalue() + ) From a89e7bc3eb57f641fd435addf9cf69ba2a2d61ca Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 19 Jan 2024 22:51:06 +0100 Subject: [PATCH 12/46] Add a comment about the endpoint likely being blocking --- looper/api/main.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/looper/api/main.py b/looper/api/main.py index 2143e9358..9b0109c9b 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -54,6 +54,15 @@ async def main_endpoint(top_level_model: TopLevelParser) -> MainResponse: stdout_stream = io.StringIO() stderr_stream = io.StringIO() with redirect_stderr(stderr_stream), redirect_stdout(stdout_stream): + # TODO: as it stands, because of the `async def`, and the lacking `await` + # in the following line, this endpoint is (I (Simeon) thing) currently blocking. + # We would need to make `run_looper()` return a future, but it inherently does + # not support `async` calls. + # So one option would be to run `run_looper()` in its own thread whose + # termination we can `await`, using `fastapi.run_in_threadpool`. But that fails + # with an error stemming from the `yacman` library about `signal.signal` only + # working in the main thread of the main interpreter. We have to investigate + # how to solve this. run_looper(argparse_namespace, None, True) return MainResponse( stdout=stdout_stream.getvalue(), From 1880372a7973de9afcdd35ac6c1bb3c911ae7b89 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Mon, 22 Jan 2024 15:57:29 +0800 Subject: [PATCH 13/46] Apply formatter --- looper/api/main.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 9b0109c9b..7091b7dfe 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,11 +1,11 @@ +import io from argparse import Namespace from contextlib import redirect_stderr, redirect_stdout -import io +import pydantic from fastapi import FastAPI from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser -import pydantic app = FastAPI(validate_model=True) @@ -31,7 +31,9 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: else: command_namespace = Namespace() command_namespace_args = value - for command_argname, command_arg_value in vars(command_namespace_args).items(): + for command_argname, command_arg_value in vars( + command_namespace_args + ).items(): setattr( command_namespace, command_argname, @@ -40,12 +42,18 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: setattr(namespace, argname, command_namespace) return namespace + class MainResponse(pydantic.BaseModel): """ Response of the main endpoint. """ - stdout: str = pydantic.Field(description="Standard output produced by `looper` while running a command") - stderr: str = pydantic.Field(description="Standard error output produced by `looper` while running a command") + + stdout: str = pydantic.Field( + description="Standard output produced by `looper` while running a command" + ) + stderr: str = pydantic.Field( + description="Standard error output produced by `looper` while running a command" + ) @app.post("/") @@ -65,6 +73,5 @@ async def main_endpoint(top_level_model: TopLevelParser) -> MainResponse: # how to solve this. run_looper(argparse_namespace, None, True) return MainResponse( - stdout=stdout_stream.getvalue(), - stderr=stderr_stream.getvalue() + stdout=stdout_stream.getvalue(), stderr=stderr_stream.getvalue() ) From 42119f00a7591d869bd502a74e6728350cfcbefa Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Mon, 22 Jan 2024 17:17:21 +0800 Subject: [PATCH 14/46] Add logger def to be captured by API and also CLI --- looper/cli_pydantic.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index e14be9b45..c768db8d0 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -178,11 +178,6 @@ def main() -> None: add_help=True, ) args = parser.parse_typed_args() -<<<<<<< HEAD -======= - print(args) - print("#########################################") ->>>>>>> 0141fb3 (Re-organize cli_pydantic.py to run looper run via CLI and http-api) run_looper(args, parser) From f0c749db6ae0da89dca2f06fdc82601f5f584695 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Mon, 22 Jan 2024 17:30:16 +0800 Subject: [PATCH 15/46] Add README for the API --- looper/api/README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 looper/api/README.md diff --git a/looper/api/README.md b/looper/api/README.md new file mode 100644 index 000000000..ffcf951e7 --- /dev/null +++ b/looper/api/README.md @@ -0,0 +1,21 @@ +# Looper HTTP API + +## Overview + +This API provides an HTTP interface for running the `looper` commands, allowing users to interact with Looper via HTTP requests. + +## Usage +### Running the API +To run the API, execute the following command: +```bash +cd looper/api +uvicorn main:app --reload +``` +### Example API Usage +To run the `looper run` command through the HTTP API, you can use the following curl command: +```bash +curl -X POST -H "Content-Type: application/json" -d '{"run": {}, "looper_config": ".looper.yaml"}' "http://127.0.0.1:8000" +``` +with the project files in the same `looper/api` folder. + +This example sends a JSON payload with the `run` and `looper_config` parameters to the `/` endpoint. From 6d146b5da3bb736f6db6e17d8fa146466b5ad883 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Tue, 23 Jan 2024 17:47:24 +0800 Subject: [PATCH 16/46] Add endpoint "\status" to capture UUID --- looper/api/main.py | 15 ++++++++++++++- looper/cli_pydantic.py | 4 ++++ requirements/requirements-all.txt | 1 + 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 7091b7dfe..9a455feb4 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -9,6 +9,8 @@ app = FastAPI(validate_model=True) +_UUID = None + def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: """ @@ -58,6 +60,7 @@ class MainResponse(pydantic.BaseModel): @app.post("/") async def main_endpoint(top_level_model: TopLevelParser) -> MainResponse: + global _UUID argparse_namespace = create_argparse_namespace(top_level_model) stdout_stream = io.StringIO() stderr_stream = io.StringIO() @@ -71,7 +74,17 @@ async def main_endpoint(top_level_model: TopLevelParser) -> MainResponse: # with an error stemming from the `yacman` library about `signal.signal` only # working in the main thread of the main interpreter. We have to investigate # how to solve this. - run_looper(argparse_namespace, None, True) + _, _UUID = run_looper(argparse_namespace, None, True) return MainResponse( stdout=stdout_stream.getvalue(), stderr=stderr_stream.getvalue() ) + + +@app.get("/status") +async def get_status(): + global _UUID + if _UUID: + print(_UUID) + return {"UUID": _UUID} + else: + return {"UUID": "Not found"} diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index c768db8d0..8514cce85 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -19,6 +19,7 @@ import os import sys +import uuid from argparse import Namespace import logmuse @@ -51,6 +52,7 @@ def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_ap # here comes adapted `cli_looper.py` code global _LOGGER + _UUID = str(uuid.uuid4()) _LOGGER = logmuse.logger_via_cli(args, make_root=True) # Find out which subcommand was used @@ -158,6 +160,8 @@ def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_ap run = Runner(prj) try: compute_kwargs = _proc_resources_spec(args) + if http_api: + return (run(args, rerun=False, **compute_kwargs), _UUID) return run(args, rerun=False, **compute_kwargs) except SampleFailedException: sys.exit(1) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index 0e793d423..dac9400aa 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -15,3 +15,4 @@ pydantic-argparse>=0.8.0 pydantic-argparse==0.8.0 fastapi uvicorn +uuid From 59869fa2d5d3a207daff62a38ada5e7d4a8335ae Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Wed, 24 Jan 2024 18:07:58 +0800 Subject: [PATCH 17/46] Create 2-step job submission and result workflow Co-authored-by: Simeon Carstens --- looper/api/main.py | 68 +++++++++++++++++++++++++----------------- looper/cli_pydantic.py | 4 --- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 9a455feb4..14fb5bad4 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,15 +1,46 @@ import io from argparse import Namespace from contextlib import redirect_stderr, redirect_stdout +from typing import Dict +from uuid import UUID, uuid4 import pydantic from fastapi import FastAPI from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser + +class Job(pydantic.BaseModel): + id: UUID = pydantic.Field(default_factory=uuid4) + status: str = "in_progress" + progress: int = 0 + stdout: str = None + stderr: str = None + + app = FastAPI(validate_model=True) +jobs: Dict[UUID, Job] = {} + + +def background_async(top_level_model: TopLevelParser, job_id: UUID) -> None: + argparse_namespace = create_argparse_namespace(top_level_model) + stdout_stream = io.StringIO() + stderr_stream = io.StringIO() + with redirect_stderr(stderr_stream), redirect_stdout(stdout_stream): + # TODO: as it stands, because of the `async def`, and the lacking `await` + # in the following line, this endpoint is (I (Simeon) thing) currently blocking. + # We would need to make `run_looper()` return a future, but it inherently does + # not support `async` calls. + # So one option would be to run `run_looper()` in its own thread whose + # termination we can `await`, using `fastapi.run_in_threadpool`. But that fails + # with an error stemming from the `yacman` library about `signal.signal` only + # working in the main thread of the main interpreter. We have to investigate + # how to solve this. + run_looper(argparse_namespace, None, True) -_UUID = None + jobs[job_id].status = "completed" + jobs[job_id].stdout = stdout_stream.getvalue() + jobs[job_id].stderr = stderr_stream.getvalue() def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: @@ -59,32 +90,13 @@ class MainResponse(pydantic.BaseModel): @app.post("/") -async def main_endpoint(top_level_model: TopLevelParser) -> MainResponse: - global _UUID - argparse_namespace = create_argparse_namespace(top_level_model) - stdout_stream = io.StringIO() - stderr_stream = io.StringIO() - with redirect_stderr(stderr_stream), redirect_stdout(stdout_stream): - # TODO: as it stands, because of the `async def`, and the lacking `await` - # in the following line, this endpoint is (I (Simeon) thing) currently blocking. - # We would need to make `run_looper()` return a future, but it inherently does - # not support `async` calls. - # So one option would be to run `run_looper()` in its own thread whose - # termination we can `await`, using `fastapi.run_in_threadpool`. But that fails - # with an error stemming from the `yacman` library about `signal.signal` only - # working in the main thread of the main interpreter. We have to investigate - # how to solve this. - _, _UUID = run_looper(argparse_namespace, None, True) - return MainResponse( - stdout=stdout_stream.getvalue(), stderr=stderr_stream.getvalue() - ) +async def main_endpoint(top_level_model: TopLevelParser) -> Dict: + job = Job() + jobs[job.id] = job + background_async(top_level_model, job.id) + return {"job_id": job.id} -@app.get("/status") -async def get_status(): - global _UUID - if _UUID: - print(_UUID) - return {"UUID": _UUID} - else: - return {"UUID": "Not found"} +@app.get("/status/{job_id}") +async def get_status(job_id: UUID): + return jobs[job_id] diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index 8514cce85..c768db8d0 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -19,7 +19,6 @@ import os import sys -import uuid from argparse import Namespace import logmuse @@ -52,7 +51,6 @@ def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_ap # here comes adapted `cli_looper.py` code global _LOGGER - _UUID = str(uuid.uuid4()) _LOGGER = logmuse.logger_via_cli(args, make_root=True) # Find out which subcommand was used @@ -160,8 +158,6 @@ def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_ap run = Runner(prj) try: compute_kwargs = _proc_resources_spec(args) - if http_api: - return (run(args, rerun=False, **compute_kwargs), _UUID) return run(args, rerun=False, **compute_kwargs) except SampleFailedException: sys.exit(1) From 384a898a4dfdf540a7d20d82fa782da8cc41af76 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Wed, 24 Jan 2024 15:44:58 +0100 Subject: [PATCH 18/46] Allow `None` stderr / stdout in job model --- looper/api/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 14fb5bad4..e727f3b9f 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -14,8 +14,8 @@ class Job(pydantic.BaseModel): id: UUID = pydantic.Field(default_factory=uuid4) status: str = "in_progress" progress: int = 0 - stdout: str = None - stderr: str = None + stdout: str | None = None + stderr: str | None = None app = FastAPI(validate_model=True) From 0161cc68a0ebf1a752378293402004f91251e994 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Wed, 24 Jan 2024 15:45:33 +0100 Subject: [PATCH 19/46] Run `run_looper()` in FastAPI background task --- looper/api/main.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index e727f3b9f..c13ffc959 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -3,6 +3,7 @@ from contextlib import redirect_stderr, redirect_stdout from typing import Dict from uuid import UUID, uuid4 +import fastapi import pydantic from fastapi import FastAPI @@ -22,7 +23,7 @@ class Job(pydantic.BaseModel): jobs: Dict[UUID, Job] = {} -def background_async(top_level_model: TopLevelParser, job_id: UUID) -> None: +async def background_async(top_level_model: TopLevelParser, job_id: UUID) -> None: argparse_namespace = create_argparse_namespace(top_level_model) stdout_stream = io.StringIO() stderr_stream = io.StringIO() @@ -90,10 +91,10 @@ class MainResponse(pydantic.BaseModel): @app.post("/") -async def main_endpoint(top_level_model: TopLevelParser) -> Dict: +async def main_endpoint(top_level_model: TopLevelParser, background_tasks: fastapi.BackgroundTasks) -> Dict: job = Job() jobs[job.id] = job - background_async(top_level_model, job.id) + background_tasks.add_task(background_async, top_level_model, job.id) return {"job_id": job.id} From 67c5d340eb7ecefeb4ed7f5304b14525306b8154 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Wed, 24 Jan 2024 16:03:51 +0100 Subject: [PATCH 20/46] Document / make self-documenting `Job` fields --- looper/api/main.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index c13ffc959..37aaa1b92 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -12,11 +12,21 @@ class Job(pydantic.BaseModel): - id: UUID = pydantic.Field(default_factory=uuid4) - status: str = "in_progress" + id: UUID = pydantic.Field( + default_factory=uuid4, + description="The unique identifier of the job" + ) + status: str = pydantic.Field( + default="in_progress", + description="The current status of the job. Can be either `in_progress` or `completed`." + ) progress: int = 0 - stdout: str | None = None - stderr: str | None = None + stdout: str | None = pydantic.Field(default=None, + description="Standard output produced by `looper` while performing the requested action" + ) + stderr: str | None = pydantic.Field(default=None, + description="Standard error output produced by `looper` while performing the requested action" + ) app = FastAPI(validate_model=True) @@ -76,20 +86,6 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: setattr(namespace, argname, command_namespace) return namespace - -class MainResponse(pydantic.BaseModel): - """ - Response of the main endpoint. - """ - - stdout: str = pydantic.Field( - description="Standard output produced by `looper` while running a command" - ) - stderr: str = pydantic.Field( - description="Standard error output produced by `looper` while running a command" - ) - - @app.post("/") async def main_endpoint(top_level_model: TopLevelParser, background_tasks: fastapi.BackgroundTasks) -> Dict: job = Job() From 8869378a6e3e37d9acf3b87a7361e0071032ace8 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Wed, 24 Jan 2024 16:08:49 +0100 Subject: [PATCH 21/46] Make `/` route return a 202 (Accepted) HTTP status code --- looper/api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 37aaa1b92..7826786c1 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -86,7 +86,7 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: setattr(namespace, argname, command_namespace) return namespace -@app.post("/") +@app.post("/", status_code=202) async def main_endpoint(top_level_model: TopLevelParser, background_tasks: fastapi.BackgroundTasks) -> Dict: job = Job() jobs[job.id] = job From e76c135fcc86ff85e7da26211e44b9a2f93a7d32 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Wed, 24 Jan 2024 16:20:13 +0100 Subject: [PATCH 22/46] Replace job UUID with a shorter random string The UUID is very, very long, and such a length / collision safety is not needed. So this uses a much shorter (length 6 characters) random job ID created using `secrets.token_urlsafe()`. Empiric testing says that this gives, for two random IDs, a collision probability of ~1e-4. --- looper/api/main.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 7826786c1..46da4bbf1 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,8 +1,8 @@ import io from argparse import Namespace from contextlib import redirect_stderr, redirect_stdout -from typing import Dict -from uuid import UUID, uuid4 +import secrets +from typing import Dict, TypeAlias import fastapi import pydantic @@ -10,10 +10,11 @@ from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser +JobId: TypeAlias = str class Job(pydantic.BaseModel): - id: UUID = pydantic.Field( - default_factory=uuid4, + id: JobId = pydantic.Field( + default_factory=lambda: secrets.token_urlsafe(4), description="The unique identifier of the job" ) status: str = pydantic.Field( @@ -28,12 +29,11 @@ class Job(pydantic.BaseModel): description="Standard error output produced by `looper` while performing the requested action" ) - app = FastAPI(validate_model=True) -jobs: Dict[UUID, Job] = {} +jobs: Dict[str, Job] = {} -async def background_async(top_level_model: TopLevelParser, job_id: UUID) -> None: +async def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: argparse_namespace = create_argparse_namespace(top_level_model) stdout_stream = io.StringIO() stderr_stream = io.StringIO() @@ -95,5 +95,5 @@ async def main_endpoint(top_level_model: TopLevelParser, background_tasks: fasta @app.get("/status/{job_id}") -async def get_status(job_id: UUID): +async def get_status(job_id: JobId): return jobs[job_id] From adc451af6df483bcde9bc41f84cf2ee7cbb0a14f Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Wed, 24 Jan 2024 16:23:53 +0100 Subject: [PATCH 23/46] Reorder imports --- looper/api/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 46da4bbf1..f74a4b310 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -3,10 +3,11 @@ from contextlib import redirect_stderr, redirect_stdout import secrets from typing import Dict, TypeAlias -import fastapi -import pydantic +import fastapi from fastapi import FastAPI +import pydantic + from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser From a3b85de794e128dc16132a3daec6e366737a0b1a Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Wed, 24 Jan 2024 17:03:05 +0100 Subject: [PATCH 24/46] Fix a typo --- looper/api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index f74a4b310..6c8c8d61a 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -40,7 +40,7 @@ async def background_async(top_level_model: TopLevelParser, job_id: JobId) -> No stderr_stream = io.StringIO() with redirect_stderr(stderr_stream), redirect_stdout(stdout_stream): # TODO: as it stands, because of the `async def`, and the lacking `await` - # in the following line, this endpoint is (I (Simeon) thing) currently blocking. + # in the following line, this endpoint is (I (Simeon) think) currently blocking. # We would need to make `run_looper()` return a future, but it inherently does # not support `async` calls. # So one option would be to run `run_looper()` in its own thread whose From ea81cb11f22ac36c11cea44ac1ade73e8ac320f5 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Sat, 27 Jan 2024 12:16:28 +0100 Subject: [PATCH 25/46] Remove `uuid` dependency --- requirements/requirements-all.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index dac9400aa..0e793d423 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -15,4 +15,3 @@ pydantic-argparse>=0.8.0 pydantic-argparse==0.8.0 fastapi uvicorn -uuid From 279d24f324c0bc3af548c39f6a7e7a8df5a7a43d Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Sat, 27 Jan 2024 12:21:23 +0100 Subject: [PATCH 26/46] Change `yacman` dependency to a hacked, but threadable version This is a temporary hack: use a Yacman branch that makes Yacman's YAMLConfigmanager not capture SIGTERM / SIGKILL (and thus Ctrl+c events). In the near future, this will be replaced by a new version of Yacman that supports a read-only mode. --- requirements/requirements-all.txt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index 0e793d423..b935989be 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -10,7 +10,11 @@ pipestat>=0.6.0 pyyaml>=3.12 rich>=9.10.0 ubiquerg>=0.5.2 -yacman>=0.9.2 +# This is a temporary hack: use a Yacman branch that makes Yacman's +# YAMLConfigmanager not capture SIGTERM / SIGKILL (and thus Ctrl+c events). +# In the near future, this will be replaced by a new version of Yacman that +# supports a read-only mode. +git+https://github.com/databio/yacman.git@tweag/thread-unsafe pydantic-argparse>=0.8.0 pydantic-argparse==0.8.0 fastapi From 4c887884c006f21768eb195ae95f613261f1a49e Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Sat, 27 Jan 2024 12:23:59 +0100 Subject: [PATCH 27/46] Add lower bound for FastAPI version --- requirements/requirements-all.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index b935989be..e5fb45b40 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -16,6 +16,5 @@ ubiquerg>=0.5.2 # supports a read-only mode. git+https://github.com/databio/yacman.git@tweag/thread-unsafe pydantic-argparse>=0.8.0 -pydantic-argparse==0.8.0 -fastapi uvicorn +fastapi>=0.109.0 From a82a8f78946d59fa8965e580b688b8c477fc3516 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Sat, 27 Jan 2024 12:24:15 +0100 Subject: [PATCH 28/46] Add lower bound for uvicorn version --- requirements/requirements-all.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index e5fb45b40..06adde4f2 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -16,5 +16,5 @@ ubiquerg>=0.5.2 # supports a read-only mode. git+https://github.com/databio/yacman.git@tweag/thread-unsafe pydantic-argparse>=0.8.0 -uvicorn fastapi>=0.109.0 +uvicorn>=0.26.0 From f995b472a13b821f41ba791212f47468e04fead9 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Sat, 27 Jan 2024 12:27:46 +0100 Subject: [PATCH 29/46] Make background task function non-`async` This allows, together with the hacked, threadable (but _not_ thread-safe) `yacman` version, to run `looper` commands in a non-blocking way. --- looper/api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 6c8c8d61a..e3ab9a82b 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -34,7 +34,7 @@ class Job(pydantic.BaseModel): jobs: Dict[str, Job] = {} -async def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: +def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: argparse_namespace = create_argparse_namespace(top_level_model) stdout_stream = io.StringIO() stderr_stream = io.StringIO() From 3c545467cdb0d65536c8578e513e5eff232e239f Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Mon, 29 Jan 2024 13:17:10 +0100 Subject: [PATCH 30/46] [DELETE ME] hack to use local `yacman` copy --- requirements/requirements-all.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index 06adde4f2..abc49ab9a 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -14,7 +14,8 @@ ubiquerg>=0.5.2 # YAMLConfigmanager not capture SIGTERM / SIGKILL (and thus Ctrl+c events). # In the near future, this will be replaced by a new version of Yacman that # supports a read-only mode. -git+https://github.com/databio/yacman.git@tweag/thread-unsafe +# git+https://github.com/databio/yacman.git@tweag/thread-unsafe +../yacman/ pydantic-argparse>=0.8.0 fastapi>=0.109.0 uvicorn>=0.26.0 From 9b3a1daae2bae2214b924c6214b6a379ca56f0fa Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Mon, 29 Jan 2024 18:03:10 +0100 Subject: [PATCH 31/46] Don't call `logmuse.init_logger()` in `looper.__init__.py` Everything will probably still work - after all, `logmuse.logger_via_cli()` is called in `cli_looper.py` / `cli_pydantic.py` which also sets up a logger. --- looper/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/looper/__init__.py b/looper/__init__.py index fe751d02d..b0931009e 100644 --- a/looper/__init__.py +++ b/looper/__init__.py @@ -7,10 +7,6 @@ """ -import logmuse - -logmuse.init_logger("looper") - from .divvy import ComputingConfiguration, select_divvy_config from .divvy import DEFAULT_COMPUTE_RESOURCES_NAME from .divvy import NEW_COMPUTE_KEY as COMPUTE_KEY From 16f0ab501a66afe8e98c9619abf0cf5b587589b4 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Mon, 29 Jan 2024 18:07:16 +0100 Subject: [PATCH 32/46] Explicitly initialize `logmuse` logger with `sys.stderr` as stream --- looper/cli_pydantic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index c768db8d0..a7da330f7 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -51,7 +51,7 @@ def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_ap # here comes adapted `cli_looper.py` code global _LOGGER - _LOGGER = logmuse.logger_via_cli(args, make_root=True) + _LOGGER = logmuse.logger_via_cli(args, make_root=True, stream=sys.stderr) # Find out which subcommand was used supported_command_names = [cmd.name for cmd in SUPPORTED_COMMANDS] From 2370aa6708fe905d20989a68635a106d73e5c954 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang Date: Tue, 30 Jan 2024 17:55:13 +0800 Subject: [PATCH 33/46] Selectively capture logs from separate jobs Before this commit, the logging stdout we captured will be mixed if we submit several jobs at the same time. This captures outputs for each thread (job) separately. See https://stackoverflow.com/questions/14890997/redirect-stdout-to-a-file-only-for-a-specific-thread. Co-authored-by: Simeon Carstens --- looper/api/main.py | 22 +-- looper/api/stdout_redirects.py | 257 +++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 15 deletions(-) create mode 100644 looper/api/stdout_redirects.py diff --git a/looper/api/main.py b/looper/api/main.py index e3ab9a82b..f34fb4c17 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,6 +1,5 @@ import io from argparse import Namespace -from contextlib import redirect_stderr, redirect_stdout import secrets from typing import Dict, TypeAlias @@ -11,6 +10,10 @@ from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser +import stdout_redirects + +stdout_redirects.enable_proxy() + JobId: TypeAlias = str class Job(pydantic.BaseModel): @@ -36,23 +39,12 @@ class Job(pydantic.BaseModel): def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: argparse_namespace = create_argparse_namespace(top_level_model) - stdout_stream = io.StringIO() - stderr_stream = io.StringIO() - with redirect_stderr(stderr_stream), redirect_stdout(stdout_stream): - # TODO: as it stands, because of the `async def`, and the lacking `await` - # in the following line, this endpoint is (I (Simeon) think) currently blocking. - # We would need to make `run_looper()` return a future, but it inherently does - # not support `async` calls. - # So one option would be to run `run_looper()` in its own thread whose - # termination we can `await`, using `fastapi.run_in_threadpool`. But that fails - # with an error stemming from the `yacman` library about `signal.signal` only - # working in the main thread of the main interpreter. We have to investigate - # how to solve this. - run_looper(argparse_namespace, None, True) + stdout_stream = stdout_redirects.redirect() + + run_looper(argparse_namespace, None, True) jobs[job_id].status = "completed" jobs[job_id].stdout = stdout_stream.getvalue() - jobs[job_id].stderr = stderr_stream.getvalue() def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: diff --git a/looper/api/stdout_redirects.py b/looper/api/stdout_redirects.py new file mode 100644 index 000000000..9daffe78b --- /dev/null +++ b/looper/api/stdout_redirects.py @@ -0,0 +1,257 @@ +# copied from https://stackoverflow.com/a/43667367/1193986 +# +# (c) umichscoots 2017 +# License unsepcified. Assumed to be CC-by-sa as is StackOverflow's policy +# +# The class LocalProxy is taken from the werkzeug project +# https://raw.githubusercontent.com/pallets/werkzeug/ef545f0d0bf28cbad02066b4cb7471bea50a93ee/src/werkzeug/local.py +# It is licensed under the BSD-3-Clause License +# +# I guess that means the result is CC-by-SA + + +import sys +import threading +from io import StringIO +from typing import Any, Optional, Union + +# Save all of the objects for use later. +orig___stdout__ = sys.__stdout__ +orig___stderr__ = sys.__stderr__ +orig_stdout = sys.stdout +orig_stderr = sys.stderr +thread_proxies = {} + + +class LocalProxy: + """Acts as a proxy for a werkzeug local. Forwards all operations to + a proxied object. The only operations not supported for forwarding + are right handed operands and any kind of assignment. + Example usage:: + from werkzeug.local import Local + l = Local() + # these are proxies + request = l('request') + user = l('user') + from werkzeug.local import LocalStack + _response_local = LocalStack() + # this is a proxy + response = _response_local() + Whenever something is bound to l.user / l.request the proxy objects + will forward all operations. If no object is bound a :exc:`RuntimeError` + will be raised. + To create proxies to :class:`Local` or :class:`LocalStack` objects, + call the object as shown above. If you want to have a proxy to an + object looked up by a function, you can (as of Werkzeug 0.6.1) pass + a function to the :class:`LocalProxy` constructor:: + session = LocalProxy(lambda: get_current_request().session) + .. versionchanged:: 0.6.1 + The class can be instantiated with a callable as well now. + """ + + __slots__ = ("__local", "__dict__", "__name__", "__wrapped__") + + def __init__( + self, + local: Union[Any, "LocalProxy", "LocalStack"], + name: Optional[str] = None, + ) -> None: + object.__setattr__(self, "_LocalProxy__local", local) + object.__setattr__(self, "__name__", name) + if callable(local) and not hasattr(local, "__release_local__"): + # "local" is a callable that is not an instance of Local or + # LocalManager: mark it as a wrapped function. + object.__setattr__(self, "__wrapped__", local) + + def _get_current_object( + self, + ) -> object: + """Return the current object. This is useful if you want the real + object behind the proxy at a time for performance reasons or because + you want to pass the object into a different context. + """ + if not hasattr(self.__local, "__release_local__"): + return self.__local() + try: + return getattr(self.__local, self.__name__) + except AttributeError: + raise RuntimeError(f"no object bound to {self.__name__}") + + @property + def __dict__(self): + try: + return self._get_current_object().__dict__ + except RuntimeError: + raise AttributeError("__dict__") + + def __repr__(self) -> str: + try: + obj = self._get_current_object() + except RuntimeError: + return f"<{type(self).__name__} unbound>" + return repr(obj) + + def __bool__(self) -> bool: + try: + return bool(self._get_current_object()) + except RuntimeError: + return False + + def __dir__(self): + try: + return dir(self._get_current_object()) + except RuntimeError: + return [] + + def __getattr__(self, name: str) -> Any: + if name == "__members__": + return dir(self._get_current_object()) + return getattr(self._get_current_object(), name) + + def __setitem__(self, key: Any, value: Any) -> None: + self._get_current_object()[key] = value # type: ignore + + def __delitem__(self, key): + del self._get_current_object()[key] + + __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v) # type: ignore + __delattr__ = lambda x, n: delattr(x._get_current_object(), n) # type: ignore + __str__ = lambda x: str(x._get_current_object()) # type: ignore + __lt__ = lambda x, o: x._get_current_object() < o + __le__ = lambda x, o: x._get_current_object() <= o + __eq__ = lambda x, o: x._get_current_object() == o # type: ignore + __ne__ = lambda x, o: x._get_current_object() != o # type: ignore + __gt__ = lambda x, o: x._get_current_object() > o + __ge__ = lambda x, o: x._get_current_object() >= o + __hash__ = lambda x: hash(x._get_current_object()) # type: ignore + __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw) + __len__ = lambda x: len(x._get_current_object()) + __getitem__ = lambda x, i: x._get_current_object()[i] + __iter__ = lambda x: iter(x._get_current_object()) + __contains__ = lambda x, i: i in x._get_current_object() + __add__ = lambda x, o: x._get_current_object() + o + __sub__ = lambda x, o: x._get_current_object() - o + __mul__ = lambda x, o: x._get_current_object() * o + __floordiv__ = lambda x, o: x._get_current_object() // o + __mod__ = lambda x, o: x._get_current_object() % o + __divmod__ = lambda x, o: x._get_current_object().__divmod__(o) + __pow__ = lambda x, o: x._get_current_object() ** o + __lshift__ = lambda x, o: x._get_current_object() << o + __rshift__ = lambda x, o: x._get_current_object() >> o + __and__ = lambda x, o: x._get_current_object() & o + __xor__ = lambda x, o: x._get_current_object() ^ o + __or__ = lambda x, o: x._get_current_object() | o + __div__ = lambda x, o: x._get_current_object().__div__(o) + __truediv__ = lambda x, o: x._get_current_object().__truediv__(o) + __neg__ = lambda x: -(x._get_current_object()) + __pos__ = lambda x: +(x._get_current_object()) + __abs__ = lambda x: abs(x._get_current_object()) + __invert__ = lambda x: ~(x._get_current_object()) + __complex__ = lambda x: complex(x._get_current_object()) + __int__ = lambda x: int(x._get_current_object()) + __long__ = lambda x: long(x._get_current_object()) # type: ignore # noqa + __float__ = lambda x: float(x._get_current_object()) + __oct__ = lambda x: oct(x._get_current_object()) + __hex__ = lambda x: hex(x._get_current_object()) + __index__ = lambda x: x._get_current_object().__index__() + __coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o) + __enter__ = lambda x: x._get_current_object().__enter__() + __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw) + __radd__ = lambda x, o: o + x._get_current_object() + __rsub__ = lambda x, o: o - x._get_current_object() + __rmul__ = lambda x, o: o * x._get_current_object() + __rdiv__ = lambda x, o: o / x._get_current_object() + __rtruediv__ = __rdiv__ + __rfloordiv__ = lambda x, o: o // x._get_current_object() + __rmod__ = lambda x, o: o % x._get_current_object() + __rdivmod__ = lambda x, o: x._get_current_object().__rdivmod__(o) + __copy__ = lambda x: copy.copy(x._get_current_object()) + __deepcopy__ = lambda x, memo: copy.deepcopy(x._get_current_object(), memo) + + +def redirect(): + """ + Enables the redirect for the current thread's output to a single cStringIO + object and returns the object. + + :return: The StringIO object. + :rtype: ``cStringIO.StringIO`` + """ + # Get the current thread's identity. + ident = threading.currentThread().ident + + # Enable the redirect and return the cStringIO object. + thread_proxies[ident] = StringIO() + return thread_proxies[ident] + + +def stop_redirect(): + """ + Enables the redirect for the current thread's output to a single cStringIO + object and returns the object. + + :return: The final string value. + :rtype: ``str`` + """ + # Get the current thread's identity. + ident = threading.currentThread().ident + + # Only act on proxied threads. + if ident not in thread_proxies: + return + + # Read the value, close/remove the buffer, and return the value. + retval = thread_proxies[ident].getvalue() + thread_proxies[ident].close() + del thread_proxies[ident] + return retval + + +def _get_stream(original): + """ + Returns the inner function for use in the LocalProxy object. + + :param original: The stream to be returned if thread is not proxied. + :type original: ``file`` + :return: The inner function for use in the LocalProxy object. + :rtype: ``function`` + """ + + def proxy(): + """ + Returns the original stream if the current thread is not proxied, + otherwise we return the proxied item. + + :return: The stream object for the current thread. + :rtype: ``file`` + """ + # Get the current thread's identity. + ident = threading.currentThread().ident + + # Return the proxy, otherwise return the original. + return thread_proxies.get(ident, original) + + # Return the inner function. + return proxy + + +def enable_proxy(): + """ + Overwrites __stdout__, __stderr__, stdout, and stderr with the proxied + objects. + """ + sys.__stdout__ = LocalProxy(_get_stream(sys.__stdout__)) + sys.__stderr__ = LocalProxy(_get_stream(sys.__stderr__)) + sys.stdout = LocalProxy(_get_stream(sys.stdout)) + sys.stderr = LocalProxy(_get_stream(sys.stderr)) + + +def disable_proxy(): + """ + Overwrites __stdout__, __stderr__, stdout, and stderr with the original + objects. + """ + sys.__stdout__ = orig___stdout__ + sys.__stderr__ = orig___stderr__ + sys.stdout = orig_stdout + sys.stderr = orig_stderr From 8ffaef76add0c1c7583fc0bc842f603f46783bf2 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Tue, 30 Jan 2024 13:34:32 +0100 Subject: [PATCH 34/46] Add source for `stdout_redirects.py` --- looper/api/stdout_redirects.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/looper/api/stdout_redirects.py b/looper/api/stdout_redirects.py index 9daffe78b..f430b1b42 100644 --- a/looper/api/stdout_redirects.py +++ b/looper/api/stdout_redirects.py @@ -1,3 +1,5 @@ +# Copied from https://gitlab.com/yquemener/stdout-redirects +# # copied from https://stackoverflow.com/a/43667367/1193986 # # (c) umichscoots 2017 From d8ae6ec6a3293d834172b4c23d072e4ca36af4f3 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Tue, 30 Jan 2024 13:47:51 +0100 Subject: [PATCH 35/46] Add a comment about not calling `stdout_redirect.stop_redirect()` --- looper/api/main.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/looper/api/main.py b/looper/api/main.py index f34fb4c17..94462bd2c 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -43,6 +43,9 @@ def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: run_looper(argparse_namespace, None, True) + # Here, we should call `stdout_redirects.stop_redirect()`, but that fails for reasons discussed + # in the following issue: https://github.com/python/cpython/issues/80374 + # But this *seems* not to pose any problems. jobs[job_id].status = "completed" jobs[job_id].stdout = stdout_stream.getvalue() From db9f8f52ae1e19b9a5f25152e19b8b4d5f6e2bb9 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Tue, 30 Jan 2024 13:52:30 +0100 Subject: [PATCH 36/46] Remove superfluous import --- looper/api/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 94462bd2c..d44bec049 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,4 +1,3 @@ -import io from argparse import Namespace import secrets from typing import Dict, TypeAlias From ad621c6cb4b844b9edf39248ab9933c9b286b695 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Tue, 30 Jan 2024 13:52:49 +0100 Subject: [PATCH 37/46] Remove `progress` field from `Job` model --- looper/api/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index d44bec049..4e9f1aecf 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -24,7 +24,6 @@ class Job(pydantic.BaseModel): default="in_progress", description="The current status of the job. Can be either `in_progress` or `completed`." ) - progress: int = 0 stdout: str | None = pydantic.Field(default=None, description="Standard output produced by `looper` while performing the requested action" ) From 95278b3749cbf62c1a4bc8e5e457da55fe2309b4 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Tue, 30 Jan 2024 17:03:00 +0100 Subject: [PATCH 38/46] Capture subprocess output to `sys.stdout`/`sys.stderr` This allows us to capture the output of the Bash scripts or commands `looper` executes. --- looper/conductor.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/looper/conductor.py b/looper/conductor.py index e83616332..ddb36fd3a 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -4,6 +4,7 @@ import logging import os import subprocess +import sys import time import yaml from copy import copy, deepcopy @@ -387,7 +388,12 @@ def submit(self, force=False): # Capture submission command return value so that we can # intercept and report basic submission failures; #167 try: - subprocess.check_call(submission_command, shell=True) + # Using `subprocess.run()` instead of `subprocess.check()` allows us to capture + # stdout and stderr of the child process, and pass it to the `stdout` / `stderr` + # of `looper`'s Python process. + result = subprocess.run(submission_command, check=True, shell=True, capture_output=True) + print(result.stdout.decode()) + print(result.stderr.decode(), file=sys.stderr) except subprocess.CalledProcessError: fails = ( "" if self.collate else [s.sample_name for s in self._samples] From 6870bcd80e61c6219dbae111d48bba9b702b18c8 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Thu, 1 Feb 2024 10:32:19 +0100 Subject: [PATCH 39/46] Make CLI for HTTP API server Co-authored-by: Zhihan Zhang --- looper/api/main.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 4e9f1aecf..34fd8f03c 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,10 +1,11 @@ -from argparse import Namespace +from argparse import ArgumentParser, Namespace import secrets from typing import Dict, TypeAlias import fastapi from fastapi import FastAPI import pydantic +import uvicorn from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser @@ -91,3 +92,12 @@ async def main_endpoint(top_level_model: TopLevelParser, background_tasks: fasta @app.get("/status/{job_id}") async def get_status(job_id: JobId): return jobs[job_id] + + +if __name__ == "__main__": + parser = ArgumentParser("looper-serve", description="Run looper HTTP API server") + parser.add_argument("--host", type=str, default="0.0.0.0", help="Host IP address to use (127.0.0.1 for local access only)") + parser.add_argument("--port", type=int, default=8000, help="Port the server listens on") + args = parser.parse_args() + + uvicorn.run(app, host=args.host, port=args.port) From 8ead693669d6146dfb6d9c5fec2accdd2a35dfbe Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Thu, 1 Feb 2024 10:44:05 +0100 Subject: [PATCH 40/46] Add entry point console script for HTTP API server --- looper/api/main.py | 19 +++++++++++++++---- setup.py | 1 + 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 34fd8f03c..3e43e70eb 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -10,7 +10,7 @@ from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser -import stdout_redirects +from looper.api import stdout_redirects stdout_redirects.enable_proxy() @@ -94,10 +94,21 @@ async def get_status(job_id: JobId): return jobs[job_id] -if __name__ == "__main__": +def main() -> None: parser = ArgumentParser("looper-serve", description="Run looper HTTP API server") - parser.add_argument("--host", type=str, default="0.0.0.0", help="Host IP address to use (127.0.0.1 for local access only)") - parser.add_argument("--port", type=int, default=8000, help="Port the server listens on") + parser.add_argument( + "--host", + type=str, + default="0.0.0.0", + help="Host IP address to use (127.0.0.1 for local access only)", + ) + parser.add_argument( + "--port", type=int, default=8000, help="Port the server listens on" + ) args = parser.parse_args() uvicorn.run(app, host=args.host, port=args.port) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index c05314fe6..57ead1bad 100644 --- a/setup.py +++ b/setup.py @@ -82,6 +82,7 @@ def get_static(name, condition=None): "looper = looper.__main__:main", "divvy = looper.__main__:divvy_main", "looper-pydantic-argparse = looper.cli_pydantic:main", + "looper-serve = looper.api.main:main" ], }, scripts=scripts, From 8b1b2cad5a2c87571b8488323d991780c894acfe Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Thu, 1 Feb 2024 17:01:58 +0100 Subject: [PATCH 41/46] Replace `stdout` / `stderr` job fields with `job_output` field The module we use to capture console output does not seem to distinguish easily between `stdout` and `stderr`. So we rather use a generic `console_output` field in the job model that subsumes both. --- looper/api/main.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index 3e43e70eb..5120ec14a 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -25,11 +25,8 @@ class Job(pydantic.BaseModel): default="in_progress", description="The current status of the job. Can be either `in_progress` or `completed`." ) - stdout: str | None = pydantic.Field(default=None, - description="Standard output produced by `looper` while performing the requested action" - ) - stderr: str | None = pydantic.Field(default=None, - description="Standard error output produced by `looper` while performing the requested action" + console_output: str | None = pydantic.Field(default=None, + description="Console output produced by `looper` while performing the requested action" ) app = FastAPI(validate_model=True) @@ -38,7 +35,7 @@ class Job(pydantic.BaseModel): def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: argparse_namespace = create_argparse_namespace(top_level_model) - stdout_stream = stdout_redirects.redirect() + output_stream = stdout_redirects.redirect() run_looper(argparse_namespace, None, True) @@ -46,7 +43,7 @@ def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: # in the following issue: https://github.com/python/cpython/issues/80374 # But this *seems* not to pose any problems. jobs[job_id].status = "completed" - jobs[job_id].stdout = stdout_stream.getvalue() + jobs[job_id].console_output = output_stream.getvalue() def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: From 0d6b0162f00b8c51354ba17d39e9469f24632447 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Thu, 1 Feb 2024 17:03:28 +0100 Subject: [PATCH 42/46] Add return type to `/status` endpoint This makes the Swagger documentation show the job schema for that endpoint. --- looper/api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/api/main.py b/looper/api/main.py index 5120ec14a..fc7e1a1ce 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -87,7 +87,7 @@ async def main_endpoint(top_level_model: TopLevelParser, background_tasks: fasta @app.get("/status/{job_id}") -async def get_status(job_id: JobId): +async def get_status(job_id: JobId) -> Job: return jobs[job_id] From 2b4a962a54f6ae779b8eb289deade7c2d9ca9003 Mon Sep 17 00:00:00 2001 From: Zhihan Zhang <32028117+zz1874@users.noreply.github.com> Date: Fri, 2 Feb 2024 00:12:34 +0800 Subject: [PATCH 43/46] Add HTTP API Documentation (#3) * Apply formatter * Add documentation for `POST` and `GET` requests * Update looper/api/main.py Co-authored-by: Simeon Carstens * Update looper/api/main.py Co-authored-by: Simeon Carstens * Add where to access the API documentation --------- Co-authored-by: Simeon Carstens --- looper/api/README.md | 8 ++++++++ looper/api/main.py | 34 ++++++++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/looper/api/README.md b/looper/api/README.md index ffcf951e7..64980ee5a 100644 --- a/looper/api/README.md +++ b/looper/api/README.md @@ -19,3 +19,11 @@ curl -X POST -H "Content-Type: application/json" -d '{"run": {}, "looper_config" with the project files in the same `looper/api` folder. This example sends a JSON payload with the `run` and `looper_config` parameters to the `/` endpoint. + +## API Documentation +The API documentation is automatically generated and can be accessed in your web browser: + +Swagger UI: http://127.0.0.1:8000/docs +ReDoc: http://127.0.0.1:8000/redoc + +Explore the API documentation to understand available endpoints, request parameters, and response formats. diff --git a/looper/api/main.py b/looper/api/main.py index fc7e1a1ce..fb98746cc 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,12 +1,14 @@ from argparse import ArgumentParser, Namespace import secrets +import io from typing import Dict, TypeAlias import fastapi -from fastapi import FastAPI import pydantic import uvicorn +from fastapi import FastAPI + from looper.cli_pydantic import run_looper from looper.command_models.commands import SUPPORTED_COMMANDS, TopLevelParser @@ -16,19 +18,22 @@ JobId: TypeAlias = str + class Job(pydantic.BaseModel): id: JobId = pydantic.Field( default_factory=lambda: secrets.token_urlsafe(4), - description="The unique identifier of the job" + description="The unique identifier of the job", ) status: str = pydantic.Field( default="in_progress", - description="The current status of the job. Can be either `in_progress` or `completed`." + description="The current status of the job. Can be either `in_progress` or `completed`.", ) - console_output: str | None = pydantic.Field(default=None, - description="Console output produced by `looper` while performing the requested action" + console_output: str | None = pydantic.Field( + default=None, + description="Console output produced by `looper` while performing the requested action", ) + app = FastAPI(validate_model=True) jobs: Dict[str, Job] = {} @@ -78,15 +83,28 @@ def create_argparse_namespace(top_level_model: TopLevelParser) -> Namespace: setattr(namespace, argname, command_namespace) return namespace -@app.post("/", status_code=202) -async def main_endpoint(top_level_model: TopLevelParser, background_tasks: fastapi.BackgroundTasks) -> Dict: + +@app.post( + "/", + status_code=202, + summary="Run Looper", + description="Start a `looper` command with arguments specified in " + "`top_level_model` in the background and return a job identifier.", +) +async def main_endpoint( + top_level_model: TopLevelParser, background_tasks: fastapi.BackgroundTasks +) -> Dict: job = Job() jobs[job.id] = job background_tasks.add_task(background_async, top_level_model, job.id) return {"job_id": job.id} -@app.get("/status/{job_id}") +@app.get( + "/status/{job_id}", + summary="Get job status", + description="Retrieve the status of a job based on its unique identifier.", +) async def get_status(job_id: JobId) -> Job: return jobs[job_id] From 6619440572fbd5c36e08fa37a07797d478e3edb5 Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Thu, 1 Feb 2024 17:52:22 +0100 Subject: [PATCH 44/46] Run formatter --- looper/cli_pydantic.py | 7 +++++-- looper/conductor.py | 4 +++- setup.py | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index a7da330f7..c1846f3d4 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -47,7 +47,9 @@ ) -def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_api=False): +def run_looper( + args: Namespace | TopLevelParser, parser: ArgumentParser, http_api=False +): # here comes adapted `cli_looper.py` code global _LOGGER @@ -98,7 +100,8 @@ def run_looper(args: Namespace | TopLevelParser, parser: ArgumentParser, http_ap divcfg = ( select_divvy_config(filepath=subcommand_args.divvy) - if hasattr(subcommand_args, "divvy") else None + if hasattr(subcommand_args, "divvy") + else None ) args = enrich_args_via_cfg(args, parser, False, http_api) diff --git a/looper/conductor.py b/looper/conductor.py index ddb36fd3a..01fe2e102 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -391,7 +391,9 @@ def submit(self, force=False): # Using `subprocess.run()` instead of `subprocess.check()` allows us to capture # stdout and stderr of the child process, and pass it to the `stdout` / `stderr` # of `looper`'s Python process. - result = subprocess.run(submission_command, check=True, shell=True, capture_output=True) + result = subprocess.run( + submission_command, check=True, shell=True, capture_output=True + ) print(result.stdout.decode()) print(result.stderr.decode(), file=sys.stderr) except subprocess.CalledProcessError: diff --git a/setup.py b/setup.py index 57ead1bad..1e3857972 100644 --- a/setup.py +++ b/setup.py @@ -82,7 +82,7 @@ def get_static(name, condition=None): "looper = looper.__main__:main", "divvy = looper.__main__:divvy_main", "looper-pydantic-argparse = looper.cli_pydantic:main", - "looper-serve = looper.api.main:main" + "looper-serve = looper.api.main:main", ], }, scripts=scripts, From b3aa4aa3c20edc8a24cb938c00544c0d7c1d0f4f Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 2 Feb 2024 10:44:40 +0100 Subject: [PATCH 45/46] Make HTTP API code Python 3.8 compatible --- looper/api/main.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/looper/api/main.py b/looper/api/main.py index fb98746cc..963aa1cda 100644 --- a/looper/api/main.py +++ b/looper/api/main.py @@ -1,6 +1,5 @@ from argparse import ArgumentParser, Namespace import secrets -import io from typing import Dict, TypeAlias import fastapi @@ -16,11 +15,9 @@ stdout_redirects.enable_proxy() -JobId: TypeAlias = str - class Job(pydantic.BaseModel): - id: JobId = pydantic.Field( + id: str = pydantic.Field( default_factory=lambda: secrets.token_urlsafe(4), description="The unique identifier of the job", ) @@ -38,7 +35,7 @@ class Job(pydantic.BaseModel): jobs: Dict[str, Job] = {} -def background_async(top_level_model: TopLevelParser, job_id: JobId) -> None: +def background_async(top_level_model: TopLevelParser, job_id: str) -> None: argparse_namespace = create_argparse_namespace(top_level_model) output_stream = stdout_redirects.redirect() @@ -105,7 +102,7 @@ async def main_endpoint( summary="Get job status", description="Retrieve the status of a job based on its unique identifier.", ) -async def get_status(job_id: JobId) -> Job: +async def get_status(job_id: str) -> Job: return jobs[job_id] From d75942cbcfa555c5bb37a4757b129355c9433d7c Mon Sep 17 00:00:00 2001 From: Simeon Carstens Date: Fri, 2 Feb 2024 11:48:56 +0100 Subject: [PATCH 46/46] Update README with more detailed usage instructions --- looper/api/README.md | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/looper/api/README.md b/looper/api/README.md index 64980ee5a..e0cc9359f 100644 --- a/looper/api/README.md +++ b/looper/api/README.md @@ -5,20 +5,25 @@ This API provides an HTTP interface for running the `looper` commands, allowing users to interact with Looper via HTTP requests. ## Usage -### Running the API -To run the API, execute the following command: +### Running the server +Run the app: ```bash -cd looper/api -uvicorn main:app --reload +looper-serve [--host ] [--port ] ``` -### Example API Usage -To run the `looper run` command through the HTTP API, you can use the following curl command: + +> [!NOTE] +This assumes that all files specified in the arguments are available on the file system of the machine that is running the HTTP API server. Best make sure you use absolute file paths in all `looper` YAML configuration files. + +### Sending requests +To test this, you can clone the [`hello_looper`](https://github.com/pepkit/hello_looper) repository and then run (for example) the following in a second terminal: ```bash -curl -X POST -H "Content-Type: application/json" -d '{"run": {}, "looper_config": ".looper.yaml"}' "http://127.0.0.1:8000" +curl -X POST -H "Content-Type: application/json" -d '{"run": {"time_delay": 5}, "looper_config": "/path/to/hello_looper/.looper.yaml"}' "http://127.0.0.1:8000" ``` -with the project files in the same `looper/api` folder. - -This example sends a JSON payload with the `run` and `looper_config` parameters to the `/` endpoint. +This will return a six-letter job ID, say `abc123`. Then get the result / output of the run with +```bash +curl -X GET -v localhost:8000/status/abc123 +``` +For better visualization / readability, you can post-process the output by piping it to `jq` (` | jq -r .console_output`). ## API Documentation The API documentation is automatically generated and can be accessed in your web browser: