diff --git a/Cargo.lock b/Cargo.lock index 598d09d..db2b8a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2449,6 +2449,7 @@ dependencies = [ "aws-credential-types", "aws-sdk-kms", "engine-aa-types", + "moka", "schemars 0.8.22", "serde", "serde_json", diff --git a/core/Cargo.toml b/core/Cargo.toml index 5594326..0cd1787 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,3 +23,4 @@ alloy-signer-aws = { version = "1.0.23", features = ["eip712"] } aws-config = "1.8.2" aws-sdk-kms = "1.79.0" aws-credential-types = "1.2.4" +moka = { version = "0.12", features = ["future"] } diff --git a/core/src/credentials.rs b/core/src/credentials.rs index 8871d8b..6d0169f 100644 --- a/core/src/credentials.rs +++ b/core/src/credentials.rs @@ -5,17 +5,33 @@ use aws_config::{BehaviorVersion, Region}; use aws_credential_types::provider::future::ProvideCredentials as ProvideCredentialsFuture; use aws_sdk_kms::config::{Credentials, ProvideCredentials}; use serde::{Deserialize, Serialize}; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::ops::Deref; use thirdweb_core::auth::ThirdwebAuth; use thirdweb_core::iaw::AuthToken; use vault_types::enclave::auth::Auth as VaultAuth; use crate::error::EngineError; +/// Cache for AWS KMS clients to avoid recreating connections +pub type KmsClientCache = moka::future::Cache; + impl SigningCredential { /// Create a random private key credential for testing pub fn random_local() -> Self { SigningCredential::PrivateKey(PrivateKeySigner::random()) } + + /// Inject KMS cache into AWS KMS credentials (useful after deserialization) + pub fn with_aws_kms_cache(self, kms_client_cache: &KmsClientCache) -> Self { + match self { + SigningCredential::AwsKms(creds) => { + SigningCredential::AwsKms(creds.with_cache(kms_client_cache.clone())) + } + other => other, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -38,6 +54,18 @@ pub struct AwsKmsCredential { pub secret_access_key: String, pub key_id: String, pub region: String, + #[serde(skip)] + pub kms_client_cache: Option, +} + +impl Hash for AwsKmsCredential { + fn hash(&self, state: &mut H) { + self.access_key_id.hash(state); + self.secret_access_key.hash(state); + self.key_id.hash(state); + self.region.hash(state); + // Don't hash the cache - it's not part of the credential identity + } } impl ProvideCredentials for AwsKmsCredential { @@ -57,14 +85,71 @@ impl ProvideCredentials for AwsKmsCredential { } impl AwsKmsCredential { - pub async fn get_signer(&self, chain_id: Option) -> Result { + /// Create a new AwsKmsCredential with cache + pub fn new( + access_key_id: String, + secret_access_key: String, + key_id: String, + region: String, + kms_client_cache: KmsClientCache, + ) -> Self { + Self { + access_key_id, + secret_access_key, + key_id, + region, + kms_client_cache: Some(kms_client_cache), + } + } + + /// Inject cache into this credential (useful after deserialization) + pub fn with_cache(mut self, kms_client_cache: KmsClientCache) -> Self { + self.kms_client_cache = Some(kms_client_cache); + self + } + + /// Create a cache key from the credential + fn cache_key(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.hash(&mut hasher); + hasher.finish() + } + + /// Create a new AWS KMS client (without caching) + async fn create_kms_client(&self) -> Result { let config = aws_config::defaults(BehaviorVersion::latest()) .credentials_provider(self.clone()) .region(Region::new(self.region.clone())) .load() .await; - let client = aws_sdk_kms::Client::new(&config); + Ok(aws_sdk_kms::Client::new(&config)) + } + /// Get a cached AWS KMS client, creating one if it doesn't exist + async fn get_cached_kms_client(&self) -> Result { + match &self.kms_client_cache { + Some(cache) => { + let cache_key = self.cache_key(); + + cache + .try_get_with(cache_key, async { + tracing::debug!("Creating new KMS client for key: {}", cache_key); + self.create_kms_client().await + }) + .await + .map_err(|e| e.deref().clone()) + } + None => { + // Fallback to creating a new client without caching + tracing::debug!("No cache available, creating new KMS client"); + self.create_kms_client().await + } + } + } + + /// Get signer (uses cache if available) + pub async fn get_signer(&self, chain_id: Option) -> Result { + let client = self.get_cached_kms_client().await?; let signer = AwsSigner::new(client, self.key_id.clone(), chain_id).await?; Ok(signer) } diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index 364c2ee..3156ccd 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -3,7 +3,7 @@ use alloy::primitives::{Address, U256}; use alloy::providers::Provider; use engine_core::{ chain::{Chain, ChainService}, - credentials::SigningCredential, + credentials::{SigningCredential, KmsClientCache}, error::AlloyRpcErrorToEngineError, signer::EoaSigner, }; @@ -127,6 +127,9 @@ where // EOA metrics abstraction with encapsulated configuration pub eoa_metrics: EoaMetrics, + + // KMS client cache for AWS KMS credentials + pub kms_client_cache: KmsClientCache, } impl DurableExecution for EoaExecutorJobHandler @@ -182,12 +185,15 @@ where let chain_id = chain.chain_id(); + // Inject KMS cache into the noop signing credential (after deserialization from Redis) + let noop_signing_credential = data.noop_signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache); + let worker = EoaExecutorWorker { store: scoped, chain, eoa: data.eoa_address, chain_id: data.chain_id, - noop_signing_credential: data.noop_signing_credential.clone(), + noop_signing_credential, max_inflight: if is_minimal_account { 1 @@ -199,6 +205,7 @@ where max_recycled_nonces: self.max_recycled_nonces, webhook_queue: self.webhook_queue.clone(), signer: self.eoa_signer.clone(), + kms_client_cache: self.kms_client_cache.clone(), }; let job_start_time = current_timestamp_ms(); @@ -311,6 +318,7 @@ pub struct EoaExecutorWorker { pub webhook_queue: Arc>, pub signer: Arc, + pub kms_client_cache: KmsClientCache, } impl EoaExecutorWorker { diff --git a/executors/src/eoa/worker/transaction.rs b/executors/src/eoa/worker/transaction.rs index 7d825c6..fa234bc 100644 --- a/executors/src/eoa/worker/transaction.rs +++ b/executors/src/eoa/worker/transaction.rs @@ -176,7 +176,7 @@ impl EoaExecutorWorker { // Try EIP-1559 fees first, fall back to legacy if unsupported match self.chain.provider().estimate_eip1559_fees().await { Ok(eip1559_fees) => { - tracing::debug!( + tracing::trace!( "Using EIP-1559 fees: max_fee={}, max_priority_fee={}", eip1559_fees.max_fee_per_gas, eip1559_fees.max_priority_fee_per_gas @@ -397,7 +397,11 @@ impl EoaExecutorWorker { nonce: u64, ) -> Result, EoaExecutorWorkerError> { let typed_tx = self.build_typed_transaction(request, nonce).await?; - self.sign_transaction(typed_tx, &request.signing_credential) + + // Inject KMS cache into the signing credential (after deserialization from Redis) + let credential_with_cache = request.signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache); + + self.sign_transaction(typed_tx, &credential_with_cache) .await } diff --git a/justfile b/justfile new file mode 100644 index 0000000..628d057 --- /dev/null +++ b/justfile @@ -0,0 +1,42 @@ +# Start local chain with anvil + speedbump proxy (300ms variable latency) +local-chain: + #!/usr/bin/env bash + set -e + + echo "๐Ÿงน Cleaning up existing processes..." + # Kill any existing anvil or speedbump processes + pkill -f "anvil.*8546" || true + pkill -f "speedbump.*8545" || true + lsof -ti:8545 | xargs kill -9 2>/dev/null || true + lsof -ti:8546 | xargs kill -9 2>/dev/null || true + + echo "๐Ÿ”จ Starting anvil on port 8546 (1s blocktime)..." + anvil --port 8546 --block-time 1 & + ANVIL_PID=$! + + # Cleanup function + cleanup() { + echo "" + echo "๐Ÿ›‘ Stopping services..." + kill $ANVIL_PID 2>/dev/null || true + pkill -f "speedbump.*8545" || true + exit 0 + } + + trap cleanup INT TERM EXIT + + # Wait a moment for anvil to start + sleep 2 + + echo "๐ŸŒ Starting speedbump proxy on port 8545 (โ†’ localhost:8546)" + echo " Latency: 300ms base + 150ms sine wave (150-450ms variable)" + echo " Connect to: http://localhost:8545" + echo "" + speedbump --port=8545 --latency=300ms --sine-amplitude=150ms --sine-period=1m localhost:8546 + +# Fund an address with 1 ETH (bypasses speedbump for faster setup) +fund address: + @echo "๐Ÿ’ฐ Funding {{address}} with 1 ETH..." + @cast rpc anvil_setBalance {{address}} $(cast to-wei 1) --rpc-url http://localhost:8546 + @echo "โœ… Done!" + diff --git a/scripts/benchmarks/eoa.ts b/scripts/benchmarks/eoa.ts index a2e3dad..a7ca9a5 100644 --- a/scripts/benchmarks/eoa.ts +++ b/scripts/benchmarks/eoa.ts @@ -1,14 +1,3 @@ -/// - -// Bun globals (runtime will provide these) -declare const Bun: any; -declare const process: any; - -// Extend ImportMeta for Bun -interface ImportMeta { - dir: string; -} - // Types based on events.rs interface WebhookEvent { transactionId: string; diff --git a/scripts/simple-redis-cleanup.ts b/scripts/simple-redis-cleanup.ts new file mode 100644 index 0000000..0d69684 --- /dev/null +++ b/scripts/simple-redis-cleanup.ts @@ -0,0 +1,145 @@ +#!/usr/bin/env bun + +import Redis from "ioredis"; + +if (!process.env.REDIS_URL) { + throw new Error("REDIS_URL is not set"); +} + +// Configuration +const CONFIG = { + redisUrl: process.env.REDIS_URL, + batchSize: 5000, + dryRun: false, // Set to false to actually delete +} as const; + +class SimpleRedisCleanup { + private redis: Redis; + private stats = { + useropErrors: 0, + eip7702Errors: 0, + totalDeleted: 0, + errors: 0, + }; + + constructor() { + this.redis = new Redis(CONFIG.redisUrl); + } + + async run(): Promise { + console.log(`๐Ÿš€ Starting cleanup (DRY_RUN: ${CONFIG.dryRun})`); + console.log("๐ŸŽฏ Target patterns:"); + console.log(" - twmq:engine-cloud_userop_confirm:job:*:errors"); + console.log(" - twmq:engine-cloud_eip7702_send:job:*:errors"); + console.log(""); + + try { + // Clean userop confirm error keys + await this.cleanPattern("twmq:engine-cloud_userop_confirm:job:*:errors"); + + // Clean eip7702 send error keys + await this.cleanPattern("twmq:engine-cloud_eip7702_send:job:*:errors"); + + this.printFinalStats(); + } catch (error) { + console.error(`๐Ÿ’ฅ Fatal error: ${error}`); + throw error; + } finally { + await this.redis.quit(); + } + } + + private async cleanPattern(pattern: string): Promise { + console.log(`๐Ÿ” Scanning pattern: ${pattern}`); + + let cursor = "0"; + let totalFound = 0; + + do { + const [newCursor, keys] = await this.redis.scan( + cursor, + "MATCH", + pattern, + "COUNT", + CONFIG.batchSize + ); + cursor = newCursor; + + if (keys.length > 0) { + totalFound += keys.length; + console.log(` Found ${keys.length} keys (total: ${totalFound})`); + + if (CONFIG.dryRun) { + console.log(` [DRY RUN] Would delete ${keys.length} keys`); + this.updateStats(pattern, keys.length); + } else { + await this.deleteKeys(keys); + this.updateStats(pattern, keys.length); + } + } + } while (cursor !== "0"); + + console.log(`โœ… Pattern complete: ${pattern} (found ${totalFound} keys)`); + console.log(""); + } + + private async deleteKeys(keys: string[]): Promise { + try { + const pipeline = this.redis.pipeline(); + for (const key of keys) { + pipeline.del(key); + } + + const results = await pipeline.exec(); + const deletedCount = results?.filter(([err]) => err === null).length || 0; + const failedCount = keys.length - deletedCount; + + console.log(` โœ… Deleted ${deletedCount} keys`); + if (failedCount > 0) { + console.log(` โŒ Failed to delete ${failedCount} keys`); + this.stats.errors += failedCount; + } + + this.stats.totalDeleted += deletedCount; + } catch (error) { + console.error(` ๐Ÿ’ฅ Error deleting batch: ${error}`); + this.stats.errors += keys.length; + } + } + + private updateStats(pattern: string, count: number): void { + if (pattern.includes("userop_confirm")) { + this.stats.useropErrors += count; + } else if (pattern.includes("eip7702_send")) { + this.stats.eip7702Errors += count; + } + } + + private printFinalStats(): void { + console.log("๐Ÿ“ˆ Final Statistics:"); + console.log(` Userop Confirm Errors: ${this.stats.useropErrors.toLocaleString()}`); + console.log(` EIP-7702 Send Errors: ${this.stats.eip7702Errors.toLocaleString()}`); + console.log(` Total ${CONFIG.dryRun ? 'Would Delete' : 'Deleted'}: ${this.stats.totalDeleted.toLocaleString()}`); + if (this.stats.errors > 0) { + console.log(` Errors: ${this.stats.errors}`); + } + console.log(""); + + if (CONFIG.dryRun) { + console.log("๐Ÿ’ก This was a DRY RUN - no data was actually deleted"); + console.log("๐Ÿ’ก Set CONFIG.dryRun = false to actually delete the keys"); + } else { + console.log("โœ… CLEANUP COMPLETED - Data has been permanently deleted"); + } + } +} + +// Main execution +async function main() { + const cleaner = new SimpleRedisCleanup(); + await cleaner.run(); +} + +if (import.meta.main) { + main().catch(console.error); +} diff --git a/server/configuration/server_production.yaml b/server/configuration/server_production.yaml index 3c7bdce..e8062e2 100644 --- a/server/configuration/server_production.yaml +++ b/server/configuration/server_production.yaml @@ -19,7 +19,7 @@ redis: queue: execution_namespace: "engine-cloud" - webhook_workers: 100 + webhook_workers: 300 external_bundler_send_workers: 100 userop_confirm_workers: 100 local_concurrency: 100 diff --git a/server/src/http/extractors.rs b/server/src/http/extractors.rs index 4a1c7d1..2492505 100644 --- a/server/src/http/extractors.rs +++ b/server/src/http/extractors.rs @@ -7,7 +7,7 @@ use axum::{ }; use engine_core::{ chain::RpcCredentials, - credentials::{AwsKmsCredential, SigningCredential}, + credentials::{AwsKmsCredential, SigningCredential, KmsClientCache}, error::EngineError, }; use thirdweb_core::auth::ThirdwebAuth; @@ -101,15 +101,15 @@ where #[derive(OperationIo)] pub struct SigningCredentialsExtractor(pub SigningCredential); -impl FromRequestParts for SigningCredentialsExtractor -where - S: Send + Sync, -{ +impl FromRequestParts for SigningCredentialsExtractor { type Rejection = ApiEngineError; - async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { - // Try AWS KMS credentials first - if let Some(aws_kms) = Self::try_extract_aws_kms(parts)? { + async fn from_request_parts( + parts: &mut Parts, + state: &EngineServerState, + ) -> Result { + // Try AWS KMS credentials first (with cache) + if let Some(aws_kms) = Self::try_extract_aws_kms_with_cache(parts, state.kms_client_cache.clone())? { return Ok(SigningCredentialsExtractor(SigningCredential::AwsKms( aws_kms, ))); @@ -145,8 +145,11 @@ impl SigningCredentialsExtractor { parts.headers.get(header_name).and_then(|v| v.to_str().ok()) } - /// Try to extract AWS KMS credentials from headers - fn try_extract_aws_kms(parts: &Parts) -> Result, ApiEngineError> { + /// Try to extract AWS KMS credentials from headers with cache + fn try_extract_aws_kms_with_cache( + parts: &Parts, + kms_cache: KmsClientCache, + ) -> Result, ApiEngineError> { let arn = Self::get_header_value(parts, HEADER_AWS_KMS_ARN); let access_key_id = Self::get_header_value(parts, HEADER_AWS_ACCESS_KEY_ID); let secret_access_key = Self::get_header_value(parts, HEADER_AWS_SECRET_ACCESS_KEY); @@ -154,12 +157,13 @@ impl SigningCredentialsExtractor { match (arn, access_key_id, secret_access_key) { (Some(arn), Some(access_key_id), Some(secret_access_key)) => { let (key_id, region) = Self::parse_kms_arn(arn)?; - Ok(Some(AwsKmsCredential { - access_key_id: access_key_id.to_string(), - secret_access_key: secret_access_key.to_string(), + Ok(Some(AwsKmsCredential::new( + access_key_id.to_string(), + secret_access_key.to_string(), key_id, region, - })) + kms_cache, + ))) } _ => Ok(None), } diff --git a/server/src/http/server.rs b/server/src/http/server.rs index d77af32..4598ec4 100644 --- a/server/src/http/server.rs +++ b/server/src/http/server.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use axum::{Json, Router, routing::get}; -use engine_core::{signer::EoaSigner, userop::UserOpSigner}; +use engine_core::{signer::EoaSigner, userop::UserOpSigner, credentials::KmsClientCache}; use serde_json::json; use thirdweb_core::abi::ThirdwebAbiService; use tokio::{sync::watch, task::JoinHandle}; @@ -32,6 +32,7 @@ pub struct EngineServerState { pub diagnostic_access_password: Option, pub metrics_registry: Arc, + pub kms_client_cache: KmsClientCache, } pub struct EngineServer { diff --git a/server/src/main.rs b/server/src/main.rs index 5872797..4b7f9be 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use engine_core::{signer::EoaSigner, userop::UserOpSigner}; +use engine_core::{signer::EoaSigner, userop::UserOpSigner, credentials::KmsClientCache}; use engine_executors::{eoa::authorization_cache::EoaAuthorizationCache, metrics::{ExecutorMetrics, initialize_metrics}}; use thirdweb_core::{abi::ThirdwebAbiServiceBuilder, auth::ThirdwebAuth, iaw::IAWClient}; use thirdweb_engine::{ @@ -19,7 +19,8 @@ async fn main() -> anyhow::Result<()> { let subscriber = tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| { // Default to info level if RUST_LOG environment variable is not set - "thirdweb_engine=debug,tower_http=debug,axum=debug,twmq=debug,engine_executors=debug,thirdweb_core=debug" + // Note: engine_executors::webhook=warn overrides the general engine_executors=debug + "thirdweb_engine=debug,tower_http=debug,axum=debug,twmq=info,engine_executors=debug,engine_executors::webhook=warn,thirdweb_core=debug" .into() })); @@ -47,6 +48,14 @@ async fn main() -> anyhow::Result<()> { let iaw_client = IAWClient::new(&config.thirdweb.urls.iaw_service)?; tracing::info!("IAW client initialized"); + let kms_client_cache: KmsClientCache = moka::future::Cache::builder() + .max_capacity(100) // Limit number of KMS clients cached + .time_to_live(Duration::from_secs(60 * 60)) // 1 hour TTL + .time_to_idle(Duration::from_secs(60 * 30)) // 30 minutes idle timeout + .build(); + + tracing::info!("KMS client cache initialized"); + let signer = Arc::new(UserOpSigner { vault_client: vault_client.clone(), iaw_client: iaw_client.clone(), @@ -69,6 +78,7 @@ async fn main() -> anyhow::Result<()> { signer.clone(), eoa_signer.clone(), authorization_cache.clone(), + kms_client_cache.clone(), ) .await?; @@ -119,6 +129,7 @@ async fn main() -> anyhow::Result<()> { queue_manager: Arc::new(queue_manager), diagnostic_access_password: config.server.diagnostic_access_password, metrics_registry, + kms_client_cache: kms_client_cache.clone(), }) .await; diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index cb61bb6..e0435de 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration}; use alloy::transports::http::reqwest; use engine_core::error::EngineError; +use engine_core::credentials::KmsClientCache; use engine_executors::{ eip7702_executor::{confirm::Eip7702ConfirmationHandler, send::Eip7702SendHandler}, eoa::{EoaExecutorJobHandler, authorization_cache::EoaAuthorizationCache}, @@ -50,6 +51,7 @@ impl QueueManager { userop_signer: Arc, eoa_signer: Arc, authorization_cache: EoaAuthorizationCache, + kms_client_cache: KmsClientCache, ) -> Result { // Create transaction registry let transaction_registry = Arc::new(TransactionRegistry::new( @@ -247,6 +249,7 @@ impl QueueManager { max_inflight: 100, max_recycled_nonces: 50, eoa_metrics, + kms_client_cache, }; let eoa_executor_queue = Queue::builder()