diff --git a/.github/workflows/test_client_macos_nightlies.yml b/.github/workflows/test_client_macos_nightlies.yml index 7a4e723b..b046ef0c 100644 --- a/.github/workflows/test_client_macos_nightlies.yml +++ b/.github/workflows/test_client_macos_nightlies.yml @@ -37,7 +37,7 @@ jobs: run: >- python -m pytest -x -m object_retrieval -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache object_removal: runs-on: ubuntu-latest timeout-minutes: 30 @@ -62,7 +62,7 @@ jobs: run: >- python -m pytest -x -m object_removal -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache dispatch_tests: runs-on: ubuntu-latest timeout-minutes: 30 @@ -86,7 +86,7 @@ jobs: run: >- python -m pytest -x -m dispatch -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache run_tests_online: runs-on: ubuntu-latest timeout-minutes: 30 @@ -110,8 +110,8 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m online -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -m "online and not api" -c /dev/null -p no:warnings + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache run_tests_offline: runs-on: ubuntu-latest timeout-minutes: 30 @@ -135,7 +135,7 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m offline -c /dev/null -p no:warnings + -m "offline and not api" -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache config_tests: runs-on: ubuntu-latest @@ -160,7 +160,7 @@ jobs: run: >- python -m pytest -x -m config -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache executor_tests: runs-on: ubuntu-latest timeout-minutes: 30 @@ -232,4 +232,4 @@ jobs: run: >- python -m pytest -x -m local -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache diff --git a/.github/workflows/test_client_ubuntu.yml b/.github/workflows/test_client_ubuntu.yml index 23251bc7..bc8911e2 100644 --- a/.github/workflows/test_client_ubuntu.yml +++ b/.github/workflows/test_client_ubuntu.yml @@ -42,7 +42,7 @@ jobs: run: >- python -m pytest -x -m object_retrieval -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache object_removal: runs-on: ubuntu-latest timeout-minutes: 30 @@ -67,7 +67,7 @@ jobs: run: >- python -m pytest -x -m object_removal -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache dispatch_tests: runs-on: ubuntu-latest timeout-minutes: 30 @@ -91,7 +91,7 @@ jobs: run: >- python -m pytest -x -m dispatch -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache run_tests_online: runs-on: ubuntu-latest timeout-minutes: 30 @@ -115,8 +115,8 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m online -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -m "online and not api" -c /dev/null -p no:warnings + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache run_tests_offline: runs-on: ubuntu-latest timeout-minutes: 30 @@ -140,7 +140,7 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m offline -c /dev/null -p no:warnings + -m "offline and not api" -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache config_tests: runs-on: ubuntu-latest @@ -165,7 +165,7 @@ jobs: run: >- python -m pytest -x -m config -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache executor_tests: runs-on: ubuntu-latest timeout-minutes: 30 @@ -237,4 +237,4 @@ jobs: run: >- python -m pytest -x -m local -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache diff --git a/.github/workflows/test_client_ubuntu_nightlies.yml b/.github/workflows/test_client_ubuntu_nightlies.yml index 724c2ef7..e868da63 100644 --- a/.github/workflows/test_client_ubuntu_nightlies.yml +++ b/.github/workflows/test_client_ubuntu_nightlies.yml @@ -40,7 +40,7 @@ jobs: run: >- python -m pytest -x -m object_retrieval -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache object_removal: runs-on: ubuntu-latest timeout-minutes: 30 @@ -65,7 +65,7 @@ jobs: run: >- python -m pytest -x -m object_removal -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache dispatch_tests: runs-on: ubuntu-latest timeout-minutes: 30 @@ -89,8 +89,8 @@ jobs: run: >- python -m pytest -x -m dispatch -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache - run_tests_online: + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + online_online: runs-on: ubuntu-latest timeout-minutes: 30 steps: @@ -113,9 +113,9 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m online -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache - run_tests_offline: + -m "online and not api" -c /dev/null -p no:warnings + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + offline_tests: runs-on: ubuntu-latest timeout-minutes: 30 steps: @@ -138,7 +138,7 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m offline -c /dev/null -p no:warnings + -m "offline and not api" -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache config_tests: runs-on: ubuntu-latest @@ -163,7 +163,7 @@ jobs: run: >- python -m pytest -x -m config -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache executor_tests: runs-on: ubuntu-latest timeout-minutes: 30 @@ -235,4 +235,4 @@ jobs: run: >- python -m pytest -x -m local -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache diff --git a/.github/workflows/test_client_windows_nightlies.yml b/.github/workflows/test_client_windows_nightlies.yml index d852dd39..4b863b56 100644 --- a/.github/workflows/test_client_windows_nightlies.yml +++ b/.github/workflows/test_client_windows_nightlies.yml @@ -36,8 +36,8 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m object_retrieval -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -m 'object_retrieval and not unix' -c /dev/null -p no:warnings + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache shell: pwsh object_removal: runs-on: windows-latest @@ -63,7 +63,7 @@ jobs: run: >- python -m pytest -x -m 'object_removal and not unix' -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache shell: pwsh dispatch_tests: runs-on: windows-latest @@ -88,7 +88,7 @@ jobs: run: >- python -m pytest -x -m 'dispatch and not unix' -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache shell: pwsh run_tests_online: runs-on: windows-latest @@ -113,8 +113,8 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m 'online and not unix' -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -m run -m 'online and not unix and not api' -c /dev/null -p no:warnings + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache shell: pwsh run_tests_offline: runs-on: windows-latest @@ -139,7 +139,7 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m 'offline and not unix' -c /dev/null -p no:warnings + -m run -m 'offline and not unix and not api' -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache shell: pwsh config_tests: @@ -165,7 +165,7 @@ jobs: run: >- python -m pytest -x -m 'config and not unix' -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache shell: pwsh executor_tests: runs-on: windows-latest @@ -240,5 +240,5 @@ jobs: run: >- python -m pytest -x -m 'local and not unix' -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache shell: pwsh diff --git a/.github/workflows/test_multiple_python.yml b/.github/workflows/test_multiple_python.yml index 7929acb7..7a44f92a 100644 --- a/.github/workflows/test_multiple_python.yml +++ b/.github/workflows/test_multiple_python.yml @@ -45,7 +45,7 @@ jobs: run: >- python -m pytest -x -m object_retrieval -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache object_removal: runs-on: ubuntu-latest name: Object Removal Tests @@ -75,7 +75,7 @@ jobs: run: >- python -m pytest -x -m object_removal -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache dispatch_tests: runs-on: ubuntu-latest timeout-minutes: 30 @@ -104,7 +104,7 @@ jobs: run: >- python -m pytest -x -m dispatch -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache run_tests_online: name: Run Tests Online runs-on: ubuntu-latest @@ -133,8 +133,8 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m online -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -m "online and not api" -c /dev/null -p no:warnings + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache run_tests_offline: runs-on: ubuntu-latest name: Run Tests Offline @@ -163,7 +163,7 @@ jobs: SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- python -m pytest -x - -m run -m offline -c /dev/null -p no:warnings + -m "offline and not api" -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache config_tests: runs-on: ubuntu-latest @@ -193,7 +193,7 @@ jobs: run: >- python -m pytest -x -m config -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache executor_tests: runs-on: ubuntu-latest name: Executor Tests @@ -251,7 +251,7 @@ jobs: run: >- python -m pytest -x -m api -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache local_tests: runs-on: ubuntu-latest name: Local Tests @@ -280,4 +280,4 @@ jobs: run: >- python -m pytest -x -m local -c /dev/null -p no:warnings - -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + -n 8 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache diff --git a/.gitignore b/.gitignore index ffa94acc..533bab88 100644 --- a/.gitignore +++ b/.gitignore @@ -149,3 +149,6 @@ offline/ Vagrantfile .sourcery* + +# Modules +!simvue/offline/ diff --git a/CHANGELOG.md b/CHANGELOG.md index c3fb2c6e..3f0ba6ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Change Log +## Unreleased + +- Refactored sender functionality introducing new `Sender` class. + ## [v2.2.2](https://github.com/simvue-io/client/releases/tag/v2.2.2) - 2025-10-14 - Enforced use of UTC for all datetime recording. diff --git a/pyproject.toml b/pyproject.toml index bb76d034..92a2df07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "simvue" -version = "2.2.2" +version = "2.3.0" description = "Simulation tracking and monitoring" authors = [ {name = "Simvue Development Team", email = "info@simvue.io"} diff --git a/simvue/api/objects/administrator/tenant.py b/simvue/api/objects/administrator/tenant.py index 88dd3a14..ffb71918 100644 --- a/simvue/api/objects/administrator/tenant.py +++ b/simvue/api/objects/administrator/tenant.py @@ -30,6 +30,7 @@ def new( max_runs: int = 0, max_data_volume: int = 0, offline: bool = False, + **_, ) -> Self: """Create a new tenant on the Simvue server. diff --git a/simvue/api/objects/base.py b/simvue/api/objects/base.py index 679b473f..5905e46c 100644 --- a/simvue/api/objects/base.py +++ b/simvue/api/objects/base.py @@ -22,7 +22,6 @@ from simvue.utilities import staging_merger from simvue.config.user import SimvueConfiguration from simvue.exception import ObjectNotFoundError -from simvue.version import __version__ from simvue.api.request import ( get as sv_get, get_paginated, @@ -232,13 +231,7 @@ def __init__( ) self._headers: dict[str, str] = ( - { - "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}", - "User-Agent": _user_agent or f"Simvue Python client {__version__}", - "Accept-Encoding": "gzip", - } - if not self._offline - else {} + self._user_config.headers if not self._offline else {} ) self._params: dict[str, str] = {} diff --git a/simvue/bin/__init__.py b/simvue/bin/__init__.py index e69de29b..dcca5b13 100644 --- a/simvue/bin/__init__.py +++ b/simvue/bin/__init__.py @@ -0,0 +1,3 @@ +from .sender import run as run_sender + +__all__ = ["run_sender"] diff --git a/simvue/bin/sender.py b/simvue/bin/sender.py index 649b9d6f..330e2b20 100644 --- a/simvue/bin/sender.py +++ b/simvue/bin/sender.py @@ -1,44 +1,71 @@ -"""Send runs to server""" +"""Send locally cached data to server.""" import logging +import pathlib +import click + +from simvue.sender import Sender, UPLOAD_ORDER, UploadItem -from simvue.sender import sender, UPLOAD_ORDER -import argparse _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) -def run() -> None: - parser = argparse.ArgumentParser(description="My script description") - parser.add_argument( - "-w", - "--max-workers", - type=int, - required=False, - default=5, - help="The maximum number of worker threads to use in parallel, by default 5", - ) - parser.add_argument( - "-n", - "--threading-threshold", - type=int, - required=False, - default=10, - help="The number of objects of a given type above which items will be sent to the server in parallel, by default 10", - ) - parser.add_argument( - "-o", - "--objects-to-upload", - type=str, - nargs="+", - required=False, - default=UPLOAD_ORDER, - help="The object types to upload, by default All", - ) - args = parser.parse_args() +@click.command("simvue-sender") +@click.option( + "--max-workers", + "-w", + type=int, + default=5, + required=False, + help="The maximum number of worker threads to use in parallel, by default 5", +) +@click.option( + "-n", + "--threading-threshold", + type=int, + required=False, + default=10, + help="The number of objects of a given type above which items will be sent to the server in parallel, by default 10", +) +@click.option( + "-o", + "--objects-to-upload", + type=str, + multiple=True, + required=False, + default=UPLOAD_ORDER, + help="The object types to upload, by default All", +) +@click.option( + "-i", + "--cache-directory", + type=click.Path( + file_okay=False, + dir_okay=True, + exists=True, + writable=True, + path_type=pathlib.Path, + ), + help="Location of cache directory to use", + default=None, + required=False, +) +def run( + cache_directory: pathlib.Path | None, + objects_to_upload: list[UploadItem] | None, + threading_threshold: int, + max_workers: int, +) -> None: try: _logger.info("Starting Simvue Sender") - sender(**vars(args)) + _sender = Sender( + cache_directory=cache_directory, + max_workers=max_workers, + threading_threshold=threading_threshold, + throw_exceptions=True, + ) + _sender.upload(objects_to_upload) except Exception as err: _logger.critical("Exception running sender: %s", str(err)) + raise click.Abort diff --git a/simvue/client.py b/simvue/client.py index 2130db38..67928525 100644 --- a/simvue/client.py +++ b/simvue/client.py @@ -92,8 +92,7 @@ def __init__( if not value: logger.warning(f"No {label} specified") - self._headers: dict[str, str] = { - "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}", + self._headers: dict[str, str] = self._user_config.headers | { "Accept-Encoding": "gzip", } diff --git a/simvue/config/parameters.py b/simvue/config/parameters.py index be54de9e..9647ce93 100644 --- a/simvue/config/parameters.py +++ b/simvue/config/parameters.py @@ -22,6 +22,9 @@ class ServerSpecifications(pydantic.BaseModel): + model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( + extra="forbid", strict=True + ) url: pydantic.AnyHttpUrl | None token: pydantic.SecretStr | None diff --git a/simvue/config/user.py b/simvue/config/user.py index 1d093752..46858f9e 100644 --- a/simvue/config/user.py +++ b/simvue/config/user.py @@ -253,3 +253,13 @@ def config_file(cls) -> pathlib.Path: raise FileNotFoundError("Failed to find Simvue configuration file") return _config_file + + @property + def headers(self) -> dict[str, str]: + if not self.server.token: + raise ValueError("Cannot generate headers, no token provided.") + return { + "Authorization": f"Bearer {self.server.token.get_secret_value()}", + "User-Agent": f"Simvue Python client {__version__}", + "Accept-Encoding": "gzip", + } diff --git a/simvue/models.py b/simvue/models.py index 49c4ac19..7dec244d 100644 --- a/simvue/models.py +++ b/simvue/models.py @@ -9,6 +9,7 @@ NAME_REGEX: str = r"^[a-zA-Z0-9\-\_\s\/\.:]+$" METRIC_KEY_REGEX: str = r"^[a-zA-Z0-9\-\_\s\/\.:=><+\(\)]+$" DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%S.%f" +OBJECT_ID: str = r"^[A-Za-z0-9]{22}$" MetadataKeyString = typing.Annotated[ str, pydantic.StringConstraints(pattern=r"^[\w\-\s\.]+$") @@ -17,6 +18,7 @@ MetricKeyString = typing.Annotated[ str, pydantic.StringConstraints(pattern=METRIC_KEY_REGEX) ] +ObjectID = typing.Annotated[str, pydantic.StringConstraints(pattern=OBJECT_ID)] def validate_timestamp(timestamp: str, raise_except: bool = True) -> bool: diff --git a/simvue/run.py b/simvue/run.py index 1cdf994c..7e3e96fb 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -202,12 +202,7 @@ def __init__( else self._user_config.metrics.system_metrics_interval ) self._headers: dict[str, str] = ( - { - "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}", - "Accept-Encoding": "gzip", - } - if mode != "offline" - else {} + self._user_config.headers if mode != "offline" else {} ) self._sv_obj: RunObject | None = None self._pid: int | None = 0 diff --git a/simvue/sender.py b/simvue/sender.py deleted file mode 100644 index 64b8801d..00000000 --- a/simvue/sender.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -Simvue Sender -============== - -Function to send data cached by Simvue in Offline mode to the server. -""" - -import json -import pydantic -import logging -from concurrent.futures import ThreadPoolExecutor -import threading -import requests -import psutil -from simvue.config.user import SimvueConfiguration -import simvue.api.objects -from simvue.api.objects.artifact.base import ArtifactBase -from simvue.eco.emissions_monitor import CO2Monitor -from simvue.version import __version__ - -UPLOAD_ORDER: list[str] = [ - "tenants", - "users", - "storage", - "folders", - "tags", - "alerts", - "runs", - "grids", - "artifacts", - "metrics", - "grid_metrics", - "events", -] - -_logger = logging.getLogger(__name__) - - -def _log_upload_failed(file_path: pydantic.FilePath) -> None: - """Record that an object failed to upload in the object offline cache file. - - Parameters - ---------- - file_path : pydantic.FilePath - The path to the offline cache file for the object - """ - with file_path.open("r") as file: - _data = json.load(file) - _data["upload_failed"] = True - with file_path.open("w") as file: - json.dump(_data, file) - - -def upload_cached_file( - cache_dir: pydantic.DirectoryPath, - obj_type: str, - file_path: pydantic.FilePath, - id_mapping: dict[str, str], - throw_exceptions: bool, - retry_failed_uploads: bool, - lock: threading.Lock, -) -> None: - """Upload data stored in a cached file to the Simvue server. - - Parameters - ---------- - cache_dir : pydantic.DirectoryPath - The directory where cached files are stored - obj_type : str - The type of object which should be created for this cached file - file_path : pydantic.FilePath - The path to the cached file to upload - id_mapping : dict[str, str] - A mapping of offline to online object IDs - throw_exceptions : bool - Whether to throw exceptions, or just log them - retry_failed_uploads : bool - Whether to retry failed uploads or ignore them - lock : threading.Lock - A lock to prevent multiple threads accessing the id mapping directory at once - """ - _current_id = file_path.name.split(".")[0] - _data = json.load(file_path.open()) - _exact_type: str = _data.pop("obj_type") - - if _data.pop("upload_failed", False) and not retry_failed_uploads: - return - - try: - _instance_class = getattr(simvue.api.objects, _exact_type) - except AttributeError as error: - if throw_exceptions: - raise error - - _logger.error(f"Attempt to initialise unknown type '{_exact_type}'") - _log_upload_failed(file_path) - return - - # If it is an ObjectArtifact, need to load the object as bytes from a different file - if issubclass(_instance_class, simvue.api.objects.ObjectArtifact): - with open(file_path.parent.joinpath(f"{_current_id}.object"), "rb") as file: - _data["serialized"] = file.read() - try: - # We want to reconnect if there is an online ID stored for this file - if _online_id := id_mapping.get(_current_id): - obj_for_upload = _instance_class( - identifier=_online_id, _read_only=False, **_data - ) - else: - obj_for_upload = _instance_class.new(**_data) - - with lock: - obj_for_upload.on_reconnect(id_mapping) - - if not issubclass(_instance_class, ArtifactBase): - obj_for_upload.commit() - _new_id = obj_for_upload.id - - except Exception as error: - if "status 409" in str(error): - return - if throw_exceptions: - raise error - - _logger.error( - f"Error while committing '{_instance_class.__name__}': {str(error)}" - ) - _log_upload_failed(file_path) - return - if not _new_id: - _logger.error(f"Object of type '{_instance_class.__name__}' has no identifier") - _log_upload_failed(file_path) - return - - _logger.info( - f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {_instance_class.__name__} '{_new_id}'" - ) - - file_path.unlink(missing_ok=True) - if issubclass(_instance_class, simvue.api.objects.ObjectArtifact): - file_path.parent.joinpath(f"{_current_id}.object").unlink() - - with lock: - id_mapping[_current_id] = _new_id - - if obj_type in {"alerts", "runs", "folders", "tags"}: - cache_dir.joinpath("server_ids", f"{_current_id}.txt").write_text(_new_id) - - if ( - obj_type == "runs" - and cache_dir.joinpath(f"{obj_type}", f"{_current_id}.closed").exists() - ): - # Get alerts and folder created by this run - their IDs can be deleted - for id in _data.get("alerts", []): - cache_dir.joinpath("server_ids", f"{id}.txt").unlink() - if _folder_id := _data.get("folder_id"): - cache_dir.joinpath("server_ids", f"{_folder_id}.txt").unlink() - - cache_dir.joinpath("server_ids", f"{_current_id}.txt").unlink() - cache_dir.joinpath(f"{obj_type}", f"{_current_id}.closed").unlink() - _logger.info(f"Run {_current_id} closed - deleting cached copies...") - - -def send_heartbeat( - file_path: pydantic.FilePath, - id_mapping: dict[str, str], - server_url: str, - headers: dict[str, str], -): - _offline_id = file_path.name.split(".")[0] - _online_id = id_mapping.get(_offline_id) - if not _online_id: - # Run has been closed - can just remove heartbeat and continue - file_path.unlink() - return - _logger.info(f"Sending heartbeat to run {_online_id}") - _response = requests.put( - f"{server_url}/runs/{_online_id}/heartbeat", - headers=headers, - ) - if _response.status_code == 200: - file_path.unlink() - else: - _logger.warning( - f"Attempting to send heartbeat to run {_online_id} returned status code {_response.status_code}." - ) - - -@pydantic.validate_call -def sender( - cache_dir: pydantic.DirectoryPath | None = None, - max_workers: int = 5, - threading_threshold: int = 10, - objects_to_upload: list[str] = UPLOAD_ORDER, - throw_exceptions: bool = False, - retry_failed_uploads: bool = False, -) -> dict[str, str]: - """Send data from a local cache directory to the Simvue server. - - Parameters - ---------- - cache_dir : pydantic.DirectoryPath - The directory where cached files are stored - max_workers : int - The maximum number of threads to use - threading_threshold : int - The number of cached files above which threading will be used - objects_to_upload : list[str] - Types of objects to upload, by default uploads all types of objects present in cache - throw_exceptions : bool, optional - Whether to throw exceptions as they are encountered in the sender, default is False (exceptions will be logged) - retry_failed_uploads : bool, optional - Whether to retry sending objects which previously failed, by default False - - Returns - ------- - id_mapping - mapping of local ID to server ID - """ - _user_config: SimvueConfiguration = SimvueConfiguration.fetch(mode="online") - cache_dir = cache_dir or _user_config.offline.cache - - cache_dir.joinpath("server_ids").mkdir(parents=True, exist_ok=True) - _lock_path = cache_dir.joinpath("sender.lock") - - # Check that no other sender is already currently running... - if _lock_path.exists() and psutil.pid_exists(int(_lock_path.read_text())): - raise RuntimeError("A sender is already running for this cache!") - - # Create lock file to prevent other senders running while this one isn't finished - _lock_path.write_text(str(psutil.Process().pid)) - - _id_mapping: dict[str, str] = { - file_path.name.split(".")[0]: file_path.read_text() - for file_path in cache_dir.glob("server_ids/*.txt") - } - _lock = threading.Lock() - _upload_order = [item for item in UPLOAD_ORDER if item in objects_to_upload] - # Glob all files to look in at the start, to prevent extra files being written while other types are being uploaded - _all_offline_files = { - obj_type: list(cache_dir.glob(f"{obj_type}/*.json")) - for obj_type in _upload_order - } - - for _obj_type in _upload_order: - _offline_files = _all_offline_files[_obj_type] - if len(_offline_files) < threading_threshold: - for file_path in _offline_files: - upload_cached_file( - cache_dir=cache_dir, - obj_type=_obj_type, - file_path=file_path, - id_mapping=_id_mapping, - throw_exceptions=throw_exceptions, - retry_failed_uploads=retry_failed_uploads, - lock=_lock, - ) - else: - with ThreadPoolExecutor( - max_workers=max_workers, thread_name_prefix="sender_session_upload" - ) as executor: - _results = executor.map( - lambda file_path: upload_cached_file( - cache_dir=cache_dir, - obj_type=_obj_type, - file_path=file_path, - id_mapping=_id_mapping, - throw_exceptions=throw_exceptions, - retry_failed_uploads=retry_failed_uploads, - lock=_lock, - ), - _offline_files, - ) - # This will raise any exceptions encountered during sending - for result in _results: - pass - - # Send heartbeats - _headers: dict[str, str] = { - "Authorization": f"Bearer {_user_config.server.token.get_secret_value()}", - "User-Agent": f"Simvue Python client {__version__}", - } - _heartbeat_files = list(cache_dir.glob("runs/*.heartbeat")) - if len(_heartbeat_files) < threading_threshold: - for _heartbeat_file in _heartbeat_files: - ( - send_heartbeat( - file_path=_heartbeat_file, - id_mapping=_id_mapping, - server_url=_user_config.server.url, - headers=_headers, - ), - ) - else: - with ThreadPoolExecutor( - max_workers=max_workers, thread_name_prefix="sender_heartbeat" - ) as executor: - _results = executor.map( - lambda _heartbeat_file: send_heartbeat( - file_path=_heartbeat_file, - id_mapping=_id_mapping, - server_url=_user_config.server.url, - headers=_headers, - ), - _heartbeat_files, - ) - - # If CO2 emissions are requested create a dummy monitor which just - # refreshes the CO2 intensity value if required. No emission metrics - # will be taken by the sender itself, values are assumed to be recorded - # by any offline runs being sent. - if _user_config.metrics.enable_emission_metrics: - CO2Monitor( - thermal_design_power_per_gpu=None, - thermal_design_power_per_cpu=None, - local_data_directory=cache_dir, - intensity_refresh_interval=_user_config.eco.intensity_refresh_interval, - co2_intensity=_user_config.eco.co2_intensity, - co2_signal_api_token=_user_config.eco.co2_signal_api_token, - ).check_refresh() - - # Remove lock file to allow another sender to start in the future - _lock_path.unlink() - return _id_mapping diff --git a/simvue/sender/__init__.py b/simvue/sender/__init__.py new file mode 100644 index 00000000..eb89cd52 --- /dev/null +++ b/simvue/sender/__init__.py @@ -0,0 +1,5 @@ +"""Simvue sender for sending locally cached data to the server.""" + +from .base import Sender, UPLOAD_ORDER, UploadItem + +__all__ = ["Sender", "UPLOAD_ORDER", "UploadItem"] diff --git a/simvue/sender/actions.py b/simvue/sender/actions.py new file mode 100644 index 00000000..ea679710 --- /dev/null +++ b/simvue/sender/actions.py @@ -0,0 +1,1106 @@ +"""Upload actions for cached files.""" + +import abc +from collections.abc import Generator +from concurrent.futures import ThreadPoolExecutor +import http +import json +import logging +import pathlib +import threading +import typing + +import requests + +from simvue.api.objects import ( + Alert, + Artifact, + Events, + EventsAlert, + FileArtifact, + FileStorage, + Folder, + Grid, + GridMetrics, + Metrics, + MetricsRangeAlert, + MetricsThresholdAlert, + ObjectArtifact, + Run, + S3Storage, + Storage, + Tag, + Tenant, + User, + UserAlert, +) +from simvue.api.objects.alert.fetch import AlertType +from simvue.api.objects.artifact.base import ArtifactBase +from simvue.api.objects.base import SimvueObject +from simvue.api.request import put as sv_put, get_json_from_response +from simvue.models import ObjectID +from simvue.config.user import SimvueConfiguration +from simvue.eco import CO2Monitor +from simvue.run import Run as SimvueRun + +try: + from typing import override +except ImportError: + from typing_extensions import override # noqa: UP035 + + +class UploadAction: + """Defines the tasks to execute during upload.""" + + object_type: str = "" + logger: logging.Logger = logging.getLogger(__name__) + singular_object: bool = True + + @classmethod + def json_file(cls, cache_directory: pathlib.Path, offline_id: str) -> pathlib.Path: + """Returns the local cache JSON file for an upload. + + Parameters + ---------- + cache_directory : pathlib.Path + the cache directory to search + offline_id : str + the offline identifier for the upload + + Returns + ------- + pathlib.Path + path of local JSON file + """ + return cache_directory.joinpath(f"{cls.object_type}", f"{offline_id}.json") + + @classmethod + def _log_upload_failed( + cls, cache_directory: pathlib.Path, offline_id: str, data: dict[str, typing.Any] + ) -> None: + """Log a failing upload to the local cache.""" + data["upload_failed"] = True + with cls.json_file(cache_directory, offline_id).open("w") as out_f: + json.dump(data, out_f, indent=2) + + @classmethod + def count(cls, cache_directory: pathlib.Path) -> int: + """Return number of objects to upload of this type. + + Parameters + ---------- + cache_directory : pathlib.Path + the local cache directory to read from. + + Returns + ------- + int + the number of objects of this type pending upload. + """ + return len(list(cls.uploadable_objects(cache_directory))) + + @classmethod + def pre_tasks( + cls, offline_id: str, data: dict[str, typing.Any], cache_directory: pathlib.Path + ) -> None: + """Pre-upload actions. + + For this object type no pre-actions are performed. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + _ = offline_id + _ = data + _ = cache_directory + pass + + @classmethod + def post_tasks( + cls, + offline_id: str, + online_id: ObjectID | None, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """Post-upload actions. + + Removes local JSON data on successful upload. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + _ = data + _ = online_id + cls.json_file(cache_directory, offline_id).unlink(missing_ok=True) + + @classmethod + @abc.abstractmethod + def initialise_object( + cls, online_id: ObjectID | None, **data + ) -> SimvueObject | None: + """Initialise an instance of an object.""" + _ = online_id + _ = data + + @classmethod + def uploadable_objects(cls, cache_directory: pathlib.Path) -> Generator[str]: + """Iterate through uploadables. + + Returns the offline identifiers.f objects awaiting upload for this type. + + + Parameters + ---------- + cache_directory : pathlib.Path + the local cache directory to read from. + + Yields + ------ + str + offline identifier + """ + for file in cache_directory.glob(f"{cls.object_type}/*.json"): + yield file.stem + + @classmethod + def _single_item_upload( + cls, + identifier: str, + id_mapping: dict[str, str], + cache_directory: pathlib.Path, + thread_lock: threading.Lock, + simvue_monitor_run: SimvueRun, + *, + throw_exceptions: bool = False, + retry_failed: bool = False, + upload_status: dict[str, str | float] | None, + ) -> None: + """Upload a single item of this object type.""" + _label: str = cls.object_type[:-1] if cls.singular_object else cls.object_type + simvue_monitor_run.log_event(f"Uploading {_label} '{identifier}'") + _json_file = cache_directory.joinpath(f"{cls.object_type}/{identifier}.json") + + with _json_file.open() as in_f: + _data = json.load(in_f) + + if _data.pop("upload_failed", False) and not retry_failed: + return + + try: + cls.pre_tasks( + offline_id=identifier, data=_data, cache_directory=cache_directory + ) + + _object = cls.initialise_object( + online_id=id_mapping.get(identifier), **_data + ) + + if not _object: + _out_msg: str = f"No initialiser defined for type '{cls.__name__}'" + raise RuntimeError(_out_msg) + + with thread_lock: + _object.on_reconnect(id_mapping) + + if not isinstance(_object, ArtifactBase): + _object.commit() + + _object.read_only(True) + + except Exception as err: + if throw_exceptions: + raise err + _exception_msg: str = ( + f"Error while committing {_label} '{identifier}': {err}" + ) + simvue_monitor_run.log_event(_exception_msg) + simvue_monitor_run.log_alert( + name="sender_object_upload_failure", state="critical" + ) + cls.logger.error(_exception_msg) + cls._log_upload_failed(cache_directory, identifier, _data) + return + + if cls.singular_object: + if not _object.id: + cls.logger.error( + "No identifier retrieved for %s '%s'", + _label, + identifier, + ) + cls._log_upload_failed(cache_directory, identifier, _data) + return + + cls.logger.info( + "%s %s '%s'", + "Updated" if id_mapping.get(identifier) else "Created", + _label, + _object.id, + ) + + with thread_lock: + id_mapping[identifier] = _object.id + else: + cls.logger.info( + "%s %s", "Updated" if id_mapping.get(identifier) else "Created", _label + ) + + if upload_status is not None: + with thread_lock: + upload_status.setdefault(cls.object_type, 0) + upload_status[cls.object_type] += 1 + simvue_monitor_run.log_metrics( + {f"uploads.{cls.object_type}": upload_status[cls.object_type]} + ) + + cls.post_tasks( + offline_id=identifier, + online_id=_object.id if cls.singular_object else None, + data=_data, + cache_directory=cache_directory, + ) + + @classmethod + def upload( + cls, + id_mapping: dict[str, str], + cache_directory: pathlib.Path, + thread_lock: threading.Lock, + threading_threshold: int, + max_thread_workers: int, + simvue_monitor_run: SimvueRun, + *, + throw_exceptions: bool = False, + retry_failed: bool = False, + upload_status: dict[str, int | float] | None = None, + ) -> None: + """Run upload of all objects of this type. + + Parameters + ---------- + id_mapping : dict[str, str] + the offline-online mapping to update after upload. + cache_directory : pathlib.Path + the local cache directory to read from. + thread_lock : threading.Lock + the thread lock to use when uploading via multithreading + to ensure mappings are modified correctly. + threading_threshold: int + the number of cached files above which threading will be used. + max_thread_workers : int + the maximum number of threads to use. + throw_exceptions : bool, optional + whether to throw exceptions and terminate, default False. + retry_failed : bool, optional + whether to retry failed uploads, default True. + upload_status : dict[str, int | float] | None, optional + a mapping which will be updated with upload status, default None. + """ + _iterable = cls.uploadable_objects(cache_directory) + if cls.count(cache_directory) < threading_threshold: + for identifier in _iterable: + cls._single_item_upload( + identifier=identifier, + cache_directory=cache_directory, + thread_lock=thread_lock, + throw_exceptions=throw_exceptions, + retry_failed=retry_failed, + id_mapping=id_mapping, + simvue_monitor_run=simvue_monitor_run, + upload_status=upload_status, + ) + else: + with ThreadPoolExecutor( + max_workers=max_thread_workers, + thread_name_prefix="sender_session_upload", + ) as executor: + _results = executor.map( + lambda identifier: cls._single_item_upload( + identifier=identifier, + cache_directory=cache_directory, + thread_lock=thread_lock, + throw_exceptions=throw_exceptions, + retry_failed=retry_failed, + id_mapping=id_mapping, + simvue_monitor_run=simvue_monitor_run, + upload_status=upload_status, + ), + _iterable, + ) + # This will raise any exceptions encountered during sending + for _ in _results: + pass + + +class ArtifactUploadAction(UploadAction): + object_type: str = "artifacts" + + @override + @classmethod + def pre_tasks( + cls, + offline_id: str, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """Pre-upload actions. + + For object-based artifacts the local data is serialized in + preparation for the upload. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + if data["obj_type"] != "ObjectArtifact": + return + with cache_directory.joinpath(cls.object_type, f"{offline_id}.object").open( + "rb" + ) as in_f: + data["serialized"] = in_f.read() + + @override + @classmethod + def post_tasks( + cls, + offline_id: str, + online_id: ObjectID | None, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """Post-upload actions. + + Removes local JSON data on successful upload, if the artifact + is object-based the locally serialized data is removed. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + _ = online_id + super().post_tasks( + offline_id=offline_id, + online_id=online_id, + data=data, + cache_directory=cache_directory, + ) + if data["obj_type"] != "ObjectArtifact": + return + cache_directory.joinpath(cls.object_type, f"{offline_id}.object").unlink() + + @override + @classmethod + def initialise_object( + cls, online_id: ObjectID | None, **data + ) -> FileArtifact | ObjectArtifact: + """Initialise/update an Artifact object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.FileArtifact | simvue.api.objects.ObjectArtifact + a local representation of the server object. + """ + if not online_id: + if data.get("file_path"): + return FileArtifact.new(**data) + + return ObjectArtifact.new(**data) + + _sv_obj = Artifact(identifier=online_id, _read_only=False, **data) + return _sv_obj + + +class RunUploadAction(UploadAction): + object_type: str = "runs" + + @override + @classmethod + def initialise_object(cls, online_id: ObjectID | None, **data) -> Run: + """Initialise/update a Run object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.Run + a local representation of the server object. + """ + if not online_id: + return Run.new(**data) + + return Run(identifier=online_id, _read_only=False, **data) + + @override + @classmethod + def post_tasks( + cls, + offline_id: str, + online_id: str, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """Post-upload actions. + + Removes local JSON data on successful upload, also handles removal + of additional files defining related identifiers. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + super().post_tasks( + offline_id=offline_id, + online_id=online_id, + data=data, + cache_directory=cache_directory, + ) + + _ = cache_directory.joinpath("server_ids", f"{offline_id}.txt").write_text( + online_id + ) + + if not cache_directory.joinpath( + cls.object_type, f"{offline_id}.closed" + ).exists(): + return + + _alerts_list: list[str] = typing.cast("list[str]", data.get("alerts", [])) + + for _id in _alerts_list: + cache_directory.joinpath("server_ids", f"{_id}.txt").unlink() + + if _folder_id := data.get("folder_id"): + cache_directory.joinpath("server_ids", f"{_folder_id}.txt").unlink() + + cache_directory.joinpath("server_ids", f"{offline_id}.txt").unlink() + cache_directory.joinpath(cls.object_type, f"{offline_id}.closed").unlink() + cls.logger.info("Run '%s' closed - deleting cached copies...", offline_id) + + +class FolderUploadAction(UploadAction): + object_type: str = "folders" + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> Folder: + """Initialise/update a Folder object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.Folder + a local representation of the server object. + """ + if not online_id: + return Folder.new(**data) + + return Folder(identifier=online_id, _read_only=False, **data) + + @classmethod + @override + def post_tasks( + cls, + offline_id: str, + online_id: str, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """Post-upload actions. + + Removes local JSON data on successful upload, also handles removal + of additional files defining related identifiers. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + super().post_tasks( + offline_id=offline_id, + online_id=online_id, + data=data, + cache_directory=cache_directory, + ) + + _ = cache_directory.joinpath("server_ids", f"{offline_id}.txt").write_text( + online_id + ) + + +class TenantUploadAction(UploadAction): + object_type: str = "tenants" + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> Tenant: + """Initialise/update a Tenant object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.administrator.Tenant + a local representation of the server object. + """ + if not online_id: + return Tenant.new(**data) + + return Tenant(identifier=online_id, _read_only=False, **data) + + +class UserUploadAction(UploadAction): + object_type: str = "users" + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> User: + """Initialise/update a User object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.administrator.User + a local representation of the server object. + """ + if not online_id: + return User.new(**data) + + return User(identifier=online_id, _read_only=False, **data) + + +class TagUploadAction(UploadAction): + object_type: str = "tags" + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> Tag: + """Initialise/update a Tag object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.Tag + a local representation of the server object. + """ + if not online_id: + return Tag.new(**data) + + return Tag(identifier=online_id, _read_only=False, **data) + + @classmethod + @override + def post_tasks( + cls, + offline_id: str, + online_id: str, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """Post-upload actions. + + Removes local JSON data on successful upload, also handles removal + of additional files defining related identifiers. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + super().post_tasks(offline_id, online_id, data, cache_directory) + _ = cache_directory.joinpath("server_ids", f"{offline_id}.txt").write_text( + online_id + ) + + +class AlertUploadAction(UploadAction): + object_type: str = "alerts" + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> AlertType: + """Initialise/update an Alert object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.AlertType + a local representation of the server object. + """ + if not online_id: + _source: str = data["source"] + + if _source == "events": + return EventsAlert.new(**data) + elif _source == "metrics" and data.get("threshold"): + return MetricsThresholdAlert.new(**data) + elif _source == "metrics": + return MetricsRangeAlert.new(**data) + else: + return UserAlert.new(**data) + + return Alert(identifier=online_id, _read_only=False, **data) + + @classmethod + @override + def post_tasks( + cls, + offline_id: str, + online_id: str, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """Post-upload actions. + + Removes local JSON data on successful upload, also handles removal + of additional files defining related identifiers. + + Parameters + ----------- + offline_id : str + the offline identifier for the upload. + online_id : str + the recorded online identifier after upload. + data : dict[str, Any] + the data sent during upload. + cache_directory : pathlib.Path + the local cache directory to read from. + """ + super().post_tasks(offline_id, online_id, data, cache_directory) + _ = cache_directory.joinpath("server_ids", f"{offline_id}.txt").write_text( + online_id + ) + + +class StorageUploadAction(UploadAction): + object_type: str = "storage" + + @classmethod + @override + def initialise_object( + cls, online_id: ObjectID | None, **data + ) -> S3Storage | FileStorage: + """Initialise/update an Storage object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.S3Storage | simvue.api.objects.FileStorage + a local representation of the server object. + """ + if not online_id: + if data.get("config", {}).get("endpoint_url"): + return S3Storage.new(**data) + + return FileStorage.new(**data) + + return Storage(identifier=online_id, _read_only=False, **data) + + +class GridUploadAction(UploadAction): + object_type: str = "grids" + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> Grid: + """Initialise/update an Grid object. + + Parameters + ---------- + online_id : str | None, optional + the online identifier for an object to update, default None. + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.Grid + a local representation of the server object. + """ + if not online_id: + return Grid.new(**data) + + return Grid(identifier=online_id, _read_only=False, **data) + + +class MetricsUploadAction(UploadAction): + object_type: str = "metrics" + singular_object: bool = False + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> Metrics: + """Initialise Metrics. + + Parameters + ---------- + online_id : str | None, optional + parameter is ignored in this case, + update is ambiguous in this context + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.Grid + a local representation of the server object. + """ + _ = online_id + return Metrics.new(**data) + + +class GridMetricsUploadAction(UploadAction): + object_type: str = "grid_metrics" + singular_object: bool = False + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> GridMetrics: + """Initialise GridMetrics. + + Parameters + ---------- + online_id : str | None, optional + parameter is ignored in this case, + update is ambiguous in this context + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.GridMetrics + a local representation of the server object. + """ + _ = online_id + return GridMetrics.new(**data) + + +class EventsUploadAction(UploadAction): + object_type: str = "events" + singular_object: bool = False + + @classmethod + @override + def initialise_object(cls, online_id: ObjectID | None, **data) -> Events: + """Initialise Events. + + Parameters + ---------- + online_id : str | None, optional + parameter is ignored in this case, + update is ambiguous in this context + **data + data to create/modify a run. + + Returns + ------- + simvue.api.objects.Events + a local representation of the server object. + """ + _ = online_id + return Events.new(**data) + + +class HeartbeatUploadAction(UploadAction): + object_type: str = "heartbeat" + singular_object: bool = True + + @override + @classmethod + def initialise_object(cls, online_id: ObjectID | None, **data) -> None: + """No initialiser for this action.""" + _ = online_id + _ = data + + @override + @classmethod + def pre_tasks( + cls, offline_id: str, data: dict[str, typing.Any], cache_directory: pathlib.Path + ) -> None: + """No pre-tasks for this action.""" + _ = offline_id + _ = data + _ = cache_directory + pass + + @override + @classmethod + def uploadable_objects(cls, cache_directory: pathlib.Path) -> Generator[str]: + """Iterate through uploadable heartbeat run identifiers.""" + for file in cache_directory.glob("runs/*.heartbeat"): + yield file.stem + + @override + @classmethod + def _single_item_upload( + cls, + identifier: str, + id_mapping: dict[str, str], + cache_directory: pathlib.Path, + thread_lock: threading.Lock, + simvue_monitor_run: SimvueRun, + *, + throw_exceptions: bool = False, + retry_failed: bool = False, + upload_status: dict[str, str | float] | None, + ) -> None: + """Upload a single heartbeat item.""" + _ = simvue_monitor_run + if not (_online_id := id_mapping.get(identifier)): + # Run has been closed - can just remove heartbeat and continue + cache_directory.joinpath(f"runs/{identifier}.heartbeat").unlink() + return + + _local_config: SimvueConfiguration = SimvueConfiguration.fetch(mode="online") + + cls.logger.info("Sending heartbeat to run '%s'", identifier) + + _response: requests.Response = sv_put( + url=f"{_local_config.server.url}/runs/{_online_id}/heartbeat", + headers=_local_config.headers, + ) + + try: + _json_response = get_json_from_response( + expected_status=[http.HTTPStatus.OK], + scenario=f"Attempt to send heartbeat to run {_online_id}", + response=_response, + ) + except RuntimeError as e: + if throw_exceptions: + raise e + cls.logger.exception(e) + + @override + @classmethod + def post_tasks( + cls, + offline_id: str, + online_id: ObjectID | None, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """No post-tasks for this action.""" + _ = offline_id + _ = data + _ = cache_directory + _ = online_id + pass + + +class CO2IntensityUploadAction(UploadAction): + object_type: str = "co2_intensity" + + @override + @classmethod + def initialise_object(cls, online_id: ObjectID | None, **data) -> None: + """No initialiser for this action.""" + _ = online_id + _ = data + + @override + @classmethod + def pre_tasks( + cls, offline_id: str, data: dict[str, typing.Any], cache_directory: pathlib.Path + ) -> None: + """No pre-tasks for this action.""" + _ = offline_id + _ = data + _ = cache_directory + + @override + @classmethod + def post_tasks( + cls, + offline_id: str, + online_id: ObjectID | None, + data: dict[str, typing.Any], + cache_directory: pathlib.Path, + ) -> None: + """No post-tasks for this action.""" + _ = offline_id + _ = data + _ = cache_directory + + @override + @classmethod + def uploadable_objects(cls, cache_directory: pathlib.Path) -> Generator[str]: + """No uploadable object file data for this action.""" + yield from () + + @override + @classmethod + def _single_item_upload( + cls, + identifier: str, + id_mapping: dict[str, str], + cache_directory: pathlib.Path, + thread_lock: threading.Lock, + simvue_monitor_run: SimvueRun, + *, + throw_exceptions: bool = False, + retry_failed: bool = False, + upload_status: dict[str, str | float] | None, + ) -> None: + _ = simvue_monitor_run + _ = identifier + _ = id_mapping + _ = cache_directory + _ = thread_lock + + @override + @classmethod + def upload( + cls, + id_mapping: dict[str, str], + cache_directory: pathlib.Path, + thread_lock: threading.Lock, + threading_threshold: int, + max_thread_workers: int, + *, + throw_exceptions: bool = False, + retry_failed: bool = False, + upload_status: dict[str, str | float] | None = None, + simvue_monitor_run: dict[str, str | float] | None = None, + ) -> None: + """Upload CO2 intensity data.""" + _ = id_mapping + _ = thread_lock + _ = threading_threshold + _ = max_thread_workers + _ = simvue_monitor_run + _ = upload_status + + _local_config: SimvueConfiguration = SimvueConfiguration.fetch(mode="online") + + if not _local_config.metrics.enable_emission_metrics: + return + + try: + CO2Monitor( + thermal_design_power_per_gpu=None, + thermal_design_power_per_cpu=None, + local_data_directory=cache_directory, + intensity_refresh_interval=_local_config.eco.intensity_refresh_interval, + co2_intensity=_local_config.eco.co2_intensity, + co2_signal_api_token=_local_config.eco.co2_signal_api_token, + ).check_refresh() + except (ValueError, RuntimeError) as e: + if throw_exceptions: + raise e + cls.logger.exception(e) + + +# Define the upload action ordering +UPLOAD_ACTION_ORDER: tuple[type[UploadAction], ...] = ( + TenantUploadAction, + UserUploadAction, + StorageUploadAction, + FolderUploadAction, + TagUploadAction, + AlertUploadAction, + RunUploadAction, + GridUploadAction, + ArtifactUploadAction, + MetricsUploadAction, + GridMetricsUploadAction, + EventsUploadAction, + HeartbeatUploadAction, + CO2IntensityUploadAction, +) diff --git a/simvue/sender/base.py b/simvue/sender/base.py new file mode 100644 index 00000000..aacadc58 --- /dev/null +++ b/simvue/sender/base.py @@ -0,0 +1,175 @@ +"""Classes and methods for sending local objects to server. + +These are designed to be run with a cron task in cases where server connection +is either not possible on the simulation machine, or connection is limited. +""" + +import datetime +import logging +import threading +import typing +import pydantic +import psutil + +from simvue.sender.actions import UPLOAD_ACTION_ORDER +from simvue.config.user import SimvueConfiguration +from simvue.run import Run + +logger = logging.getLogger(__name__) + +UploadItem = typing.Literal[ + "tenants", + "users", + "storage", + "folders", + "tags", + "alerts", + "runs", + "grids", + "artifacts", + "metrics", + "grid_metrics", + "events", + "heartbeat", + "co2_intensity", +] + +UPLOAD_ORDER: list[str] = [action.object_type for action in UPLOAD_ACTION_ORDER] + + +class Sender: + @pydantic.validate_call + def __init__( + self, + cache_directory: pydantic.DirectoryPath | None = None, + max_workers: pydantic.PositiveInt = 5, + threading_threshold: pydantic.PositiveInt = 10, + throw_exceptions: bool = False, + retry_failed_uploads: bool = False, + run_notification: typing.Literal["none", "all", "email"] = "none", + run_retention_period: str | None = None, + ) -> None: + """Initialise a local data sender. + + Parameters + ---------- + cache_directory : pydantic.DirectoryPath | None, optional + The directory where cached files are stored, else use default. + max_workers : int, optional + The maximum number of threads to use, default 5. + threading_threshold : int, optional + The number of cached files above which threading will be used, default 10. + throw_exceptions : bool, optional + Whether to throw exceptions as they are encountered in the sender, + default is False (exceptions will be logged) + retry_failed_uploads : bool, optional + Whether to retry sending objects which previously failed, by default False + """ + _local_config: SimvueConfiguration = SimvueConfiguration.fetch(mode="online") + self._cache_directory = cache_directory or _local_config.offline.cache + self._cache_directory.joinpath("server_ids").mkdir(parents=True, exist_ok=True) + self._throw_exceptions = throw_exceptions + self._threading_threshold = threading_threshold + self._retry_failed_uploads = retry_failed_uploads + self._max_workers = max_workers + self._lock_path = self._cache_directory.joinpath("sender.lock") + self._thread_lock = threading.Lock() + self._run_notification: typing.Literal["none", "email"] = run_notification + self._run_retention_period: str | None = run_retention_period + self._upload_status: dict[str, str | float] = {} + self._id_mapping = { + file_path.name.split(".")[0]: file_path.read_text() + for file_path in self._cache_directory.glob("server_ids/*.txt") + } + + @property + def locked(self) -> bool: + """Check if dispatch locked by another sender.""" + if not self._lock_path: + raise RuntimeError("Expected lock file path, but none initialised.") + return self._lock_path.exists() and psutil.pid_exists( + int(self._lock_path.read_text()) + ) + + @property + def id_mapping(self) -> dict[str, str]: + """Get the ID mapping from offline to online ID.""" + return self._id_mapping + + def _lock(self) -> None: + """Lock to this sender.""" + if self.locked: + raise RuntimeError("A sender is already running for this cache!") + _ = self._lock_path.write_text(f"{psutil.Process().pid}") + + def _release(self) -> None: + """Release lock to this sender.""" + self._lock_path.unlink() + + def _initialise_monitor_run(self) -> Run: + """Create a Simvue run for monitoring upload.""" + _time_stamp: str = datetime.datetime.now(tz=datetime.UTC).strftime( + "%Y_%m_%d_%H_%M_%S" + ) + _run = Run(mode="online") + _ = _run.init( + name=f"sender_upload_{_time_stamp}", + folder="/sender", + notification=self._run_notification, + description="Simvue sender upload session.", + retention_period=self._run_retention_period, + timeout=None, + metadata={ + f"sender.item_count.{upload_object.object_type}": _obj_count + for upload_object in UPLOAD_ACTION_ORDER + if (_obj_count := upload_object.count(self._cache_directory)) > 0 + }, + no_color=True, + ) + _ = _run.config(suppress_errors=True, enable_emission_metrics=False) + _run.create_user_alert( + name="sender_object_upload_failure", + description="Triggers when an object fails to send to the server.", + notification=self._run_notification, + trigger_abort=False, + ) + + _run.upload_count = 0 + + return _run + + @pydantic.validate_call + def upload(self, objects_to_upload: list[UploadItem] | None = None) -> None: + """Upload objects to server. + + Parameters + ---------- + objects_to_upload : list[str] + Types of objects to upload, by default uploads all types of objects present in cache + """ + self._lock() + + _monitor_run = self._initialise_monitor_run() + self._upload_status = {} + + for action in UPLOAD_ACTION_ORDER: + if objects_to_upload and action.object_type not in objects_to_upload: + continue + + logger.info("Uploading %s", action.object_type) + + _n_objects: int = action.count(self._cache_directory) + + action.upload( + cache_directory=self._cache_directory, + id_mapping=self._id_mapping, + thread_lock=self._thread_lock, + throw_exceptions=self._throw_exceptions, + retry_failed=self._retry_failed_uploads, + threading_threshold=self._threading_threshold, + max_thread_workers=self._max_workers, + simvue_monitor_run=_monitor_run, + upload_status=self._upload_status, + ) + _monitor_run.close() + self._release() diff --git a/tests/conftest.py b/tests/conftest.py index ab574f2c..0e8570b8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -156,11 +156,12 @@ def testing_exit(status: int) -> None: with tempfile.TemporaryDirectory() as temp_d: monkeypatch.setenv("SIMVUE_OFFLINE_DIRECTORY", temp_d) with sv_run.Run("offline") as run: - yield run, setup_test_run(run, True, request) + _test_run_data = setup_test_run(run, True, request) + yield run, _test_run_data with contextlib.suppress(ObjectNotFoundError): sv_api_obj.Folder(identifier=run._folder.id).delete(recursive=True, delete_runs=True, runs_only=False) for alert_id in _test_run_data.get("alert_ids", []): - with contextlib.suppress(ObjectNotFoundError): + with contextlib.suppress(ObjectNotFoundError, RuntimeError): sv_api_obj.Alert(identifier=alert_id).delete() clear_out_files() @@ -172,6 +173,7 @@ def testing_exit(status: int) -> None: with sv_run.Run() as run: run.metric_spy = mocker.spy(run, "_get_internal_metrics") yield run, setup_test_run(run, False, request) + clear_out_files() @pytest.fixture @@ -306,6 +308,8 @@ def setup_test_run(run: sv_run.Run, create_objects: bool, request: pytest.Fixtur out_f.write( "print('Hello World!')" ) + print(test_script) + assert pathlib.Path(test_script).exists() run.save_file(test_script, category="code", name="test_code_upload") TEST_DATA["file_3"] = "test_code_upload" diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index 376ec309..0a990629 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -21,12 +21,10 @@ import simvue from simvue.api.objects import Alert, Metrics from simvue.api.objects.grids import GridMetrics -from simvue.eco.api_client import CO2SignalData, CO2SignalResponse from simvue.exception import ObjectNotFoundError, SimvueRunError -from simvue.eco.emissions_monitor import TIME_FORMAT, CO2Monitor +from simvue.sender import Sender import simvue.run as sv_run import simvue.client as sv_cl -import simvue.sender as sv_send import simvue.config.user as sv_cfg from simvue.api.objects import Run as RunObject @@ -116,7 +114,9 @@ def test_run_with_emissions_offline(speedy_heartbeat, mock_co2_signal, create_pl run_created.config(enable_emission_metrics=True) time.sleep(5) # Run should continue, but fail to log metrics until sender runs and creates file - id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True) + _sender.upload() + id_mapping = _sender.id_mapping _run = RunObject(identifier=id_mapping[run_created.id]) _metric_names = [item[0] for item in _run.metrics] for _metric in ["emissions", "energy_consumed"]: @@ -126,7 +126,9 @@ def test_run_with_emissions_offline(speedy_heartbeat, mock_co2_signal, create_pl assert _delta_metric_name not in _metric_names # Sender should now have made a local file, and the run should be able to use it to create emissions metrics time.sleep(5) - id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True) + _sender.upload() + id_mapping = _sender.id_mapping _run.refresh() _metric_names = [item[0] for item in _run.metrics] client = sv_cl.Client() @@ -318,7 +320,9 @@ def test_log_metrics_offline( run.log_metrics(METRICS) time.sleep(1) - id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True) + _sender.upload() + id_mapping = _sender.id_mapping time.sleep(1) if metric_type == "tensor": @@ -441,9 +445,11 @@ def test_visibility_offline( retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), ) _id = run.id - _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True) + _sender.upload() + id_mapping = _sender.id_mapping run.close() - _retrieved_run = RunObject(identifier=_id_mapping.get(_id)) + _retrieved_run = RunObject(identifier=id_mapping.get(_id)) if visibility == "tenant": assert _retrieved_run.visibility.tenant @@ -478,7 +484,8 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) - run, _ = create_plain_run_offline run_name = run.name run.log_event(EVENT_MSG) - sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True) + _sender.upload() client = sv_cl.Client() attempts: int = 0 @@ -488,7 +495,8 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) - not (event_data := client.get_events(client.get_run_id_from_name(run_name), count_limit=1)) ) and attempts < 5: time.sleep(1) - sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload() attempts += 1 assert event_data[0].get("message", EVENT_MSG) @@ -496,8 +504,9 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) - @pytest.mark.run @pytest.mark.offline def test_offline_tags(create_plain_run_offline: tuple[sv_run.Run, dict]) -> None: - run, run_data = create_plain_run_offline - sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _, run_data = create_plain_run_offline + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload() client = sv_cl.Client() tags = client.get_tags() @@ -557,7 +566,8 @@ def test_update_metadata_offline( # Try updating an already defined piece of metadata run.update_metadata({"a": 1}) - sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload() client = sv_cl.Client() run_info = client.get_run(client.get_run_id_from_name(run_name)) @@ -945,7 +955,8 @@ def test_save_file_offline( "w", ) as out_f: out_f.write("updated file!") - sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload() os.remove(out_name) client = sv_cl.Client() base_name = name or out_name.name @@ -1031,7 +1042,8 @@ def test_update_tags_offline( simvue_run.update_tags(["additional"]) - sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload() client = sv_cl.Client() run_data = client.get_run(client.get_run_id_from_name(run_name)) @@ -1208,27 +1220,30 @@ def test_add_alerts_offline(monkeypatch) -> None: rule="is inside range", ) - _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) - _online_run = RunObject(identifier=_id_mapping.get(run.id)) + _sender = Sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender.upload() + _online_run = RunObject(identifier=_sender.id_mapping.get(run.id)) # Check that there is no duplication - assert sorted(_online_run.alerts) == sorted([_id_mapping.get(_id) for _id in _expected_alerts]) + assert sorted(_online_run.alerts) == sorted([_sender.id_mapping.get(_id) for _id in _expected_alerts]) # Create another run without adding to run _id = run.create_user_alert(name=f"user_alert_{_uuid}", attach_to_run=False) - _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender.upload() # Check alert is not added _online_run.refresh() - assert sorted(_online_run.alerts) == sorted([_id_mapping.get(_id) for _id in _expected_alerts]) + assert sorted(_online_run.alerts) == sorted([_sender.id_mapping.get(_id) for _id in _expected_alerts]) # Try adding alerts with IDs, check there is no duplication _expected_alerts.append(_id) run.add_alerts(ids=_expected_alerts) - _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender.upload() _online_run.refresh() - assert sorted(_online_run.alerts) == sorted([_id_mapping.get(_id) for _id in _expected_alerts]) + assert sorted(_online_run.alerts) == sorted([_sender.id_mapping.get(_id) for _id in _expected_alerts]) run.close() @@ -1239,7 +1254,7 @@ def test_add_alerts_offline(monkeypatch) -> None: remove_runs=True, recursive=True ) - for _id in [_id_mapping.get(_id) for _id in _expected_alerts]: + for _id in [_sender.id_mapping.get(_id) for _id in _expected_alerts]: client.delete_alert(_id) @@ -1446,7 +1461,9 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None: ) run_id = run.id if mode == "offline": - _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload() + _id_mapping = _sender.id_mapping run_id = _id_mapping.get(run_id) client = simvue.Client() @@ -1460,7 +1477,8 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None: run.log_event("Testing!") if mode == "offline": - _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True) + _sender = Sender(cache_directory=os.environ["SIMVUE_OFFLINE_DIRECTORY"], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload() _reconnected_run = client.get_run(run_id) assert dict(_reconnected_run.metrics)["test_metric"]["last"] == 1 diff --git a/tests/functional/test_run_execute_process.py b/tests/functional/test_run_execute_process.py index 1337e301..66b8a4c9 100644 --- a/tests/functional/test_run_execute_process.py +++ b/tests/functional/test_run_execute_process.py @@ -9,7 +9,7 @@ from simvue import Run, Client from simvue.executor import get_current_shell -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.executor def test_monitor_processes(create_plain_run_offline: tuple[Run, dict]): @@ -24,7 +24,8 @@ def test_monitor_processes(create_plain_run_offline: tuple[Run, dict]): _run.add_process(f"process_1_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="Write-Output 'Hello World!'", executable="powershell") _run.add_process(f"process_2_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="Get-ChildItem", executable="powershell") _run.add_process(f"process_3_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="exit 0", executable="powershell") - sender(_run._sv_obj._local_staging_file.parents[1], 1, 10, ["folders", "runs", "alerts"], throw_exceptions=True) + _sender = Sender(_run._sv_obj._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["folders", "runs", "alerts"], ) @pytest.mark.executor diff --git a/tests/unit/test_event_alert.py b/tests/unit/test_event_alert.py index 2e4d7722..764f7b54 100644 --- a/tests/unit/test_event_alert.py +++ b/tests/unit/test_event_alert.py @@ -5,7 +5,7 @@ import uuid from simvue.api.objects import Alert, EventsAlert -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -55,12 +55,13 @@ def test_event_alert_creation_offline(offline_cache_setup) -> None: assert _local_data.get("alert").get("pattern") == "completed" assert _local_data.get("name") == f"events_alert_{_uuid}" assert _local_data.get("notification") == "none" - - _id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + + _sender = Sender(cache_directory=_alert._local_staging_file.parents[1], max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload(objects_to_upload=["alerts"]) time.sleep(1) # Get online ID and retrieve alert - _online_alert = Alert(_id_mapping.get(_alert.id)) + _online_alert = Alert(_sender.id_mapping.get(_alert.id)) assert _online_alert.source == "events" assert _online_alert.alert.frequency == 1 assert _online_alert.alert.pattern == "completed" @@ -106,11 +107,12 @@ def test_event_alert_modification_offline(offline_cache_setup) -> None: description=None ) _alert.commit() - _id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(cache_directory=_alert._local_staging_file.parents[1], max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload(objects_to_upload=["alerts"]) time.sleep(1) # Get online ID and retrieve alert - _online_alert = Alert(_id_mapping.get(_alert.id)) + _online_alert = Alert(_sender.id_mapping.get(_alert.id)) assert _online_alert.source == "events" assert _online_alert.alert.frequency == 1 assert _online_alert.alert.pattern == "completed" @@ -118,6 +120,7 @@ def test_event_alert_modification_offline(offline_cache_setup) -> None: assert _online_alert.notification == "none" _new_alert = EventsAlert(_alert.id) + assert _new_alert._offline _new_alert.read_only(False) _new_alert.description = "updated!" _new_alert.commit() @@ -130,7 +133,9 @@ def test_event_alert_modification_offline(offline_cache_setup) -> None: _local_data = json.load(in_f) assert _local_data.get("description") == "updated!" - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(cache_directory=_alert._local_staging_file.parents[1], max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload(objects_to_upload=["alerts"]) + time.sleep(1) _online_alert.refresh() diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 1839f067..f407e305 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -7,7 +7,7 @@ from simvue.api.objects import Events, Folder, Run from simvue.models import DATETIME_FORMAT -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -55,15 +55,16 @@ def test_events_creation_offline(offline_cache_setup) -> None: assert _local_data.get("run") == _run.id assert _local_data.get("events")[0].get("message") == "This is a test!" assert _local_data.get("events")[0].get("timestamp") == _timestamp - - _id_mapping = sender(_events._local_staging_file.parents[1], 1, 10, ["folders", "runs", "events"], throw_exceptions=True) + + _sender = Sender(cache_directory=_events._local_staging_file.parents[1], max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload(["folders", "runs", "events"]) time.sleep(1) # Get online version of events - _online_events = Events(_id_mapping.get(_events.id)) - _event_content = next(_online_events.get(run_id=_id_mapping.get(_run.id))) + _online_events = Events(_sender.id_mapping.get(_events.id)) + _event_content = next(_online_events.get(run_id=_sender.id_mapping.get(_run.id))) assert _event_content.message == "This is a test!" assert _event_content.timestamp == _timestamp _run.delete() - _folder.delete(recursive=True, delete_runs=True, runs_only=False) \ No newline at end of file + _folder.delete(recursive=True, delete_runs=True, runs_only=False) diff --git a/tests/unit/test_file_artifact.py b/tests/unit/test_file_artifact.py index ad736c3a..0c778d17 100644 --- a/tests/unit/test_file_artifact.py +++ b/tests/unit/test_file_artifact.py @@ -8,9 +8,8 @@ import json from simvue.api.objects import FileArtifact, Run, Artifact from simvue.api.objects.folder import Folder -from simvue.sender import sender -from simvue.client import Client -import logging +from simvue.sender import Sender + @pytest.mark.api @pytest.mark.online @@ -103,13 +102,14 @@ def test_file_artifact_creation_offline(offline_cache_setup, snapshot) -> None: # If snapshot, check artifact definition file and a copy of the actual file exist in staging area assert len(list(_artifact._local_staging_file.parent.iterdir())) == 2 if snapshot else 1 - _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True) + _sender = Sender(cache_directory=pathlib.Path(offline_cache_setup.name), max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload() time.sleep(1) # Check file(s) deleted after upload assert len(list(_artifact._local_staging_file.parent.iterdir())) == 0 - _online_artifact = Artifact(_id_mapping[_artifact.id]) + _online_artifact = Artifact(_sender.id_mapping[_artifact.id]) assert _online_artifact.name == _artifact.name _content = b"".join(_online_artifact.download_content()).decode("UTF-8") assert _content == f"Hello World! {_uuid}" @@ -159,13 +159,15 @@ def test_file_artifact_creation_offline_updated(offline_cache_setup, caplog, sna if not snapshot: with pytest.raises(RuntimeError, match="The SHA256 you specified did not match the calculated checksum."): - _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True) + _sender = Sender(cache_directory=pathlib.Path(offline_cache_setup.name), max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload() return else: - _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True) + _sender = Sender(cache_directory=pathlib.Path(offline_cache_setup.name), max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload() time.sleep(1) - _online_artifact = Artifact(_id_mapping[_artifact.id]) + _online_artifact = Artifact(_sender.id_mapping[_artifact.id]) assert _online_artifact.name == _artifact.name _content = b"".join(_online_artifact.download_content()).decode("UTF-8") # Since it was snapshotted, should be the state of the file before it was changed diff --git a/tests/unit/test_file_storage.py b/tests/unit/test_file_storage.py index 1c5c9ce6..cbd74a34 100644 --- a/tests/unit/test_file_storage.py +++ b/tests/unit/test_file_storage.py @@ -4,7 +4,7 @@ import uuid from simvue.api.objects import FileStorage -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -37,13 +37,14 @@ def test_create_file_storage_offline(offline_cache_setup) -> None: assert _local_data.get("name") == _uuid assert _local_data.get("is_enabled") == False assert _local_data.get("is_default") == False - - _id_mapping = sender(_storage._local_staging_file.parents[1], 1, 10, ["storage"], throw_exceptions=True) + + _sender = Sender(cache_directory=_storage._local_staging_file.parents[1], max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload(["storage"]) time.sleep(1) - _online_storage = FileStorage(_id_mapping.get(_storage.id)) + _online_storage = FileStorage(_sender.id_mapping.get(_storage.id)) assert _online_storage.name == _uuid assert _online_storage.is_enabled == False assert _online_storage.is_default == False _online_storage.read_only(False) - _online_storage.delete() \ No newline at end of file + _online_storage.delete() diff --git a/tests/unit/test_folder.py b/tests/unit/test_folder.py index 7004551f..8f0dd83a 100644 --- a/tests/unit/test_folder.py +++ b/tests/unit/test_folder.py @@ -1,14 +1,13 @@ -import typing import pytest import uuid import contextlib import json import time -import os from simvue.api.objects.folder import Folder -from simvue.sender import sender +from simvue.sender import Sender from simvue.client import Client + @pytest.mark.api @pytest.mark.online def test_folder_creation_online() -> None: @@ -42,14 +41,15 @@ def test_folder_creation_offline(offline_cache_setup) -> None: assert _folder._local_staging_file.name.split(".")[0] == _folder.id assert _local_data.get("path", None) == _path - sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True) + _sender = Sender(cache_directory=_folder._local_staging_file.parents[1], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload(["folders"]) + time.sleep(1) - client = Client() - _folder_new = client.get_folder(_path) + _folder_new = Folder(identifier=_sender.id_mapping[_folder.id]) assert _folder_new.path == _path - _folder_new.delete() + _folder_new.delete(recursive=True, delete_runs=True) assert not _folder._local_staging_file.exists() @@ -96,11 +96,11 @@ def test_folder_modification_offline(offline_cache_setup) -> None: _folder = Folder.new(path=_path, offline=True) _folder.commit() - sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True) + _sender = Sender(cache_directory=_folder._local_staging_file.parents[1], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload(["folders"]) time.sleep(1) - client = Client() - _folder_online = client.get_folder(_path) + _folder_online = Folder(identifier=_sender.id_mapping[_folder.id]) assert _folder_online.path == _path _folder_new = Folder(identifier=_folder.id) @@ -115,7 +115,8 @@ def test_folder_modification_offline(offline_cache_setup) -> None: assert _local_data.get("description", None) == _description assert _local_data.get("tags", None) == _tags - sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True) + _sender = Sender(cache_directory=_folder._local_staging_file.parents[1], max_workers=2, threading_threshold=10, throw_exceptions=True) + _sender.upload(["folders"]) time.sleep(1) _folder_online.refresh() diff --git a/tests/unit/test_grids.py b/tests/unit/test_grids.py index ed0f744f..87a745d9 100644 --- a/tests/unit/test_grids.py +++ b/tests/unit/test_grids.py @@ -7,13 +7,9 @@ import contextlib import json import time -import os from simvue.api.objects import Grid, GridMetrics, Folder, Run -from simvue.models import GridMetricSet -from simvue.run import Run as sv_Run -from simvue.sender import sender -from simvue.client import Client +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -72,10 +68,11 @@ def test_grid_creation_offline(offline_cache_setup) -> None: assert _local_data.get("runs", [None])[0] == [_run.id, "A"] npt.assert_array_equal(numpy.array(_local_data.get("grid")), _grid_def) - _id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids"], throw_exceptions=True) + _sender = Sender(cache_directory=_grid._local_staging_file.parents[1], max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload(["folders", "runs", "grids"]) time.sleep(1) # Get online version of grid - _online_grid = Grid(_id_mapping.get(_grid.id)) + _online_grid = Grid(_sender.id_mapping.get(_grid.id)) npt.assert_array_equal(numpy.array(_online_grid.grid), _grid_def) _grid.delete() with contextlib.suppress(RuntimeError): @@ -184,9 +181,10 @@ def test_grid_metrics_creation_offline(offline_cache_setup) -> None: _metrics.commit() _run.status = "completed" _run.commit() - _id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids", "grid_metrics"], throw_exceptions=True) + _sender = Sender(cache_directory=_grid._local_staging_file.parents[1], max_workers=1, threading_threshold=10, throw_exceptions=True) + _sender.upload(["folders", "runs", "grids", "grid_metrics"]) time.sleep(1) # Online metrics - assert list(GridMetrics.get(runs=[_id_mapping[_run.id]], metrics=["A"], step=_step)) + assert list(GridMetrics.get(runs=[_sender.id_mapping[_run.id]], metrics=["A"], step=_step)) _run.delete() _folder.delete(recursive=True, delete_runs=True, runs_only=False) diff --git a/tests/unit/test_metric_range_alert.py b/tests/unit/test_metric_range_alert.py index 56347eff..a7efbaff 100644 --- a/tests/unit/test_metric_range_alert.py +++ b/tests/unit/test_metric_range_alert.py @@ -5,8 +5,7 @@ import uuid from simvue.api.objects import MetricsRangeAlert, Alert -from simvue.client import Client -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -62,12 +61,12 @@ def test_metric_range_alert_creation_offline(offline_cache_setup) -> None: assert _local_data.get("name") == f"metrics_range_alert_{_uuid}" assert _local_data.get("notification") == "none" assert _local_data.get("alert").get("range_low") == 10 - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) # Get online ID and retrieve alert - _online_id = _alert._local_staging_file.parents[1].joinpath("server_ids", f"{_alert._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_alert = Alert(_online_id) + _online_alert = Alert(_sender.id_mapping[_alert.id]) assert _online_alert.source == "metrics" assert _online_alert.alert.frequency == 1 @@ -124,12 +123,12 @@ def test_metric_range_alert_modification_offline(offline_cache_setup) -> None: offline=True ) _alert.commit() - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) # Get online ID and retrieve alert - _online_id = _alert._local_staging_file.parents[1].joinpath("server_ids", f"{_alert._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_alert = Alert(_online_id) + _online_alert = Alert(_sender.id_mapping[_alert.id]) assert _online_alert.source == "metrics" assert _online_alert.alert.frequency == 1 @@ -149,7 +148,8 @@ def test_metric_range_alert_modification_offline(offline_cache_setup) -> None: _local_data = json.load(in_f) assert _local_data.get("description") == "updated!" - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) _online_alert.refresh() diff --git a/tests/unit/test_metric_threshold_alert.py b/tests/unit/test_metric_threshold_alert.py index 8f04c698..dfd1209e 100644 --- a/tests/unit/test_metric_threshold_alert.py +++ b/tests/unit/test_metric_threshold_alert.py @@ -5,7 +5,7 @@ import uuid from simvue.api.objects import MetricsThresholdAlert, Alert -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -61,12 +61,12 @@ def test_metric_threshold_alert_creation_offline(offline_cache_setup) -> None: assert _local_data.get("notification") == "none" assert _local_data.get("alert").get("threshold") == 10 - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) # Get online ID and retrieve alert - _online_id = _alert._local_staging_file.parents[1].joinpath("server_ids", f"{_alert._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_alert = Alert(_online_id) + _online_alert = Alert(_sender.id_mapping[_alert.id]) assert _online_alert.source == "metrics" assert _online_alert.alert.frequency == 1 @@ -123,12 +123,12 @@ def test_metric_threshold_alert_modification_offline(offline_cache_setup) -> Non ) _alert.commit() - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) # Get online ID and retrieve alert - _online_id = _alert._local_staging_file.parents[1].joinpath("server_ids", f"{_alert._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_alert = MetricsThresholdAlert(_online_id) + _online_alert = MetricsThresholdAlert(_sender.id_mapping[_alert.id]) assert _online_alert.source == "metrics" assert _online_alert.alert.frequency == 1 @@ -149,7 +149,7 @@ def test_metric_threshold_alert_modification_offline(offline_cache_setup) -> Non _local_data = json.load(in_f) assert _local_data.get("description") == "updated!" - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True).upload(["alerts"]) time.sleep(1) _online_alert.refresh() diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py index fea65482..74a5c0aa 100644 --- a/tests/unit/test_metrics.py +++ b/tests/unit/test_metrics.py @@ -7,7 +7,7 @@ from simvue.api.objects import Metrics, Folder, Run from simvue.models import DATETIME_FORMAT -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -88,14 +88,15 @@ def test_metrics_creation_offline(offline_cache_setup) -> None: assert _local_data.get("metrics")[0].get("step") == _step assert _local_data.get("metrics")[0].get("time") == _time - _id_mapping = sender(_metrics._local_staging_file.parents[1], 1, 10, ["folders", "runs", "metrics"], throw_exceptions=True) + _sender = Sender(_metrics._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload( ["folders", "runs", "metrics"]) time.sleep(1) # Get online version of metrics - _online_metrics = Metrics(_id_mapping.get(_metrics.id)) - _data = next(_online_metrics.get(metrics=["x", "y", "aB0-_/.:=><+()"], runs=[_id_mapping.get(_run.id)], xaxis="step")) - assert sorted(_online_metrics.names(run_ids=[_id_mapping.get(_run.id)])) == sorted(_values.keys()) - assert _data.get(_id_mapping.get(_run.id)).get('y')[0].get('value') == 2.0 - assert _data.get(_id_mapping.get(_run.id)).get('y')[0].get('step') == 1 + _online_metrics = Metrics(_sender.id_mapping.get(_metrics.id)) + _data = next(_online_metrics.get(metrics=["x", "y", "aB0-_/.:=><+()"], runs=[_sender.id_mapping.get(_run.id)], xaxis="step")) + assert sorted(_online_metrics.names(run_ids=[_sender.id_mapping.get(_run.id)])) == sorted(_values.keys()) + assert _data.get(_sender.id_mapping.get(_run.id)).get('y')[0].get('value') == 2.0 + assert _data.get(_sender.id_mapping.get(_run.id)).get('y')[0].get('step') == 1 _run.delete() _folder.delete(recursive=True, delete_runs=True, runs_only=False) diff --git a/tests/unit/test_object_artifact.py b/tests/unit/test_object_artifact.py index b45eef1a..1a60dda4 100644 --- a/tests/unit/test_object_artifact.py +++ b/tests/unit/test_object_artifact.py @@ -6,7 +6,7 @@ import json from simvue.api.objects import ObjectArtifact, Run, Artifact from simvue.api.objects.folder import Folder -from simvue.sender import sender +from simvue.sender import Sender from simvue.serialization import _deserialize_numpy_array @pytest.mark.api @@ -63,10 +63,11 @@ def test_object_artifact_creation_offline(offline_cache_setup) -> None: assert _local_data.get("mime_type") == "application/vnd.simvue.numpy.v1" assert _local_data.get("runs") == {_run.id: "input"} - _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True) + _sender = Sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True) + _sender.upload() time.sleep(1) - _online_artifact = Artifact(_id_mapping.get(_artifact.id)) + _online_artifact = Artifact(_sender.id_mapping.get(_artifact.id)) assert _online_artifact.name == f"test_object_artifact_offline_{_uuid}" assert _online_artifact.mime_type == "application/vnd.simvue.numpy.v1" diff --git a/tests/unit/test_run.py b/tests/unit/test_run.py index 395b658a..1ee2e6f9 100644 --- a/tests/unit/test_run.py +++ b/tests/unit/test_run.py @@ -5,7 +5,7 @@ import datetime import uuid from simvue.api.objects.run import RunBatchArgs -from simvue.sender import sender +from simvue.sender import Sender from simvue.api.objects import Run, Folder from simvue.client import Client @@ -41,18 +41,17 @@ def test_run_creation_offline(offline_cache_setup) -> None: assert _local_data.get("name") == f"simvue_offline_run_{_uuid}" assert _local_data.get("folder") == _folder_name - sender(_run._local_staging_file.parents[1], 1, 10, ["folders", "runs"], throw_exceptions=True) + _sender = Sender(_run._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["folders", "runs"]) time.sleep(1) # Get online ID and retrieve run - _online_id = _run._local_staging_file.parents[1].joinpath("server_ids", f"{_run._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_run = Run(_online_id) + _online_run = Run(_sender.id_mapping[_run.id]) assert _online_run.name == _run_name assert _online_run.folder == _folder_name _run.delete() - _run._local_staging_file.parents[1].joinpath("server_ids", f"{_run._local_staging_file.name.split('.')[0]}.txt").unlink() client = Client() client.delete_folder(_folder_name, recursive=True, remove_runs=True) @@ -119,12 +118,12 @@ def test_run_modification_offline(offline_cache_setup) -> None: assert _new_run.description == "Simvue test run" assert _new_run.name == "simvue_test_run" - sender(_run._local_staging_file.parents[1], 1, 10, ["folders", "runs"], throw_exceptions=True) + _sender = Sender(_run._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["folders", "runs"]) time.sleep(1) # Get online ID and retrieve run - _online_id = _run._local_staging_file.parents[1].joinpath("server_ids", f"{_run._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_run = Run(_online_id) + _online_run = Run(_sender.id_mapping[_run.id]) assert _online_run.ttl == 120 assert _online_run.description == "Simvue test run" @@ -139,7 +138,8 @@ def test_run_modification_offline(offline_cache_setup) -> None: _online_run.refresh() assert _online_run.tags == [] - sender(_run._local_staging_file.parents[1], 1, 10, ["folders", "runs"], throw_exceptions=True) + _sender = Sender(_run._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["folders", "runs"]) time.sleep(1) _online_run.refresh() diff --git a/tests/unit/test_s3_storage.py b/tests/unit/test_s3_storage.py index 2246e98a..e251a034 100644 --- a/tests/unit/test_s3_storage.py +++ b/tests/unit/test_s3_storage.py @@ -5,7 +5,7 @@ from simvue.api.objects import S3Storage from simvue.api.objects.storage.fetch import Storage -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -71,8 +71,9 @@ def test_create_s3_offline(offline_cache_setup) -> None: assert not _local_data.get("user", None) assert not _local_data.get("usage", None) - _id_mapping = sender(_storage._local_staging_file.parents[1], 1, 10, ["storage"], throw_exceptions=True) - _online_id = _id_mapping[_storage.id] + _sender = Sender(_storage._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["storage"]) + _online_id = _sender.id_mapping[_storage.id] time.sleep(1) _online_storage = S3Storage(_online_id) diff --git a/tests/unit/test_sender.py b/tests/unit/test_sender.py index 5d0933e6..9177903d 100644 --- a/tests/unit/test_sender.py +++ b/tests/unit/test_sender.py @@ -1,13 +1,10 @@ -import contextlib import json import pytest import time import datetime import uuid -from simvue.api.objects.run import RunBatchArgs -from simvue.sender import sender +from simvue.sender import Sender from simvue.api.objects import Run, Metrics, Folder -from simvue.client import Client from simvue.models import DATETIME_FORMAT import logging import pathlib @@ -37,23 +34,26 @@ def test_sender_exception_handling(offline_cache_setup, caplog, throw_exceptions if throw_exceptions: with pytest.raises(ValueError): - sender(throw_exceptions=True, threading_threshold=1 if parallel else 10) + _sender = Sender(throw_exceptions=True, threading_threshold=1 if parallel else 10) + _sender.upload() return with caplog.at_level(logging.ERROR): - sender(threading_threshold=1 if parallel else 10) + _sender = Sender(threading_threshold=1 if parallel else 10) + _sender.upload() - assert "Error while committing 'Metrics'" in caplog.text + assert "Error while committing metrics" in caplog.text # Wait, then try sending again time.sleep(1) caplog.clear() with caplog.at_level(logging.ERROR): - sender(retry_failed_uploads=retry_failed_uploads, threading_threshold=1 if parallel else 10) + _sender = Sender(retry_failed_uploads=retry_failed_uploads, threading_threshold=1 if parallel else 10) + _sender.upload() if retry_failed_uploads: - assert "Error while committing 'Metrics'" in caplog.text + assert "Error while committing metrics" in caplog.text else: assert not caplog.text @@ -100,7 +100,8 @@ def test_sender_server_ids(offline_cache_setup, caplog, parallel): # Send both items with caplog.at_level(logging.ERROR): - sender(threading_threshold=1 if parallel else 10) + _sender = Sender(threading_threshold=1 if parallel else 10) + _sender.upload() assert not caplog.text @@ -139,7 +140,8 @@ def test_sender_server_ids(offline_cache_setup, caplog, parallel): # Run sender again, check online ID is correctly loaded from file and substituted for offline ID with caplog.at_level(logging.ERROR): - sender(threading_threshold=1 if parallel else 10) + _sender = Sender(threading_threshold=1 if parallel else 10) + _sender.upload() assert not caplog.text @@ -171,8 +173,9 @@ def test_send_heartbeat(offline_cache_setup, parallel, mocker): _offline_runs.append(_run) - _id_mapping = sender(threading_threshold=1 if parallel else 10) - _online_runs = [Run(identifier=_id_mapping.get(_offline_run.id)) for _offline_run in _offline_runs] + _sender = Sender(threading_threshold=1 if parallel else 10) + _sender.upload() + _online_runs = [Run(identifier=_sender.id_mapping.get(_offline_run.id)) for _offline_run in _offline_runs] assert all([_online_run.status == "running" for _online_run in _online_runs]) spy_put = mocker.spy(requests, "put") @@ -181,11 +184,11 @@ def test_send_heartbeat(offline_cache_setup, parallel, mocker): for i in range(10): time.sleep(0.5) [_offline_run.send_heartbeat() for _offline_run in _offline_runs] - sender(threading_threshold=1 if parallel else 10) + Sender(threading_threshold=1 if parallel else 10).upload() # Check requests.put() endpoint called 50 times - once for each of the 5 runs, on all 10 iterations assert spy_put.call_count == 50 # Get online runs and check all running [_online_run.refresh() for _online_run in _online_runs] - assert all([_online_run.status == "running" for _online_run in _online_runs]) \ No newline at end of file + assert all([_online_run.status == "running" for _online_run in _online_runs]) diff --git a/tests/unit/test_tag.py b/tests/unit/test_tag.py index 381234d2..c91af8c1 100644 --- a/tests/unit/test_tag.py +++ b/tests/unit/test_tag.py @@ -5,7 +5,7 @@ import json import pydantic.color from simvue.api.objects.tag import Tag -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -35,10 +35,11 @@ def test_tag_creation_offline(offline_cache_setup) -> None: assert _local_data.get("name") == f"test_tag_{_uuid}" - _id_mapping = sender(_tag._local_staging_file.parents[1], 1, 10, ["tags"], throw_exceptions=True) + _sender = Sender(_tag._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["tags"]) time.sleep(1) - _online_id = _id_mapping.get(_tag.id) + _online_id = _sender.id_mapping.get(_tag.id) _online_tag = Tag(_online_id) assert _online_tag.name == f"test_tag_{_uuid}" @@ -78,8 +79,9 @@ def test_tag_modification_offline(offline_cache_setup) -> None: assert _local_data.get("name") == f"test_tag_{_uuid}" - _id_mapping = sender(_tag._local_staging_file.parents[1], 1, 10, ["tags"], throw_exceptions=True) - _online_id = _id_mapping.get(_tag.id) + _sender = Sender(_tag._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["tags"]) + _online_id = _sender.id_mapping.get(_tag.id) _online_tag = Tag(_online_id) assert _online_tag.name == f"test_tag_{_uuid}" @@ -101,7 +103,8 @@ def test_tag_modification_offline(offline_cache_setup) -> None: assert pydantic.color.parse_str(_local_data.get("colour")).r == 250 / 255 assert _local_data.get("description") == "modified test tag" - sender(_tag._local_staging_file.parents[1], 1, 10, ["tags"], throw_exceptions=True) + _sender = Sender(_tag._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["tags"]) time.sleep(1) # Check online version is updated diff --git a/tests/unit/test_tenant.py b/tests/unit/test_tenant.py index 04684467..73117ddd 100644 --- a/tests/unit/test_tenant.py +++ b/tests/unit/test_tenant.py @@ -5,7 +5,7 @@ import uuid from simvue.api.objects.administrator import Tenant -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -40,9 +40,10 @@ def test_create_tenant_offline(offline_cache_setup) -> None: assert _local_data.get("name") == _uuid assert _local_data.get("is_enabled") == True - _id_mapping = sender(_new_tenant._local_staging_file.parents[1], 1, 10, ["tenants"], throw_exceptions=True) + _sender = Sender(_new_tenant._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["tenants"]) time.sleep(1) - _online_user = Tenant(_id_mapping.get(_new_tenant.id)) + _online_user = Tenant(_sender.id_mapping.get(_new_tenant.id)) assert _online_user.name == _uuid assert _online_user.is_enabled == True diff --git a/tests/unit/test_user.py b/tests/unit/test_user.py index a53f3cfd..5aac3c11 100644 --- a/tests/unit/test_user.py +++ b/tests/unit/test_user.py @@ -5,7 +5,7 @@ import uuid from simvue.api.objects.administrator import User, Tenant -from simvue.sender import sender +from simvue.sender import Sender @pytest.mark.api @pytest.mark.online @@ -62,9 +62,10 @@ def test_create_user_offline(offline_cache_setup) -> None: assert _local_data.get("fullname") == "Joe Bloggs" assert _local_data.get("email") == "jbloggs@simvue.io" - _id_mapping = sender(_user._local_staging_file.parents[1], 1, 10, ["users"], throw_exceptions=True) + _sender = Sender(_user._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["users"]) time.sleep(1) - _online_user = User(_id_mapping.get(_user.id)) + _online_user = User(_sender.id_mapping.get(_user.id)) assert _online_user.username == "jbloggs" assert _online_user.fullname == "Joe Bloggs" assert _online_user.email == "jbloggs@simvue.io" diff --git a/tests/unit/test_user_alert.py b/tests/unit/test_user_alert.py index f1f1acea..f13248c3 100644 --- a/tests/unit/test_user_alert.py +++ b/tests/unit/test_user_alert.py @@ -3,7 +3,7 @@ import contextlib import pytest import uuid -from simvue.sender import sender +from simvue.sender import Sender from simvue.api.objects import Alert, UserAlert, Run from simvue.api.objects.folder import Folder @@ -46,11 +46,11 @@ def test_user_alert_creation_offline(offline_cache_setup) -> None: assert _local_data.get("name") == f"users_alert_{_uuid}" assert _local_data.get("notification") == "none" - _id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) - _online_id = _alert._local_staging_file.parents[1].joinpath("server_ids", f"{_alert._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_alert = Alert(_online_id) + _online_alert = Alert(_sender.id_mapping[_alert.id]) assert _online_alert.source == "user" assert _online_alert.name == f"users_alert_{_uuid}" @@ -94,12 +94,13 @@ def test_user_alert_modification_offline(offline_cache_setup) -> None: ) _alert.commit() - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) + time.sleep(1) # Get online ID and retrieve alert - _online_id = _alert._local_staging_file.parents[1].joinpath("server_ids", f"{_alert._local_staging_file.name.split('.')[0]}.txt").read_text() - _online_alert = UserAlert(_online_id) + _online_alert = UserAlert(_sender.id_mapping[_alert.id]) assert _online_alert.source == "user" assert _online_alert.name == f"users_alert_{_uuid}" @@ -117,7 +118,8 @@ def test_user_alert_modification_offline(offline_cache_setup) -> None: with _alert._local_staging_file.open() as in_f: _local_data = json.load(in_f) assert _local_data.get("description") == "updated!" - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) _online_alert.refresh() @@ -191,12 +193,13 @@ def test_user_alert_status_offline(offline_cache_setup) -> None: _run.alerts = [_alert.id] _run.commit() - _id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["folders", "runs", "alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["folders", "runs", "alerts"]) time.sleep(1) # Get online aler, check status is not set - _online_alert = UserAlert(_id_mapping.get(_alert.id)) - assert not _online_alert.get_status(run_id=_id_mapping.get(_run.id)) + _online_alert = UserAlert(_sender.id_mapping.get(_alert.id)) + assert not _online_alert.get_status(run_id=_sender.id_mapping.get(_run.id)) _alert.set_status(_run.id, "critical") _alert.commit() @@ -204,14 +207,15 @@ def test_user_alert_status_offline(offline_cache_setup) -> None: # Check online status is still not set as change has not been sent _online_alert.refresh() - assert not _online_alert.get_status(run_id=_id_mapping.get(_run.id)) + assert not _online_alert.get_status(run_id=_sender.id_mapping.get(_run.id)) - sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True) + _sender = Sender(_alert._local_staging_file.parents[1], 1, 10, throw_exceptions=True) + _sender.upload(["alerts"]) time.sleep(1) # Check online status has been updated _online_alert.refresh() - assert _online_alert.get_status(run_id=_id_mapping.get(_run.id)) == "critical" + assert _online_alert.get_status(run_id=_sender.id_mapping.get(_run.id)) == "critical" _run.delete() _folder.delete(recursive=True, runs_only=False, delete_runs=True)