diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index e780b23..0ac7797 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -589,6 +589,14 @@ impl AtomicEoaExecutorStore { pipeline.hset(&tx_data_key, "failure_reason", error.to_string()); pipeline.hset(&tx_data_key, "status", "failed"); + // Add TTL expiration + let ttl_seconds = self.completed_transaction_ttl_seconds as i64; + pipeline.expire(&tx_data_key, ttl_seconds); + pipeline.expire( + &self.transaction_attempts_list_name(&pending_transaction.transaction_id), + ttl_seconds, + ); + let event = EoaExecutorEvent { transaction_id: pending_transaction.transaction_id.clone(), address: pending_transaction.user_request.from, @@ -657,6 +665,14 @@ impl AtomicEoaExecutorStore { pipeline.hset(&tx_data_key, "completed_at", now); pipeline.hset(&tx_data_key, "failure_reason", error.to_string()); pipeline.hset(&tx_data_key, "status", "failed"); + + // Add TTL expiration + let ttl_seconds = self.completed_transaction_ttl_seconds as i64; + pipeline.expire(&tx_data_key, ttl_seconds); + pipeline.expire( + &self.transaction_attempts_list_name(&pending_transaction.transaction_id), + ttl_seconds, + ); } // Queue webhooks for all failures @@ -715,6 +731,7 @@ impl AtomicEoaExecutorStore { keys: &self.keys, webhook_queue, eoa_metrics: &self.eoa_metrics, + completed_transaction_ttl_seconds: self.store.completed_transaction_ttl_seconds, }) .await } @@ -732,6 +749,7 @@ impl AtomicEoaExecutorStore { keys: &self.keys, webhook_queue, eoa_metrics: &self.eoa_metrics, + completed_transaction_ttl_seconds: self.store.completed_transaction_ttl_seconds, }) .await } diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 83d9e54..8797a78 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -41,6 +41,7 @@ pub struct ProcessBorrowedTransactions<'a> { pub keys: &'a EoaExecutorStoreKeys, pub webhook_queue: Arc>, pub eoa_metrics: &'a EoaMetrics, + pub completed_transaction_ttl_seconds: u64, } #[derive(Debug, Default)] @@ -226,6 +227,14 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { pipeline.hset(&tx_data_key, "completed_at", now); pipeline.hset(&tx_data_key, "failure_reason", err.to_string()); + // Add TTL expiration + let ttl_seconds = self.completed_transaction_ttl_seconds as i64; + pipeline.expire(&tx_data_key, ttl_seconds); + pipeline.expire( + &self.keys.transaction_attempts_list_name(transaction_id), + ttl_seconds, + ); + // ask for this nonce to be recycled because we did not consume the nonce pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce); diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index 876253c..2cb59b4 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -100,6 +100,7 @@ pub struct TransactionData { pub struct EoaExecutorStore { pub redis: ConnectionManager, pub keys: EoaExecutorStoreKeys, + pub completed_transaction_ttl_seconds: u64, } pub struct EoaExecutorStoreKeys { @@ -298,6 +299,7 @@ impl EoaExecutorStore { namespace: Option, eoa: Address, chain_id: u64, + completed_transaction_ttl_seconds: u64, ) -> Self { Self { redis, @@ -306,6 +308,7 @@ impl EoaExecutorStore { chain_id, namespace, }, + completed_transaction_ttl_seconds, } } } diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index a2f91bf..fb16739 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -200,6 +200,7 @@ pub struct CleanSubmittedTransactions<'a> { pub keys: &'a EoaExecutorStoreKeys, pub webhook_queue: Arc>, pub eoa_metrics: &'a EoaMetrics, + pub completed_transaction_ttl_seconds: u64, } impl<'a> CleanSubmittedTransactions<'a> { @@ -360,6 +361,11 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { confirmed_tx.receipt_serialized.clone(), ); + // Add TTL expiration + let ttl_seconds = self.completed_transaction_ttl_seconds as i64; + pipeline.expire(&data_key_name, ttl_seconds); + pipeline.expire(&self.keys.transaction_attempts_list_name(id), ttl_seconds); + if let SubmittedTransactionHydrated::Real(tx) = tx { // Record metrics: transaction queued to mined for confirmed transactions let confirmed_timestamp = current_timestamp_ms(); diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index 84ad60a..33e47bf 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -126,6 +126,9 @@ where // KMS client cache for AWS KMS credentials pub kms_client_cache: KmsClientCache, + + // TTL for completed transactions + pub completed_transaction_ttl_seconds: u64, } impl DurableExecution for EoaExecutorJobHandler @@ -161,6 +164,7 @@ where self.namespace.clone(), data.eoa_address, data.chain_id, + self.completed_transaction_ttl_seconds, ) .acquire_eoa_lock_aggressively(&worker_id, self.eoa_metrics.clone()) .await diff --git a/scripts/redis-cleanup/index.tsx b/scripts/redis-cleanup/index.tsx index 945e226..0e1b6d9 100644 --- a/scripts/redis-cleanup/index.tsx +++ b/scripts/redis-cleanup/index.tsx @@ -12,9 +12,9 @@ if (!process.env.REDIS_URL) { const CONFIG = { redisUrl: process.env.REDIS_URL, namespace: "engine-cloud" as string | undefined, // Set to your namespace if needed - batchSize: 2000, - dryRun: false, // Set to false when ready to actually delete - progressInterval: 2000, // Report progress every N transactions + batchSize: 10000, + dryRun: true, // Set to false when ready to actually delete + progressInterval: 10000, // Report progress every N transactions } as const; // === TYPES === @@ -219,12 +219,12 @@ class EoaRedisCleanup { return false; } - // CRITICAL SAFETY CHECK: Only clean transactions that are reasonably old (1 minute minimum) + // CRITICAL SAFETY CHECK: Only clean transactions that are reasonably old (1 month minimum) // This prevents cleaning transactions that just completed - const oneMinuteAgo = Date.now() - 60 * 1000; - if (completedMs > oneMinuteAgo) { + const oneMonthAgo = Date.now() - 30 * 24 * 60 * 60 * 1000; + if (completedMs > oneMonthAgo) { this.log( - `🛡️ SAFETY: Skipping tx ${tx.id} - completed too recently (less than 1 minute ago)` + `🛡️ SAFETY: Skipping tx ${tx.id} - completed too recently (less than 1 month ago)` ); return false; } @@ -302,11 +302,11 @@ class EoaRedisCleanup { if (tx.status === "failed") this.stats.failed++; const keysToDelete = this.buildKeysToDelete(tx.id); - this.log( - `🔍 [DRY RUN] Would clean: ${tx.id} (${tx.status}) - ${keysToDelete.length} keys` - ); this.stats.cleaned++; } + this.log( + `🔍 [DRY RUN] Would clean: ${transactions.length} transactions (${this.stats.confirmed} confirmed, ${this.stats.failed} failed)` + ); return; } diff --git a/scripts/simple-redis-cleanup.ts b/scripts/simple-redis-cleanup.ts index 602323f..9db2de4 100644 --- a/scripts/simple-redis-cleanup.ts +++ b/scripts/simple-redis-cleanup.ts @@ -10,7 +10,7 @@ if (!process.env.REDIS_URL) { const CONFIG = { redisUrl: process.env.REDIS_URL, batchSize: 5000, - dryRun: false, // Set to false to actually delete + dryRun: true, // Set to false to actually delete } as const; class SimpleRedisCleanup { diff --git a/server/configuration/server_base.yaml b/server/configuration/server_base.yaml index 7006d1a..f677c27 100644 --- a/server/configuration/server_base.yaml +++ b/server/configuration/server_base.yaml @@ -40,3 +40,4 @@ queue: eoa_send_degradation_threshold_seconds: 30 eoa_confirmation_degradation_threshold_seconds: 60 eoa_stuck_threshold_seconds: 300 + completed_transaction_ttl_seconds: 86400 # 1 day diff --git a/server/src/config.rs b/server/src/config.rs index 50584e8..30476f8 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -50,6 +50,9 @@ pub struct QueueConfig { #[serde(default)] pub monitoring: MonitoringConfig, + + #[serde(default = "default_completed_transaction_ttl_seconds")] + pub completed_transaction_ttl_seconds: u64, } #[derive(Debug, Clone, Deserialize)] @@ -70,6 +73,10 @@ impl Default for MonitoringConfig { } } +fn default_completed_transaction_ttl_seconds() -> u64 { + 86400 // 1 day in seconds +} + #[derive(Debug, Clone, Deserialize)] pub struct RedisConfig { pub url: String, diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 5a246b7..ba41c46 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -453,6 +453,9 @@ impl ExecutionRouter { self.namespace.clone(), eoa_execution_options.from, base_execution_options.chain_id, + self.eoa_executor_queue + .handler + .completed_transaction_ttl_seconds, ); // Add transaction to the store diff --git a/server/src/http/routes/admin/eoa_diagnostics.rs b/server/src/http/routes/admin/eoa_diagnostics.rs index bdc1834..b0fe81e 100644 --- a/server/src/http/routes/admin/eoa_diagnostics.rs +++ b/server/src/http/routes/admin/eoa_diagnostics.rs @@ -123,7 +123,12 @@ pub async fn get_eoa_state( .handler .namespace .clone(); - let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + let ttl = state + .queue_manager + .eoa_executor_queue + .handler + .completed_transaction_ttl_seconds; + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl); // Get all the state information using store methods let cached_nonce = store.get_cached_transaction_count().await.ok(); @@ -207,7 +212,8 @@ pub async fn get_transaction_detail( // Get namespace from the config let namespace = eoa_queue.handler.namespace.clone(); - let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + let ttl = eoa_queue.handler.completed_transaction_ttl_seconds; + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl); // Get transaction data using store method let transaction_data = store @@ -260,8 +266,9 @@ pub async fn get_pending_transactions( // Get namespace from the config let namespace = eoa_queue.handler.namespace.clone(); + let ttl = eoa_queue.handler.completed_transaction_ttl_seconds; - let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl); let offset = pagination.offset.unwrap_or(0); let limit = pagination.limit.unwrap_or(1000).min(1000); // Cap at 100 @@ -323,8 +330,9 @@ pub async fn get_submitted_transactions( // Get namespace from the config let namespace = eoa_queue.handler.namespace.clone(); + let ttl = eoa_queue.handler.completed_transaction_ttl_seconds; - let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl); // Use store method to get submitted transactions let submitted_txs = store.get_all_submitted_transactions().await.map_err(|e| { @@ -368,8 +376,9 @@ pub async fn get_borrowed_transactions( // Get namespace from the config let namespace = eoa_queue.handler.namespace.clone(); + let ttl = eoa_queue.handler.completed_transaction_ttl_seconds; - let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl); // Use store method to get borrowed transactions let borrowed_txs = store.peek_borrowed_transactions().await.map_err(|e| { @@ -410,8 +419,9 @@ pub async fn schedule_manual_reset( let redis_conn = eoa_queue.handler.redis.clone(); let namespace = eoa_queue.handler.namespace.clone(); + let ttl = eoa_queue.handler.completed_transaction_ttl_seconds; - let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl); store.schedule_manual_reset().await.map_err(|e| { ApiEngineError(engine_core::error::EngineError::InternalError { diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index 23c40f7..3e61575 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -261,6 +261,7 @@ impl QueueManager { max_recycled_nonces: 50, eoa_metrics, kms_client_cache, + completed_transaction_ttl_seconds: queue_config.completed_transaction_ttl_seconds, }; let eoa_executor_queue = Queue::builder()