diff --git a/simvue/api/objects/artifact/file.py b/simvue/api/objects/artifact/file.py index 664dadde..5cc41465 100644 --- a/simvue/api/objects/artifact/file.py +++ b/simvue/api/objects/artifact/file.py @@ -4,6 +4,9 @@ import pydantic import os import pathlib +import shutil +from simvue.config.user import SimvueConfiguration +from datetime import datetime from simvue.models import NAME_REGEX from simvue.utilities import get_mimetype_for_file, get_mimetypes, calculate_sha256 @@ -39,6 +42,7 @@ def new( metadata: dict[str, typing.Any] | None, upload_timeout: int | None = None, offline: bool = False, + snapshot: bool = False, **kwargs, ) -> Self: """Create a new artifact either locally or on the server @@ -61,6 +65,8 @@ def new( specify the timeout in seconds for upload offline : bool, optional whether to define this artifact locally, default is False + snapshot : bool, optional + whether to create a snapshot of this file before uploading it, default is False """ _mime_type = mime_type or get_mimetype_for_file(file_path) @@ -73,6 +79,19 @@ def new( _file_checksum = kwargs.pop("checksum") else: file_path = pathlib.Path(file_path) + if snapshot: + _user_config = SimvueConfiguration.fetch() + + _local_staging_dir: pathlib.Path = _user_config.offline.cache.joinpath( + "artifacts" + ) + _local_staging_dir.mkdir(parents=True, exist_ok=True) + _local_staging_file = _local_staging_dir.joinpath( + f"{file_path.stem}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S_%f')[:-3]}.file" + ) + shutil.copy(file_path, _local_staging_file) + file_path = _local_staging_file + _file_size = file_path.stat().st_size _file_orig_path = file_path.expanduser().absolute() _file_checksum = calculate_sha256(f"{file_path}", is_file=True) @@ -105,4 +124,8 @@ def new( with open(_file_orig_path, "rb") as out_f: _artifact._upload(file=out_f, timeout=upload_timeout, file_size=_file_size) + # If snapshot created, delete it after uploading + if pathlib.Path(_file_orig_path).parent == _artifact._local_staging_file.parent: + pathlib.Path(_file_orig_path).unlink() + return _artifact diff --git a/simvue/run.py b/simvue/run.py index c0904d78..6e37aff9 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -1632,6 +1632,7 @@ def save_file( category: typing.Literal["input", "output", "code"], file_type: str | None = None, preserve_path: bool = False, + snapshot: bool = False, name: typing.Optional[ typing.Annotated[str, pydantic.Field(pattern=NAME_REGEX)] ] = None, @@ -1652,6 +1653,8 @@ def save_file( the MIME file type else this is deduced, by default None preserve_path : bool, optional whether to preserve the path during storage, by default False + snapshot : bool, optional + whether to take a snapshot of the file before uploading, by default False name : str, optional name to associate with this file, by default None metadata : str | None, optional @@ -1686,6 +1689,7 @@ def save_file( offline=self._user_config.run.mode == "offline", mime_type=file_type, metadata=metadata, + snapshot=snapshot, ) _artifact.attach_to_run(self.id, category) except (ValueError, RuntimeError) as e: diff --git a/simvue/sender.py b/simvue/sender.py index 0989cdbc..ee8f2913 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -13,7 +13,6 @@ import requests import psutil from simvue.config.user import SimvueConfiguration - import simvue.api.objects from simvue.api.objects.artifact.base import ArtifactBase from simvue.eco.emissions_monitor import CO2Monitor @@ -37,13 +36,29 @@ _logger = logging.getLogger(__name__) +def _log_upload_failed(file_path: pydantic.FilePath) -> None: + """Record that an object failed to upload in the object offline cache file. + + Parameters + ---------- + file_path : pydantic.FilePath + The path to the offline cache file for the object + """ + with file_path.open("r") as file: + _data = json.load(file) + _data["upload_failed"] = True + with file_path.open("w") as file: + json.dump(_data, file) + + def upload_cached_file( cache_dir: pydantic.DirectoryPath, obj_type: str, file_path: pydantic.FilePath, id_mapping: dict[str, str], + retry_failed_uploads: bool, lock: threading.Lock, -): +) -> None: """Upload data stored in a cached file to the Simvue server. Parameters @@ -62,41 +77,53 @@ def upload_cached_file( _current_id = file_path.name.split(".")[0] _data = json.load(file_path.open()) _exact_type: str = _data.pop("obj_type") + + if _data.pop("upload_failed", False) and not retry_failed_uploads: + return + try: _instance_class = getattr(simvue.api.objects, _exact_type) - except AttributeError as e: - raise RuntimeError(f"Attempt to initialise unknown type '{_exact_type}'") from e + except AttributeError: + _logger.error(f"Attempt to initialise unknown type '{_exact_type}'") + _log_upload_failed(file_path) + return # If it is an ObjectArtifact, need to load the object as bytes from a different file if issubclass(_instance_class, simvue.api.objects.ObjectArtifact): with open(file_path.parent.joinpath(f"{_current_id}.object"), "rb") as file: _data["serialized"] = file.read() + try: + # We want to reconnect if there is an online ID stored for this file + if _online_id := id_mapping.get(_current_id): + obj_for_upload = _instance_class( + identifier=_online_id, _read_only=False, **_data + ) + else: + obj_for_upload = _instance_class.new(**_data) - # We want to reconnect if there is an online ID stored for this file - if _online_id := id_mapping.get(_current_id): - obj_for_upload = _instance_class( - identifier=_online_id, _read_only=False, **_data - ) - else: - obj_for_upload = _instance_class.new(**_data) - - with lock: - obj_for_upload.on_reconnect(id_mapping) + with lock: + obj_for_upload.on_reconnect(id_mapping) - try: if not issubclass(_instance_class, ArtifactBase): obj_for_upload.commit() _new_id = obj_for_upload.id - except RuntimeError as error: + + except Exception as error: if "status 409" in error.args[0]: return - raise error - if not _new_id: - raise RuntimeError( - f"Object of type '{obj_for_upload.__class__.__name__}' has no identifier" + + _logger.error( + f"Error while committing '{_instance_class.__name__}': {error.args[0]}" ) + _log_upload_failed(file_path) + return + if not _new_id: + _logger.error(f"Object of type '{_instance_class.__name__}' has no identifier") + _log_upload_failed(file_path) + return + _logger.info( - f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {obj_for_upload.__class__.__name__} '{_new_id}'" + f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {_instance_class.__name__} '{_new_id}'" ) file_path.unlink(missing_ok=True) @@ -155,6 +182,7 @@ def sender( max_workers: int = 5, threading_threshold: int = 10, objects_to_upload: list[str] = UPLOAD_ORDER, + retry_failed_uploads: bool = False, ) -> dict[str, str]: """Send data from a local cache directory to the Simvue server. @@ -168,6 +196,8 @@ def sender( The number of cached files above which threading will be used objects_to_upload : list[str] Types of objects to upload, by default uploads all types of objects present in cache + retry_failed_uploads : bool, optional + Whether to retry sending objects which previously failed, by default False Returns ------- @@ -203,7 +233,14 @@ def sender( _offline_files = _all_offline_files[_obj_type] if len(_offline_files) < threading_threshold: for file_path in _offline_files: - upload_cached_file(cache_dir, _obj_type, file_path, _id_mapping, _lock) + upload_cached_file( + cache_dir=cache_dir, + obj_type=_obj_type, + file_path=file_path, + id_mapping=_id_mapping, + retry_failed_uploads=retry_failed_uploads, + lock=_lock, + ) else: with ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="sender_session_upload" @@ -214,6 +251,7 @@ def sender( obj_type=_obj_type, file_path=file_path, id_mapping=_id_mapping, + retry_failed_uploads=retry_failed_uploads, lock=_lock, ), _offline_files, diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index dd82846a..4a0d54dc 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -809,6 +809,9 @@ def test_set_folder_details(request: pytest.FixtureRequest) -> None: @pytest.mark.run +@pytest.mark.parametrize( + "snapshot", (True, False) +) @pytest.mark.parametrize( "valid_mimetype,preserve_path,name,allow_pickle,empty_file,category", [ @@ -827,6 +830,7 @@ def test_save_file_online( allow_pickle: bool, empty_file: bool, category: typing.Literal["input", "output", "code"], + snapshot: bool, capfd, request, ) -> None: @@ -860,6 +864,7 @@ def test_save_file_online( file_type=file_type, preserve_path=preserve_path, name=name, + snapshot=snapshot ) else: with pytest.raises(RuntimeError): @@ -891,6 +896,9 @@ def test_save_file_online( @pytest.mark.run @pytest.mark.offline +@pytest.mark.parametrize( + "snapshot", (True, False) +) @pytest.mark.parametrize( "preserve_path,name,allow_pickle,empty_file,category", [ @@ -908,6 +916,7 @@ def test_save_file_offline( name: str | None, allow_pickle: bool, empty_file: bool, + snapshot: bool, category: typing.Literal["input", "output", "code"], capfd, ) -> None: @@ -927,7 +936,15 @@ def test_save_file_offline( file_type=file_type, preserve_path=preserve_path, name=name, + snapshot=snapshot ) + # if snapshotting, check file can be updated, but previous contents set + if snapshot: + with open( + (out_name := pathlib.Path(tempd).joinpath("test_file.txt")), + "w", + ) as out_f: + out_f.write("updated file!") sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) os.remove(out_name) client = sv_cl.Client() @@ -944,8 +961,11 @@ def test_save_file_offline( name=f"{name or stored_name}", output_dir=tempd, ) - assert out_loc.joinpath(name or out_name.name).exists() - + assert out_file.exists() + with open( + out_file, "r") as out_f: + content = out_f.read() + assert content == "test data entry" @pytest.mark.run def test_update_tags_running( diff --git a/tests/unit/test_file_artifact.py b/tests/unit/test_file_artifact.py index c442086c..e59867b9 100644 --- a/tests/unit/test_file_artifact.py +++ b/tests/unit/test_file_artifact.py @@ -10,10 +10,15 @@ from simvue.api.objects.folder import Folder from simvue.sender import sender from simvue.client import Client +import logging @pytest.mark.api @pytest.mark.online -def test_file_artifact_creation_online() -> None: +@pytest.mark.parametrize( + "snapshot", + (True, False) +) +def test_file_artifact_creation_online(offline_cache_setup, snapshot) -> None: _uuid: str = f"{uuid.uuid4()}".split("-")[0] _folder_name = f"/simvue_unit_testing/{_uuid}" _folder = Folder.new(path=_folder_name) @@ -32,7 +37,8 @@ def test_file_artifact_creation_online() -> None: file_path=_path, storage=None, mime_type=None, - metadata=None + metadata=None, + snapshot=snapshot ) _artifact.attach_to_run(_run.id, "input") time.sleep(1) @@ -45,6 +51,11 @@ def test_file_artifact_creation_online() -> None: _content = b"".join(_artifact.download_content()).decode("UTF-8") assert _content == f"Hello World! {_uuid}" assert _artifact.to_dict() + + # If snapshotting, check no local copy remains + if snapshot: + assert len(list(_artifact._local_staging_file.parent.iterdir())) == 0 + _run.delete() _folder.delete(recursive=True, delete_runs=True, runs_only=False) with contextlib.suppress(FileNotFoundError): @@ -55,7 +66,11 @@ def test_file_artifact_creation_online() -> None: @pytest.mark.api @pytest.mark.offline -def test_file_artifact_creation_offline(offline_cache_setup) -> None: +@pytest.mark.parametrize( + "snapshot", + (True, False) +) +def test_file_artifact_creation_offline(offline_cache_setup, snapshot) -> None: _uuid: str = f"{uuid.uuid4()}".split("-")[0] _folder_name = f"/simvue_unit_testing/{_uuid}" _folder = Folder.new(path=_folder_name, offline=True) @@ -74,7 +89,8 @@ def test_file_artifact_creation_offline(offline_cache_setup) -> None: storage=None, mime_type=None, offline=True, - metadata=None + metadata=None, + snapshot=snapshot ) _artifact.attach_to_run(_run._identifier, category="input") @@ -84,12 +100,76 @@ def test_file_artifact_creation_offline(offline_cache_setup) -> None: assert _local_data.get("name") == f"test_file_artifact_{_uuid}" assert _local_data.get("runs") == {_run._identifier: "input"} + # If snapshot, check artifact definition file and a copy of the actual file exist in staging area + assert len(list(_artifact._local_staging_file.parent.iterdir())) == 2 if snapshot else 1 + _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) time.sleep(1) + # Check file(s) deleted after upload + assert len(list(_artifact._local_staging_file.parent.iterdir())) == 0 + + _online_artifact = Artifact(_id_mapping[_artifact.id]) + assert _online_artifact.name == _artifact.name + _content = b"".join(_online_artifact.download_content()).decode("UTF-8") + assert _content == f"Hello World! {_uuid}" + _run.delete() + _folder.delete() + + +@pytest.mark.api +@pytest.mark.offline +@pytest.mark.parametrize( + "snapshot", + (True, False) +) +def test_file_artifact_creation_offline_updated(offline_cache_setup, caplog, snapshot) -> None: + _uuid: str = f"{uuid.uuid4()}".split("-")[0] + _folder_name = f"/simvue_unit_testing/{_uuid}" + _folder = Folder.new(path=_folder_name, offline=True) + _run = Run.new(name=f"test_file_artifact_creation_offline_updated_{_uuid}",folder=_folder_name, offline=True) + + _path = pathlib.Path(offline_cache_setup.name).joinpath("hello_world.txt") + + with _path.open("w") as out_f: + out_f.write(f"Hello World! {_uuid}") + + _folder.commit() + _run.commit() + _artifact = FileArtifact.new( + name=f"test_file_artifact_{_uuid}", + file_path=_path, + storage=None, + mime_type=None, + offline=True, + metadata=None, + snapshot=snapshot + ) + _artifact.attach_to_run(_run._identifier, category="input") + + with _artifact._local_staging_file.open() as in_f: + _local_data = json.load(in_f) + + assert _local_data.get("name") == f"test_file_artifact_{_uuid}" + assert _local_data.get("runs") == {_run._identifier: "input"} + + # Change the file after the artifact is created, but before it is sent + with _path.open("w") as out_f: + out_f.write("File changed!") + + if not snapshot: + with caplog.at_level(logging.ERROR): + _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) + assert "The SHA256 you specified did not match the calculated checksum." in caplog.text + return + else: + _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) + time.sleep(1) + _online_artifact = Artifact(_id_mapping[_artifact.id]) assert _online_artifact.name == _artifact.name _content = b"".join(_online_artifact.download_content()).decode("UTF-8") + # Since it was snapshotted, should be the state of the file before it was changed assert _content == f"Hello World! {_uuid}" _run.delete() _folder.delete() diff --git a/tests/unit/test_sender.py b/tests/unit/test_sender.py new file mode 100644 index 00000000..0c8555a0 --- /dev/null +++ b/tests/unit/test_sender.py @@ -0,0 +1,185 @@ +import contextlib +import json +import pytest +import time +import datetime +import uuid +from simvue.api.objects.run import RunBatchArgs +from simvue.sender import sender +from simvue.api.objects import Run, Metrics, Folder +from simvue.client import Client +from simvue.models import DATETIME_FORMAT +import logging +import pathlib +import requests + +@pytest.mark.parametrize("retry_failed_uploads", (True, False)) +@pytest.mark.parametrize("parallel", (True, False)) + +@pytest.mark.offline +def test_sender_exception_handling(offline_cache_setup, caplog, retry_failed_uploads, parallel): + # Create something which will produce an error when sent, eg a metric with invalid run ID + for i in range(5): + _metrics = Metrics.new( + run="invalid_run_id", + metrics=[ + { + "timestamp": datetime.datetime.now().strftime(DATETIME_FORMAT), + "time": 1, + "step": 1, + "values": {"x": 1, "y": 2}, + } + ], + offline=True + ) + _metrics.commit() + + with caplog.at_level(logging.ERROR): + sender(threading_threshold=1 if parallel else 10) + + assert "Error while committing 'Metrics'" in caplog.text + + # Wait, then try sending again + time.sleep(1) + caplog.clear() + + with caplog.at_level(logging.ERROR): + sender(retry_failed_uploads=retry_failed_uploads, threading_threshold=1 if parallel else 10) + + if retry_failed_uploads: + assert "Error while committing 'Metrics'" in caplog.text + else: + assert not caplog.text + + # Check files not deleted + _offline_metric_paths = list(pathlib.Path(offline_cache_setup.name).joinpath("metrics").iterdir()) + assert len(_offline_metric_paths) == 5 + # Check files have 'upload_failed: True' + for _metric_path in _offline_metric_paths: + with open(_metric_path, "r") as _file: + _metric = json.load(_file) + assert _metric.get("upload_failed") == True + +@pytest.mark.parametrize("parallel", (True, False)) +def test_sender_server_ids(offline_cache_setup, caplog, parallel): + # Create an offline run + _uuid: str = f"{uuid.uuid4()}".split("-")[0] + _path = f"/simvue_unit_testing/objects/folder/{_uuid}" + _folder = Folder.new(path=_path, offline=True) + _folder.commit() + + _offline_run_ids = [] + + for i in range(5): + _name = f"test_sender_server_ids-{_uuid}-{i}" + _run = Run.new(name=_name, folder=_path, offline=True) + _run.commit() + + _offline_run_ids.append(_run.id) + + # Create metric associated with offline run ID + _metrics = Metrics.new( + run=_run.id, + metrics=[ + { + "timestamp": datetime.datetime.now().strftime(DATETIME_FORMAT), + "time": 1, + "step": 1, + "values": {"x": i}, + } + ], + offline=True + ) + _metrics.commit() + + # Send both items + with caplog.at_level(logging.ERROR): + sender(threading_threshold=1 if parallel else 10) + + assert not caplog.text + + # Check server ID mapping correctly created + _online_runs = [] + for i, _offline_run_id in enumerate(_offline_run_ids): + _id_file = pathlib.Path(offline_cache_setup.name).joinpath("server_ids", f"{_offline_run_id}.txt") + assert _id_file.exists() + _online_id = _id_file.read_text() + + # Check correct ID is contained within file + _online_run = Run(identifier=_online_id) + _online_runs.append(_online_run) + assert _online_run.name == f"test_sender_server_ids-{_uuid}-{i}" + + # Check metric has been associated with correct online run + _run_metric = next(_online_run.metrics) + assert _run_metric[0] == 'x' + assert _run_metric[1]["count"] == 1 + assert _run_metric[1]["min"] == i + + # Create a new offline metric with offline run ID + _metrics = Metrics.new( + run=_offline_run_id, + metrics=[ + { + "timestamp": datetime.datetime.now().strftime(DATETIME_FORMAT), + "time": 2, + "step": 2, + "values": {"x": 2}, + } + ], + offline=True + ) + _metrics.commit() + + # Run sender again, check online ID is correctly loaded from file and substituted for offline ID + with caplog.at_level(logging.ERROR): + sender(threading_threshold=1 if parallel else 10) + + assert not caplog.text + + # Check metric uploaded correctly + for _online_run in _online_runs: + _online_run.refresh() + _run_metric = next(_online_run.metrics) + assert _run_metric[0] == 'x' + assert _run_metric[1]["count"] == 2 + + # Check all files for runs and metrics deleted once they were processed + assert len(list(pathlib.Path(offline_cache_setup.name).joinpath("runs").iterdir())) == 0 + assert len(list(pathlib.Path(offline_cache_setup.name).joinpath("metrics").iterdir())) == 0 + +@pytest.mark.parametrize("parallel", (True, False)) +def test_send_heartbeat(offline_cache_setup, parallel, mocker): + # Create an offline run + _uuid: str = f"{uuid.uuid4()}".split("-")[0] + _path = f"/simvue_unit_testing/objects/folder/{_uuid}" + _folder = Folder.new(path=_path, offline=True) + _folder.commit() + + _offline_runs = [] + + for i in range(5): + _name = f"test_sender_server_ids-{_uuid}-{i}" + _run = Run.new(name=_name, folder=_path, offline=True, heartbeat_timeout=1, status="running") + _run.commit() + + _offline_runs.append(_run) + + _id_mapping = sender(threading_threshold=1 if parallel else 10) + _online_runs = [Run(identifier=_id_mapping.get(_offline_run.id)) for _offline_run in _offline_runs] + assert all([_online_run.status == "running" for _online_run in _online_runs]) + + spy_put = mocker.spy(requests, "put") + + # Create heartbeat and send every 0.5s for 5s + for i in range(10): + time.sleep(0.5) + [_offline_run.send_heartbeat() for _offline_run in _offline_runs] + sender(threading_threshold=1 if parallel else 10) + + # Check requests.put() endpoint called 50 times - once for each of the 5 runs, on all 10 iterations + assert spy_put.call_count == 50 + + # Get online runs and check all running + [_online_run.refresh() for _online_run in _online_runs] + assert all([_online_run.status == "running" for _online_run in _online_runs]) \ No newline at end of file