diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index 3156ccd..b7caf11 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -3,7 +3,7 @@ use alloy::primitives::{Address, U256}; use alloy::providers::Provider; use engine_core::{ chain::{Chain, ChainService}, - credentials::{SigningCredential, KmsClientCache}, + credentials::{KmsClientCache, SigningCredential}, error::AlloyRpcErrorToEngineError, signer::EoaSigner, }; @@ -11,7 +11,6 @@ use engine_eip7702_core::delegated_account::DelegatedAccount; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use twmq::Queue; -use twmq::redis::AsyncCommands; use twmq::redis::aio::ConnectionManager; use twmq::{ DurableExecution, FailHookData, NackHookData, SuccessHookData, @@ -20,10 +19,7 @@ use twmq::{ }; use crate::eoa::authorization_cache::EoaAuthorizationCache; -use crate::eoa::store::{ - AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult, - TransactionStoreError, -}; +use crate::eoa::store::{AtomicEoaExecutorStore, EoaExecutorStore, EoaHealth, SubmissionResult}; use crate::metrics::{ EoaMetrics, calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time, }; @@ -127,7 +123,7 @@ where // EOA metrics abstraction with encapsulated configuration pub eoa_metrics: EoaMetrics, - + // KMS client cache for AWS KMS credentials pub kms_client_cache: KmsClientCache, } @@ -186,7 +182,10 @@ where let chain_id = chain.chain_id(); // Inject KMS cache into the noop signing credential (after deserialization from Redis) - let noop_signing_credential = data.noop_signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache); + let noop_signing_credential = data + .noop_signing_credential + .clone() + .with_aws_kms_cache(&self.kms_client_cache); let worker = EoaExecutorWorker { store: scoped, @@ -209,16 +208,31 @@ where }; let job_start_time = current_timestamp_ms(); - let result = worker.execute_main_workflow().await?; + let workflow_result = worker.execute_main_workflow().await; + + // Always release lock, regardless of workflow success/failure if let Err(e) = worker.release_eoa_lock().await { tracing::error!(error = ?e, worker_id = worker_id, "Error releasing EOA lock"); } + // Propagate workflow error after releasing lock + let result = workflow_result?; + // Record EOA job processing metrics let job_end_time = current_timestamp_ms(); let job_duration = calculate_duration_seconds(job_start_time, job_end_time); record_eoa_job_processing_time(data.chain_id, job_duration); + tracing::info!( + eoa = ?data.eoa_address, + chain_id = data.chain_id, + worker_id = worker_id, + job_duration_seconds = job_duration, + work_remaining = result.is_work_remaining(), + result = ?result, + "EOA executor job completed" + ); + let delay = if is_minimal_account { Some(Duration::from_secs(2)) } else { @@ -243,65 +257,29 @@ where async fn on_success( &self, - job: &BorrowedJob, + _job: &BorrowedJob, _success_data: SuccessHookData<'_, Self::Output>, _tx: &mut TransactionContext<'_>, ) { - self.soft_release_eoa_lock(&job.job.data).await; + // Lock is already released in process() with ownership checking } async fn on_nack( &self, - job: &BorrowedJob, + _job: &BorrowedJob, _nack_data: NackHookData<'_, Self::ErrorData>, _tx: &mut TransactionContext<'_>, ) { - self.soft_release_eoa_lock(&job.job.data).await; + // Lock is already released in process() with ownership checking } - #[tracing::instrument(name = "eoa_executor_worker_on_fail", skip_all, fields(eoa = ?job.job.data.eoa_address, chain_id = job.job.data.chain_id, job_id = ?job.job.id))] async fn on_fail( &self, - job: &BorrowedJob, - fail_data: FailHookData<'_, Self::ErrorData>, + _job: &BorrowedJob, + _fail_data: FailHookData<'_, Self::ErrorData>, _tx: &mut TransactionContext<'_>, ) { - if let EoaExecutorWorkerError::StoreError { inner_error, .. } = &fail_data.error { - if let TransactionStoreError::LockLost { .. } = &inner_error { - tracing::error!( - eoa = ?job.job.data.eoa_address, - chain_id = job.job.data.chain_id, - "Encountered lock lost store error, skipping soft release of EOA lock" - ); - return; - } - } else { - self.soft_release_eoa_lock(&job.job.data).await; - } - } -} - -impl EoaExecutorJobHandler -where - CS: ChainService + Send + Sync + 'static, -{ - async fn soft_release_eoa_lock(&self, job_data: &EoaExecutorWorkerJobData) { - let keys = EoaExecutorStoreKeys::new( - job_data.eoa_address, - job_data.chain_id, - self.namespace.clone(), - ); - - let lock_key = keys.eoa_lock_key_name(); - let mut conn = self.redis.clone(); - if let Err(e) = conn.del::<&str, ()>(&lock_key).await { - tracing::error!( - eoa = ?job_data.eoa_address, - chain_id = job_data.chain_id, - error = ?e, - "Failed to release EOA lock" - ); - } + // Lock is already released in process() with ownership checking } } diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index e845636..296ebd4 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -251,6 +251,10 @@ impl EoaExecutorWorker { (_, Err(e)) => { // Track balance threshold issues + if should_break_on_failure { + failure_occurred = true; + } + if let EoaExecutorWorkerError::TransactionSimulationFailed { inner_error, .. } = &e @@ -287,8 +291,6 @@ impl EoaExecutorWorker { ); // Don't propagate the error, continue processing } - } else if should_break_on_failure { - failure_occurred = true; } } (true, Ok(_)) => continue, diff --git a/server/src/main.rs b/server/src/main.rs index 94bb060..4376aec 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,15 @@ use std::{sync::Arc, time::Duration}; -use engine_core::{signer::{EoaSigner, SolanaSigner}, userop::UserOpSigner, credentials::KmsClientCache}; -use engine_executors::{eoa::authorization_cache::EoaAuthorizationCache, metrics::{ExecutorMetrics, initialize_metrics}, solana_executor::rpc_cache::{SolanaRpcCache, SolanaRpcUrls}}; +use engine_core::{ + credentials::KmsClientCache, + signer::{EoaSigner, SolanaSigner}, + userop::UserOpSigner, +}; +use engine_executors::{ + eoa::authorization_cache::EoaAuthorizationCache, + metrics::{ExecutorMetrics, initialize_metrics}, + solana_executor::rpc_cache::{SolanaRpcCache, SolanaRpcUrls}, +}; use thirdweb_core::{abi::ThirdwebAbiServiceBuilder, auth::ThirdwebAuth, iaw::IAWClient}; use thirdweb_engine::{ chains::ThirdwebChainService, @@ -93,7 +101,10 @@ async fn main() -> anyhow::Result<()> { ) .await?; - tracing::info!("Queue manager initialized"); + tracing::info!( + "Queue manager initialized with queue config: {:?}", + config.queue + ); // Start queue workers tracing::info!("Starting queue workers..."); @@ -123,12 +134,12 @@ async fn main() -> anyhow::Result<()> { // Initialize metrics registry and executor metrics let metrics_registry = Arc::new(prometheus::Registry::new()); - let executor_metrics = ExecutorMetrics::new(&metrics_registry) - .expect("Failed to create executor metrics"); - + let executor_metrics = + ExecutorMetrics::new(&metrics_registry).expect("Failed to create executor metrics"); + // Initialize the executor metrics globally initialize_metrics(executor_metrics); - + tracing::info!("Executor metrics initialized"); let mut server = EngineServer::new(EngineServerState { diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index 09447c8..23c40f7 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -257,7 +257,7 @@ impl QueueManager { namespace: queue_config.execution_namespace.clone(), redis: redis_client.get_connection_manager().await?, authorization_cache, - max_inflight: 100, + max_inflight: 50, max_recycled_nonces: 50, eoa_metrics, kms_client_cache,