Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions simvue/api/objects/artifact/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import pydantic
import os
import pathlib
import shutil
from simvue.config.user import SimvueConfiguration
from datetime import datetime
from simvue.models import NAME_REGEX
from simvue.utilities import get_mimetype_for_file, get_mimetypes, calculate_sha256

Expand Down Expand Up @@ -39,6 +42,7 @@ def new(
metadata: dict[str, typing.Any] | None,
upload_timeout: int | None = None,
offline: bool = False,
snapshot: bool = False,
**kwargs,
) -> Self:
"""Create a new artifact either locally or on the server
Expand All @@ -61,6 +65,8 @@ def new(
specify the timeout in seconds for upload
offline : bool, optional
whether to define this artifact locally, default is False
snapshot : bool, optional
whether to create a snapshot of this file before uploading it, default is False

"""
_mime_type = mime_type or get_mimetype_for_file(file_path)
Expand All @@ -73,6 +79,19 @@ def new(
_file_checksum = kwargs.pop("checksum")
else:
file_path = pathlib.Path(file_path)
if snapshot:
_user_config = SimvueConfiguration.fetch()

_local_staging_dir: pathlib.Path = _user_config.offline.cache.joinpath(
"artifacts"
)
_local_staging_dir.mkdir(parents=True, exist_ok=True)
_local_staging_file = _local_staging_dir.joinpath(
f"{file_path.stem}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S_%f')[:-3]}.file"
)
shutil.copy(file_path, _local_staging_file)
file_path = _local_staging_file

_file_size = file_path.stat().st_size
_file_orig_path = file_path.expanduser().absolute()
_file_checksum = calculate_sha256(f"{file_path}", is_file=True)
Expand Down Expand Up @@ -105,4 +124,8 @@ def new(
with open(_file_orig_path, "rb") as out_f:
_artifact._upload(file=out_f, timeout=upload_timeout, file_size=_file_size)

# If snapshot created, delete it after uploading
if pathlib.Path(_file_orig_path).parent == _artifact._local_staging_file.parent:
pathlib.Path(_file_orig_path).unlink()

return _artifact
4 changes: 4 additions & 0 deletions simvue/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,7 @@ def save_file(
category: typing.Literal["input", "output", "code"],
file_type: str | None = None,
preserve_path: bool = False,
snapshot: bool = False,
name: typing.Optional[
typing.Annotated[str, pydantic.Field(pattern=NAME_REGEX)]
] = None,
Expand All @@ -1652,6 +1653,8 @@ def save_file(
the MIME file type else this is deduced, by default None
preserve_path : bool, optional
whether to preserve the path during storage, by default False
snapshot : bool, optional
whether to take a snapshot of the file before uploading, by default False
name : str, optional
name to associate with this file, by default None
metadata : str | None, optional
Expand Down Expand Up @@ -1686,6 +1689,7 @@ def save_file(
offline=self._user_config.run.mode == "offline",
mime_type=file_type,
metadata=metadata,
snapshot=snapshot,
)
_artifact.attach_to_run(self.id, category)
except (ValueError, RuntimeError) as e:
Expand Down
82 changes: 60 additions & 22 deletions simvue/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import requests
import psutil
from simvue.config.user import SimvueConfiguration

import simvue.api.objects
from simvue.api.objects.artifact.base import ArtifactBase
from simvue.eco.emissions_monitor import CO2Monitor
Expand All @@ -37,13 +36,29 @@
_logger = logging.getLogger(__name__)


def _log_upload_failed(file_path: pydantic.FilePath) -> None:
"""Record that an object failed to upload in the object offline cache file.

Parameters
----------
file_path : pydantic.FilePath
The path to the offline cache file for the object
"""
with file_path.open("r") as file:
_data = json.load(file)
_data["upload_failed"] = True
with file_path.open("w") as file:
json.dump(_data, file)


def upload_cached_file(
cache_dir: pydantic.DirectoryPath,
obj_type: str,
file_path: pydantic.FilePath,
id_mapping: dict[str, str],
retry_failed_uploads: bool,
lock: threading.Lock,
):
) -> None:
"""Upload data stored in a cached file to the Simvue server.

Parameters
Expand All @@ -62,41 +77,53 @@ def upload_cached_file(
_current_id = file_path.name.split(".")[0]
_data = json.load(file_path.open())
_exact_type: str = _data.pop("obj_type")

if _data.pop("upload_failed", False) and not retry_failed_uploads:
return

try:
_instance_class = getattr(simvue.api.objects, _exact_type)
except AttributeError as e:
raise RuntimeError(f"Attempt to initialise unknown type '{_exact_type}'") from e
except AttributeError:
_logger.error(f"Attempt to initialise unknown type '{_exact_type}'")
_log_upload_failed(file_path)
return

# If it is an ObjectArtifact, need to load the object as bytes from a different file
if issubclass(_instance_class, simvue.api.objects.ObjectArtifact):
with open(file_path.parent.joinpath(f"{_current_id}.object"), "rb") as file:
_data["serialized"] = file.read()
try:
# We want to reconnect if there is an online ID stored for this file
if _online_id := id_mapping.get(_current_id):
obj_for_upload = _instance_class(
identifier=_online_id, _read_only=False, **_data
)
else:
obj_for_upload = _instance_class.new(**_data)

# We want to reconnect if there is an online ID stored for this file
if _online_id := id_mapping.get(_current_id):
obj_for_upload = _instance_class(
identifier=_online_id, _read_only=False, **_data
)
else:
obj_for_upload = _instance_class.new(**_data)

with lock:
obj_for_upload.on_reconnect(id_mapping)
with lock:
obj_for_upload.on_reconnect(id_mapping)

try:
if not issubclass(_instance_class, ArtifactBase):
obj_for_upload.commit()
_new_id = obj_for_upload.id
except RuntimeError as error:

except Exception as error:
if "status 409" in error.args[0]:
return
raise error
if not _new_id:
raise RuntimeError(
f"Object of type '{obj_for_upload.__class__.__name__}' has no identifier"

_logger.error(
f"Error while committing '{_instance_class.__name__}': {error.args[0]}"
)
_log_upload_failed(file_path)
return
if not _new_id:
_logger.error(f"Object of type '{_instance_class.__name__}' has no identifier")
_log_upload_failed(file_path)
return

_logger.info(
f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {obj_for_upload.__class__.__name__} '{_new_id}'"
f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {_instance_class.__name__} '{_new_id}'"
)

file_path.unlink(missing_ok=True)
Expand Down Expand Up @@ -155,6 +182,7 @@ def sender(
max_workers: int = 5,
threading_threshold: int = 10,
objects_to_upload: list[str] = UPLOAD_ORDER,
retry_failed_uploads: bool = False,
) -> dict[str, str]:
"""Send data from a local cache directory to the Simvue server.

Expand All @@ -168,6 +196,8 @@ def sender(
The number of cached files above which threading will be used
objects_to_upload : list[str]
Types of objects to upload, by default uploads all types of objects present in cache
retry_failed_uploads : bool, optional
Whether to retry sending objects which previously failed, by default False

Returns
-------
Expand Down Expand Up @@ -203,7 +233,14 @@ def sender(
_offline_files = _all_offline_files[_obj_type]
if len(_offline_files) < threading_threshold:
for file_path in _offline_files:
upload_cached_file(cache_dir, _obj_type, file_path, _id_mapping, _lock)
upload_cached_file(
cache_dir=cache_dir,
obj_type=_obj_type,
file_path=file_path,
id_mapping=_id_mapping,
retry_failed_uploads=retry_failed_uploads,
lock=_lock,
)
else:
with ThreadPoolExecutor(
max_workers=max_workers, thread_name_prefix="sender_session_upload"
Expand All @@ -214,6 +251,7 @@ def sender(
obj_type=_obj_type,
file_path=file_path,
id_mapping=_id_mapping,
retry_failed_uploads=retry_failed_uploads,
lock=_lock,
),
_offline_files,
Expand Down
24 changes: 22 additions & 2 deletions tests/functional/test_run_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,9 @@ def test_set_folder_details(request: pytest.FixtureRequest) -> None:


@pytest.mark.run
@pytest.mark.parametrize(
"snapshot", (True, False)
)
@pytest.mark.parametrize(
"valid_mimetype,preserve_path,name,allow_pickle,empty_file,category",
[
Expand All @@ -827,6 +830,7 @@ def test_save_file_online(
allow_pickle: bool,
empty_file: bool,
category: typing.Literal["input", "output", "code"],
snapshot: bool,
capfd,
request,
) -> None:
Expand Down Expand Up @@ -860,6 +864,7 @@ def test_save_file_online(
file_type=file_type,
preserve_path=preserve_path,
name=name,
snapshot=snapshot
)
else:
with pytest.raises(RuntimeError):
Expand Down Expand Up @@ -891,6 +896,9 @@ def test_save_file_online(

@pytest.mark.run
@pytest.mark.offline
@pytest.mark.parametrize(
"snapshot", (True, False)
)
@pytest.mark.parametrize(
"preserve_path,name,allow_pickle,empty_file,category",
[
Expand All @@ -908,6 +916,7 @@ def test_save_file_offline(
name: str | None,
allow_pickle: bool,
empty_file: bool,
snapshot: bool,
category: typing.Literal["input", "output", "code"],
capfd,
) -> None:
Expand All @@ -927,7 +936,15 @@ def test_save_file_offline(
file_type=file_type,
preserve_path=preserve_path,
name=name,
snapshot=snapshot
)
# if snapshotting, check file can be updated, but previous contents set
if snapshot:
with open(
(out_name := pathlib.Path(tempd).joinpath("test_file.txt")),
"w",
) as out_f:
out_f.write("updated file!")
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
os.remove(out_name)
client = sv_cl.Client()
Expand All @@ -944,8 +961,11 @@ def test_save_file_offline(
name=f"{name or stored_name}",
output_dir=tempd,
)
assert out_loc.joinpath(name or out_name.name).exists()

assert out_file.exists()
with open(
out_file, "r") as out_f:
content = out_f.read()
assert content == "test data entry"

@pytest.mark.run
def test_update_tags_running(
Expand Down
Loading