Skip to content

Commit 806ba68

Browse files
authored
Merge pull request #878 from simvue-io/wk9874/sender_exception_handling_fixz
Wk9874/sender exception handling fixz
2 parents d1aa7ec + 2fee8fd commit 806ba68

20 files changed

+77
-54
lines changed

simvue/sender.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def upload_cached_file(
5656
obj_type: str,
5757
file_path: pydantic.FilePath,
5858
id_mapping: dict[str, str],
59+
throw_exceptions: bool,
5960
retry_failed_uploads: bool,
6061
lock: threading.Lock,
6162
) -> None:
@@ -71,6 +72,10 @@ def upload_cached_file(
7172
The path to the cached file to upload
7273
id_mapping : dict[str, str]
7374
A mapping of offline to online object IDs
75+
throw_exceptions : bool
76+
Whether to throw exceptions, or just log them
77+
retry_failed_uploads : bool
78+
Whether to retry failed uploads or ignore them
7479
lock : threading.Lock
7580
A lock to prevent multiple threads accessing the id mapping directory at once
7681
"""
@@ -83,7 +88,10 @@ def upload_cached_file(
8388

8489
try:
8590
_instance_class = getattr(simvue.api.objects, _exact_type)
86-
except AttributeError:
91+
except AttributeError as error:
92+
if throw_exceptions:
93+
raise error
94+
8795
_logger.error(f"Attempt to initialise unknown type '{_exact_type}'")
8896
_log_upload_failed(file_path)
8997
return
@@ -109,11 +117,13 @@ def upload_cached_file(
109117
_new_id = obj_for_upload.id
110118

111119
except Exception as error:
112-
if "status 409" in error.args[0]:
120+
if "status 409" in str(error):
113121
return
122+
if throw_exceptions:
123+
raise error
114124

115125
_logger.error(
116-
f"Error while committing '{_instance_class.__name__}': {error.args[0]}"
126+
f"Error while committing '{_instance_class.__name__}': {str(error)}"
117127
)
118128
_log_upload_failed(file_path)
119129
return
@@ -182,6 +192,7 @@ def sender(
182192
max_workers: int = 5,
183193
threading_threshold: int = 10,
184194
objects_to_upload: list[str] = UPLOAD_ORDER,
195+
throw_exceptions: bool = False,
185196
retry_failed_uploads: bool = False,
186197
) -> dict[str, str]:
187198
"""Send data from a local cache directory to the Simvue server.
@@ -196,6 +207,8 @@ def sender(
196207
The number of cached files above which threading will be used
197208
objects_to_upload : list[str]
198209
Types of objects to upload, by default uploads all types of objects present in cache
210+
throw_exceptions : bool, optional
211+
Whether to throw exceptions as they are encountered in the sender, default is False (exceptions will be logged)
199212
retry_failed_uploads : bool, optional
200213
Whether to retry sending objects which previously failed, by default False
201214
@@ -238,6 +251,7 @@ def sender(
238251
obj_type=_obj_type,
239252
file_path=file_path,
240253
id_mapping=_id_mapping,
254+
throw_exceptions=throw_exceptions,
241255
retry_failed_uploads=retry_failed_uploads,
242256
lock=_lock,
243257
)
@@ -251,11 +265,15 @@ def sender(
251265
obj_type=_obj_type,
252266
file_path=file_path,
253267
id_mapping=_id_mapping,
268+
throw_exceptions=throw_exceptions,
254269
retry_failed_uploads=retry_failed_uploads,
255270
lock=_lock,
256271
),
257272
_offline_files,
258273
)
274+
# This will raise any exceptions encountered during sending
275+
for result in _results:
276+
pass
259277

260278
# Send heartbeats
261279
_headers: dict[str, str] = {

tests/functional/test_run_class.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def test_run_with_emissions_offline(speedy_heartbeat, mock_co2_signal, create_pl
116116
run_created.config(enable_emission_metrics=True)
117117
time.sleep(5)
118118
# Run should continue, but fail to log metrics until sender runs and creates file
119-
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"])
119+
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True)
120120
_run = RunObject(identifier=id_mapping[run_created.id])
121121
_metric_names = [item[0] for item in _run.metrics]
122122
for _metric in ["emissions", "energy_consumed"]:
@@ -126,7 +126,7 @@ def test_run_with_emissions_offline(speedy_heartbeat, mock_co2_signal, create_pl
126126
assert _delta_metric_name not in _metric_names
127127
# Sender should now have made a local file, and the run should be able to use it to create emissions metrics
128128
time.sleep(5)
129-
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"])
129+
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], throw_exceptions=True)
130130
_run.refresh()
131131
_metric_names = [item[0] for item in _run.metrics]
132132
client = sv_cl.Client()
@@ -318,7 +318,7 @@ def test_log_metrics_offline(
318318
run.log_metrics(METRICS)
319319

320320
time.sleep(1)
321-
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
321+
id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
322322
time.sleep(1)
323323

324324
if metric_type == "tensor":
@@ -441,7 +441,7 @@ def test_visibility_offline(
441441
retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"),
442442
)
443443
_id = run.id
444-
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
444+
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
445445
run.close()
446446
_retrieved_run = RunObject(identifier=_id_mapping.get(_id))
447447

@@ -478,7 +478,7 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) -
478478
run, _ = create_plain_run_offline
479479
run_name = run.name
480480
run.log_event(EVENT_MSG)
481-
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
481+
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
482482
client = sv_cl.Client()
483483
attempts: int = 0
484484

@@ -488,7 +488,7 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) -
488488
not (event_data := client.get_events(client.get_run_id_from_name(run_name), count_limit=1))
489489
) and attempts < 5:
490490
time.sleep(1)
491-
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
491+
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
492492
attempts += 1
493493
assert event_data[0].get("message", EVENT_MSG)
494494

@@ -497,7 +497,7 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) -
497497
@pytest.mark.offline
498498
def test_offline_tags(create_plain_run_offline: tuple[sv_run.Run, dict]) -> None:
499499
run, run_data = create_plain_run_offline
500-
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
500+
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
501501
client = sv_cl.Client()
502502

503503
tags = client.get_tags()
@@ -557,7 +557,7 @@ def test_update_metadata_offline(
557557
# Try updating an already defined piece of metadata
558558
run.update_metadata({"a": 1})
559559

560-
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
560+
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
561561

562562
client = sv_cl.Client()
563563
run_info = client.get_run(client.get_run_id_from_name(run_name))
@@ -945,7 +945,7 @@ def test_save_file_offline(
945945
"w",
946946
) as out_f:
947947
out_f.write("updated file!")
948-
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
948+
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
949949
os.remove(out_name)
950950
client = sv_cl.Client()
951951
base_name = name or out_name.name
@@ -1031,7 +1031,7 @@ def test_update_tags_offline(
10311031

10321032
simvue_run.update_tags(["additional"])
10331033

1034-
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
1034+
sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
10351035

10361036
client = sv_cl.Client()
10371037
run_data = client.get_run(client.get_run_id_from_name(run_name))
@@ -1358,7 +1358,7 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None:
13581358
)
13591359
run_id = run.id
13601360
if mode == "offline":
1361-
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
1361+
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
13621362
run_id = _id_mapping.get(run_id)
13631363

13641364
client = simvue.Client()
@@ -1372,7 +1372,7 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None:
13721372
run.log_event("Testing!")
13731373

13741374
if mode == "offline":
1375-
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10)
1375+
_id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10, throw_exceptions=True)
13761376

13771377
_reconnected_run = client.get_run(run_id)
13781378
assert dict(_reconnected_run.metrics)["test_metric"]["last"] == 1

tests/functional/test_run_execute_process.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def test_monitor_processes(create_plain_run_offline: tuple[Run, dict]):
2424
_run.add_process(f"process_1_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="Write-Output 'Hello World!'", executable="powershell")
2525
_run.add_process(f"process_2_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="Get-ChildItem", executable="powershell")
2626
_run.add_process(f"process_3_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="exit 0", executable="powershell")
27-
sender(_run._sv_obj._local_staging_file.parents[1], 1, 10, ["folders", "runs", "alerts"])
27+
sender(_run._sv_obj._local_staging_file.parents[1], 1, 10, ["folders", "runs", "alerts"], throw_exceptions=True)
2828

2929

3030
@pytest.mark.executor

tests/unit/test_event_alert.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_event_alert_creation_offline(offline_cache_setup) -> None:
5656
assert _local_data.get("name") == f"events_alert_{_uuid}"
5757
assert _local_data.get("notification") == "none"
5858

59-
_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
59+
_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
6060
time.sleep(1)
6161

6262
# Get online ID and retrieve alert
@@ -106,7 +106,7 @@ def test_event_alert_modification_offline(offline_cache_setup) -> None:
106106
description=None
107107
)
108108
_alert.commit()
109-
_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
109+
_id_mapping = sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
110110
time.sleep(1)
111111

112112
# Get online ID and retrieve alert
@@ -130,7 +130,7 @@ def test_event_alert_modification_offline(offline_cache_setup) -> None:
130130
_local_data = json.load(in_f)
131131
assert _local_data.get("description") == "updated!"
132132

133-
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
133+
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
134134
time.sleep(1)
135135

136136
_online_alert.refresh()

tests/unit/test_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_events_creation_offline(offline_cache_setup) -> None:
5656
assert _local_data.get("events")[0].get("message") == "This is a test!"
5757
assert _local_data.get("events")[0].get("timestamp") == _timestamp
5858

59-
_id_mapping = sender(_events._local_staging_file.parents[1], 1, 10, ["folders", "runs", "events"])
59+
_id_mapping = sender(_events._local_staging_file.parents[1], 1, 10, ["folders", "runs", "events"], throw_exceptions=True)
6060
time.sleep(1)
6161

6262
# Get online version of events

tests/unit/test_file_artifact.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def test_file_artifact_creation_offline(offline_cache_setup, snapshot) -> None:
103103
# If snapshot, check artifact definition file and a copy of the actual file exist in staging area
104104
assert len(list(_artifact._local_staging_file.parent.iterdir())) == 2 if snapshot else 1
105105

106-
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10)
106+
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True)
107107
time.sleep(1)
108108

109109
# Check file(s) deleted after upload
@@ -158,12 +158,11 @@ def test_file_artifact_creation_offline_updated(offline_cache_setup, caplog, sna
158158
out_f.write("File changed!")
159159

160160
if not snapshot:
161-
with caplog.at_level(logging.ERROR):
162-
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10)
163-
assert "The SHA256 you specified did not match the calculated checksum." in caplog.text
161+
with pytest.raises(RuntimeError, match="The SHA256 you specified did not match the calculated checksum."):
162+
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True)
164163
return
165164
else:
166-
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10)
165+
_id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10, throw_exceptions=True)
167166
time.sleep(1)
168167

169168
_online_artifact = Artifact(_id_mapping[_artifact.id])

tests/unit/test_file_storage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def test_create_file_storage_offline(offline_cache_setup) -> None:
3838
assert _local_data.get("is_enabled") == False
3939
assert _local_data.get("is_default") == False
4040

41-
_id_mapping = sender(_storage._local_staging_file.parents[1], 1, 10, ["storage"])
41+
_id_mapping = sender(_storage._local_staging_file.parents[1], 1, 10, ["storage"], throw_exceptions=True)
4242
time.sleep(1)
4343
_online_storage = FileStorage(_id_mapping.get(_storage.id))
4444
assert _online_storage.name == _uuid

tests/unit/test_folder.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def test_folder_creation_offline(offline_cache_setup) -> None:
4242
assert _folder._local_staging_file.name.split(".")[0] == _folder.id
4343
assert _local_data.get("path", None) == _path
4444

45-
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"])
45+
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True)
4646
time.sleep(1)
4747
client = Client()
4848

@@ -96,7 +96,7 @@ def test_folder_modification_offline(offline_cache_setup) -> None:
9696
_folder = Folder.new(path=_path, offline=True)
9797
_folder.commit()
9898

99-
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"])
99+
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True)
100100
time.sleep(1)
101101

102102
client = Client()
@@ -115,7 +115,7 @@ def test_folder_modification_offline(offline_cache_setup) -> None:
115115
assert _local_data.get("description", None) == _description
116116
assert _local_data.get("tags", None) == _tags
117117

118-
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"])
118+
sender(_folder._local_staging_file.parents[1], 2, 10, ["folders"], throw_exceptions=True)
119119
time.sleep(1)
120120

121121
_folder_online.refresh()

tests/unit/test_grids.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def test_grid_creation_offline() -> None:
7272

7373
assert _local_data.get("runs", [None])[0] == [_run.id, "A"]
7474
npt.assert_array_equal(numpy.array(_local_data.get("grid")), _grid_def)
75-
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids"])
75+
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids"], throw_exceptions=True)
7676
time.sleep(1)
7777
# Get online version of grid
7878
_online_grid = Grid(_id_mapping.get(_grid.id))
@@ -184,7 +184,7 @@ def test_grid_metrics_creation_offline() -> None:
184184
_metrics.commit()
185185
_run.status = "completed"
186186
_run.commit()
187-
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids", "grid_metrics"])
187+
_id_mapping = sender(_grid._local_staging_file.parents[1], 1, 10, ["folders", "runs", "grids", "grid_metrics"], throw_exceptions=True)
188188
time.sleep(1)
189189
# Online metrics
190190
assert list(GridMetrics.get(runs=[_id_mapping[_run.id]], metrics=["A"], step=_step))

tests/unit/test_metric_range_alert.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def test_metric_range_alert_creation_offline(offline_cache_setup) -> None:
6262
assert _local_data.get("name") == f"metrics_range_alert_{_uuid}"
6363
assert _local_data.get("notification") == "none"
6464
assert _local_data.get("alert").get("range_low") == 10
65-
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
65+
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
6666
time.sleep(1)
6767

6868
# Get online ID and retrieve alert
@@ -124,7 +124,7 @@ def test_metric_range_alert_modification_offline(offline_cache_setup) -> None:
124124
offline=True
125125
)
126126
_alert.commit()
127-
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
127+
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
128128
time.sleep(1)
129129

130130
# Get online ID and retrieve alert
@@ -149,7 +149,7 @@ def test_metric_range_alert_modification_offline(offline_cache_setup) -> None:
149149
_local_data = json.load(in_f)
150150
assert _local_data.get("description") == "updated!"
151151

152-
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"])
152+
sender(_alert._local_staging_file.parents[1], 1, 10, ["alerts"], throw_exceptions=True)
153153
time.sleep(1)
154154

155155
_online_alert.refresh()

0 commit comments

Comments
 (0)