1+ import contextlib
2+ import json
3+ import pytest
4+ import time
5+ import datetime
6+ import uuid
7+ from simvue .api .objects .run import RunBatchArgs
8+ from simvue .sender import sender
9+ from simvue .api .objects import Run , Metrics , Folder
10+ from simvue .client import Client
11+ from simvue .models import DATETIME_FORMAT
12+ import logging
13+ import pathlib
14+ import requests
15+
16+ @pytest .mark .parametrize ("retry_failed_uploads" , (True , False ))
17+ @pytest .mark .parametrize ("parallel" , (True , False ))
18+
19+ @pytest .mark .offline
20+ def test_sender_exception_handling (offline_cache_setup , caplog , retry_failed_uploads , parallel ):
21+ # Create something which will produce an error when sent, eg a metric with invalid run ID
22+ for i in range (5 ):
23+ _metrics = Metrics .new (
24+ run = "invalid_run_id" ,
25+ metrics = [
26+ {
27+ "timestamp" : datetime .datetime .now ().strftime (DATETIME_FORMAT ),
28+ "time" : 1 ,
29+ "step" : 1 ,
30+ "values" : {"x" : 1 , "y" : 2 },
31+ }
32+ ],
33+ offline = True
34+ )
35+ _metrics .commit ()
36+
37+ with caplog .at_level (logging .ERROR ):
38+ sender (threading_threshold = 1 if parallel else 10 )
39+
40+ assert "Error while committing 'Metrics'" in caplog .text
41+
42+ # Wait, then try sending again
43+ time .sleep (1 )
44+ caplog .clear ()
45+
46+ with caplog .at_level (logging .ERROR ):
47+ sender (retry_failed_uploads = retry_failed_uploads , threading_threshold = 1 if parallel else 10 )
48+
49+ if retry_failed_uploads :
50+ assert "Error while committing 'Metrics'" in caplog .text
51+ else :
52+ assert not caplog .text
53+
54+ # Check files not deleted
55+ _offline_metric_paths = list (pathlib .Path (offline_cache_setup .name ).joinpath ("metrics" ).iterdir ())
56+ assert len (_offline_metric_paths ) == 5
57+ # Check files have 'upload_failed: True'
58+ for _metric_path in _offline_metric_paths :
59+ with open (_metric_path , "r" ) as _file :
60+ _metric = json .load (_file )
61+ assert _metric .get ("upload_failed" ) == True
62+
63+ @pytest .mark .parametrize ("parallel" , (True , False ))
64+ def test_sender_server_ids (offline_cache_setup , caplog , parallel ):
65+ # Create an offline run
66+ _uuid : str = f"{ uuid .uuid4 ()} " .split ("-" )[0 ]
67+ _path = f"/simvue_unit_testing/objects/folder/{ _uuid } "
68+ _folder = Folder .new (path = _path , offline = True )
69+ _folder .commit ()
70+
71+ _offline_run_ids = []
72+
73+ for i in range (5 ):
74+ _name = f"test_sender_server_ids-{ _uuid } -{ i } "
75+ _run = Run .new (name = _name , folder = _path , offline = True )
76+ _run .commit ()
77+
78+ _offline_run_ids .append (_run .id )
79+
80+ # Create metric associated with offline run ID
81+ _metrics = Metrics .new (
82+ run = _run .id ,
83+ metrics = [
84+ {
85+ "timestamp" : datetime .datetime .now ().strftime (DATETIME_FORMAT ),
86+ "time" : 1 ,
87+ "step" : 1 ,
88+ "values" : {"x" : i },
89+ }
90+ ],
91+ offline = True
92+ )
93+ _metrics .commit ()
94+
95+ # Send both items
96+ with caplog .at_level (logging .ERROR ):
97+ sender (threading_threshold = 1 if parallel else 10 )
98+
99+ assert not caplog .text
100+
101+ # Check server ID mapping correctly created
102+ _online_runs = []
103+ for i , _offline_run_id in enumerate (_offline_run_ids ):
104+ _id_file = pathlib .Path (offline_cache_setup .name ).joinpath ("server_ids" , f"{ _offline_run_id } .txt" )
105+ assert _id_file .exists ()
106+ _online_id = _id_file .read_text ()
107+
108+ # Check correct ID is contained within file
109+ _online_run = Run (identifier = _online_id )
110+ _online_runs .append (_online_run )
111+ assert _online_run .name == f"test_sender_server_ids-{ _uuid } -{ i } "
112+
113+ # Check metric has been associated with correct online run
114+ _run_metric = next (_online_run .metrics )
115+ assert _run_metric [0 ] == 'x'
116+ assert _run_metric [1 ]["count" ] == 1
117+ assert _run_metric [1 ]["min" ] == i
118+
119+ # Create a new offline metric with offline run ID
120+ _metrics = Metrics .new (
121+ run = _offline_run_id ,
122+ metrics = [
123+ {
124+ "timestamp" : datetime .datetime .now ().strftime (DATETIME_FORMAT ),
125+ "time" : 2 ,
126+ "step" : 2 ,
127+ "values" : {"x" : 2 },
128+ }
129+ ],
130+ offline = True
131+ )
132+ _metrics .commit ()
133+
134+ # Run sender again, check online ID is correctly loaded from file and substituted for offline ID
135+ with caplog .at_level (logging .ERROR ):
136+ sender (threading_threshold = 1 if parallel else 10 )
137+
138+ assert not caplog .text
139+
140+ # Check metric uploaded correctly
141+ for _online_run in _online_runs :
142+ _online_run .refresh ()
143+ _run_metric = next (_online_run .metrics )
144+ assert _run_metric [0 ] == 'x'
145+ assert _run_metric [1 ]["count" ] == 2
146+
147+ # Check all files for runs and metrics deleted once they were processed
148+ assert len (list (pathlib .Path (offline_cache_setup .name ).joinpath ("runs" ).iterdir ())) == 0
149+ assert len (list (pathlib .Path (offline_cache_setup .name ).joinpath ("metrics" ).iterdir ())) == 0
150+
151+ @pytest .mark .parametrize ("parallel" , (True , False ))
152+ def test_send_heartbeat (offline_cache_setup , parallel , mocker ):
153+ # Create an offline run
154+ _uuid : str = f"{ uuid .uuid4 ()} " .split ("-" )[0 ]
155+ _path = f"/simvue_unit_testing/objects/folder/{ _uuid } "
156+ _folder = Folder .new (path = _path , offline = True )
157+ _folder .commit ()
158+
159+ _offline_runs = []
160+
161+ for i in range (5 ):
162+ _name = f"test_sender_server_ids-{ _uuid } -{ i } "
163+ _run = Run .new (name = _name , folder = _path , offline = True , heartbeat_timeout = 1 , status = "running" )
164+ _run .commit ()
165+
166+ _offline_runs .append (_run )
167+
168+ _id_mapping = sender (threading_threshold = 1 if parallel else 10 )
169+ _online_runs = [Run (identifier = _id_mapping .get (_offline_run .id )) for _offline_run in _offline_runs ]
170+ assert all ([_online_run .status == "running" for _online_run in _online_runs ])
171+
172+ spy_put = mocker .spy (requests , "put" )
173+
174+ # Create heartbeat and send every 0.5s for 5s
175+ for i in range (10 ):
176+ time .sleep (0.5 )
177+ [_offline_run .send_heartbeat () for _offline_run in _offline_runs ]
178+ sender (threading_threshold = 1 if parallel else 10 )
179+
180+ # Check requests.put() endpoint called 50 times - once for each of the 5 runs, on all 10 iterations
181+ assert spy_put .call_count == 50
182+
183+ # Get online runs and check all running
184+ [_online_run .refresh () for _online_run in _online_runs ]
185+ assert all ([_online_run .status == "running" for _online_run in _online_runs ])
0 commit comments