Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,14 @@ impl AtomicEoaExecutorStore {
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
pipeline.hset(&tx_data_key, "status", "failed");

// Add TTL expiration
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
pipeline.expire(&tx_data_key, ttl_seconds);
pipeline.expire(
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
ttl_seconds,
);

let event = EoaExecutorEvent {
transaction_id: pending_transaction.transaction_id.clone(),
address: pending_transaction.user_request.from,
Expand Down Expand Up @@ -657,6 +665,14 @@ impl AtomicEoaExecutorStore {
pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
pipeline.hset(&tx_data_key, "status", "failed");

// Add TTL expiration
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
pipeline.expire(&tx_data_key, ttl_seconds);
pipeline.expire(
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
ttl_seconds,
);
}

// Queue webhooks for all failures
Expand Down Expand Up @@ -715,6 +731,7 @@ impl AtomicEoaExecutorStore {
keys: &self.keys,
webhook_queue,
eoa_metrics: &self.eoa_metrics,
completed_transaction_ttl_seconds: self.store.completed_transaction_ttl_seconds,
})
.await
}
Expand All @@ -732,6 +749,7 @@ impl AtomicEoaExecutorStore {
keys: &self.keys,
webhook_queue,
eoa_metrics: &self.eoa_metrics,
completed_transaction_ttl_seconds: self.store.completed_transaction_ttl_seconds,
})
.await
}
Expand Down
9 changes: 9 additions & 0 deletions executors/src/eoa/store/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct ProcessBorrowedTransactions<'a> {
pub keys: &'a EoaExecutorStoreKeys,
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
pub eoa_metrics: &'a EoaMetrics,
pub completed_transaction_ttl_seconds: u64,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -226,6 +227,14 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "failure_reason", err.to_string());

// Add TTL expiration
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
pipeline.expire(&tx_data_key, ttl_seconds);
pipeline.expire(
&self.keys.transaction_attempts_list_name(transaction_id),
ttl_seconds,
);

// ask for this nonce to be recycled because we did not consume the nonce
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);

Expand Down
3 changes: 3 additions & 0 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub struct TransactionData {
pub struct EoaExecutorStore {
pub redis: ConnectionManager,
pub keys: EoaExecutorStoreKeys,
pub completed_transaction_ttl_seconds: u64,
}

pub struct EoaExecutorStoreKeys {
Expand Down Expand Up @@ -298,6 +299,7 @@ impl EoaExecutorStore {
namespace: Option<String>,
eoa: Address,
chain_id: u64,
completed_transaction_ttl_seconds: u64,
) -> Self {
Self {
redis,
Expand All @@ -306,6 +308,7 @@ impl EoaExecutorStore {
chain_id,
namespace,
},
completed_transaction_ttl_seconds,
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions executors/src/eoa/store/submitted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ pub struct CleanSubmittedTransactions<'a> {
pub keys: &'a EoaExecutorStoreKeys,
pub webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
pub eoa_metrics: &'a EoaMetrics,
pub completed_transaction_ttl_seconds: u64,
}

impl<'a> CleanSubmittedTransactions<'a> {
Expand Down Expand Up @@ -360,6 +361,11 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
confirmed_tx.receipt_serialized.clone(),
);

// Add TTL expiration
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
pipeline.expire(&data_key_name, ttl_seconds);
pipeline.expire(&self.keys.transaction_attempts_list_name(id), ttl_seconds);

if let SubmittedTransactionHydrated::Real(tx) = tx {
// Record metrics: transaction queued to mined for confirmed transactions
let confirmed_timestamp = current_timestamp_ms();
Expand Down
4 changes: 4 additions & 0 deletions executors/src/eoa/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ where

// KMS client cache for AWS KMS credentials
pub kms_client_cache: KmsClientCache,

// TTL for completed transactions
pub completed_transaction_ttl_seconds: u64,
}

impl<CS> DurableExecution for EoaExecutorJobHandler<CS>
Expand Down Expand Up @@ -161,6 +164,7 @@ where
self.namespace.clone(),
data.eoa_address,
data.chain_id,
self.completed_transaction_ttl_seconds,
)
.acquire_eoa_lock_aggressively(&worker_id, self.eoa_metrics.clone())
.await
Expand Down
20 changes: 10 additions & 10 deletions scripts/redis-cleanup/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ if (!process.env.REDIS_URL) {
const CONFIG = {
redisUrl: process.env.REDIS_URL,
namespace: "engine-cloud" as string | undefined, // Set to your namespace if needed
batchSize: 2000,
dryRun: false, // Set to false when ready to actually delete
progressInterval: 2000, // Report progress every N transactions
batchSize: 10000,
dryRun: true, // Set to false when ready to actually delete
progressInterval: 10000, // Report progress every N transactions
} as const;

// === TYPES ===
Expand Down Expand Up @@ -219,12 +219,12 @@ class EoaRedisCleanup {
return false;
}

// CRITICAL SAFETY CHECK: Only clean transactions that are reasonably old (1 minute minimum)
// CRITICAL SAFETY CHECK: Only clean transactions that are reasonably old (1 month minimum)
// This prevents cleaning transactions that just completed
const oneMinuteAgo = Date.now() - 60 * 1000;
if (completedMs > oneMinuteAgo) {
const oneMonthAgo = Date.now() - 30 * 24 * 60 * 60 * 1000;
if (completedMs > oneMonthAgo) {
this.log(
`🛡️ SAFETY: Skipping tx ${tx.id} - completed too recently (less than 1 minute ago)`
`🛡️ SAFETY: Skipping tx ${tx.id} - completed too recently (less than 1 month ago)`
);
return false;
}
Expand Down Expand Up @@ -302,11 +302,11 @@ class EoaRedisCleanup {
if (tx.status === "failed") this.stats.failed++;

const keysToDelete = this.buildKeysToDelete(tx.id);
this.log(
`🔍 [DRY RUN] Would clean: ${tx.id} (${tx.status}) - ${keysToDelete.length} keys`
);
this.stats.cleaned++;
}
this.log(
`🔍 [DRY RUN] Would clean: ${transactions.length} transactions (${this.stats.confirmed} confirmed, ${this.stats.failed} failed)`
);
return;
}

Expand Down
2 changes: 1 addition & 1 deletion scripts/simple-redis-cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ if (!process.env.REDIS_URL) {
const CONFIG = {
redisUrl: process.env.REDIS_URL,
batchSize: 5000,
dryRun: false, // Set to false to actually delete
dryRun: true, // Set to false to actually delete
} as const;

class SimpleRedisCleanup {
Expand Down
1 change: 1 addition & 0 deletions server/configuration/server_base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ queue:
eoa_send_degradation_threshold_seconds: 30
eoa_confirmation_degradation_threshold_seconds: 60
eoa_stuck_threshold_seconds: 300
completed_transaction_ttl_seconds: 86400 # 1 day
7 changes: 7 additions & 0 deletions server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub struct QueueConfig {

#[serde(default)]
pub monitoring: MonitoringConfig,

#[serde(default = "default_completed_transaction_ttl_seconds")]
pub completed_transaction_ttl_seconds: u64,
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -70,6 +73,10 @@ impl Default for MonitoringConfig {
}
}

fn default_completed_transaction_ttl_seconds() -> u64 {
86400 // 1 day in seconds
}

#[derive(Debug, Clone, Deserialize)]
pub struct RedisConfig {
pub url: String,
Expand Down
3 changes: 3 additions & 0 deletions server/src/execution_router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ impl ExecutionRouter {
self.namespace.clone(),
eoa_execution_options.from,
base_execution_options.chain_id,
self.eoa_executor_queue
.handler
.completed_transaction_ttl_seconds,
);

// Add transaction to the store
Expand Down
22 changes: 16 additions & 6 deletions server/src/http/routes/admin/eoa_diagnostics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ pub async fn get_eoa_state(
.handler
.namespace
.clone();
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id);
let ttl = state
.queue_manager
.eoa_executor_queue
.handler
.completed_transaction_ttl_seconds;
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl);

// Get all the state information using store methods
let cached_nonce = store.get_cached_transaction_count().await.ok();
Expand Down Expand Up @@ -207,7 +212,8 @@ pub async fn get_transaction_detail(

// Get namespace from the config
let namespace = eoa_queue.handler.namespace.clone();
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id);
let ttl = eoa_queue.handler.completed_transaction_ttl_seconds;
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl);

// Get transaction data using store method
let transaction_data = store
Expand Down Expand Up @@ -260,8 +266,9 @@ pub async fn get_pending_transactions(

// Get namespace from the config
let namespace = eoa_queue.handler.namespace.clone();
let ttl = eoa_queue.handler.completed_transaction_ttl_seconds;

let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id);
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl);
let offset = pagination.offset.unwrap_or(0);
let limit = pagination.limit.unwrap_or(1000).min(1000); // Cap at 100

Expand Down Expand Up @@ -323,8 +330,9 @@ pub async fn get_submitted_transactions(

// Get namespace from the config
let namespace = eoa_queue.handler.namespace.clone();
let ttl = eoa_queue.handler.completed_transaction_ttl_seconds;

let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id);
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl);

// Use store method to get submitted transactions
let submitted_txs = store.get_all_submitted_transactions().await.map_err(|e| {
Expand Down Expand Up @@ -368,8 +376,9 @@ pub async fn get_borrowed_transactions(

// Get namespace from the config
let namespace = eoa_queue.handler.namespace.clone();
let ttl = eoa_queue.handler.completed_transaction_ttl_seconds;

let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id);
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl);

// Use store method to get borrowed transactions
let borrowed_txs = store.peek_borrowed_transactions().await.map_err(|e| {
Expand Down Expand Up @@ -410,8 +419,9 @@ pub async fn schedule_manual_reset(
let redis_conn = eoa_queue.handler.redis.clone();

let namespace = eoa_queue.handler.namespace.clone();
let ttl = eoa_queue.handler.completed_transaction_ttl_seconds;

let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id);
let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id, ttl);

store.schedule_manual_reset().await.map_err(|e| {
ApiEngineError(engine_core::error::EngineError::InternalError {
Expand Down
1 change: 1 addition & 0 deletions server/src/queue/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ impl QueueManager {
max_recycled_nonces: 50,
eoa_metrics,
kms_client_cache,
completed_transaction_ttl_seconds: queue_config.completed_transaction_ttl_seconds,
};

let eoa_executor_queue = Queue::builder()
Expand Down