Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions simvue/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def upload_cached_file(
obj_type: str,
file_path: pydantic.FilePath,
id_mapping: dict[str, str],
throw_exceptions: bool,
retry_failed_uploads: bool,
lock: threading.Lock,
) -> None:
Expand All @@ -71,6 +72,10 @@ def upload_cached_file(
The path to the cached file to upload
id_mapping : dict[str, str]
A mapping of offline to online object IDs
throw_exceptions : bool
Whether to throw exceptions, or just log them
retry_failed_uploads : bool
Whether to retry failed uploads or ignore them
lock : threading.Lock
A lock to prevent multiple threads accessing the id mapping directory at once
"""
Expand All @@ -83,7 +88,10 @@ def upload_cached_file(

try:
_instance_class = getattr(simvue.api.objects, _exact_type)
except AttributeError:
except AttributeError as error:
if throw_exceptions:
raise error

_logger.error(f"Attempt to initialise unknown type '{_exact_type}'")
_log_upload_failed(file_path)
return
Expand All @@ -109,11 +117,13 @@ def upload_cached_file(
_new_id = obj_for_upload.id

except Exception as error:
if "status 409" in error.args[0]:
if "status 409" in str(error):
return
if throw_exceptions:
raise error

_logger.error(
f"Error while committing '{_instance_class.__name__}': {error.args[0]}"
f"Error while committing '{_instance_class.__name__}': {str(error)}"
)
_log_upload_failed(file_path)
return
Expand Down Expand Up @@ -182,6 +192,7 @@ def sender(
max_workers: int = 5,
threading_threshold: int = 10,
objects_to_upload: list[str] = UPLOAD_ORDER,
throw_exceptions: bool = False,
retry_failed_uploads: bool = False,
) -> dict[str, str]:
"""Send data from a local cache directory to the Simvue server.
Expand All @@ -196,6 +207,8 @@ def sender(
The number of cached files above which threading will be used
objects_to_upload : list[str]
Types of objects to upload, by default uploads all types of objects present in cache
throw_exceptions : bool, optional
Whether to throw exceptions as they are encountered in the sender, default is False (exceptions will be logged)
retry_failed_uploads : bool, optional
Whether to retry sending objects which previously failed, by default False

Expand Down Expand Up @@ -238,6 +251,7 @@ def sender(
obj_type=_obj_type,
file_path=file_path,
id_mapping=_id_mapping,
throw_exceptions=throw_exceptions,
retry_failed_uploads=retry_failed_uploads,
lock=_lock,
)
Expand All @@ -251,11 +265,15 @@ def sender(
obj_type=_obj_type,
file_path=file_path,
id_mapping=_id_mapping,
throw_exceptions=throw_exceptions,
retry_failed_uploads=retry_failed_uploads,
lock=_lock,
),
_offline_files,
)
# This will raise any exceptions encountered during sending
for result in _results:
pass

# Send heartbeats
_headers: dict[str, str] = {
Expand Down
24 changes: 12 additions & 12 deletions tests/functional/test_run_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_run_with_emissions_offline(speedy_heartbeat, mock_co2_signal, create_pl
run_created.config(enable_emission_metrics=True)
time.sleep(5)
# Run should continue, but fail to log metrics until sender runs and creates file
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"])
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True)
_run = RunObject(identifier=id_mapping[run_created.id])
_metric_names = [item[0] for item in _run.metrics]
for _metric in ["emissions", "energy_consumed"]:
Expand All @@ -126,7 +126,7 @@ def test_run_with_emissions_offline(speedy_heartbeat, mock_co2_signal, create_pl
assert _delta_metric_name not in _metric_names
# Sender should now have made a local file, and the run should be able to use it to create emissions metrics
time.sleep(5)
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"])
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True)
_run.refresh()
_metric_names = [item[0] for item in _run.metrics]
client = sv_cl.Client()
Expand Down Expand Up @@ -318,7 +318,7 @@ def test_log_metrics_offline(
run.log_metrics(METRICS)

time.sleep(1)
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
time.sleep(1)

if metric_type == "tensor":
Expand Down Expand Up @@ -441,7 +441,7 @@ def test_visibility_offline(
retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"),
)
_id = run.id
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
run.close()
_retrieved_run = RunObject(identifier=_id_mapping.get(_id))

Expand Down Expand Up @@ -478,7 +478,7 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) -
run, _ = create_plain_run_offline
run_name = run.name
run.log_event(EVENT_MSG)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
client = sv_cl.Client()
attempts: int = 0

Expand All @@ -488,7 +488,7 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) -
not (event_data := client.get_events(client.get_run_id_from_name(run_name), count_limit=1))
) and attempts < 5:
time.sleep(1)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
attempts += 1
assert event_data[0].get("message", EVENT_MSG)

Expand All @@ -497,7 +497,7 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) -
@pytest.mark.offline
def test_offline_tags(create_plain_run_offline: tuple[sv_run.Run, dict]) -> None:
run, run_data = create_plain_run_offline
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
client = sv_cl.Client()

tags = client.get_tags()
Expand Down Expand Up @@ -557,7 +557,7 @@ def test_update_metadata_offline(
# Try updating an already defined piece of metadata
run.update_metadata({"a": 1})

sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)

client = sv_cl.Client()
run_info = client.get_run(client.get_run_id_from_name(run_name))
Expand Down Expand Up @@ -945,7 +945,7 @@ def test_save_file_offline(
"w",
) as out_f:
out_f.write("updated file!")
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
os.remove(out_name)
client = sv_cl.Client()
base_name = name or out_name.name
Expand Down Expand Up @@ -1031,7 +1031,7 @@ def test_update_tags_offline(

simvue_run.update_tags(["additional"])

sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)

client = sv_cl.Client()
run_data = client.get_run(client.get_run_id_from_name(run_name))
Expand Down Expand Up @@ -1358,7 +1358,7 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None:
)
run_id = run.id
if mode == "offline":
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
run_id = _id_mapping.get(run_id)

client = simvue.Client()
Expand All @@ -1372,7 +1372,7 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None:
run.log_event("Testing!")

if mode == "offline":
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)

_reconnected_run = client.get_run(run_id)
assert dict(_reconnected_run.metrics)["test_metric"]["last"] == 1
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_run_execute_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_monitor_processes(create_plain_run_offline: tuple[Run, dict]):
_run.add_process(f"process_1_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="Write-Output 'Hello World!'", executable="powershell")
_run.add_process(f"process_2_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="Get-ChildItem", executable="powershell")
_run.add_process(f"process_3_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="exit 0", executable="powershell")
sender(_run._sv_obj._local_staging_file.parents[1], 1, 10, ["folders", "runs", "alerts"])
sender(_run._sv_obj._local_staging_file.parents[1], 1, 10, ["folders", "runs", "alerts"], throw_exceptions=True)


@pytest.mark.executor
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_event_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_event_alert_creation_offline(offline_cache_setup) -> None:
assert _local_data.get("name") == f"events_alert_{_uuid}"
assert _local_data.get("notification") == "none"

_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

# Get online ID and retrieve alert
Expand Down Expand Up @@ -106,7 +106,7 @@ def test_event_alert_modification_offline(offline_cache_setup) -> None:
description=None
)
_alert.commit()
_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

# Get online ID and retrieve alert
Expand All @@ -130,7 +130,7 @@ def test_event_alert_modification_offline(offline_cache_setup) -> None:
_local_data = json.load(in_f)
assert _local_data.get("description") == "updated!"

sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

_online_alert.refresh()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_events_creation_offline(offline_cache_setup) -> None:
assert _local_data.get("events")[0].get("message") == "This is a test!"
assert _local_data.get("events")[0].get("timestamp") == _timestamp

_id_mapping = sender(_events._local_staging_file.parents[1], 1, 10, ["folders", "runs", "events"])
_id_mapping = sender(_events._local_staging_file.parents[1], 1, 10, ["folders", "runs", "events"], throw_exceptions=True)
time.sleep(1)

# Get online version of events
Expand Down
9 changes: 4 additions & 5 deletions tests/unit/test_file_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_file_artifact_creation_offline(offline_cache_setup, snapshot) -> None:
# If snapshot, check artifact definition file and a copy of the actual file exist in staging area
assert len(list(_artifact._local_staging_file.parent.iterdir())) == 2 if snapshot else 1

_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10)
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True)
time.sleep(1)

# Check file(s) deleted after upload
Expand Down Expand Up @@ -158,12 +158,11 @@ def test_file_artifact_creation_offline_updated(offline_cache_setup, caplog, sna
out_f.write("File changed!")

if not snapshot:
with caplog.at_level(logging.ERROR):
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10)
assert "The SHA256 you specified did not match the calculated checksum." in caplog.text
with pytest.raises(RuntimeError, match="The SHA256 you specified did not match the calculated checksum."):
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True)
return
else:
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10)
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True)
time.sleep(1)

_online_artifact = Artifact(_id_mapping[_artifact.id])
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_create_file_storage_offline(offline_cache_setup) -> None:
assert _local_data.get("is_enabled") == False
assert _local_data.get("is_default") == False

_id_mapping = sender(_storage._local_staging_file.parents[1], 1, 10, ["storage"])
_id_mapping = sender(_storage._local_staging_file.parents[1], 1, 10, ["storage"], throw_exceptions=True)
time.sleep(1)
_online_storage = FileStorage(_id_mapping.get(_storage.id))
assert _online_storage.name == _uuid
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_folder_creation_offline(offline_cache_setup) -> None:
assert _folder._local_staging_file.name.split(".")[0] == _folder.id
assert _local_data.get("path", None) == _path

sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"])
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True)
time.sleep(1)
client = Client()

Expand Down Expand Up @@ -96,7 +96,7 @@ def test_folder_modification_offline(offline_cache_setup) -> None:
_folder = Folder.new(path=_path, offline=True)
_folder.commit()

sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"])
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True)
time.sleep(1)

client = Client()
Expand All @@ -115,7 +115,7 @@ def test_folder_modification_offline(offline_cache_setup) -> None:
assert _local_data.get("description", None) == _description
assert _local_data.get("tags", None) == _tags

sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"])
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True)
time.sleep(1)

_folder_online.refresh()
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_grids.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_grid_creation_offline() -> None:

assert _local_data.get("runs", [None])[0] == [_run.id, "A"]
npt.assert_array_equal(numpy.array(_local_data.get("grid")), _grid_def)
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids"])
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids"], throw_exceptions=True)
time.sleep(1)
# Get online version of grid
_online_grid = Grid(_id_mapping.get(_grid.id))
Expand Down Expand Up @@ -184,7 +184,7 @@ def test_grid_metrics_creation_offline() -> None:
_metrics.commit()
_run.status = "completed"
_run.commit()
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids", "grid_metrics"])
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids", "grid_metrics"], throw_exceptions=True)
time.sleep(1)
# Online metrics
assert list(GridMetrics.get(runs=[_id_mapping[_run.id]], metrics=["A"], step=_step))
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_metric_range_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_metric_range_alert_creation_offline(offline_cache_setup) -> None:
assert _local_data.get("name") == f"metrics_range_alert_{_uuid}"
assert _local_data.get("notification") == "none"
assert _local_data.get("alert").get("range_low") == 10
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

# Get online ID and retrieve alert
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_metric_range_alert_modification_offline(offline_cache_setup) -> None:
offline=True
)
_alert.commit()
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

# Get online ID and retrieve alert
Expand All @@ -149,7 +149,7 @@ def test_metric_range_alert_modification_offline(offline_cache_setup) -> None:
_local_data = json.load(in_f)
assert _local_data.get("description") == "updated!"

sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

_online_alert.refresh()
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_metric_threshold_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_metric_threshold_alert_creation_offline(offline_cache_setup) -> None:
assert _local_data.get("notification") == "none"
assert _local_data.get("alert").get("threshold") == 10

sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

# Get online ID and retrieve alert
Expand Down Expand Up @@ -123,7 +123,7 @@ def test_metric_threshold_alert_modification_offline(offline_cache_setup) -> Non
)
_alert.commit()

sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

# Get online ID and retrieve alert
Expand All @@ -149,7 +149,7 @@ def test_metric_threshold_alert_modification_offline(offline_cache_setup) -> Non
_local_data = json.load(in_f)
assert _local_data.get("description") == "updated!"

sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
time.sleep(1)

_online_alert.refresh()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_metrics_creation_offline(offline_cache_setup) -> None:
assert _local_data.get("metrics")[0].get("step") == _step
assert _local_data.get("metrics")[0].get("time") == _time

_id_mapping = sender(_metrics._local_staging_file.parents[1], 1, 10, ["folders", "runs", "metrics"])
_id_mapping = sender(_metrics._local_staging_file.parents[1], 1, 10, ["folders", "runs", "metrics"], throw_exceptions=True)
time.sleep(1)

# Get online version of metrics
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_object_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_object_artifact_creation_offline(offline_cache_setup) -> None:
assert _local_data.get("mime_type") == "application/vnd.simvue.numpy.v1"
assert _local_data.get("runs") == {_run.id: "input"}

_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10)
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True)
time.sleep(1)

_online_artifact = Artifact(_id_mapping.get(_artifact.id))
Expand Down
Loading