Skip to content

Commit 5ad9989

Browse files
authored
Merge pull request #873 from simvue-io/wk9874/artifact_snapshot
Add snapshot functionality to Artifacts
2 parents f1c9820 + cd21ba6 commit 5ad9989

File tree

6 files changed

+378
-28
lines changed

6 files changed

+378
-28
lines changed

simvue/api/objects/artifact/file.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import pydantic
55
import os
66
import pathlib
7+
import shutil
8+
from simvue.config.user import SimvueConfiguration
9+
from datetime import datetime
710
from simvue.models import NAME_REGEX
811
from simvue.utilities import get_mimetype_for_file, get_mimetypes, calculate_sha256
912

@@ -39,6 +42,7 @@ def new(
3942
metadata: dict[str, typing.Any] | None,
4043
upload_timeout: int | None = None,
4144
offline: bool = False,
45+
snapshot: bool = False,
4246
**kwargs,
4347
) -> Self:
4448
"""Create a new artifact either locally or on the server
@@ -61,6 +65,8 @@ def new(
6165
specify the timeout in seconds for upload
6266
offline : bool, optional
6367
whether to define this artifact locally, default is False
68+
snapshot : bool, optional
69+
whether to create a snapshot of this file before uploading it, default is False
6470
6571
"""
6672
_mime_type = mime_type or get_mimetype_for_file(file_path)
@@ -73,6 +79,19 @@ def new(
7379
_file_checksum = kwargs.pop("checksum")
7480
else:
7581
file_path = pathlib.Path(file_path)
82+
if snapshot:
83+
_user_config = SimvueConfiguration.fetch()
84+
85+
_local_staging_dir: pathlib.Path = _user_config.offline.cache.joinpath(
86+
"artifacts"
87+
)
88+
_local_staging_dir.mkdir(parents=True, exist_ok=True)
89+
_local_staging_file = _local_staging_dir.joinpath(
90+
f"{file_path.stem}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S_%f')[:-3]}.file"
91+
)
92+
shutil.copy(file_path, _local_staging_file)
93+
file_path = _local_staging_file
94+
7695
_file_size = file_path.stat().st_size
7796
_file_orig_path = file_path.expanduser().absolute()
7897
_file_checksum = calculate_sha256(f"{file_path}", is_file=True)
@@ -105,4 +124,8 @@ def new(
105124
with open(_file_orig_path, "rb") as out_f:
106125
_artifact._upload(file=out_f, timeout=upload_timeout, file_size=_file_size)
107126

127+
# If snapshot created, delete it after uploading
128+
if pathlib.Path(_file_orig_path).parent == _artifact._local_staging_file.parent:
129+
pathlib.Path(_file_orig_path).unlink()
130+
108131
return _artifact

simvue/run.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1632,6 +1632,7 @@ def save_file(
16321632
category: typing.Literal["input", "output", "code"],
16331633
file_type: str | None = None,
16341634
preserve_path: bool = False,
1635+
snapshot: bool = False,
16351636
name: typing.Optional[
16361637
typing.Annotated[str, pydantic.Field(pattern=NAME_REGEX)]
16371638
] = None,
@@ -1652,6 +1653,8 @@ def save_file(
16521653
the MIME file type else this is deduced, by default None
16531654
preserve_path : bool, optional
16541655
whether to preserve the path during storage, by default False
1656+
snapshot : bool, optional
1657+
whether to take a snapshot of the file before uploading, by default False
16551658
name : str, optional
16561659
name to associate with this file, by default None
16571660
metadata : str | None, optional
@@ -1686,6 +1689,7 @@ def save_file(
16861689
offline=self._user_config.run.mode == "offline",
16871690
mime_type=file_type,
16881691
metadata=metadata,
1692+
snapshot=snapshot,
16891693
)
16901694
_artifact.attach_to_run(self.id, category)
16911695
except (ValueError, RuntimeError) as e:

simvue/sender.py

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import requests
1414
import psutil
1515
from simvue.config.user import SimvueConfiguration
16-
1716
import simvue.api.objects
1817
from simvue.api.objects.artifact.base import ArtifactBase
1918
from simvue.eco.emissions_monitor import CO2Monitor
@@ -37,13 +36,29 @@
3736
_logger = logging.getLogger(__name__)
3837

3938

39+
def _log_upload_failed(file_path: pydantic.FilePath) -> None:
40+
"""Record that an object failed to upload in the object offline cache file.
41+
42+
Parameters
43+
----------
44+
file_path : pydantic.FilePath
45+
The path to the offline cache file for the object
46+
"""
47+
with file_path.open("r") as file:
48+
_data = json.load(file)
49+
_data["upload_failed"] = True
50+
with file_path.open("w") as file:
51+
json.dump(_data, file)
52+
53+
4054
def upload_cached_file(
4155
cache_dir: pydantic.DirectoryPath,
4256
obj_type: str,
4357
file_path: pydantic.FilePath,
4458
id_mapping: dict[str, str],
59+
retry_failed_uploads: bool,
4560
lock: threading.Lock,
46-
):
61+
) -> None:
4762
"""Upload data stored in a cached file to the Simvue server.
4863
4964
Parameters
@@ -62,41 +77,53 @@ def upload_cached_file(
6277
_current_id = file_path.name.split(".")[0]
6378
_data = json.load(file_path.open())
6479
_exact_type: str = _data.pop("obj_type")
80+
81+
if _data.pop("upload_failed", False) and not retry_failed_uploads:
82+
return
83+
6584
try:
6685
_instance_class = getattr(simvue.api.objects, _exact_type)
67-
except AttributeError as e:
68-
raise RuntimeError(f"Attempt to initialise unknown type '{_exact_type}'") from e
86+
except AttributeError:
87+
_logger.error(f"Attempt to initialise unknown type '{_exact_type}'")
88+
_log_upload_failed(file_path)
89+
return
6990

7091
# If it is an ObjectArtifact, need to load the object as bytes from a different file
7192
if issubclass(_instance_class, simvue.api.objects.ObjectArtifact):
7293
with open(file_path.parent.joinpath(f"{_current_id}.object"), "rb") as file:
7394
_data["serialized"] = file.read()
95+
try:
96+
# We want to reconnect if there is an online ID stored for this file
97+
if _online_id := id_mapping.get(_current_id):
98+
obj_for_upload = _instance_class(
99+
identifier=_online_id, _read_only=False, **_data
100+
)
101+
else:
102+
obj_for_upload = _instance_class.new(**_data)
74103

75-
# We want to reconnect if there is an online ID stored for this file
76-
if _online_id := id_mapping.get(_current_id):
77-
obj_for_upload = _instance_class(
78-
identifier=_online_id, _read_only=False, **_data
79-
)
80-
else:
81-
obj_for_upload = _instance_class.new(**_data)
82-
83-
with lock:
84-
obj_for_upload.on_reconnect(id_mapping)
104+
with lock:
105+
obj_for_upload.on_reconnect(id_mapping)
85106

86-
try:
87107
if not issubclass(_instance_class, ArtifactBase):
88108
obj_for_upload.commit()
89109
_new_id = obj_for_upload.id
90-
except RuntimeError as error:
110+
111+
except Exception as error:
91112
if "status 409" in error.args[0]:
92113
return
93-
raise error
94-
if not _new_id:
95-
raise RuntimeError(
96-
f"Object of type '{obj_for_upload.__class__.__name__}' has no identifier"
114+
115+
_logger.error(
116+
f"Error while committing '{_instance_class.__name__}': {error.args[0]}"
97117
)
118+
_log_upload_failed(file_path)
119+
return
120+
if not _new_id:
121+
_logger.error(f"Object of type '{_instance_class.__name__}' has no identifier")
122+
_log_upload_failed(file_path)
123+
return
124+
98125
_logger.info(
99-
f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {obj_for_upload.__class__.__name__} '{_new_id}'"
126+
f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {_instance_class.__name__} '{_new_id}'"
100127
)
101128

102129
file_path.unlink(missing_ok=True)
@@ -155,6 +182,7 @@ def sender(
155182
max_workers: int = 5,
156183
threading_threshold: int = 10,
157184
objects_to_upload: list[str] = UPLOAD_ORDER,
185+
retry_failed_uploads: bool = False,
158186
) -> dict[str, str]:
159187
"""Send data from a local cache directory to the Simvue server.
160188
@@ -168,6 +196,8 @@ def sender(
168196
The number of cached files above which threading will be used
169197
objects_to_upload : list[str]
170198
Types of objects to upload, by default uploads all types of objects present in cache
199+
retry_failed_uploads : bool, optional
200+
Whether to retry sending objects which previously failed, by default False
171201
172202
Returns
173203
-------
@@ -203,7 +233,14 @@ def sender(
203233
_offline_files = _all_offline_files[_obj_type]
204234
if len(_offline_files) < threading_threshold:
205235
for file_path in _offline_files:
206-
upload_cached_file(cache_dir, _obj_type, file_path, _id_mapping, _lock)
236+
upload_cached_file(
237+
cache_dir=cache_dir,
238+
obj_type=_obj_type,
239+
file_path=file_path,
240+
id_mapping=_id_mapping,
241+
retry_failed_uploads=retry_failed_uploads,
242+
lock=_lock,
243+
)
207244
else:
208245
with ThreadPoolExecutor(
209246
max_workers=max_workers, thread_name_prefix="sender_session_upload"
@@ -214,6 +251,7 @@ def sender(
214251
obj_type=_obj_type,
215252
file_path=file_path,
216253
id_mapping=_id_mapping,
254+
retry_failed_uploads=retry_failed_uploads,
217255
lock=_lock,
218256
),
219257
_offline_files,

tests/functional/test_run_class.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,9 @@ def test_set_folder_details(request: pytest.FixtureRequest) -> None:
809809

810810

811811
@pytest.mark.run
812+
@pytest.mark.parametrize(
813+
"snapshot", (True, False)
814+
)
812815
@pytest.mark.parametrize(
813816
"valid_mimetype,preserve_path,name,allow_pickle,empty_file,category",
814817
[
@@ -827,6 +830,7 @@ def test_save_file_online(
827830
allow_pickle: bool,
828831
empty_file: bool,
829832
category: typing.Literal["input", "output", "code"],
833+
snapshot: bool,
830834
capfd,
831835
request,
832836
) -> None:
@@ -860,6 +864,7 @@ def test_save_file_online(
860864
file_type=file_type,
861865
preserve_path=preserve_path,
862866
name=name,
867+
snapshot=snapshot
863868
)
864869
else:
865870
with pytest.raises(RuntimeError):
@@ -891,6 +896,9 @@ def test_save_file_online(
891896

892897
@pytest.mark.run
893898
@pytest.mark.offline
899+
@pytest.mark.parametrize(
900+
"snapshot", (True, False)
901+
)
894902
@pytest.mark.parametrize(
895903
"preserve_path,name,allow_pickle,empty_file,category",
896904
[
@@ -908,6 +916,7 @@ def test_save_file_offline(
908916
name: str | None,
909917
allow_pickle: bool,
910918
empty_file: bool,
919+
snapshot: bool,
911920
category: typing.Literal["input", "output", "code"],
912921
capfd,
913922
) -> None:
@@ -927,7 +936,15 @@ def test_save_file_offline(
927936
file_type=file_type,
928937
preserve_path=preserve_path,
929938
name=name,
939+
snapshot=snapshot
930940
)
941+
# if snapshotting, check file can be updated, but previous contents set
942+
if snapshot:
943+
with open(
944+
(out_name := pathlib.Path(tempd).joinpath("test_file.txt")),
945+
"w",
946+
) as out_f:
947+
out_f.write("updated file!")
931948
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
932949
os.remove(out_name)
933950
client = sv_cl.Client()
@@ -944,8 +961,11 @@ def test_save_file_offline(
944961
name=f"{name or stored_name}",
945962
output_dir=tempd,
946963
)
947-
assert out_loc.joinpath(name or out_name.name).exists()
948-
964+
assert out_file.exists()
965+
with open(
966+
out_file, "r") as out_f:
967+
content = out_f.read()
968+
assert content == "test data entry"
949969

950970
@pytest.mark.run
951971
def test_update_tags_running(

0 commit comments

Comments
 (0)