From 590e57ab17d130f03c8dfbf2e8953f9f639e6cc8 Mon Sep 17 00:00:00 2001 From: 0xFirekeeper <0xFirekeeper@gmail.com> Date: Fri, 20 Mar 2026 00:01:27 +0700 Subject: [PATCH 1/7] Revert "log clientid, txnid, queueid, chainid (#98)" This reverts commit b5b14a580280462d77fd525b19a1acf0722296e5. --- core/src/chain.rs | 9 ---- executors/src/eip7702_executor/confirm.rs | 53 +++------------------- executors/src/eip7702_executor/send.rs | 55 +++-------------------- executors/src/eoa/store/atomic.rs | 21 +-------- executors/src/eoa/store/borrowed.rs | 44 ++---------------- executors/src/eoa/store/submitted.rs | 18 -------- executors/src/eoa/worker/confirm.rs | 27 +---------- executors/src/eoa/worker/send.rs | 38 ++-------------- server/src/main.rs | 2 +- 9 files changed, 22 insertions(+), 245 deletions(-) diff --git a/core/src/chain.rs b/core/src/chain.rs index 23a2f8f..ee00ec2 100644 --- a/core/src/chain.rs +++ b/core/src/chain.rs @@ -25,15 +25,6 @@ impl RpcCredentials { Ok(header_map) } - - pub fn client_id_for_logs(&self) -> Option<&str> { - match self { - RpcCredentials::Thirdweb(ThirdwebAuth::ClientIdServiceKey(creds)) => { - Some(&creds.client_id) - } - RpcCredentials::Thirdweb(ThirdwebAuth::SecretKey(_)) => None, - } - } } pub trait Chain: Send + Sync { diff --git a/executors/src/eip7702_executor/confirm.rs b/executors/src/eip7702_executor/confirm.rs index 3d26389..65dc21a 100644 --- a/executors/src/eip7702_executor/confirm.rs +++ b/executors/src/eip7702_executor/confirm.rs @@ -29,8 +29,6 @@ use crate::{ }, }; -const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm"; - // --- Job Payload --- #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] @@ -170,7 +168,7 @@ where type ErrorData = Eip7702ConfirmationError; type JobData = Eip7702ConfirmationJobData; - #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_CONFIRM_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))] + #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))] async fn process( &self, job: &BorrowedJob, @@ -204,14 +202,7 @@ where .await .map_err(|e| { tracing::error!( - transaction_id = %job_data.transaction_id, - chain_id = job_data.chain_id, - client_id = job_data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, - bundler_transaction_id = %job_data.bundler_transaction_id, + bundler_transaction_id = job_data.bundler_transaction_id, sender_details = ?job_data.sender_details, error = ?e, "Failed to get transaction hash from bundler" @@ -330,15 +321,7 @@ where // Send webhook if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue success webhook" ); @@ -363,15 +346,7 @@ where if should_queue_webhook { if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue nack webhook" ); @@ -395,30 +370,14 @@ where .add_remove_command(tx.pipeline(), &job.job.data.transaction_id); tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?fail_data.error, "EIP-7702 confirmation job failed" ); if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue fail webhook" ); diff --git a/executors/src/eip7702_executor/send.rs b/executors/src/eip7702_executor/send.rs index 78b54b7..f531eab 100644 --- a/executors/src/eip7702_executor/send.rs +++ b/executors/src/eip7702_executor/send.rs @@ -36,9 +36,6 @@ use crate::{ use super::confirm::{Eip7702ConfirmationHandler, Eip7702ConfirmationJobData}; -const EIP7702_SEND_QUEUE_ID: &str = "eip7702_send"; -const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm"; - // --- Job Payload --- #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] @@ -178,7 +175,7 @@ where type ErrorData = Eip7702SendError; type JobData = Eip7702SendJobData; - #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_SEND_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))] + #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))] async fn process( &self, job: &BorrowedJob, @@ -389,15 +386,7 @@ where if let Err(e) = tx.queue_job(confirmation_job) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to enqueue confirmation job" ); @@ -406,15 +395,7 @@ where // Send webhook if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue success webhook" ); @@ -430,15 +411,7 @@ where // Don't modify transaction registry on NACK - job will be retried if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue nack webhook" ); @@ -456,30 +429,14 @@ where .add_remove_command(tx.pipeline(), &job.job.data.transaction_id); tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?fail_data.error, "EIP-7702 send job failed" ); if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue fail webhook" ); diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index d85c044..92333ef 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -31,7 +31,6 @@ use crate::{ const MAX_RETRIES: u32 = 10; const RETRY_BASE_DELAY_MS: u64 = 10; -const EOA_QUEUE_ID: &str = "eoa_executor"; pub trait SafeRedisTransaction: Send + Sync { type ValidationData; @@ -614,18 +613,7 @@ impl AtomicEoaExecutorStore { &mut tx_context, webhook_queue.clone(), ) { - tracing::error!( - transaction_id = %pending_transaction.transaction_id, - chain_id = pending_transaction.user_request.chain_id, - client_id = pending_transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for fail: {}", - e - ); + tracing::error!("Failed to queue webhook for fail: {}", e); } } @@ -707,13 +695,6 @@ impl AtomicEoaExecutorStore { ) { tracing::error!( transaction_id = %pending_transaction.transaction_id, - chain_id = pending_transaction.user_request.chain_id, - client_id = pending_transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, error = ?e, "Failed to queue webhook for batch fail" ); diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 87b5823..3c1260a 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -16,8 +16,6 @@ use crate::eoa::{ use crate::metrics::{EoaMetrics, calculate_duration_seconds, current_timestamp_ms}; use crate::webhook::{WebhookJobHandler, queue_webhook_envelopes}; -const EOA_QUEUE_ID: &str = "eoa_executor"; - #[derive(Debug, Clone)] pub enum SubmissionResultType { Success, @@ -175,19 +173,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { &mut tx_context, self.webhook_queue.clone(), ) { - tracing::error!( - transaction_id = transaction_id, - chain_id = result.transaction.user_request.chain_id, - client_id = result - .transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for success: {}", - e - ); + tracing::error!("Failed to queue webhook for success: {}", e); } else { report.webhook_events_queued += 1; } @@ -227,19 +213,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { &mut tx_context, self.webhook_queue.clone(), ) { - tracing::error!( - transaction_id = transaction_id, - chain_id = result.transaction.user_request.chain_id, - client_id = result - .transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for nack: {}", - e - ); + tracing::error!("Failed to queue webhook for nack: {}", e); } else { report.webhook_events_queued += 1; } @@ -281,19 +255,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { &mut tx_context, self.webhook_queue.clone(), ) { - tracing::error!( - transaction_id = transaction_id, - chain_id = result.transaction.user_request.chain_id, - client_id = result - .transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for fail: {}", - e - ); + tracing::error!("Failed to queue webhook for fail: {}", e); } else { report.webhook_events_queued += 1; } diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index f0776c4..5da4c2b 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -22,8 +22,6 @@ use crate::{ webhook::{WebhookJobHandler, queue_webhook_envelopes}, }; -const EOA_QUEUE_ID: &str = "eoa_executor"; - #[derive(Debug, Clone)] pub struct SubmittedTransaction { pub data: SubmittedTransactionDehydrated, @@ -399,14 +397,6 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { self.webhook_queue.clone(), ) { tracing::error!( - transaction_id = %tx.transaction_id, - chain_id = tx.user_request.chain_id, - client_id = tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, "Failed to queue webhook for confirmed transaction: {}", e ); @@ -446,14 +436,6 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { self.webhook_queue.clone(), ) { tracing::error!( - transaction_id = %tx.transaction_id, - chain_id = tx.user_request.chain_id, - client_id = tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, "Failed to queue webhook for replaced transaction: {}", e ); diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index 46d9f10..beb3feb 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -18,7 +18,6 @@ use crate::{ }; const NONCE_STALL_LIMIT_MS: u64 = 60_000; // 1 minute in milliseconds - after this time, attempt gas bump -const EOA_QUEUE_ID: &str = "eoa_executor"; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -407,35 +406,13 @@ impl EoaExecutorWorker { if should_update_balance_threshold(inner_error) && let Err(e) = self.update_balance_threshold().await { - tracing::error!( - transaction_id = ?newest_transaction_data.transaction_id, - chain_id = newest_transaction_data.user_request.chain_id, - client_id = newest_transaction_data - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to update balance threshold: {}", - e - ); + tracing::error!("Failed to update balance threshold: {}", e); } } else if let EoaExecutorWorkerError::RpcError { inner_error, .. } = &e && should_update_balance_threshold(inner_error) && let Err(e) = self.update_balance_threshold().await { - tracing::error!( - transaction_id = ?newest_transaction_data.transaction_id, - chain_id = newest_transaction_data.user_request.chain_id, - client_id = newest_transaction_data - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to update balance threshold: {}", - e - ); + tracing::error!("Failed to update balance threshold: {}", e); } // Check if nonce has moved ahead since we started the gas bump // This handles the race condition where the original transaction diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index 8e48967..9742ee6 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -17,7 +17,6 @@ use crate::{ }; const HEALTH_CHECK_INTERVAL_MS: u64 = 60 * 5 * 1000; // 5 minutes in milliseconds -const EOA_QUEUE_ID: &str = "eoa_executor"; impl EoaExecutorWorker { // ========== SEND FLOW ========== @@ -349,14 +348,7 @@ impl EoaExecutorWorker { if !is_retryable_preparation_error(&e) { tracing::error!( error = ?e, - transaction_id = %pending.transaction_id, - chain_id = pending.user_request.chain_id, - client_id = pending - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, + transaction_id = pending.transaction_id, "Transaction permanently failed due to non-retryable preparation error", ); non_retryable_failures.push((pending, e.clone())); @@ -556,35 +548,11 @@ impl EoaExecutorWorker { match &result.result { SubmissionResultType::Success => result, SubmissionResultType::Nack(e) => { - tracing::error!( - error = ?e, - transaction_id = %borrowed_tx.transaction_id, - chain_id = borrowed_tx.user_request.chain_id, - client_id = borrowed_tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - nonce = borrowed_tx.data.signed_transaction.nonce(), - "Transaction nack error during send" - ); + tracing::error!(error = ?e, transaction_id = borrowed_tx.transaction_id, nonce = borrowed_tx.data.signed_transaction.nonce(), "Transaction nack error during send"); result } SubmissionResultType::Fail(e) => { - tracing::error!( - error = ?e, - transaction_id = %borrowed_tx.transaction_id, - chain_id = borrowed_tx.user_request.chain_id, - client_id = borrowed_tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - nonce = borrowed_tx.data.signed_transaction.nonce(), - "Transaction failed during send" - ); + tracing::error!(error = ?e, transaction_id = borrowed_tx.transaction_id, nonce = borrowed_tx.data.signed_transaction.nonce(), "Transaction failed during send"); result } } diff --git a/server/src/main.rs b/server/src/main.rs index c5a7858..758b26a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -39,7 +39,7 @@ async fn main() -> anyhow::Result<()> { match config.server.log_format { config::LogFormat::Json => subscriber - .with(tracing_subscriber::fmt::layer().json().flatten_event(false)) + .with(tracing_subscriber::fmt::layer().json()) .init(), config::LogFormat::Pretty => subscriber.with(tracing_subscriber::fmt::layer()).init(), } From e27651fd9197eb6a7c752479d5d12310ceaac658 Mon Sep 17 00:00:00 2001 From: 0xFirekeeper <0xFirekeeper@gmail.com> Date: Fri, 20 Mar 2026 00:01:35 +0700 Subject: [PATCH 2/7] Revert "valkey integration (#97)" This reverts commit 6bf1f2f0a408896ad687236f5c77d4c57490c41b. --- Cargo.toml | 14 +- executors/src/eoa/store/atomic.rs | 7 +- executors/src/eoa/store/borrowed.rs | 5 +- executors/src/eoa/store/mod.rs | 117 +++++---------- executors/src/eoa/store/pending.rs | 7 +- executors/src/eoa/store/submitted.rs | 7 +- executors/src/eoa/worker/mod.rs | 4 +- executors/src/external_bundler/deployment.rs | 39 ++--- executors/src/solana_executor/storage.rs | 19 +-- executors/src/transaction_registry.rs | 11 +- integration-tests/tests/setup.rs | 13 +- server/Cargo.toml | 1 - server/src/execution_router/mod.rs | 5 +- server/src/main.rs | 16 +- server/src/queue/manager.rs | 8 +- twmq/src/lib.rs | 148 ++++++++---------- twmq/src/multilane.rs | 149 ++++++++----------- twmq/src/queue.rs | 33 ++-- twmq/tests/basic.rs | 6 +- twmq/tests/basic_hook.rs | 6 +- twmq/tests/delay.rs | 14 +- twmq/tests/idempotency_modes.rs | 6 +- twmq/tests/lease_expiry.rs | 6 +- twmq/tests/multilane_batch_pop.rs | 5 +- twmq/tests/nack.rs | 6 +- twmq/tests/prune_race_condition.rs | 6 +- twmq/tests/prune_race_random_ids.rs | 8 +- 27 files changed, 259 insertions(+), 407 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2e0aec2..b8fc7cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,17 +98,7 @@ config = "0.15.11" aws-arn = "0.3.1" # Redis -redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] } +redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] } # Dev dependencies -criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } - -# Rustls -# -# NOTE: rustls 0.23 requires selecting exactly one process-wide crypto provider -# (features: `ring` or `aws_lc_rs` / `aws-lc-rs`). Some dependency graphs (e.g. via -# redis-rs' rustls integration) can end up with *no* provider enabled, which causes a -# runtime panic when building TLS client/server configs. -# -# We explicitly enable the `ring` provider here to make TLS work reliably. -rustls = { version = "0.23.32", default-features = false, features = ["std", "ring"] } \ No newline at end of file +criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } \ No newline at end of file diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index 92333ef..0ac7797 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -4,8 +4,7 @@ use alloy::{ consensus::{Signed, TypedTransaction}, primitives::Address, }; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::{ eoa::{ @@ -44,7 +43,7 @@ pub trait SafeRedisTransaction: Send + Sync { ) -> Self::OperationResult; fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, store: &EoaExecutorStore, ) -> impl Future> + Send; fn watch_keys(&self) -> Vec; @@ -816,7 +815,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> { async fn validation( &self, - _conn: &mut ClusterConnection, + _conn: &mut ConnectionManager, store: &EoaExecutorStore, ) -> Result { let now = chrono::Utc::now().timestamp_millis().max(0) as u64; diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 3c1260a..8797a78 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use twmq::Queue; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::eoa::EoaExecutorStore; use crate::eoa::{ @@ -72,7 +71,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { // Get all borrowed transaction IDs diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index 5ec9749..2cb59b4 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -9,8 +9,7 @@ use engine_core::transaction::TransactionTypeData; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ops::Deref; -use twmq::redis::AsyncCommands; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, aio::ConnectionManager}; mod atomic; mod borrowed; @@ -99,7 +98,7 @@ pub struct TransactionData { /// Transaction store focused on transaction_id operations and nonce indexing pub struct EoaExecutorStore { - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub keys: EoaExecutorStoreKeys, pub completed_transaction_ttl_seconds: u64, } @@ -122,14 +121,8 @@ impl EoaExecutorStoreKeys { /// Lock key name for EOA processing pub fn eoa_lock_key_name(&self) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:lock:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:lock:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), + Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa), + None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa), } } @@ -144,14 +137,8 @@ impl EoaExecutorStoreKeys { /// - "failure_reason": String failure reason (optional) pub fn transaction_data_key_name(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:tx_data:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), - None => format!( - "{}:eoa_executor:tx_data:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), + Some(ns) => format!("{ns}:eoa_executor:tx_data:{transaction_id}"), + None => format!("eoa_executor:tx_data:{transaction_id}"), } } @@ -161,14 +148,8 @@ impl EoaExecutorStoreKeys { /// of a TransactionAttempt. This allows efficient append operations. pub fn transaction_attempts_list_name(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:tx_attempts:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), - None => format!( - "{}:eoa_executor:tx_attempts:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), + Some(ns) => format!("{ns}:eoa_executor:tx_attempts:{transaction_id}"), + None => format!("eoa_executor:tx_attempts:{transaction_id}"), } } @@ -178,13 +159,10 @@ impl EoaExecutorStoreKeys { pub fn pending_transactions_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:pending_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:pending_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:pending_txs:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:pending_txs:{}:{}", self.chain_id, self.eoa), } } @@ -194,27 +172,18 @@ impl EoaExecutorStoreKeys { pub fn submitted_transactions_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:submitted_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:submitted_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:submitted_txs:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:submitted_txs:{}:{}", self.chain_id, self.eoa), } } /// Name of the key that maps transaction hash to transaction id pub fn transaction_hash_to_id_key_name(&self, hash: &str) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:tx_hash_to_id:{hash}", - twmq::ENGINE_HASH_TAG - ), - None => format!( - "{}:eoa_executor:tx_hash_to_id:{hash}", - twmq::ENGINE_HASH_TAG - ), + Some(ns) => format!("{ns}:eoa_executor:tx_hash_to_id:{hash}"), + None => format!("eoa_executor:tx_hash_to_id:{hash}"), } } @@ -228,13 +197,10 @@ impl EoaExecutorStoreKeys { pub fn borrowed_transactions_hashmap_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:borrowed_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:borrowed_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:borrowed_txs:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:borrowed_txs:{}:{}", self.chain_id, self.eoa), } } @@ -248,12 +214,12 @@ impl EoaExecutorStoreKeys { pub fn recycled_nonces_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:recycled_nonces:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:recycled_nonces:{}:{}", + self.chain_id, self.eoa ), None => format!( - "{}:eoa_executor:recycled_nonces:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "eoa_executor:recycled_nonces:{}:{}", + self.chain_id, self.eoa ), } } @@ -270,12 +236,12 @@ impl EoaExecutorStoreKeys { pub fn optimistic_transaction_count_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:optimistic_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:optimistic_nonce:{}:{}", + self.chain_id, self.eoa ), None => format!( - "{}:eoa_executor:optimistic_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "eoa_executor:optimistic_nonce:{}:{}", + self.chain_id, self.eoa ), } } @@ -290,13 +256,10 @@ impl EoaExecutorStoreKeys { pub fn last_transaction_count_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:last_tx_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:last_tx_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:last_tx_nonce:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:last_tx_nonce:{}:{}", self.chain_id, self.eoa), } } @@ -308,14 +271,8 @@ impl EoaExecutorStoreKeys { /// - timestamp of the last 5 nonce resets pub fn eoa_health_key_name(&self) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:health:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:health:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), + Some(ns) => format!("{ns}:eoa_executor:health:{}:{}", self.chain_id, self.eoa), + None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa), } } @@ -325,12 +282,12 @@ impl EoaExecutorStoreKeys { pub fn manual_reset_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:pending_manual_reset:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:pending_manual_reset:{}:{}", + self.chain_id, self.eoa ), None => format!( - "{}:eoa_executor:pending_manual_reset:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "eoa_executor:pending_manual_reset:{}:{}", + self.chain_id, self.eoa ), } } @@ -338,7 +295,7 @@ impl EoaExecutorStoreKeys { impl EoaExecutorStore { pub fn new( - redis: ClusterConnection, + redis: ConnectionManager, namespace: Option, eoa: Address, chain_id: u64, diff --git a/executors/src/eoa/store/pending.rs b/executors/src/eoa/store/pending.rs index d3be479..637e010 100644 --- a/executors/src/eoa/store/pending.rs +++ b/executors/src/eoa/store/pending.rs @@ -1,8 +1,7 @@ use std::collections::HashSet; use alloy::{consensus::Transaction, primitives::Address}; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::eoa::{ EoaExecutorStore, @@ -47,7 +46,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { if self.transactions.is_empty() { @@ -182,7 +181,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithRecycledNonces<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { if self.transactions.is_empty() { diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index 5da4c2b..fb16739 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -5,8 +5,7 @@ use std::{ }; use serde::{Deserialize, Serialize}; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::{ TransactionCounts, @@ -280,7 +279,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, store: &EoaExecutorStore, ) -> Result { // Fetch transactions up to the latest confirmed nonce for replacements @@ -593,7 +592,7 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { // get the highest submitted nonce diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index c235453..33e47bf 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -11,7 +11,7 @@ use engine_eip7702_core::delegated_account::DelegatedAccount; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use twmq::Queue; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::aio::ConnectionManager; use twmq::{ DurableExecution, FailHookData, NackHookData, SuccessHookData, hooks::TransactionContext, @@ -114,7 +114,7 @@ where pub webhook_queue: Arc>, pub authorization_cache: EoaAuthorizationCache, - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub namespace: Option, pub eoa_signer: Arc, diff --git a/executors/src/external_bundler/deployment.rs b/executors/src/external_bundler/deployment.rs index ac19502..ac8a969 100644 --- a/executors/src/external_bundler/deployment.rs +++ b/executors/src/external_bundler/deployment.rs @@ -7,10 +7,8 @@ use serde::{Deserialize, Serialize}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use twmq::{ error::TwmqError, - redis::{AsyncCommands, Pipeline}, + redis::{AsyncCommands, Pipeline, aio::ConnectionManager}, }; -use twmq::redis::cluster_async::ClusterConnection; -use twmq::redis::cluster::ClusterClient; use uuid::Uuid; const CACHE_PREFIX: &str = "deployment_cache"; @@ -18,12 +16,12 @@ const LOCK_PREFIX: &str = "deployment_lock"; #[derive(Clone)] pub struct RedisDeploymentCache { - connection: ClusterConnection, + connection_manager: twmq::redis::aio::ConnectionManager, } #[derive(Clone)] pub struct RedisDeploymentLock { - connection: ClusterConnection, + connection_manager: twmq::redis::aio::ConnectionManager, } #[derive(Serialize, Deserialize)] @@ -33,21 +31,18 @@ struct LockData { } impl RedisDeploymentCache { - pub async fn new(client: ClusterClient) -> Result { + pub async fn new(client: twmq::redis::Client) -> Result { Ok(Self { - connection: client.get_async_connection().await?, + connection_manager: ConnectionManager::new(client).await?, }) } - pub fn conn(&self) -> &ClusterConnection { - &self.connection + pub fn conn(&self) -> &ConnectionManager { + &self.connection_manager } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!( - "{}:{CACHE_PREFIX}:{chain_id}:{account_address}", - twmq::ENGINE_HASH_TAG - ) + format!("{CACHE_PREFIX}:{chain_id}:{account_address}") } } @@ -65,28 +60,22 @@ impl DeploymentCache for RedisDeploymentCache { } impl RedisDeploymentLock { - pub async fn new(client: ClusterClient) -> Result { + pub async fn new(client: twmq::redis::Client) -> Result { Ok(Self { - connection: client.get_async_connection().await?, + connection_manager: ConnectionManager::new(client).await?, }) } - pub fn conn(&self) -> &ClusterConnection { - &self.connection + pub fn conn(&self) -> &ConnectionManager { + &self.connection_manager } fn lock_key(&self, chain_id: u64, account_address: &Address) -> String { - format!( - "{}:{LOCK_PREFIX}:{chain_id}:{account_address}", - twmq::ENGINE_HASH_TAG - ) + format!("{LOCK_PREFIX}:{chain_id}:{account_address}") } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!( - "{}:{CACHE_PREFIX}:{chain_id}:{account_address}", - twmq::ENGINE_HASH_TAG - ) + format!("{CACHE_PREFIX}:{chain_id}:{account_address}") } /// Release a deployment lock using the provided pipeline diff --git a/executors/src/solana_executor/storage.rs b/executors/src/solana_executor/storage.rs index 510e22d..8a88302 100644 --- a/executors/src/solana_executor/storage.rs +++ b/executors/src/solana_executor/storage.rs @@ -3,9 +3,8 @@ use serde_with::{DisplayFromStr, serde_as}; use solana_sdk::{hash::Hash, signature::Signature}; use twmq::{ redis, - redis::AsyncCommands, + redis::{AsyncCommands, aio::ConnectionManager}, }; -use twmq::redis::cluster_async::ClusterConnection; /// Represents a single attempt to send a Solana transaction /// This is stored in Redis BEFORE sending to prevent duplicate transactions @@ -46,7 +45,7 @@ impl SolanaTransactionAttempt { /// Represents a lock held on a transaction /// When dropped, the lock is automatically released pub struct TransactionLock { - redis: ClusterConnection, + redis: ConnectionManager, lock_key: String, lock_value: String, } @@ -122,30 +121,28 @@ impl From for LockError { /// Storage for Solana transaction attempts /// Provides atomic operations to prevent duplicate transactions pub struct SolanaTransactionStorage { - redis: ClusterConnection, + redis: ConnectionManager, namespace: Option, } impl SolanaTransactionStorage { - pub fn new(redis: ClusterConnection, namespace: Option) -> Self { + pub fn new(redis: ConnectionManager, namespace: Option) -> Self { Self { redis, namespace } } /// Get the Redis key for a transaction's attempt fn attempt_key(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => { - format!("{ns}:{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG) - } - None => format!("{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG), + Some(ns) => format!("{ns}:solana_tx_attempt:{transaction_id}"), + None => format!("solana_tx_attempt:{transaction_id}"), } } /// Get the Redis key for a transaction's lock fn lock_key(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!("{ns}:{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG), - None => format!("{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG), + Some(ns) => format!("{ns}:solana_tx_lock:{transaction_id}"), + None => format!("solana_tx_lock:{transaction_id}"), } } diff --git a/executors/src/transaction_registry.rs b/executors/src/transaction_registry.rs index 9754495..a63584c 100644 --- a/executors/src/transaction_registry.rs +++ b/executors/src/transaction_registry.rs @@ -1,7 +1,6 @@ use engine_core::error::EngineError; use thiserror::Error; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; #[derive(Debug, Error)] pub enum TransactionRegistryError { @@ -21,19 +20,19 @@ impl From for EngineError { } pub struct TransactionRegistry { - redis: ClusterConnection, + redis: ConnectionManager, namespace: Option, } impl TransactionRegistry { - pub fn new(redis: ClusterConnection, namespace: Option) -> Self { + pub fn new(redis: ConnectionManager, namespace: Option) -> Self { Self { redis, namespace } } fn registry_key(&self) -> String { match &self.namespace { - Some(ns) => format!("{ns}:{}:tx_registry", twmq::ENGINE_HASH_TAG), - None => format!("{}:tx_registry", twmq::ENGINE_HASH_TAG), + Some(ns) => format!("{ns}:tx_registry"), + None => "tx_registry".to_string(), } } diff --git a/integration-tests/tests/setup.rs b/integration-tests/tests/setup.rs index e8bdffa..d105cff 100644 --- a/integration-tests/tests/setup.rs +++ b/integration-tests/tests/setup.rs @@ -168,15 +168,8 @@ impl TestEnvironment { let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); // Setup Redis - let initial_nodes: Vec<&str> = config - .redis - .url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes) - .context("Failed to connect to Valkey Cluster")?; + let redis_client = twmq::redis::Client::open(config.redis.url.as_str()) + .context("Failed to connect to Redis")?; let authorization_cache = EoaAuthorizationCache::new( moka::future::Cache::builder() @@ -260,7 +253,7 @@ impl TestEnvironment { let execution_router = thirdweb_engine::ExecutionRouter { namespace: queue_config.execution_namespace.clone(), - redis: redis_client.get_async_connection().await?, + redis: redis_client.get_connection_manager().await?, authorization_cache, webhook_queue: queue_manager.webhook_queue.clone(), external_bundler_send_queue: queue_manager.external_bundler_send_queue.clone(), diff --git a/server/Cargo.toml b/server/Cargo.toml index 0b411b9..d568ffb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -27,7 +27,6 @@ futures = { workspace = true } serde-bool = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } -rustls = { workspace = true } solana-sdk = { workspace = true } solana-client = { workspace = true } aide = { workspace = true, features = [ diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 66d0cc0..ba41c46 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -31,8 +31,7 @@ use engine_executors::{ transaction_registry::TransactionRegistry, webhook::WebhookJobHandler, }; -use twmq::{Queue, error::TwmqError}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::{Queue, error::TwmqError, redis::aio::ConnectionManager}; use vault_sdk::VaultClient; use vault_types::{ RegexRule, Rule, @@ -43,7 +42,7 @@ use vault_types::{ use crate::chains::ThirdwebChainService; pub struct ExecutionRouter { - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub namespace: Option, pub webhook_queue: Arc>, pub external_bundler_send_queue: Arc>>, diff --git a/server/src/main.rs b/server/src/main.rs index 758b26a..4376aec 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,11 +22,6 @@ use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::Subscrib #[tokio::main] async fn main() -> anyhow::Result<()> { - // rustls 0.23 requires selecting a process-wide crypto provider (ring or aws-lc-rs). - // Some dependency graphs do not enable either by default, which causes a runtime panic - // when a TLS client config is constructed (e.g. when connecting to `rediss://`). - let _ = rustls::crypto::ring::default_provider().install_default(); - let config = config::get_config(); let subscriber = tracing_subscriber::registry() @@ -75,14 +70,7 @@ async fn main() -> anyhow::Result<()> { }); let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone())); let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); - let initial_nodes: Vec<&str> = config - .redis - .url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)?; + let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?; let authorization_cache = EoaAuthorizationCache::new( moka::future::Cache::builder() @@ -130,7 +118,7 @@ async fn main() -> anyhow::Result<()> { let execution_router = ExecutionRouter { namespace: config.queue.execution_namespace.clone(), - redis: redis_client.get_async_connection().await?, + redis: redis_client.get_connection_manager().await?, authorization_cache, webhook_queue: queue_manager.webhook_queue.clone(), external_bundler_send_queue: queue_manager.external_bundler_send_queue.clone(), diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index deb042f..3e61575 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -50,7 +50,7 @@ const EOA_EXECUTOR_QUEUE_NAME: &str = "eoa_executor"; impl QueueManager { pub async fn new( - redis_client: twmq::redis::cluster::ClusterClient, + redis_client: twmq::redis::Client, queue_config: &QueueConfig, solana_config: &crate::config::SolanaConfig, chain_service: Arc, @@ -61,7 +61,7 @@ impl QueueManager { ) -> Result { // Create transaction registry let transaction_registry = Arc::new(TransactionRegistry::new( - redis_client.get_async_connection().await?, + redis_client.get_connection_manager().await?, queue_config.execution_namespace.clone(), )); @@ -255,7 +255,7 @@ impl QueueManager { eoa_signer: eoa_signer.clone(), webhook_queue: webhook_queue.clone(), namespace: queue_config.execution_namespace.clone(), - redis: redis_client.get_async_connection().await?, + redis: redis_client.get_connection_manager().await?, authorization_cache, max_inflight: 50, max_recycled_nonces: 50, @@ -286,7 +286,7 @@ impl QueueManager { }; let solana_rpc_cache = Arc::new(SolanaRpcCache::new(solana_rpc_urls)); let solana_storage = SolanaTransactionStorage::new( - redis_client.get_async_connection().await?, + redis_client.get_connection_manager().await?, queue_config.execution_namespace.clone(), ); let solana_executor_handler = SolanaExecutorJobHandler { diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 24dc446..164d234 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -18,8 +18,7 @@ use job::{ pub use multilane::{MultilanePushableJob, MultilaneQueue}; use queue::QueueOptions; use redis::Pipeline; -use redis::{AsyncCommands, RedisResult}; -use redis::cluster_async::ClusterConnection; +use redis::{AsyncCommands, RedisResult, aio::ConnectionManager}; use serde::{Serialize, de::DeserializeOwned}; use shutdown::WorkerHandle; use tokio::sync::Semaphore; @@ -29,10 +28,6 @@ pub use queue::IdempotencyMode; pub use redis; use tracing::Instrument; -/// Global hash tag used to force all keys into the same cluster hash slot. -/// This is the "easiest" Valkey Cluster strategy: correctness over sharding. -pub const ENGINE_HASH_TAG: &str = "{engine}"; - // Trait for error types to implement user cancellation pub trait UserCancellable { fn user_cancelled() -> Self; @@ -126,7 +121,7 @@ pub struct Queue where H: DurableExecution, { - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub handler: Arc, pub options: QueueOptions, // concurrency: usize, @@ -141,13 +136,8 @@ impl Queue { options: Option, handler: H, ) -> Result { - let initial_nodes: Vec<&str> = redis_url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let client = redis::cluster::ClusterClient::new(initial_nodes)?; - let redis = client.get_async_connection().await?; + let client = redis::Client::open(redis_url)?; + let redis = client.get_connection_manager().await?; let queue = Self { redis, @@ -186,60 +176,51 @@ impl Queue { } pub fn pending_list_name(&self) -> String { - format!("twmq:{}:{}:pending", ENGINE_HASH_TAG, self.name()) + format!("twmq:{}:pending", self.name()) } pub fn active_hash_name(&self) -> String { - format!("twmq:{}:{}:active", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:active", self.name) } pub fn delayed_zset_name(&self) -> String { - format!("twmq:{}:{}:delayed", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:delayed", self.name) } pub fn success_list_name(&self) -> String { - format!("twmq:{}:{}:success", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:success", self.name) } pub fn failed_list_name(&self) -> String { - format!("twmq:{}:{}:failed", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:failed", self.name) } pub fn job_data_hash_name(&self) -> String { - format!("twmq:{}:{}:jobs:data", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:jobs:data", self.name) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq:{}:{}:job:{}:meta", ENGINE_HASH_TAG, self.name, job_id) + format!("twmq:{}:job:{}:meta", self.name, job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!( - "twmq:{}:{}:job:{}:errors", - ENGINE_HASH_TAG, self.name, job_id - ) + format!("twmq:{}:job:{}:errors", self.name, job_id) } pub fn job_result_hash_name(&self) -> String { - format!("twmq:{}:{}:jobs:result", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:jobs:result", self.name) } pub fn dedupe_set_name(&self) -> String { - format!("twmq:{}:{}:dedup", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:dedup", self.name) } pub fn pending_cancellation_set_name(&self) -> String { - format!( - "twmq:{}:{}:pending_cancellations", - ENGINE_HASH_TAG, self.name - ) + format!("twmq:{}:pending_cancellations", self.name) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { - format!( - "twmq:{}:{}:job:{}:lease:{}", - ENGINE_HASH_TAG, self.name, job_id, lease_token - ) + format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token) } pub async fn push( @@ -249,20 +230,20 @@ impl Queue { // Check for duplicates and handle job creation with deduplication let script = redis::Script::new( r#" - local queue_id = ARGV[1] - local job_id = ARGV[2] - local job_data = ARGV[3] - local now = ARGV[4] - local delay = ARGV[5] - local reentry_position = ARGV[6] -- "first" or "last" + local job_id = ARGV[1] + local job_data = ARGV[2] + local now = ARGV[3] + local delay = ARGV[4] + local reentry_position = ARGV[5] -- "first" or "last" - local delayed_zset_name = KEYS[1] - local pending_list_name = KEYS[2] + local queue_id = KEYS[1] + local delayed_zset_name = KEYS[2] + local pending_list_name = KEYS[3] - local job_data_hash_name = KEYS[3] - local job_meta_hash_name = KEYS[4] + local job_data_hash_name = KEYS[4] + local job_meta_hash_name = KEYS[5] - local dedupe_set_name = KEYS[5] + local dedupe_set_name = KEYS[6] -- Check if job already exists in any queue if redis.call('SISMEMBER', dedupe_set_name, job_id) == 1 then @@ -320,12 +301,12 @@ impl Queue { let position_string = delay.position.to_string(); let _result: (i32, String) = script + .key(&self.name) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.job_data_hash_name()) .key(self.job_meta_hash_name(&job.id)) .key(self.dedupe_set_name()) - .arg(self.name()) .arg(job_options.id) .arg(job_data) .arg(now) @@ -607,15 +588,14 @@ impl Queue { local batch_size = tonumber(ARGV[3]) local lease_seconds = tonumber(ARGV[4]) - local queue_id = ARGV[5] - - local delayed_zset_name = KEYS[1] - local pending_list_name = KEYS[2] - local active_hash_name = KEYS[3] - local job_data_hash_name = KEYS[4] - local pending_cancellation_set = KEYS[5] - local failed_list_name = KEYS[6] - local success_list_name = KEYS[7] + local queue_id = KEYS[1] + local delayed_zset_name = KEYS[2] + local pending_list_name = KEYS[3] + local active_hash_name = KEYS[4] + local job_data_hash_name = KEYS[5] + local pending_cancellation_set = KEYS[6] + local failed_list_name = KEYS[7] + local success_list_name = KEYS[8] local result_jobs = {} local timed_out_jobs = {} @@ -630,14 +610,14 @@ impl Queue { for i = 1, #active_jobs, 2 do local job_id = active_jobs[i] local attempts = active_jobs[i + 1] - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' -- Get the current lease token from job metadata local current_lease_token = redis.call('HGET', job_meta_hash_name, 'lease_token') if current_lease_token then -- Build the lease key and check if it exists (Redis auto-expires) - local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token + local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token local lease_exists = redis.call('EXISTS', lease_key) -- If lease doesn't exist (expired), move job back to pending @@ -677,7 +657,7 @@ impl Queue { -- Job not successful, cancel it now redis.call('LPUSH', failed_list_name, job_id) -- Add cancellation timestamp - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash_name, 'finished_at', now) table.insert(cancelled_jobs, job_id) end @@ -689,7 +669,7 @@ impl Queue { -- Step 3: Move expired delayed jobs to pending local delayed_jobs = redis.call('ZRANGEBYSCORE', delayed_zset_name, 0, now) for i, job_id in ipairs(delayed_jobs) do - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' local reentry_position = redis.call('HGET', job_meta_hash_name, 'reentry_position') or 'last' -- Remove from delayed @@ -724,7 +704,7 @@ impl Queue { -- Only process if we have data if job_data then -- Update metadata - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash_name, 'processed_at', now) local created_at = redis.call('HGET', job_meta_hash_name, 'created_at') or now local attempts = redis.call('HINCRBY', job_meta_hash_name, 'attempts', 1) @@ -733,7 +713,7 @@ impl Queue { local lease_token = now .. '_' .. job_id .. '_' .. attempts .. '_' .. pop_id -- Create separate lease key with TTL - local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token + local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token redis.call('SET', lease_key, '1') redis.call('EXPIRE', lease_key, lease_seconds) @@ -762,6 +742,7 @@ impl Queue { Vec, Vec, ) = script + .key(self.name()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.active_hash_name()) @@ -773,7 +754,6 @@ impl Queue { .arg(pop_id) .arg(batch_size) .arg(self.options.lease_duration.as_secs()) - .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; @@ -962,14 +942,14 @@ impl Queue { // Separate call for pruning with data deletion using Lua let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local results_hash = KEYS[3] -- e.g., "myqueue:results" - local dedupe_set_name = KEYS[4] - local active_hash = KEYS[5] - local pending_list = KEYS[6] - local delayed_zset = KEYS[7] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local results_hash = KEYS[4] -- e.g., "myqueue:results" + local dedupe_set_name = KEYS[5] + local active_hash = KEYS[6] + local pending_list = KEYS[7] + local delayed_zset = KEYS[8] local max_len = tonumber(ARGV[1]) @@ -991,8 +971,8 @@ impl Queue { -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then - local job_meta_hash = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' - local errors_list_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' + local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta' + local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1010,6 +990,7 @@ impl Queue { ); let trimmed_count: usize = trim_script + .key(self.name()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) // results_hash @@ -1018,7 +999,6 @@ impl Queue { .key(self.pending_list_name()) // Check if job is pending .key(self.delayed_zset_name()) // Check if job is delayed .arg(self.options.max_success) // max_len (LTRIM is 0 to max_success-1) - .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; @@ -1143,13 +1123,13 @@ impl Queue { // Separate call for pruning with data deletion using Lua let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local dedupe_set_name = KEYS[3] - local active_hash = KEYS[4] - local pending_list = KEYS[5] - local delayed_zset = KEYS[6] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local dedupe_set_name = KEYS[4] + local active_hash = KEYS[5] + local pending_list = KEYS[6] + local delayed_zset = KEYS[7] local max_len = tonumber(ARGV[1]) @@ -1171,8 +1151,8 @@ impl Queue { -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then - local errors_list_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' - local job_meta_hash = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' + local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors' + local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1188,6 +1168,7 @@ impl Queue { ); let trimmed_count: usize = trim_script + .key(self.name()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) @@ -1195,7 +1176,6 @@ impl Queue { .key(self.pending_list_name()) // Check if job is pending .key(self.delayed_zset_name()) // Check if job is delayed .arg(self.options.max_failed) - .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index f79c414..5921e0a 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -2,8 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use redis::{AsyncCommands, Pipeline, RedisResult}; -use redis::cluster_async::ClusterConnection; +use redis::{AsyncCommands, Pipeline, RedisResult, aio::ConnectionManager}; use tokio::sync::Semaphore; use tokio::time::sleep; use tracing::Instrument; @@ -11,7 +10,6 @@ use tracing::Instrument; use crate::{ CancelResult, DurableExecution, FailHookData, NackHookData, QueueInternalErrorHookData, SuccessHookData, UserCancellable, - ENGINE_HASH_TAG, error::TwmqError, hooks::TransactionContext, job::{ @@ -28,7 +26,7 @@ pub struct MultilaneQueue where H: DurableExecution, { - pub redis: ClusterConnection, + pub redis: ConnectionManager, handler: Arc, options: QueueOptions, /// Unique identifier for this multilane queue instance @@ -52,13 +50,8 @@ impl MultilaneQueue { options: Option, handler: H, ) -> Result { - let initial_nodes: Vec<&str> = redis_url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let client = redis::cluster::ClusterClient::new(initial_nodes)?; - let redis = client.get_async_connection().await?; + let client = redis::Client::open(redis_url)?; + let redis = client.get_connection_manager().await?; let queue = Self { redis, @@ -93,81 +86,57 @@ impl MultilaneQueue { // Redis key naming methods with proper multilane namespacing pub fn lanes_zset_name(&self) -> String { - format!("twmq_multilane:{}:{}:lanes", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:lanes", self.queue_id) } pub fn lane_pending_list_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:lane:{}:pending", - ENGINE_HASH_TAG, self.queue_id, lane_id - ) + format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id) } pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:lane:{}:delayed", - ENGINE_HASH_TAG, self.queue_id, lane_id - ) + format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id) } pub fn lane_active_hash_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:lane:{}:active", - ENGINE_HASH_TAG, self.queue_id, lane_id - ) + format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id) } pub fn success_list_name(&self) -> String { - format!("twmq_multilane:{}:{}:success", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:success", self.queue_id) } pub fn failed_list_name(&self) -> String { - format!("twmq_multilane:{}:{}:failed", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:failed", self.queue_id) } pub fn job_data_hash_name(&self) -> String { - format!( - "twmq_multilane:{}:{}:jobs:data", - ENGINE_HASH_TAG, self.queue_id - ) + format!("twmq_multilane:{}:jobs:data", self.queue_id) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:job:{}:meta", - ENGINE_HASH_TAG, self.queue_id, job_id - ) + format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:job:{}:errors", - ENGINE_HASH_TAG, self.queue_id, job_id - ) + format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id) } pub fn job_result_hash_name(&self) -> String { - format!( - "twmq_multilane:{}:{}:jobs:result", - ENGINE_HASH_TAG, self.queue_id - ) + format!("twmq_multilane:{}:jobs:result", self.queue_id) } pub fn dedupe_set_name(&self) -> String { - format!("twmq_multilane:{}:{}:dedup", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:dedup", self.queue_id) } pub fn pending_cancellation_set_name(&self) -> String { - format!( - "twmq_multilane:{}:{}:pending_cancellations", - ENGINE_HASH_TAG, self.queue_id - ) + format!("twmq_multilane:{}:pending_cancellations", self.queue_id) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { format!( - "twmq_multilane:{}:{}:job:{}:lease:{}", - ENGINE_HASH_TAG, self.queue_id, job_id, lease_token + "twmq_multilane:{}:job:{}:lease:{}", + self.queue_id, job_id, lease_token ) } @@ -403,9 +372,9 @@ impl MultilaneQueue { return "not_found" end - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' -- Try to remove from pending queue if redis.call('LREM', lane_pending_list, 0, job_id) > 0 then @@ -594,20 +563,20 @@ impl MultilaneQueue { -- Helper function to cleanup expired leases for a specific lane local function cleanup_lane_leases(lane_id) - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' local active_jobs = redis.call('HGETALL', lane_active_hash) for i = 1, #active_jobs, 2 do local job_id = active_jobs[i] local attempts = active_jobs[i + 1] - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' local current_lease_token = redis.call('HGET', job_meta_hash, 'lease_token') if current_lease_token then - local lease_key = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token + local lease_key = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token local lease_exists = redis.call('EXISTS', lease_key) if lease_exists == 0 then @@ -628,12 +597,12 @@ impl MultilaneQueue { -- Helper function to move delayed jobs to pending for a specific lane local function process_delayed_jobs(lane_id) - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' local delayed_jobs = redis.call('ZRANGEBYSCORE', lane_delayed_zset, 0, now) for i, job_id in ipairs(delayed_jobs) do - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' local reentry_position = redis.call('HGET', job_meta_hash, 'reentry_position') or 'last' redis.call('ZREM', lane_delayed_zset, job_id) @@ -649,8 +618,8 @@ impl MultilaneQueue { -- Helper function to pop one job from a lane local function pop_job_from_lane(lane_id) - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' local job_id = redis.call('RPOP', lane_pending_list) if not job_id then @@ -662,13 +631,13 @@ impl MultilaneQueue { return nil end - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash, 'processed_at', now) local created_at = redis.call('HGET', job_meta_hash, 'created_at') or now local attempts = redis.call('HINCRBY', job_meta_hash, 'attempts', 1) local lease_token = now .. '_' .. job_id .. '_' .. attempts - local lease_key = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token + local lease_key = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token redis.call('SET', lease_key, '1') redis.call('EXPIRE', lease_key, lease_seconds) @@ -682,11 +651,11 @@ impl MultilaneQueue { local cancel_requests = redis.call('SMEMBERS', pending_cancellation_set) for i, job_id in ipairs(cancel_requests) do - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') if lane_id then - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' if redis.call('HEXISTS', lane_active_hash, job_id) == 1 then -- Still processing, keep in cancellation set @@ -751,9 +720,9 @@ impl MultilaneQueue { empty_lanes_count = empty_lanes_count + 1 -- Check if lane should be removed from Redis - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' local pending_count = redis.call('LLEN', lane_pending_list) local delayed_count = redis.call('ZCARD', lane_delayed_zset) @@ -987,12 +956,12 @@ impl MultilaneQueue { async fn post_success_completion(&self) -> Result<(), TwmqError> { let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local results_hash = KEYS[3] - local dedupe_set_name = KEYS[4] - local lanes_zset = KEYS[5] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local results_hash = KEYS[4] + local dedupe_set_name = KEYS[5] + local lanes_zset = KEYS[6] local max_len = tonumber(ARGV[1]) @@ -1002,16 +971,16 @@ impl MultilaneQueue { if #job_ids_to_delete > 0 then for _, j_id in ipairs(job_ids_to_delete) do -- Get the lane_id for this job to check if it's active/pending/delayed - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') local should_delete = true if lane_id then -- Check if job is in any active state for this lane - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1 local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil @@ -1024,7 +993,7 @@ impl MultilaneQueue { end if should_delete then - local errors_list_name = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' + local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1041,13 +1010,13 @@ impl MultilaneQueue { ); let trimmed_count: usize = trim_script + .key(self.queue_id()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) .key(self.dedupe_set_name()) .key(self.lanes_zset_name()) // Need to check lanes .arg(self.options.max_success) - .arg(self.queue_id()) .invoke_async(&mut self.redis.clone()) .await?; @@ -1137,10 +1106,10 @@ impl MultilaneQueue { async fn post_fail_completion(&self) -> Result<(), TwmqError> { let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local dedupe_set_name = KEYS[3] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local dedupe_set_name = KEYS[4] local max_len = tonumber(ARGV[1]) @@ -1150,16 +1119,16 @@ impl MultilaneQueue { if #job_ids_to_delete > 0 then for _, j_id in ipairs(job_ids_to_delete) do -- Get the lane_id for this job to check if it's active/pending/delayed - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') local should_delete = true if lane_id then -- Check if job is in any active state for this lane - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1 local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil @@ -1172,7 +1141,7 @@ impl MultilaneQueue { end if should_delete then - local errors_list_name = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' + local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1188,11 +1157,11 @@ impl MultilaneQueue { ); let trimmed_count: usize = trim_script + .key(self.queue_id()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) .arg(self.options.max_failed) - .arg(self.queue_id()) .invoke_async(&mut self.redis.clone()) .await?; diff --git a/twmq/src/queue.rs b/twmq/src/queue.rs index b4571e8..288b89f 100644 --- a/twmq/src/queue.rs +++ b/twmq/src/queue.rs @@ -1,7 +1,6 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; -use redis::cluster::ClusterClient; -use redis::cluster_async::ClusterConnection; +use redis::{Client, aio::ConnectionManager}; use serde::{Deserialize, Serialize}; use crate::{DurableExecution, Queue, error::TwmqError}; @@ -66,8 +65,8 @@ pub struct HasHandler; enum RedisSource { Url(String), - ClusterClient(ClusterClient), - ClusterConnection(ClusterConnection), + Client(Client), + ConnectionManager(ConnectionManager), } // Builder with typestate pattern @@ -115,9 +114,9 @@ impl QueueBuilder { } /// Set Redis connection from existing client - pub fn redis_client(self, client: ClusterClient) -> QueueBuilder { + pub fn redis_client(self, client: Client) -> QueueBuilder { QueueBuilder { - redis_source: Some(RedisSource::ClusterClient(client)), + redis_source: Some(RedisSource::Client(client)), name: self.name, options: self.options, handler: self.handler, @@ -125,10 +124,13 @@ impl QueueBuilder { } } - /// Set Redis connection from an existing cluster connection - pub fn redis_connection(self, conn: ClusterConnection) -> QueueBuilder { + /// Set Redis connection from existing connection manager + pub fn redis_connection_manager( + self, + manager: ConnectionManager, + ) -> QueueBuilder { QueueBuilder { - redis_source: Some(RedisSource::ClusterConnection(conn)), + redis_source: Some(RedisSource::ConnectionManager(manager)), name: self.name, options: self.options, handler: self.handler, @@ -190,16 +192,11 @@ impl QueueBuilder { let redis = match redis_source { RedisSource::Url(url) => { - let initial_nodes: Vec<&str> = url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let client = ClusterClient::new(initial_nodes)?; - client.get_async_connection().await? + let client = Client::open(url)?; + client.get_connection_manager().await? } - RedisSource::ClusterClient(client) => client.get_async_connection().await?, - RedisSource::ClusterConnection(conn) => conn, + RedisSource::Client(client) => client.get_connection_manager().await?, + RedisSource::ConnectionManager(manager) => manager, }; Ok(Queue { diff --git a/twmq/tests/basic.rs b/twmq/tests/basic.rs index 8a10a1f..0c5d2f6 100644 --- a/twmq/tests/basic.rs +++ b/twmq/tests/basic.rs @@ -11,14 +11,14 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use twmq::job::{JobOptions, JobStatus}; // Assuming JobStatus is in twmq::job -use twmq::redis::cluster_async::ClusterConnection; // For cleanup utility +use twmq::redis::aio::ConnectionManager; // For cleanup utility const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys for a given queue name pattern -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index b4f17ef..0306ac2 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -1,7 +1,7 @@ // Add this to tests/basic.rs mod fixtures; use fixtures::{TestJobErrorData, TestJobOutput}; -use redis::cluster_async::ClusterConnection; +use redis::aio::ConnectionManager; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -21,9 +21,9 @@ use twmq::{ }; // Helper to clean up Redis keys for a given queue name pattern -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index 4f93078..5245188 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -15,15 +15,15 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, DelayOptions, JobResult, JobStatus, RequeuePosition}, queue::QueueOptions, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) @@ -131,8 +131,8 @@ async fn test_job_delay_basic() { }; // Create Redis connection for the execution context - let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap(); - let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap()); + let redis_client = redis::Client::open(REDIS_URL).unwrap(); + let redis_conn = Arc::new(redis_client.get_connection_manager().await.unwrap()); let handler = DelayTestJobHandler; @@ -307,8 +307,8 @@ async fn test_delay_position_ordering() { }; // Create Redis connection for the execution context - let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap(); - let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap()); + let redis_client = redis::Client::open(REDIS_URL).unwrap(); + let redis_conn = Arc::new(redis_client.get_connection_manager().await.unwrap()); let handler = DelayTestJobHandler; diff --git a/twmq/tests/idempotency_modes.rs b/twmq/tests/idempotency_modes.rs index 7194207..ce1d0f3 100644 --- a/twmq/tests/idempotency_modes.rs +++ b/twmq/tests/idempotency_modes.rs @@ -8,7 +8,7 @@ use twmq::{ DurableExecution, Queue, job::{BorrowedJob, JobResult, JobStatus}, queue::{IdempotencyMode, QueueOptions}, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; @@ -66,9 +66,9 @@ impl DurableExecution for TestJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/lease_expiry.rs b/twmq/tests/lease_expiry.rs index 6ed420b..773c100 100644 --- a/twmq/tests/lease_expiry.rs +++ b/twmq/tests/lease_expiry.rs @@ -12,7 +12,7 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, JobResult, JobStatus}, queue::QueueOptions, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; mod fixtures; @@ -21,9 +21,9 @@ use fixtures::TestJobErrorData; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index e167123..3f8c398 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -90,7 +90,7 @@ impl MultilaneTestHarness { /// Clean up all Redis keys for this test async fn cleanup(&self) { let mut conn = self.queue.redis.clone(); - let keys_pattern = format!("twmq_multilane:{}:{}:*", twmq::ENGINE_HASH_TAG, self.queue_id); + let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -165,8 +165,7 @@ impl Drop for MultilaneTestHarness { tokio::spawn(async move { let mut conn = redis; - let keys_pattern = - format!("twmq_multilane:{}:{queue_id}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq_multilane:{queue_id}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/nack.rs b/twmq/tests/nack.rs index 1c58eec..f66cd9f 100644 --- a/twmq/tests/nack.rs +++ b/twmq/tests/nack.rs @@ -15,7 +15,7 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, JobError, JobResult, JobStatus, RequeuePosition}, queue::QueueOptions, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; mod fixtures; @@ -24,9 +24,9 @@ use fixtures::TestJobErrorData; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/prune_race_condition.rs b/twmq/tests/prune_race_condition.rs index bbbf7cb..c9898f9 100644 --- a/twmq/tests/prune_race_condition.rs +++ b/twmq/tests/prune_race_condition.rs @@ -6,7 +6,7 @@ mod fixtures; use fixtures::TestJobErrorData; -use redis::cluster_async::ClusterConnection; +use redis::aio::ConnectionManager; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -146,9 +146,9 @@ impl DurableExecution for EoaSimulatorJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs index 143bba5..fd5e9ea 100644 --- a/twmq/tests/prune_race_random_ids.rs +++ b/twmq/tests/prune_race_random_ids.rs @@ -4,7 +4,7 @@ mod fixtures; use fixtures::TestJobErrorData; -use redis::{AsyncCommands, cluster_async::ClusterConnection}; +use redis::{AsyncCommands, aio::ConnectionManager}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -121,9 +121,9 @@ impl DurableExecution for RandomJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -239,7 +239,7 @@ async fn test_prune_with_random_ids() { let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); // Count how many job metadata hashes still exist (should match success list length if pruning works) - let meta_pattern = format!("twmq:{}:{}:job:*:meta", twmq::ENGINE_HASH_TAG, queue.name()); + let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); let meta_keys: Vec = redis::cmd("KEYS") .arg(&meta_pattern) .query_async(&mut conn) From 5c261c61030b16ef703f5549ee15135f646850e5 Mon Sep 17 00:00:00 2001 From: 0xFirekeeper <0xFirekeeper@gmail.com> Date: Fri, 20 Mar 2026 00:01:43 +0700 Subject: [PATCH 3/7] Reapply "redis tls support (#95)" (#96) This reverts commit e570bc831dec4d73aa6746f48764e99b2ef99523. --- Cargo.toml | 6 ++- README.md | 4 ++ server/Cargo.toml | 3 +- server/DOCKER.md | 6 +++ server/src/main.rs | 8 ++++ twmq/Cargo.toml | 1 + twmq/src/lib.rs | 47 ++++++++++++++------- twmq/src/multilane.rs | 65 ++++++++++++++++++++++------- twmq/tests/basic.rs | 3 +- twmq/tests/basic_hook.rs | 3 +- twmq/tests/delay.rs | 3 +- twmq/tests/idempotency_modes.rs | 3 +- twmq/tests/lease_expiry.rs | 3 +- twmq/tests/multilane_batch_pop.rs | 5 ++- twmq/tests/nack.rs | 3 +- twmq/tests/prune_race_condition.rs | 3 +- twmq/tests/prune_race_random_ids.rs | 5 ++- 17 files changed, 126 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b8fc7cd..bf3a2e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,11 @@ config = "0.15.11" aws-arn = "0.3.1" # Redis -redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] } +redis = { version = "0.31.0", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] } + +# Rustls (required for TLS crypto provider selection). +# Rustls 0.23 requires exactly one crypto provider feature (ring or aws-lc-rs). +rustls = { version = "0.23.32", default-features = false, features = ["ring"] } # Dev dependencies criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } \ No newline at end of file diff --git a/README.md b/README.md index 41631d5..8884e48 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,8 @@ thirdweb: redis: url: "redis://localhost:6379" +# For Redis over TLS, use the `rediss://` scheme: +# url: "rediss://localhost:6379" queue: webhook_workers: 50 @@ -166,6 +168,8 @@ export APP__QUEUE__LOCAL_CONCURRENCY=500 # Custom Redis configuration export APP__REDIS__URL="redis://redis-cluster:6379" +# For Redis over TLS, use the `rediss://` scheme: +# export APP__REDIS__URL="rediss://redis-cluster:6379" # Debug logging for development export RUST_LOG="thirdweb_engine=debug,twmq=debug" diff --git a/server/Cargo.toml b/server/Cargo.toml index d568ffb..f368839 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -50,4 +50,5 @@ aws-arn = { workspace = true } moka = { workspace = true } engine-eip7702-core = { path = "../eip7702-core" } prometheus = { workspace = true } -thiserror = { workspace = true } \ No newline at end of file +thiserror = { workspace = true } +rustls = { workspace = true } \ No newline at end of file diff --git a/server/DOCKER.md b/server/DOCKER.md index e50a245..6f6e066 100644 --- a/server/DOCKER.md +++ b/server/DOCKER.md @@ -25,6 +25,8 @@ The following environment variables must be set when running the container: ```bash # Redis Configuration APP__REDIS__URL=redis://localhost:6379 +# For Redis over TLS, use the `rediss://` scheme: +# APP__REDIS__URL=rediss://localhost:6379 # Thirdweb Configuration APP__THIRDWEB__SECRET=your_secret_key_here @@ -68,6 +70,8 @@ Create a `.env` file with your configuration: ```bash # .env file APP__REDIS__URL=redis://localhost:6379 +# For Redis over TLS, use the `rediss://` scheme: +# APP__REDIS__URL=rediss://localhost:6379 APP__THIRDWEB__SECRET=your_secret_key_here APP__THIRDWEB__CLIENT_ID=your_client_id_here APP__THIRDWEB__URLS__RPC=https://your-rpc-url.com @@ -128,6 +132,8 @@ services: - "8080:8080" environment: - APP__REDIS__URL=redis://redis:6379 +# For Redis over TLS, use the `rediss://` scheme: +# - APP__REDIS__URL=rediss://redis:6379 - APP__THIRDWEB__SECRET=${APP__THIRDWEB__SECRET} - APP__THIRDWEB__CLIENT_ID=${APP__THIRDWEB__CLIENT_ID} - APP__THIRDWEB__URLS__RPC=${APP__THIRDWEB__URLS__RPC} diff --git a/server/src/main.rs b/server/src/main.rs index 4376aec..e6dd8a7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -70,6 +70,14 @@ async fn main() -> anyhow::Result<()> { }); let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone())); let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); + + // Rustls 0.23 requires selecting a process-level CryptoProvider (ring or aws-lc-rs) + // before any TLS client configuration is created (e.g. when using `rediss://`). + // If another crate already installed a provider, this will be a no-op error. + if let Err(e) = rustls::crypto::ring::default_provider().install_default() { + tracing::debug!(error = ?e, "Rustls CryptoProvider already installed"); + } + let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?; let authorization_cache = EoaAuthorizationCache::new( diff --git a/twmq/Cargo.toml b/twmq/Cargo.toml index 9d40caa..3f6227f 100644 --- a/twmq/Cargo.toml +++ b/twmq/Cargo.toml @@ -14,6 +14,7 @@ thiserror = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } futures = { workspace = true } +rustls = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 164d234..da022ba 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -175,52 +175,63 @@ impl Queue { &self.name } + /// Redis Cluster hash tag used to keep all queue keys in the same slot. + /// See: https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#hash-tags + fn redis_hash_tag(&self) -> String { + format!("{{{}}}", self.name()) + } + pub fn pending_list_name(&self) -> String { - format!("twmq:{}:pending", self.name()) + format!("twmq:{}:pending", self.redis_hash_tag()) } pub fn active_hash_name(&self) -> String { - format!("twmq:{}:active", self.name) + format!("twmq:{}:active", self.redis_hash_tag()) } pub fn delayed_zset_name(&self) -> String { - format!("twmq:{}:delayed", self.name) + format!("twmq:{}:delayed", self.redis_hash_tag()) } pub fn success_list_name(&self) -> String { - format!("twmq:{}:success", self.name) + format!("twmq:{}:success", self.redis_hash_tag()) } pub fn failed_list_name(&self) -> String { - format!("twmq:{}:failed", self.name) + format!("twmq:{}:failed", self.redis_hash_tag()) } pub fn job_data_hash_name(&self) -> String { - format!("twmq:{}:jobs:data", self.name) + format!("twmq:{}:jobs:data", self.redis_hash_tag()) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:meta", self.name, job_id) + format!("twmq:{}:job:{}:meta", self.redis_hash_tag(), job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:errors", self.name, job_id) + format!("twmq:{}:job:{}:errors", self.redis_hash_tag(), job_id) } pub fn job_result_hash_name(&self) -> String { - format!("twmq:{}:jobs:result", self.name) + format!("twmq:{}:jobs:result", self.redis_hash_tag()) } pub fn dedupe_set_name(&self) -> String { - format!("twmq:{}:dedup", self.name) + format!("twmq:{}:dedup", self.redis_hash_tag()) } pub fn pending_cancellation_set_name(&self) -> String { - format!("twmq:{}:pending_cancellations", self.name) + format!("twmq:{}:pending_cancellations", self.redis_hash_tag()) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { - format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token) + format!( + "twmq:{}:job:{}:lease:{}", + self.redis_hash_tag(), + job_id, + lease_token + ) } pub async fn push( @@ -301,7 +312,8 @@ impl Queue { let position_string = delay.position.to_string(); let _result: (i32, String) = script - .key(&self.name) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.job_data_hash_name()) @@ -742,7 +754,8 @@ impl Queue { Vec, Vec, ) = script - .key(self.name()) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.active_hash_name()) @@ -990,7 +1003,8 @@ impl Queue { ); let trimmed_count: usize = trim_script - .key(self.name()) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) // results_hash @@ -1168,7 +1182,8 @@ impl Queue { ); let trimmed_count: usize = trim_script - .key(self.name()) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index 5921e0a..a965e5d 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -84,59 +84,89 @@ impl MultilaneQueue { &self.queue_id } + /// Redis Cluster hash tag used to keep all multilane keys in the same slot. + fn redis_hash_tag(&self) -> String { + format!("{{{}}}", self.queue_id()) + } + // Redis key naming methods with proper multilane namespacing pub fn lanes_zset_name(&self) -> String { - format!("twmq_multilane:{}:lanes", self.queue_id) + format!("twmq_multilane:{}:lanes", self.redis_hash_tag()) } pub fn lane_pending_list_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:lane:{}:pending", + self.redis_hash_tag(), + lane_id + ) } pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:lane:{}:delayed", + self.redis_hash_tag(), + lane_id + ) } pub fn lane_active_hash_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:lane:{}:active", + self.redis_hash_tag(), + lane_id + ) } pub fn success_list_name(&self) -> String { - format!("twmq_multilane:{}:success", self.queue_id) + format!("twmq_multilane:{}:success", self.redis_hash_tag()) } pub fn failed_list_name(&self) -> String { - format!("twmq_multilane:{}:failed", self.queue_id) + format!("twmq_multilane:{}:failed", self.redis_hash_tag()) } pub fn job_data_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:data", self.queue_id) + format!("twmq_multilane:{}:jobs:data", self.redis_hash_tag()) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id) + format!( + "twmq_multilane:{}:job:{}:meta", + self.redis_hash_tag(), + job_id + ) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id) + format!( + "twmq_multilane:{}:job:{}:errors", + self.redis_hash_tag(), + job_id + ) } pub fn job_result_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:result", self.queue_id) + format!("twmq_multilane:{}:jobs:result", self.redis_hash_tag()) } pub fn dedupe_set_name(&self) -> String { - format!("twmq_multilane:{}:dedup", self.queue_id) + format!("twmq_multilane:{}:dedup", self.redis_hash_tag()) } pub fn pending_cancellation_set_name(&self) -> String { - format!("twmq_multilane:{}:pending_cancellations", self.queue_id) + format!( + "twmq_multilane:{}:pending_cancellations", + self.redis_hash_tag() + ) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { format!( "twmq_multilane:{}:job:{}:lease:{}", - self.queue_id, job_id, lease_token + self.redis_hash_tag(), + job_id, + lease_token ) } @@ -229,7 +259,8 @@ impl MultilaneQueue { .key(self.job_data_hash_name()) .key(self.job_meta_hash_name(&job.id)) .key(self.dedupe_set_name()) - .arg(&self.queue_id) + // Redis Cluster: ensure constructed keys match hash-tagged names + .arg(self.redis_hash_tag()) .arg(lane_id) .arg(&job_options.id) .arg(job_data) @@ -414,7 +445,8 @@ impl MultilaneQueue { .key(self.pending_cancellation_set_name()) .key(self.job_meta_hash_name(job_id)) .key(self.job_data_hash_name()) - .arg(&self.queue_id) + // Redis Cluster: ensure constructed keys match hash-tagged names + .arg(self.redis_hash_tag()) .arg(job_id) .arg(now) .invoke_async(&mut self.redis.clone()) @@ -760,7 +792,8 @@ impl MultilaneQueue { .key(self.pending_cancellation_set_name()) .key(self.failed_list_name()) .key(self.success_list_name()) - .arg(&self.queue_id) + // Redis Cluster: ensure constructed keys match hash-tagged names + .arg(self.redis_hash_tag()) .arg(now) .arg(batch_size) .arg(self.options.lease_duration.as_secs()) diff --git a/twmq/tests/basic.rs b/twmq/tests/basic.rs index 0c5d2f6..66eae0b 100644 --- a/twmq/tests/basic.rs +++ b/twmq/tests/basic.rs @@ -18,7 +18,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index 0306ac2..73435ce 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -23,7 +23,8 @@ use twmq::{ // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index 5245188..1c74ac1 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -23,7 +23,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/idempotency_modes.rs b/twmq/tests/idempotency_modes.rs index ce1d0f3..7799e6f 100644 --- a/twmq/tests/idempotency_modes.rs +++ b/twmq/tests/idempotency_modes.rs @@ -68,7 +68,8 @@ impl DurableExecution for TestJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/lease_expiry.rs b/twmq/tests/lease_expiry.rs index 773c100..5822c66 100644 --- a/twmq/tests/lease_expiry.rs +++ b/twmq/tests/lease_expiry.rs @@ -23,7 +23,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index 3f8c398..189ce21 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -90,7 +90,8 @@ impl MultilaneTestHarness { /// Clean up all Redis keys for this test async fn cleanup(&self) { let mut conn = self.queue.redis.clone(); - let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id); + // twmq multilane keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq_multilane:{{{}}}:*", self.queue_id); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -165,7 +166,7 @@ impl Drop for MultilaneTestHarness { tokio::spawn(async move { let mut conn = redis; - let keys_pattern = format!("twmq_multilane:{queue_id}:*"); + let keys_pattern = format!("twmq_multilane:{{{queue_id}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/nack.rs b/twmq/tests/nack.rs index f66cd9f..8b0fd39 100644 --- a/twmq/tests/nack.rs +++ b/twmq/tests/nack.rs @@ -26,7 +26,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/prune_race_condition.rs b/twmq/tests/prune_race_condition.rs index c9898f9..3245cc8 100644 --- a/twmq/tests/prune_race_condition.rs +++ b/twmq/tests/prune_race_condition.rs @@ -148,7 +148,8 @@ impl DurableExecution for EoaSimulatorJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs index fd5e9ea..b09ad89 100644 --- a/twmq/tests/prune_race_random_ids.rs +++ b/twmq/tests/prune_race_random_ids.rs @@ -123,7 +123,8 @@ impl DurableExecution for RandomJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -239,7 +240,7 @@ async fn test_prune_with_random_ids() { let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); // Count how many job metadata hashes still exist (should match success list length if pruning works) - let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); + let meta_pattern = format!("twmq:{{{}}}:job:*:meta", queue.name()); let meta_keys: Vec = redis::cmd("KEYS") .arg(&meta_pattern) .query_async(&mut conn) From e146fa8731fb6cc2869e96a0c57885c9c2098454 Mon Sep 17 00:00:00 2001 From: 0xFirekeeper <0xFirekeeper@gmail.com> Date: Fri, 20 Mar 2026 00:01:51 +0700 Subject: [PATCH 4/7] Revert "redis tls support (#95)" This reverts commit d729f5f72a4988a5edcde79f2f2e577224b241da. --- Cargo.toml | 6 +-- README.md | 4 -- server/Cargo.toml | 3 +- server/DOCKER.md | 6 --- server/src/main.rs | 8 ---- twmq/Cargo.toml | 1 - twmq/src/lib.rs | 47 +++++++-------------- twmq/src/multilane.rs | 65 +++++++---------------------- twmq/tests/basic.rs | 3 +- twmq/tests/basic_hook.rs | 3 +- twmq/tests/delay.rs | 3 +- twmq/tests/idempotency_modes.rs | 3 +- twmq/tests/lease_expiry.rs | 3 +- twmq/tests/multilane_batch_pop.rs | 5 +-- twmq/tests/nack.rs | 3 +- twmq/tests/prune_race_condition.rs | 3 +- twmq/tests/prune_race_random_ids.rs | 5 +-- 17 files changed, 45 insertions(+), 126 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf3a2e8..b8fc7cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,11 +98,7 @@ config = "0.15.11" aws-arn = "0.3.1" # Redis -redis = { version = "0.31.0", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] } - -# Rustls (required for TLS crypto provider selection). -# Rustls 0.23 requires exactly one crypto provider feature (ring or aws-lc-rs). -rustls = { version = "0.23.32", default-features = false, features = ["ring"] } +redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] } # Dev dependencies criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } \ No newline at end of file diff --git a/README.md b/README.md index 8884e48..41631d5 100644 --- a/README.md +++ b/README.md @@ -147,8 +147,6 @@ thirdweb: redis: url: "redis://localhost:6379" -# For Redis over TLS, use the `rediss://` scheme: -# url: "rediss://localhost:6379" queue: webhook_workers: 50 @@ -168,8 +166,6 @@ export APP__QUEUE__LOCAL_CONCURRENCY=500 # Custom Redis configuration export APP__REDIS__URL="redis://redis-cluster:6379" -# For Redis over TLS, use the `rediss://` scheme: -# export APP__REDIS__URL="rediss://redis-cluster:6379" # Debug logging for development export RUST_LOG="thirdweb_engine=debug,twmq=debug" diff --git a/server/Cargo.toml b/server/Cargo.toml index f368839..d568ffb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -50,5 +50,4 @@ aws-arn = { workspace = true } moka = { workspace = true } engine-eip7702-core = { path = "../eip7702-core" } prometheus = { workspace = true } -thiserror = { workspace = true } -rustls = { workspace = true } \ No newline at end of file +thiserror = { workspace = true } \ No newline at end of file diff --git a/server/DOCKER.md b/server/DOCKER.md index 6f6e066..e50a245 100644 --- a/server/DOCKER.md +++ b/server/DOCKER.md @@ -25,8 +25,6 @@ The following environment variables must be set when running the container: ```bash # Redis Configuration APP__REDIS__URL=redis://localhost:6379 -# For Redis over TLS, use the `rediss://` scheme: -# APP__REDIS__URL=rediss://localhost:6379 # Thirdweb Configuration APP__THIRDWEB__SECRET=your_secret_key_here @@ -70,8 +68,6 @@ Create a `.env` file with your configuration: ```bash # .env file APP__REDIS__URL=redis://localhost:6379 -# For Redis over TLS, use the `rediss://` scheme: -# APP__REDIS__URL=rediss://localhost:6379 APP__THIRDWEB__SECRET=your_secret_key_here APP__THIRDWEB__CLIENT_ID=your_client_id_here APP__THIRDWEB__URLS__RPC=https://your-rpc-url.com @@ -132,8 +128,6 @@ services: - "8080:8080" environment: - APP__REDIS__URL=redis://redis:6379 -# For Redis over TLS, use the `rediss://` scheme: -# - APP__REDIS__URL=rediss://redis:6379 - APP__THIRDWEB__SECRET=${APP__THIRDWEB__SECRET} - APP__THIRDWEB__CLIENT_ID=${APP__THIRDWEB__CLIENT_ID} - APP__THIRDWEB__URLS__RPC=${APP__THIRDWEB__URLS__RPC} diff --git a/server/src/main.rs b/server/src/main.rs index e6dd8a7..4376aec 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -70,14 +70,6 @@ async fn main() -> anyhow::Result<()> { }); let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone())); let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); - - // Rustls 0.23 requires selecting a process-level CryptoProvider (ring or aws-lc-rs) - // before any TLS client configuration is created (e.g. when using `rediss://`). - // If another crate already installed a provider, this will be a no-op error. - if let Err(e) = rustls::crypto::ring::default_provider().install_default() { - tracing::debug!(error = ?e, "Rustls CryptoProvider already installed"); - } - let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?; let authorization_cache = EoaAuthorizationCache::new( diff --git a/twmq/Cargo.toml b/twmq/Cargo.toml index 3f6227f..9d40caa 100644 --- a/twmq/Cargo.toml +++ b/twmq/Cargo.toml @@ -14,7 +14,6 @@ thiserror = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } futures = { workspace = true } -rustls = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index da022ba..164d234 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -175,63 +175,52 @@ impl Queue { &self.name } - /// Redis Cluster hash tag used to keep all queue keys in the same slot. - /// See: https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#hash-tags - fn redis_hash_tag(&self) -> String { - format!("{{{}}}", self.name()) - } - pub fn pending_list_name(&self) -> String { - format!("twmq:{}:pending", self.redis_hash_tag()) + format!("twmq:{}:pending", self.name()) } pub fn active_hash_name(&self) -> String { - format!("twmq:{}:active", self.redis_hash_tag()) + format!("twmq:{}:active", self.name) } pub fn delayed_zset_name(&self) -> String { - format!("twmq:{}:delayed", self.redis_hash_tag()) + format!("twmq:{}:delayed", self.name) } pub fn success_list_name(&self) -> String { - format!("twmq:{}:success", self.redis_hash_tag()) + format!("twmq:{}:success", self.name) } pub fn failed_list_name(&self) -> String { - format!("twmq:{}:failed", self.redis_hash_tag()) + format!("twmq:{}:failed", self.name) } pub fn job_data_hash_name(&self) -> String { - format!("twmq:{}:jobs:data", self.redis_hash_tag()) + format!("twmq:{}:jobs:data", self.name) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:meta", self.redis_hash_tag(), job_id) + format!("twmq:{}:job:{}:meta", self.name, job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:errors", self.redis_hash_tag(), job_id) + format!("twmq:{}:job:{}:errors", self.name, job_id) } pub fn job_result_hash_name(&self) -> String { - format!("twmq:{}:jobs:result", self.redis_hash_tag()) + format!("twmq:{}:jobs:result", self.name) } pub fn dedupe_set_name(&self) -> String { - format!("twmq:{}:dedup", self.redis_hash_tag()) + format!("twmq:{}:dedup", self.name) } pub fn pending_cancellation_set_name(&self) -> String { - format!("twmq:{}:pending_cancellations", self.redis_hash_tag()) + format!("twmq:{}:pending_cancellations", self.name) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { - format!( - "twmq:{}:job:{}:lease:{}", - self.redis_hash_tag(), - job_id, - lease_token - ) + format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token) } pub async fn push( @@ -312,8 +301,7 @@ impl Queue { let position_string = delay.position.to_string(); let _result: (i32, String) = script - // Redis Cluster: all KEYS must be in the same slot - .key(self.redis_hash_tag()) + .key(&self.name) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.job_data_hash_name()) @@ -754,8 +742,7 @@ impl Queue { Vec, Vec, ) = script - // Redis Cluster: all KEYS must be in the same slot - .key(self.redis_hash_tag()) + .key(self.name()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.active_hash_name()) @@ -1003,8 +990,7 @@ impl Queue { ); let trimmed_count: usize = trim_script - // Redis Cluster: all KEYS must be in the same slot - .key(self.redis_hash_tag()) + .key(self.name()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) // results_hash @@ -1182,8 +1168,7 @@ impl Queue { ); let trimmed_count: usize = trim_script - // Redis Cluster: all KEYS must be in the same slot - .key(self.redis_hash_tag()) + .key(self.name()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index a965e5d..5921e0a 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -84,89 +84,59 @@ impl MultilaneQueue { &self.queue_id } - /// Redis Cluster hash tag used to keep all multilane keys in the same slot. - fn redis_hash_tag(&self) -> String { - format!("{{{}}}", self.queue_id()) - } - // Redis key naming methods with proper multilane namespacing pub fn lanes_zset_name(&self) -> String { - format!("twmq_multilane:{}:lanes", self.redis_hash_tag()) + format!("twmq_multilane:{}:lanes", self.queue_id) } pub fn lane_pending_list_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:lane:{}:pending", - self.redis_hash_tag(), - lane_id - ) + format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id) } pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:lane:{}:delayed", - self.redis_hash_tag(), - lane_id - ) + format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id) } pub fn lane_active_hash_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:lane:{}:active", - self.redis_hash_tag(), - lane_id - ) + format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id) } pub fn success_list_name(&self) -> String { - format!("twmq_multilane:{}:success", self.redis_hash_tag()) + format!("twmq_multilane:{}:success", self.queue_id) } pub fn failed_list_name(&self) -> String { - format!("twmq_multilane:{}:failed", self.redis_hash_tag()) + format!("twmq_multilane:{}:failed", self.queue_id) } pub fn job_data_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:data", self.redis_hash_tag()) + format!("twmq_multilane:{}:jobs:data", self.queue_id) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!( - "twmq_multilane:{}:job:{}:meta", - self.redis_hash_tag(), - job_id - ) + format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!( - "twmq_multilane:{}:job:{}:errors", - self.redis_hash_tag(), - job_id - ) + format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id) } pub fn job_result_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:result", self.redis_hash_tag()) + format!("twmq_multilane:{}:jobs:result", self.queue_id) } pub fn dedupe_set_name(&self) -> String { - format!("twmq_multilane:{}:dedup", self.redis_hash_tag()) + format!("twmq_multilane:{}:dedup", self.queue_id) } pub fn pending_cancellation_set_name(&self) -> String { - format!( - "twmq_multilane:{}:pending_cancellations", - self.redis_hash_tag() - ) + format!("twmq_multilane:{}:pending_cancellations", self.queue_id) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { format!( "twmq_multilane:{}:job:{}:lease:{}", - self.redis_hash_tag(), - job_id, - lease_token + self.queue_id, job_id, lease_token ) } @@ -259,8 +229,7 @@ impl MultilaneQueue { .key(self.job_data_hash_name()) .key(self.job_meta_hash_name(&job.id)) .key(self.dedupe_set_name()) - // Redis Cluster: ensure constructed keys match hash-tagged names - .arg(self.redis_hash_tag()) + .arg(&self.queue_id) .arg(lane_id) .arg(&job_options.id) .arg(job_data) @@ -445,8 +414,7 @@ impl MultilaneQueue { .key(self.pending_cancellation_set_name()) .key(self.job_meta_hash_name(job_id)) .key(self.job_data_hash_name()) - // Redis Cluster: ensure constructed keys match hash-tagged names - .arg(self.redis_hash_tag()) + .arg(&self.queue_id) .arg(job_id) .arg(now) .invoke_async(&mut self.redis.clone()) @@ -792,8 +760,7 @@ impl MultilaneQueue { .key(self.pending_cancellation_set_name()) .key(self.failed_list_name()) .key(self.success_list_name()) - // Redis Cluster: ensure constructed keys match hash-tagged names - .arg(self.redis_hash_tag()) + .arg(&self.queue_id) .arg(now) .arg(batch_size) .arg(self.options.lease_duration.as_secs()) diff --git a/twmq/tests/basic.rs b/twmq/tests/basic.rs index 66eae0b..0c5d2f6 100644 --- a/twmq/tests/basic.rs +++ b/twmq/tests/basic.rs @@ -18,8 +18,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index 73435ce..0306ac2 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -23,8 +23,7 @@ use twmq::{ // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index 1c74ac1..5245188 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -23,8 +23,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/idempotency_modes.rs b/twmq/tests/idempotency_modes.rs index 7799e6f..ce1d0f3 100644 --- a/twmq/tests/idempotency_modes.rs +++ b/twmq/tests/idempotency_modes.rs @@ -68,8 +68,7 @@ impl DurableExecution for TestJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/lease_expiry.rs b/twmq/tests/lease_expiry.rs index 5822c66..773c100 100644 --- a/twmq/tests/lease_expiry.rs +++ b/twmq/tests/lease_expiry.rs @@ -23,8 +23,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index 189ce21..3f8c398 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -90,8 +90,7 @@ impl MultilaneTestHarness { /// Clean up all Redis keys for this test async fn cleanup(&self) { let mut conn = self.queue.redis.clone(); - // twmq multilane keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq_multilane:{{{}}}:*", self.queue_id); + let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -166,7 +165,7 @@ impl Drop for MultilaneTestHarness { tokio::spawn(async move { let mut conn = redis; - let keys_pattern = format!("twmq_multilane:{{{queue_id}}}:*"); + let keys_pattern = format!("twmq_multilane:{queue_id}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/nack.rs b/twmq/tests/nack.rs index 8b0fd39..f66cd9f 100644 --- a/twmq/tests/nack.rs +++ b/twmq/tests/nack.rs @@ -26,8 +26,7 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/prune_race_condition.rs b/twmq/tests/prune_race_condition.rs index 3245cc8..c9898f9 100644 --- a/twmq/tests/prune_race_condition.rs +++ b/twmq/tests/prune_race_condition.rs @@ -148,8 +148,7 @@ impl DurableExecution for EoaSimulatorJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs index b09ad89..fd5e9ea 100644 --- a/twmq/tests/prune_race_random_ids.rs +++ b/twmq/tests/prune_race_random_ids.rs @@ -123,8 +123,7 @@ impl DurableExecution for RandomJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - // twmq queue keys are hash-tagged for Redis Cluster compatibility - let keys_pattern = format!("twmq:{{{queue_name}}}:*"); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -240,7 +239,7 @@ async fn test_prune_with_random_ids() { let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); // Count how many job metadata hashes still exist (should match success list length if pruning works) - let meta_pattern = format!("twmq:{{{}}}:job:*:meta", queue.name()); + let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); let meta_keys: Vec = redis::cmd("KEYS") .arg(&meta_pattern) .query_async(&mut conn) From 5e625eac211b8dbc1d681665e74d9a17850658af Mon Sep 17 00:00:00 2001 From: 0xFirekeeper <0xFirekeeper@gmail.com> Date: Fri, 20 Mar 2026 00:08:47 +0700 Subject: [PATCH 5/7] Fix sub-second queue delays in TWMQ Sub-second delayed requeues in TWMQ were being truncated to 0s because queue scheduling used Duration::as_secs(). That meant values like 200ms were effectively treated as immediate retries, which could cause hot-looping at queue poll cadence. This change adds a small helper that rounds any non-zero sub-second delay up to 1s and uses it consistently in: - queue push scheduling - hook scheduling - multilane queue scheduling --- executors/src/solana_executor/worker.rs | 3 +-- twmq/src/hooks.rs | 4 ++-- twmq/src/lib.rs | 16 ++++++++++++++-- twmq/src/multilane.rs | 6 +++--- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/executors/src/solana_executor/worker.rs b/executors/src/solana_executor/worker.rs index c1ec06e..8fd3a0f 100644 --- a/executors/src/solana_executor/worker.rs +++ b/executors/src/solana_executor/worker.rs @@ -221,7 +221,7 @@ impl DurableExecution for SolanaExecutorJobHandler { type JobData = SolanaExecutorJobData; #[tracing::instrument( - skip(self, job), + skip(self, job), fields( transaction_id = job.job.id, stage = Self::stage_name() @@ -648,7 +648,6 @@ impl SolanaExecutorJobHandler { let has_signatures = versioned_tx.signatures.iter().any(|sig| { sig.as_ref() != [0u8; 64] }); - if has_signatures { error!( transaction_id = %transaction_id, diff --git a/twmq/src/hooks.rs b/twmq/src/hooks.rs index 57b03e7..44fc44a 100644 --- a/twmq/src/hooks.rs +++ b/twmq/src/hooks.rs @@ -1,6 +1,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; -use crate::{DurableExecution, error::TwmqError, job::PushableJob}; +use crate::{DurableExecution, delay_to_queue_seconds, error::TwmqError, job::PushableJob}; // A minimal transaction context that hooks can use pub struct TransactionContext<'a> { @@ -51,7 +51,7 @@ impl<'a> TransactionContext<'a> { .sadd(job.queue.dedupe_set_name(), &job.options.id); if let Some(delay_options) = job.options.delay { - let process_at = now + delay_options.delay.as_secs(); + let process_at = now + delay_to_queue_seconds(delay_options.delay); self.pipeline .hset( job.queue.job_meta_hash_name(&job.options.id), diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 164d234..34fe5d6 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -28,6 +28,18 @@ pub use queue::IdempotencyMode; pub use redis; use tracing::Instrument; +pub(crate) fn delay_to_queue_seconds(delay: Duration) -> u64 { + let delay_secs = delay.as_secs(); + + if delay.is_zero() { + 0 + } else if delay_secs == 0 { + 1 + } else { + delay_secs + } +} + // Trait for error types to implement user cancellation pub trait UserCancellable { fn user_cancelled() -> Self; @@ -297,7 +309,7 @@ impl Queue { position: RequeuePosition::Last, }); - let delay_secs = delay.delay.as_secs(); + let delay_secs = delay_to_queue_seconds(delay.delay); let position_string = delay.position.to_string(); let _result: (i32, String) = script @@ -1049,7 +1061,7 @@ impl Queue { // Add to proper queue based on delay and position if let Some(delay_duration) = delay { - let delay_until = now + delay_duration.as_secs(); + let delay_until = now + delay_to_queue_seconds(delay_duration); let pos_str = position.to_string(); pipeline diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index 5921e0a..4e1cb06 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -9,7 +9,7 @@ use tracing::Instrument; use crate::{ CancelResult, DurableExecution, FailHookData, NackHookData, QueueInternalErrorHookData, - SuccessHookData, UserCancellable, + SuccessHookData, UserCancellable, delay_to_queue_seconds, error::TwmqError, hooks::TransactionContext, job::{ @@ -219,7 +219,7 @@ impl MultilaneQueue { position: RequeuePosition::Last, }); - let delay_secs = delay.delay.as_secs(); + let delay_secs = delay_to_queue_seconds(delay.delay); let position_string = delay.position.to_string(); let _result: (i32, String) = script @@ -1226,7 +1226,7 @@ impl MultilaneQueue { .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); - let delay_until = now + delay_duration.as_secs(); + let delay_until = now + delay_to_queue_seconds(*delay_duration); let pos_str = position.to_string(); hook_pipeline From dd0b108ca10fea32d4ab22f74e1deab6eabcfacd Mon Sep 17 00:00:00 2001 From: 0xFirekeeper <0xFirekeeper@gmail.com> Date: Fri, 20 Mar 2026 00:22:58 +0700 Subject: [PATCH 6/7] bump tarpaulin timeout --- .github/workflows/coverage-twmq.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/coverage-twmq.yaml b/.github/workflows/coverage-twmq.yaml index 8c20654..cedd9ec 100644 --- a/.github/workflows/coverage-twmq.yaml +++ b/.github/workflows/coverage-twmq.yaml @@ -53,7 +53,7 @@ jobs: # Run coverage with tarpaulin - name: Run coverage - run: cargo tarpaulin -p twmq --skip-clean --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*" + run: cargo tarpaulin -p twmq --skip-clean --timeout 300 --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*" # Upload coverage to Codecov # TODO: Uncomment once we have open-sourced the repo From 90081a797d94a8a57ee4716f8a210849bfff2f7c Mon Sep 17 00:00:00 2001 From: 0xFirekeeper <0xFirekeeper@gmail.com> Date: Fri, 20 Mar 2026 00:42:43 +0700 Subject: [PATCH 7/7] safe cargo clippy changes --- executors/src/eoa/store/atomic.rs | 4 ++-- executors/src/eoa/store/borrowed.rs | 2 +- executors/src/eoa/store/submitted.rs | 2 +- executors/src/solana_executor/rpc_cache.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index 0ac7797..505366c 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -593,7 +593,7 @@ impl AtomicEoaExecutorStore { let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&tx_data_key, ttl_seconds); pipeline.expire( - &self.transaction_attempts_list_name(&pending_transaction.transaction_id), + self.transaction_attempts_list_name(&pending_transaction.transaction_id), ttl_seconds, ); @@ -670,7 +670,7 @@ impl AtomicEoaExecutorStore { let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&tx_data_key, ttl_seconds); pipeline.expire( - &self.transaction_attempts_list_name(&pending_transaction.transaction_id), + self.transaction_attempts_list_name(&pending_transaction.transaction_id), ttl_seconds, ); } diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 8797a78..99b3ce6 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -231,7 +231,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&tx_data_key, ttl_seconds); pipeline.expire( - &self.keys.transaction_attempts_list_name(transaction_id), + self.keys.transaction_attempts_list_name(transaction_id), ttl_seconds, ); diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index fb16739..ff01f23 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -364,7 +364,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { // Add TTL expiration let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&data_key_name, ttl_seconds); - pipeline.expire(&self.keys.transaction_attempts_list_name(id), ttl_seconds); + pipeline.expire(self.keys.transaction_attempts_list_name(id), ttl_seconds); if let SubmittedTransactionHydrated::Real(tx) = tx { // Record metrics: transaction queued to mined for confirmed transactions diff --git a/executors/src/solana_executor/rpc_cache.rs b/executors/src/solana_executor/rpc_cache.rs index f625a44..8bbf44d 100644 --- a/executors/src/solana_executor/rpc_cache.rs +++ b/executors/src/solana_executor/rpc_cache.rs @@ -45,7 +45,7 @@ impl SolanaRpcCache { }; let key = RpcCacheKey { - chain_id: chain_id.clone(), + chain_id, rpc_url: rpc_url.clone(), };