3737_logger = logging .getLogger (__name__ )
3838
3939
40+ def _log_upload_failed (file_path ):
41+ with open (file_path , "r" ) as file :
42+ _data = json .load (file )
43+ _data ["upload_failed" ] = True
44+ with open (file_path , "w" ) as file :
45+ json .dump (_data , file )
46+
47+
4048def upload_cached_file (
4149 cache_dir : pydantic .DirectoryPath ,
4250 obj_type : str ,
4351 file_path : pydantic .FilePath ,
4452 id_mapping : dict [str , str ],
53+ retry_failed_uploads : bool ,
4554 lock : threading .Lock ,
4655):
4756 """Upload data stored in a cached file to the Simvue server.
@@ -62,10 +71,16 @@ def upload_cached_file(
6271 _current_id = file_path .name .split ("." )[0 ]
6372 _data = json .load (file_path .open ())
6473 _exact_type : str = _data .pop ("obj_type" )
74+
75+ if _data .pop ("upload_failed" , False ) and not retry_failed_uploads :
76+ return
77+
6578 try :
6679 _instance_class = getattr (simvue .api .objects , _exact_type )
67- except AttributeError as e :
68- raise RuntimeError (f"Attempt to initialise unknown type '{ _exact_type } '" ) from e
80+ except AttributeError :
81+ _logger .error (f"Attempt to initialise unknown type '{ _exact_type } '" )
82+ _log_upload_failed (file_path )
83+ return
6984
7085 # If it is an ObjectArtifact, need to load the object as bytes from a different file
7186 if issubclass (_instance_class , simvue .api .objects .ObjectArtifact ):
@@ -87,14 +102,21 @@ def upload_cached_file(
87102 if not issubclass (_instance_class , ArtifactBase ):
88103 obj_for_upload .commit ()
89104 _new_id = obj_for_upload .id
90- except RuntimeError as error :
105+ except Exception as error :
91106 if "status 409" in error .args [0 ]:
92107 return
93- raise error
108+ _logger .error (
109+ f"Error while committing '{ obj_for_upload .__class__ .__name__ } ': { error .args [0 ]} "
110+ )
111+ _log_upload_failed (file_path )
112+ return
94113 if not _new_id :
95- raise RuntimeError (
114+ _logger . error (
96115 f"Object of type '{ obj_for_upload .__class__ .__name__ } ' has no identifier"
97116 )
117+ _log_upload_failed (file_path )
118+ return
119+
98120 _logger .info (
99121 f"{ 'Updated' if id_mapping .get (_current_id ) else 'Created' } { obj_for_upload .__class__ .__name__ } '{ _new_id } '"
100122 )
@@ -155,6 +177,7 @@ def sender(
155177 max_workers : int = 5 ,
156178 threading_threshold : int = 10 ,
157179 objects_to_upload : list [str ] = UPLOAD_ORDER ,
180+ retry_failed_uploads : bool = False ,
158181) -> dict [str , str ]:
159182 """Send data from a local cache directory to the Simvue server.
160183
@@ -168,6 +191,8 @@ def sender(
168191 The number of cached files above which threading will be used
169192 objects_to_upload : list[str]
170193 Types of objects to upload, by default uploads all types of objects present in cache
194+ retry_failed_uploads : bool, optional
195+ Whether to retry sending objects which previously failed, by default False
171196
172197 Returns
173198 -------
@@ -203,7 +228,14 @@ def sender(
203228 _offline_files = _all_offline_files [_obj_type ]
204229 if len (_offline_files ) < threading_threshold :
205230 for file_path in _offline_files :
206- upload_cached_file (cache_dir , _obj_type , file_path , _id_mapping , _lock )
231+ upload_cached_file (
232+ cache_dir ,
233+ _obj_type ,
234+ file_path ,
235+ _id_mapping ,
236+ retry_failed_uploads ,
237+ _lock ,
238+ )
207239 else :
208240 with ThreadPoolExecutor (
209241 max_workers = max_workers , thread_name_prefix = "sender_session_upload"
@@ -214,6 +246,7 @@ def sender(
214246 obj_type = _obj_type ,
215247 file_path = file_path ,
216248 id_mapping = _id_mapping ,
249+ retry_failed_uploads = retry_failed_uploads ,
217250 lock = _lock ,
218251 ),
219252 _offline_files ,
0 commit comments