Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 30 additions & 52 deletions executors/src/eoa/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use alloy::primitives::{Address, U256};
use alloy::providers::Provider;
use engine_core::{
chain::{Chain, ChainService},
credentials::{SigningCredential, KmsClientCache},
credentials::{KmsClientCache, SigningCredential},
error::AlloyRpcErrorToEngineError,
signer::EoaSigner,
};
use engine_eip7702_core::delegated_account::DelegatedAccount;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use twmq::Queue;
use twmq::redis::AsyncCommands;
use twmq::redis::aio::ConnectionManager;
use twmq::{
DurableExecution, FailHookData, NackHookData, SuccessHookData,
Expand All @@ -20,10 +19,7 @@ use twmq::{
};

use crate::eoa::authorization_cache::EoaAuthorizationCache;
use crate::eoa::store::{
AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult,
TransactionStoreError,
};
use crate::eoa::store::{AtomicEoaExecutorStore, EoaExecutorStore, EoaHealth, SubmissionResult};
use crate::metrics::{
EoaMetrics, calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time,
};
Expand Down Expand Up @@ -127,7 +123,7 @@ where

// EOA metrics abstraction with encapsulated configuration
pub eoa_metrics: EoaMetrics,

// KMS client cache for AWS KMS credentials
pub kms_client_cache: KmsClientCache,
}
Expand Down Expand Up @@ -186,7 +182,10 @@ where
let chain_id = chain.chain_id();

// Inject KMS cache into the noop signing credential (after deserialization from Redis)
let noop_signing_credential = data.noop_signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache);
let noop_signing_credential = data
.noop_signing_credential
.clone()
.with_aws_kms_cache(&self.kms_client_cache);

let worker = EoaExecutorWorker {
store: scoped,
Expand All @@ -209,16 +208,31 @@ where
};

let job_start_time = current_timestamp_ms();
let result = worker.execute_main_workflow().await?;
let workflow_result = worker.execute_main_workflow().await;

// Always release lock, regardless of workflow success/failure
if let Err(e) = worker.release_eoa_lock().await {
tracing::error!(error = ?e, worker_id = worker_id, "Error releasing EOA lock");
}

// Propagate workflow error after releasing lock
let result = workflow_result?;

// Record EOA job processing metrics
let job_end_time = current_timestamp_ms();
let job_duration = calculate_duration_seconds(job_start_time, job_end_time);
record_eoa_job_processing_time(data.chain_id, job_duration);

tracing::info!(
eoa = ?data.eoa_address,
chain_id = data.chain_id,
worker_id = worker_id,
job_duration_seconds = job_duration,
work_remaining = result.is_work_remaining(),
result = ?result,
"EOA executor job completed"
);

let delay = if is_minimal_account {
Some(Duration::from_secs(2))
} else {
Expand All @@ -243,65 +257,29 @@ where

async fn on_success(
&self,
job: &BorrowedJob<Self::JobData>,
_job: &BorrowedJob<Self::JobData>,
_success_data: SuccessHookData<'_, Self::Output>,
_tx: &mut TransactionContext<'_>,
) {
self.soft_release_eoa_lock(&job.job.data).await;
// Lock is already released in process() with ownership checking
}

async fn on_nack(
&self,
job: &BorrowedJob<Self::JobData>,
_job: &BorrowedJob<Self::JobData>,
_nack_data: NackHookData<'_, Self::ErrorData>,
_tx: &mut TransactionContext<'_>,
) {
self.soft_release_eoa_lock(&job.job.data).await;
// Lock is already released in process() with ownership checking
}

#[tracing::instrument(name = "eoa_executor_worker_on_fail", skip_all, fields(eoa = ?job.job.data.eoa_address, chain_id = job.job.data.chain_id, job_id = ?job.job.id))]
async fn on_fail(
&self,
job: &BorrowedJob<Self::JobData>,
fail_data: FailHookData<'_, Self::ErrorData>,
_job: &BorrowedJob<Self::JobData>,
_fail_data: FailHookData<'_, Self::ErrorData>,
_tx: &mut TransactionContext<'_>,
) {
if let EoaExecutorWorkerError::StoreError { inner_error, .. } = &fail_data.error {
if let TransactionStoreError::LockLost { .. } = &inner_error {
tracing::error!(
eoa = ?job.job.data.eoa_address,
chain_id = job.job.data.chain_id,
"Encountered lock lost store error, skipping soft release of EOA lock"
);
return;
}
} else {
self.soft_release_eoa_lock(&job.job.data).await;
}
}
}

impl<CS> EoaExecutorJobHandler<CS>
where
CS: ChainService + Send + Sync + 'static,
{
async fn soft_release_eoa_lock(&self, job_data: &EoaExecutorWorkerJobData) {
let keys = EoaExecutorStoreKeys::new(
job_data.eoa_address,
job_data.chain_id,
self.namespace.clone(),
);

let lock_key = keys.eoa_lock_key_name();
let mut conn = self.redis.clone();
if let Err(e) = conn.del::<&str, ()>(&lock_key).await {
tracing::error!(
eoa = ?job_data.eoa_address,
chain_id = job_data.chain_id,
error = ?e,
"Failed to release EOA lock"
);
}
// Lock is already released in process() with ownership checking
}
}

Expand Down
6 changes: 4 additions & 2 deletions executors/src/eoa/worker/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ impl<C: Chain> EoaExecutorWorker<C> {
(_, Err(e)) => {
// Track balance threshold issues

if should_break_on_failure {
failure_occurred = true;
}

if let EoaExecutorWorkerError::TransactionSimulationFailed {
inner_error, ..
} = &e
Expand Down Expand Up @@ -287,8 +291,6 @@ impl<C: Chain> EoaExecutorWorker<C> {
);
// Don't propagate the error, continue processing
}
} else if should_break_on_failure {
failure_occurred = true;
}
}
(true, Ok(_)) => continue,
Expand Down
25 changes: 18 additions & 7 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use std::{sync::Arc, time::Duration};

use engine_core::{signer::{EoaSigner, SolanaSigner}, userop::UserOpSigner, credentials::KmsClientCache};
use engine_executors::{eoa::authorization_cache::EoaAuthorizationCache, metrics::{ExecutorMetrics, initialize_metrics}, solana_executor::rpc_cache::{SolanaRpcCache, SolanaRpcUrls}};
use engine_core::{
credentials::KmsClientCache,
signer::{EoaSigner, SolanaSigner},
userop::UserOpSigner,
};
use engine_executors::{
eoa::authorization_cache::EoaAuthorizationCache,
metrics::{ExecutorMetrics, initialize_metrics},
solana_executor::rpc_cache::{SolanaRpcCache, SolanaRpcUrls},
};
use thirdweb_core::{abi::ThirdwebAbiServiceBuilder, auth::ThirdwebAuth, iaw::IAWClient};
use thirdweb_engine::{
chains::ThirdwebChainService,
Expand Down Expand Up @@ -93,7 +101,10 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

tracing::info!("Queue manager initialized");
tracing::info!(
"Queue manager initialized with queue config: {:?}",
config.queue
);

// Start queue workers
tracing::info!("Starting queue workers...");
Expand Down Expand Up @@ -123,12 +134,12 @@ async fn main() -> anyhow::Result<()> {

// Initialize metrics registry and executor metrics
let metrics_registry = Arc::new(prometheus::Registry::new());
let executor_metrics = ExecutorMetrics::new(&metrics_registry)
.expect("Failed to create executor metrics");
let executor_metrics =
ExecutorMetrics::new(&metrics_registry).expect("Failed to create executor metrics");

// Initialize the executor metrics globally
initialize_metrics(executor_metrics);

tracing::info!("Executor metrics initialized");

let mut server = EngineServer::new(EngineServerState {
Expand Down
2 changes: 1 addition & 1 deletion server/src/queue/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl QueueManager {
namespace: queue_config.execution_namespace.clone(),
redis: redis_client.get_connection_manager().await?,
authorization_cache,
max_inflight: 100,
max_inflight: 50,
max_recycled_nonces: 50,
eoa_metrics,
kms_client_cache,
Expand Down