@@ -329,33 +329,48 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
329329 raise
330330
331331 # Update metadata, preserving created_at across subsequent writes.
332+ # Use first-write concurrency with the read ETag so a concurrent write
333+ # that already established `created_at` can't be clobbered by a stale
334+ # read that saw no metadata.
332335 now = str (int (time .time ()))
333- created_at = now
334- try :
336+ meta_attempt = 0
337+ while True :
338+ meta_attempt += 1
335339 existing_meta_response = await self ._dapr_client .get_state (
336340 store_name = self ._state_store_name ,
337341 key = self ._metadata_key ,
338342 state_metadata = self ._get_read_metadata (),
339343 )
344+ created_at = now
340345 if existing_meta_response .data :
341- existing_meta = json .loads (existing_meta_response .data .decode ("utf-8" ))
342- if isinstance (existing_meta , dict ) and existing_meta .get ("created_at" ):
343- created_at = str (existing_meta ["created_at" ])
344- except (json .JSONDecodeError , UnicodeDecodeError , AttributeError ):
345- # Corrupt or missing metadata — start fresh with current timestamp.
346- pass
347- metadata = {
348- "session_id" : self .session_id ,
349- "created_at" : created_at ,
350- "updated_at" : now ,
351- }
352- await self ._dapr_client .save_state (
353- store_name = self ._state_store_name ,
354- key = self ._metadata_key ,
355- value = json .dumps (metadata ),
356- state_metadata = self ._get_metadata (),
357- options = self ._get_state_options (),
358- )
346+ try :
347+ existing_meta = json .loads (existing_meta_response .data .decode ("utf-8" ))
348+ if isinstance (existing_meta , dict ) and existing_meta .get ("created_at" ):
349+ created_at = str (existing_meta ["created_at" ])
350+ except (json .JSONDecodeError , UnicodeDecodeError , AttributeError ):
351+ # Corrupt metadata — start fresh with current timestamp.
352+ pass
353+ metadata = {
354+ "session_id" : self .session_id ,
355+ "created_at" : created_at ,
356+ "updated_at" : now ,
357+ }
358+ meta_etag = getattr (existing_meta_response , "etag" , None ) or None
359+ try :
360+ await self ._dapr_client .save_state (
361+ store_name = self ._state_store_name ,
362+ key = self ._metadata_key ,
363+ value = json .dumps (metadata ),
364+ etag = meta_etag ,
365+ state_metadata = self ._get_metadata (),
366+ options = self ._get_state_options (concurrency = Concurrency .first_write ),
367+ )
368+ break
369+ except Exception as error :
370+ should_retry = await self ._handle_concurrency_conflict (error , meta_attempt )
371+ if should_retry :
372+ continue
373+ raise
359374
360375 async def pop_item (self ) -> TResponseInputItem | None :
361376 """Remove and return the most recent item from the session.
0 commit comments