@@ -3,15 +3,14 @@ use alloy::primitives::{Address, U256};
33use alloy:: providers:: Provider ;
44use engine_core:: {
55 chain:: { Chain , ChainService } ,
6- credentials:: { SigningCredential , KmsClientCache } ,
6+ credentials:: { KmsClientCache , SigningCredential } ,
77 error:: AlloyRpcErrorToEngineError ,
88 signer:: EoaSigner ,
99} ;
1010use engine_eip7702_core:: delegated_account:: DelegatedAccount ;
1111use serde:: { Deserialize , Serialize } ;
1212use std:: { sync:: Arc , time:: Duration } ;
1313use twmq:: Queue ;
14- use twmq:: redis:: AsyncCommands ;
1514use twmq:: redis:: aio:: ConnectionManager ;
1615use twmq:: {
1716 DurableExecution , FailHookData , NackHookData , SuccessHookData ,
@@ -20,10 +19,7 @@ use twmq::{
2019} ;
2120
2221use crate :: eoa:: authorization_cache:: EoaAuthorizationCache ;
23- use crate :: eoa:: store:: {
24- AtomicEoaExecutorStore , EoaExecutorStore , EoaExecutorStoreKeys , EoaHealth , SubmissionResult ,
25- TransactionStoreError ,
26- } ;
22+ use crate :: eoa:: store:: { AtomicEoaExecutorStore , EoaExecutorStore , EoaHealth , SubmissionResult } ;
2723use crate :: metrics:: {
2824 EoaMetrics , calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time,
2925} ;
@@ -127,7 +123,7 @@ where
127123
128124 // EOA metrics abstraction with encapsulated configuration
129125 pub eoa_metrics : EoaMetrics ,
130-
126+
131127 // KMS client cache for AWS KMS credentials
132128 pub kms_client_cache : KmsClientCache ,
133129}
@@ -186,7 +182,10 @@ where
186182 let chain_id = chain. chain_id ( ) ;
187183
188184 // Inject KMS cache into the noop signing credential (after deserialization from Redis)
189- let noop_signing_credential = data. noop_signing_credential . clone ( ) . with_aws_kms_cache ( & self . kms_client_cache ) ;
185+ let noop_signing_credential = data
186+ . noop_signing_credential
187+ . clone ( )
188+ . with_aws_kms_cache ( & self . kms_client_cache ) ;
190189
191190 let worker = EoaExecutorWorker {
192191 store : scoped,
@@ -209,16 +208,31 @@ where
209208 } ;
210209
211210 let job_start_time = current_timestamp_ms ( ) ;
212- let result = worker. execute_main_workflow ( ) . await ?;
211+ let workflow_result = worker. execute_main_workflow ( ) . await ;
212+
213+ // Always release lock, regardless of workflow success/failure
213214 if let Err ( e) = worker. release_eoa_lock ( ) . await {
214215 tracing:: error!( error = ?e, worker_id = worker_id, "Error releasing EOA lock" ) ;
215216 }
216217
218+ // Propagate workflow error after releasing lock
219+ let result = workflow_result?;
220+
217221 // Record EOA job processing metrics
218222 let job_end_time = current_timestamp_ms ( ) ;
219223 let job_duration = calculate_duration_seconds ( job_start_time, job_end_time) ;
220224 record_eoa_job_processing_time ( data. chain_id , job_duration) ;
221225
226+ tracing:: info!(
227+ eoa = ?data. eoa_address,
228+ chain_id = data. chain_id,
229+ worker_id = worker_id,
230+ job_duration_seconds = job_duration,
231+ work_remaining = result. is_work_remaining( ) ,
232+ result = ?result,
233+ "EOA executor job completed"
234+ ) ;
235+
222236 let delay = if is_minimal_account {
223237 Some ( Duration :: from_secs ( 2 ) )
224238 } else {
@@ -243,65 +257,29 @@ where
243257
244258 async fn on_success (
245259 & self ,
246- job : & BorrowedJob < Self :: JobData > ,
260+ _job : & BorrowedJob < Self :: JobData > ,
247261 _success_data : SuccessHookData < ' _ , Self :: Output > ,
248262 _tx : & mut TransactionContext < ' _ > ,
249263 ) {
250- self . soft_release_eoa_lock ( & job . job . data ) . await ;
264+ // Lock is already released in process() with ownership checking
251265 }
252266
253267 async fn on_nack (
254268 & self ,
255- job : & BorrowedJob < Self :: JobData > ,
269+ _job : & BorrowedJob < Self :: JobData > ,
256270 _nack_data : NackHookData < ' _ , Self :: ErrorData > ,
257271 _tx : & mut TransactionContext < ' _ > ,
258272 ) {
259- self . soft_release_eoa_lock ( & job . job . data ) . await ;
273+ // Lock is already released in process() with ownership checking
260274 }
261275
262- #[ tracing:: instrument( name = "eoa_executor_worker_on_fail" , skip_all, fields( eoa = ?job. job. data. eoa_address, chain_id = job. job. data. chain_id, job_id = ?job. job. id) ) ]
263276 async fn on_fail (
264277 & self ,
265- job : & BorrowedJob < Self :: JobData > ,
266- fail_data : FailHookData < ' _ , Self :: ErrorData > ,
278+ _job : & BorrowedJob < Self :: JobData > ,
279+ _fail_data : FailHookData < ' _ , Self :: ErrorData > ,
267280 _tx : & mut TransactionContext < ' _ > ,
268281 ) {
269- if let EoaExecutorWorkerError :: StoreError { inner_error, .. } = & fail_data. error {
270- if let TransactionStoreError :: LockLost { .. } = & inner_error {
271- tracing:: error!(
272- eoa = ?job. job. data. eoa_address,
273- chain_id = job. job. data. chain_id,
274- "Encountered lock lost store error, skipping soft release of EOA lock"
275- ) ;
276- return ;
277- }
278- } else {
279- self . soft_release_eoa_lock ( & job. job . data ) . await ;
280- }
281- }
282- }
283-
284- impl < CS > EoaExecutorJobHandler < CS >
285- where
286- CS : ChainService + Send + Sync + ' static ,
287- {
288- async fn soft_release_eoa_lock ( & self , job_data : & EoaExecutorWorkerJobData ) {
289- let keys = EoaExecutorStoreKeys :: new (
290- job_data. eoa_address ,
291- job_data. chain_id ,
292- self . namespace . clone ( ) ,
293- ) ;
294-
295- let lock_key = keys. eoa_lock_key_name ( ) ;
296- let mut conn = self . redis . clone ( ) ;
297- if let Err ( e) = conn. del :: < & str , ( ) > ( & lock_key) . await {
298- tracing:: error!(
299- eoa = ?job_data. eoa_address,
300- chain_id = job_data. chain_id,
301- error = ?e,
302- "Failed to release EOA lock"
303- ) ;
304- }
282+ // Lock is already released in process() with ownership checking
305283 }
306284}
307285
0 commit comments