diff --git a/.github/workflows/coverage-twmq.yaml b/.github/workflows/coverage-twmq.yaml index 8c20654..cedd9ec 100644 --- a/.github/workflows/coverage-twmq.yaml +++ b/.github/workflows/coverage-twmq.yaml @@ -53,7 +53,7 @@ jobs: # Run coverage with tarpaulin - name: Run coverage - run: cargo tarpaulin -p twmq --skip-clean --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*" + run: cargo tarpaulin -p twmq --skip-clean --timeout 300 --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*" # Upload coverage to Codecov # TODO: Uncomment once we have open-sourced the repo diff --git a/Cargo.toml b/Cargo.toml index 2e0aec2..b8fc7cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,17 +98,7 @@ config = "0.15.11" aws-arn = "0.3.1" # Redis -redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] } +redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] } # Dev dependencies -criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } - -# Rustls -# -# NOTE: rustls 0.23 requires selecting exactly one process-wide crypto provider -# (features: `ring` or `aws_lc_rs` / `aws-lc-rs`). Some dependency graphs (e.g. via -# redis-rs' rustls integration) can end up with *no* provider enabled, which causes a -# runtime panic when building TLS client/server configs. -# -# We explicitly enable the `ring` provider here to make TLS work reliably. -rustls = { version = "0.23.32", default-features = false, features = ["std", "ring"] } \ No newline at end of file +criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } \ No newline at end of file diff --git a/core/src/chain.rs b/core/src/chain.rs index 23a2f8f..ee00ec2 100644 --- a/core/src/chain.rs +++ b/core/src/chain.rs @@ -25,15 +25,6 @@ impl RpcCredentials { Ok(header_map) } - - pub fn client_id_for_logs(&self) -> Option<&str> { - match self { - RpcCredentials::Thirdweb(ThirdwebAuth::ClientIdServiceKey(creds)) => { - Some(&creds.client_id) - } - RpcCredentials::Thirdweb(ThirdwebAuth::SecretKey(_)) => None, - } - } } pub trait Chain: Send + Sync { diff --git a/executors/src/eip7702_executor/confirm.rs b/executors/src/eip7702_executor/confirm.rs index 3d26389..65dc21a 100644 --- a/executors/src/eip7702_executor/confirm.rs +++ b/executors/src/eip7702_executor/confirm.rs @@ -29,8 +29,6 @@ use crate::{ }, }; -const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm"; - // --- Job Payload --- #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] @@ -170,7 +168,7 @@ where type ErrorData = Eip7702ConfirmationError; type JobData = Eip7702ConfirmationJobData; - #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_CONFIRM_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))] + #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))] async fn process( &self, job: &BorrowedJob, @@ -204,14 +202,7 @@ where .await .map_err(|e| { tracing::error!( - transaction_id = %job_data.transaction_id, - chain_id = job_data.chain_id, - client_id = job_data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, - bundler_transaction_id = %job_data.bundler_transaction_id, + bundler_transaction_id = job_data.bundler_transaction_id, sender_details = ?job_data.sender_details, error = ?e, "Failed to get transaction hash from bundler" @@ -330,15 +321,7 @@ where // Send webhook if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue success webhook" ); @@ -363,15 +346,7 @@ where if should_queue_webhook { if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue nack webhook" ); @@ -395,30 +370,14 @@ where .add_remove_command(tx.pipeline(), &job.job.data.transaction_id); tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?fail_data.error, "EIP-7702 confirmation job failed" ); if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue fail webhook" ); diff --git a/executors/src/eip7702_executor/send.rs b/executors/src/eip7702_executor/send.rs index 78b54b7..f531eab 100644 --- a/executors/src/eip7702_executor/send.rs +++ b/executors/src/eip7702_executor/send.rs @@ -36,9 +36,6 @@ use crate::{ use super::confirm::{Eip7702ConfirmationHandler, Eip7702ConfirmationJobData}; -const EIP7702_SEND_QUEUE_ID: &str = "eip7702_send"; -const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm"; - // --- Job Payload --- #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] @@ -178,7 +175,7 @@ where type ErrorData = Eip7702SendError; type JobData = Eip7702SendJobData; - #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_SEND_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))] + #[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))] async fn process( &self, job: &BorrowedJob, @@ -389,15 +386,7 @@ where if let Err(e) = tx.queue_job(confirmation_job) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_CONFIRM_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to enqueue confirmation job" ); @@ -406,15 +395,7 @@ where // Send webhook if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue success webhook" ); @@ -430,15 +411,7 @@ where // Don't modify transaction registry on NACK - job will be retried if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue nack webhook" ); @@ -456,30 +429,14 @@ where .add_remove_command(tx.pipeline(), &job.job.data.transaction_id); tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?fail_data.error, "EIP-7702 send job failed" ); if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - chain_id = job.job.data.chain_id, - client_id = job - .job - .data - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EIP7702_SEND_QUEUE_ID, + transaction_id = job.job.data.transaction_id, error = ?e, "Failed to queue fail webhook" ); diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index d85c044..505366c 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -4,8 +4,7 @@ use alloy::{ consensus::{Signed, TypedTransaction}, primitives::Address, }; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::{ eoa::{ @@ -31,7 +30,6 @@ use crate::{ const MAX_RETRIES: u32 = 10; const RETRY_BASE_DELAY_MS: u64 = 10; -const EOA_QUEUE_ID: &str = "eoa_executor"; pub trait SafeRedisTransaction: Send + Sync { type ValidationData; @@ -45,7 +43,7 @@ pub trait SafeRedisTransaction: Send + Sync { ) -> Self::OperationResult; fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, store: &EoaExecutorStore, ) -> impl Future> + Send; fn watch_keys(&self) -> Vec; @@ -595,7 +593,7 @@ impl AtomicEoaExecutorStore { let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&tx_data_key, ttl_seconds); pipeline.expire( - &self.transaction_attempts_list_name(&pending_transaction.transaction_id), + self.transaction_attempts_list_name(&pending_transaction.transaction_id), ttl_seconds, ); @@ -614,18 +612,7 @@ impl AtomicEoaExecutorStore { &mut tx_context, webhook_queue.clone(), ) { - tracing::error!( - transaction_id = %pending_transaction.transaction_id, - chain_id = pending_transaction.user_request.chain_id, - client_id = pending_transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for fail: {}", - e - ); + tracing::error!("Failed to queue webhook for fail: {}", e); } } @@ -683,7 +670,7 @@ impl AtomicEoaExecutorStore { let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&tx_data_key, ttl_seconds); pipeline.expire( - &self.transaction_attempts_list_name(&pending_transaction.transaction_id), + self.transaction_attempts_list_name(&pending_transaction.transaction_id), ttl_seconds, ); } @@ -707,13 +694,6 @@ impl AtomicEoaExecutorStore { ) { tracing::error!( transaction_id = %pending_transaction.transaction_id, - chain_id = pending_transaction.user_request.chain_id, - client_id = pending_transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, error = ?e, "Failed to queue webhook for batch fail" ); @@ -835,7 +815,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> { async fn validation( &self, - _conn: &mut ClusterConnection, + _conn: &mut ConnectionManager, store: &EoaExecutorStore, ) -> Result { let now = chrono::Utc::now().timestamp_millis().max(0) as u64; diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 87b5823..99b3ce6 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use twmq::Queue; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::eoa::EoaExecutorStore; use crate::eoa::{ @@ -16,8 +15,6 @@ use crate::eoa::{ use crate::metrics::{EoaMetrics, calculate_duration_seconds, current_timestamp_ms}; use crate::webhook::{WebhookJobHandler, queue_webhook_envelopes}; -const EOA_QUEUE_ID: &str = "eoa_executor"; - #[derive(Debug, Clone)] pub enum SubmissionResultType { Success, @@ -74,7 +71,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { // Get all borrowed transaction IDs @@ -175,19 +172,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { &mut tx_context, self.webhook_queue.clone(), ) { - tracing::error!( - transaction_id = transaction_id, - chain_id = result.transaction.user_request.chain_id, - client_id = result - .transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for success: {}", - e - ); + tracing::error!("Failed to queue webhook for success: {}", e); } else { report.webhook_events_queued += 1; } @@ -227,19 +212,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { &mut tx_context, self.webhook_queue.clone(), ) { - tracing::error!( - transaction_id = transaction_id, - chain_id = result.transaction.user_request.chain_id, - client_id = result - .transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for nack: {}", - e - ); + tracing::error!("Failed to queue webhook for nack: {}", e); } else { report.webhook_events_queued += 1; } @@ -258,7 +231,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&tx_data_key, ttl_seconds); pipeline.expire( - &self.keys.transaction_attempts_list_name(transaction_id), + self.keys.transaction_attempts_list_name(transaction_id), ttl_seconds, ); @@ -281,19 +254,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { &mut tx_context, self.webhook_queue.clone(), ) { - tracing::error!( - transaction_id = transaction_id, - chain_id = result.transaction.user_request.chain_id, - client_id = result - .transaction - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to queue webhook for fail: {}", - e - ); + tracing::error!("Failed to queue webhook for fail: {}", e); } else { report.webhook_events_queued += 1; } diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index 5ec9749..2cb59b4 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -9,8 +9,7 @@ use engine_core::transaction::TransactionTypeData; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ops::Deref; -use twmq::redis::AsyncCommands; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, aio::ConnectionManager}; mod atomic; mod borrowed; @@ -99,7 +98,7 @@ pub struct TransactionData { /// Transaction store focused on transaction_id operations and nonce indexing pub struct EoaExecutorStore { - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub keys: EoaExecutorStoreKeys, pub completed_transaction_ttl_seconds: u64, } @@ -122,14 +121,8 @@ impl EoaExecutorStoreKeys { /// Lock key name for EOA processing pub fn eoa_lock_key_name(&self) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:lock:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:lock:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), + Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa), + None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa), } } @@ -144,14 +137,8 @@ impl EoaExecutorStoreKeys { /// - "failure_reason": String failure reason (optional) pub fn transaction_data_key_name(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:tx_data:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), - None => format!( - "{}:eoa_executor:tx_data:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), + Some(ns) => format!("{ns}:eoa_executor:tx_data:{transaction_id}"), + None => format!("eoa_executor:tx_data:{transaction_id}"), } } @@ -161,14 +148,8 @@ impl EoaExecutorStoreKeys { /// of a TransactionAttempt. This allows efficient append operations. pub fn transaction_attempts_list_name(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:tx_attempts:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), - None => format!( - "{}:eoa_executor:tx_attempts:{transaction_id}", - twmq::ENGINE_HASH_TAG - ), + Some(ns) => format!("{ns}:eoa_executor:tx_attempts:{transaction_id}"), + None => format!("eoa_executor:tx_attempts:{transaction_id}"), } } @@ -178,13 +159,10 @@ impl EoaExecutorStoreKeys { pub fn pending_transactions_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:pending_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:pending_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:pending_txs:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:pending_txs:{}:{}", self.chain_id, self.eoa), } } @@ -194,27 +172,18 @@ impl EoaExecutorStoreKeys { pub fn submitted_transactions_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:submitted_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:submitted_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:submitted_txs:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:submitted_txs:{}:{}", self.chain_id, self.eoa), } } /// Name of the key that maps transaction hash to transaction id pub fn transaction_hash_to_id_key_name(&self, hash: &str) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:tx_hash_to_id:{hash}", - twmq::ENGINE_HASH_TAG - ), - None => format!( - "{}:eoa_executor:tx_hash_to_id:{hash}", - twmq::ENGINE_HASH_TAG - ), + Some(ns) => format!("{ns}:eoa_executor:tx_hash_to_id:{hash}"), + None => format!("eoa_executor:tx_hash_to_id:{hash}"), } } @@ -228,13 +197,10 @@ impl EoaExecutorStoreKeys { pub fn borrowed_transactions_hashmap_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:borrowed_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:borrowed_txs:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:borrowed_txs:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:borrowed_txs:{}:{}", self.chain_id, self.eoa), } } @@ -248,12 +214,12 @@ impl EoaExecutorStoreKeys { pub fn recycled_nonces_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:recycled_nonces:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:recycled_nonces:{}:{}", + self.chain_id, self.eoa ), None => format!( - "{}:eoa_executor:recycled_nonces:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "eoa_executor:recycled_nonces:{}:{}", + self.chain_id, self.eoa ), } } @@ -270,12 +236,12 @@ impl EoaExecutorStoreKeys { pub fn optimistic_transaction_count_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:optimistic_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:optimistic_nonce:{}:{}", + self.chain_id, self.eoa ), None => format!( - "{}:eoa_executor:optimistic_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "eoa_executor:optimistic_nonce:{}:{}", + self.chain_id, self.eoa ), } } @@ -290,13 +256,10 @@ impl EoaExecutorStoreKeys { pub fn last_transaction_count_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:last_tx_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:last_tx_nonce:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:last_tx_nonce:{}:{}", + self.chain_id, self.eoa ), + None => format!("eoa_executor:last_tx_nonce:{}:{}", self.chain_id, self.eoa), } } @@ -308,14 +271,8 @@ impl EoaExecutorStoreKeys { /// - timestamp of the last 5 nonce resets pub fn eoa_health_key_name(&self) -> String { match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:health:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:health:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), + Some(ns) => format!("{ns}:eoa_executor:health:{}:{}", self.chain_id, self.eoa), + None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa), } } @@ -325,12 +282,12 @@ impl EoaExecutorStoreKeys { pub fn manual_reset_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:{}:eoa_executor:pending_manual_reset:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "{ns}:eoa_executor:pending_manual_reset:{}:{}", + self.chain_id, self.eoa ), None => format!( - "{}:eoa_executor:pending_manual_reset:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + "eoa_executor:pending_manual_reset:{}:{}", + self.chain_id, self.eoa ), } } @@ -338,7 +295,7 @@ impl EoaExecutorStoreKeys { impl EoaExecutorStore { pub fn new( - redis: ClusterConnection, + redis: ConnectionManager, namespace: Option, eoa: Address, chain_id: u64, diff --git a/executors/src/eoa/store/pending.rs b/executors/src/eoa/store/pending.rs index d3be479..637e010 100644 --- a/executors/src/eoa/store/pending.rs +++ b/executors/src/eoa/store/pending.rs @@ -1,8 +1,7 @@ use std::collections::HashSet; use alloy::{consensus::Transaction, primitives::Address}; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::eoa::{ EoaExecutorStore, @@ -47,7 +46,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { if self.transactions.is_empty() { @@ -182,7 +181,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithRecycledNonces<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { if self.transactions.is_empty() { diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index f0776c4..ff01f23 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -5,8 +5,7 @@ use std::{ }; use serde::{Deserialize, Serialize}; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; use crate::{ TransactionCounts, @@ -22,8 +21,6 @@ use crate::{ webhook::{WebhookJobHandler, queue_webhook_envelopes}, }; -const EOA_QUEUE_ID: &str = "eoa_executor"; - #[derive(Debug, Clone)] pub struct SubmittedTransaction { pub data: SubmittedTransactionDehydrated, @@ -282,7 +279,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, store: &EoaExecutorStore, ) -> Result { // Fetch transactions up to the latest confirmed nonce for replacements @@ -367,7 +364,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { // Add TTL expiration let ttl_seconds = self.completed_transaction_ttl_seconds as i64; pipeline.expire(&data_key_name, ttl_seconds); - pipeline.expire(&self.keys.transaction_attempts_list_name(id), ttl_seconds); + pipeline.expire(self.keys.transaction_attempts_list_name(id), ttl_seconds); if let SubmittedTransactionHydrated::Real(tx) = tx { // Record metrics: transaction queued to mined for confirmed transactions @@ -399,14 +396,6 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { self.webhook_queue.clone(), ) { tracing::error!( - transaction_id = %tx.transaction_id, - chain_id = tx.user_request.chain_id, - client_id = tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, "Failed to queue webhook for confirmed transaction: {}", e ); @@ -446,14 +435,6 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { self.webhook_queue.clone(), ) { tracing::error!( - transaction_id = %tx.transaction_id, - chain_id = tx.user_request.chain_id, - client_id = tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, "Failed to queue webhook for replaced transaction: {}", e ); @@ -611,7 +592,7 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> { async fn validation( &self, - conn: &mut ClusterConnection, + conn: &mut ConnectionManager, _store: &EoaExecutorStore, ) -> Result { // get the highest submitted nonce diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index 46d9f10..beb3feb 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -18,7 +18,6 @@ use crate::{ }; const NONCE_STALL_LIMIT_MS: u64 = 60_000; // 1 minute in milliseconds - after this time, attempt gas bump -const EOA_QUEUE_ID: &str = "eoa_executor"; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -407,35 +406,13 @@ impl EoaExecutorWorker { if should_update_balance_threshold(inner_error) && let Err(e) = self.update_balance_threshold().await { - tracing::error!( - transaction_id = ?newest_transaction_data.transaction_id, - chain_id = newest_transaction_data.user_request.chain_id, - client_id = newest_transaction_data - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to update balance threshold: {}", - e - ); + tracing::error!("Failed to update balance threshold: {}", e); } } else if let EoaExecutorWorkerError::RpcError { inner_error, .. } = &e && should_update_balance_threshold(inner_error) && let Err(e) = self.update_balance_threshold().await { - tracing::error!( - transaction_id = ?newest_transaction_data.transaction_id, - chain_id = newest_transaction_data.user_request.chain_id, - client_id = newest_transaction_data - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - "Failed to update balance threshold: {}", - e - ); + tracing::error!("Failed to update balance threshold: {}", e); } // Check if nonce has moved ahead since we started the gas bump // This handles the race condition where the original transaction diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index c235453..33e47bf 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -11,7 +11,7 @@ use engine_eip7702_core::delegated_account::DelegatedAccount; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use twmq::Queue; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::aio::ConnectionManager; use twmq::{ DurableExecution, FailHookData, NackHookData, SuccessHookData, hooks::TransactionContext, @@ -114,7 +114,7 @@ where pub webhook_queue: Arc>, pub authorization_cache: EoaAuthorizationCache, - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub namespace: Option, pub eoa_signer: Arc, diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index 8e48967..9742ee6 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -17,7 +17,6 @@ use crate::{ }; const HEALTH_CHECK_INTERVAL_MS: u64 = 60 * 5 * 1000; // 5 minutes in milliseconds -const EOA_QUEUE_ID: &str = "eoa_executor"; impl EoaExecutorWorker { // ========== SEND FLOW ========== @@ -349,14 +348,7 @@ impl EoaExecutorWorker { if !is_retryable_preparation_error(&e) { tracing::error!( error = ?e, - transaction_id = %pending.transaction_id, - chain_id = pending.user_request.chain_id, - client_id = pending - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, + transaction_id = pending.transaction_id, "Transaction permanently failed due to non-retryable preparation error", ); non_retryable_failures.push((pending, e.clone())); @@ -556,35 +548,11 @@ impl EoaExecutorWorker { match &result.result { SubmissionResultType::Success => result, SubmissionResultType::Nack(e) => { - tracing::error!( - error = ?e, - transaction_id = %borrowed_tx.transaction_id, - chain_id = borrowed_tx.user_request.chain_id, - client_id = borrowed_tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - nonce = borrowed_tx.data.signed_transaction.nonce(), - "Transaction nack error during send" - ); + tracing::error!(error = ?e, transaction_id = borrowed_tx.transaction_id, nonce = borrowed_tx.data.signed_transaction.nonce(), "Transaction nack error during send"); result } SubmissionResultType::Fail(e) => { - tracing::error!( - error = ?e, - transaction_id = %borrowed_tx.transaction_id, - chain_id = borrowed_tx.user_request.chain_id, - client_id = borrowed_tx - .user_request - .rpc_credentials - .client_id_for_logs() - .unwrap_or("unknown"), - queue_id = EOA_QUEUE_ID, - nonce = borrowed_tx.data.signed_transaction.nonce(), - "Transaction failed during send" - ); + tracing::error!(error = ?e, transaction_id = borrowed_tx.transaction_id, nonce = borrowed_tx.data.signed_transaction.nonce(), "Transaction failed during send"); result } } diff --git a/executors/src/external_bundler/deployment.rs b/executors/src/external_bundler/deployment.rs index ac19502..ac8a969 100644 --- a/executors/src/external_bundler/deployment.rs +++ b/executors/src/external_bundler/deployment.rs @@ -7,10 +7,8 @@ use serde::{Deserialize, Serialize}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use twmq::{ error::TwmqError, - redis::{AsyncCommands, Pipeline}, + redis::{AsyncCommands, Pipeline, aio::ConnectionManager}, }; -use twmq::redis::cluster_async::ClusterConnection; -use twmq::redis::cluster::ClusterClient; use uuid::Uuid; const CACHE_PREFIX: &str = "deployment_cache"; @@ -18,12 +16,12 @@ const LOCK_PREFIX: &str = "deployment_lock"; #[derive(Clone)] pub struct RedisDeploymentCache { - connection: ClusterConnection, + connection_manager: twmq::redis::aio::ConnectionManager, } #[derive(Clone)] pub struct RedisDeploymentLock { - connection: ClusterConnection, + connection_manager: twmq::redis::aio::ConnectionManager, } #[derive(Serialize, Deserialize)] @@ -33,21 +31,18 @@ struct LockData { } impl RedisDeploymentCache { - pub async fn new(client: ClusterClient) -> Result { + pub async fn new(client: twmq::redis::Client) -> Result { Ok(Self { - connection: client.get_async_connection().await?, + connection_manager: ConnectionManager::new(client).await?, }) } - pub fn conn(&self) -> &ClusterConnection { - &self.connection + pub fn conn(&self) -> &ConnectionManager { + &self.connection_manager } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!( - "{}:{CACHE_PREFIX}:{chain_id}:{account_address}", - twmq::ENGINE_HASH_TAG - ) + format!("{CACHE_PREFIX}:{chain_id}:{account_address}") } } @@ -65,28 +60,22 @@ impl DeploymentCache for RedisDeploymentCache { } impl RedisDeploymentLock { - pub async fn new(client: ClusterClient) -> Result { + pub async fn new(client: twmq::redis::Client) -> Result { Ok(Self { - connection: client.get_async_connection().await?, + connection_manager: ConnectionManager::new(client).await?, }) } - pub fn conn(&self) -> &ClusterConnection { - &self.connection + pub fn conn(&self) -> &ConnectionManager { + &self.connection_manager } fn lock_key(&self, chain_id: u64, account_address: &Address) -> String { - format!( - "{}:{LOCK_PREFIX}:{chain_id}:{account_address}", - twmq::ENGINE_HASH_TAG - ) + format!("{LOCK_PREFIX}:{chain_id}:{account_address}") } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!( - "{}:{CACHE_PREFIX}:{chain_id}:{account_address}", - twmq::ENGINE_HASH_TAG - ) + format!("{CACHE_PREFIX}:{chain_id}:{account_address}") } /// Release a deployment lock using the provided pipeline diff --git a/executors/src/solana_executor/rpc_cache.rs b/executors/src/solana_executor/rpc_cache.rs index f625a44..8bbf44d 100644 --- a/executors/src/solana_executor/rpc_cache.rs +++ b/executors/src/solana_executor/rpc_cache.rs @@ -45,7 +45,7 @@ impl SolanaRpcCache { }; let key = RpcCacheKey { - chain_id: chain_id.clone(), + chain_id, rpc_url: rpc_url.clone(), }; diff --git a/executors/src/solana_executor/storage.rs b/executors/src/solana_executor/storage.rs index 510e22d..8a88302 100644 --- a/executors/src/solana_executor/storage.rs +++ b/executors/src/solana_executor/storage.rs @@ -3,9 +3,8 @@ use serde_with::{DisplayFromStr, serde_as}; use solana_sdk::{hash::Hash, signature::Signature}; use twmq::{ redis, - redis::AsyncCommands, + redis::{AsyncCommands, aio::ConnectionManager}, }; -use twmq::redis::cluster_async::ClusterConnection; /// Represents a single attempt to send a Solana transaction /// This is stored in Redis BEFORE sending to prevent duplicate transactions @@ -46,7 +45,7 @@ impl SolanaTransactionAttempt { /// Represents a lock held on a transaction /// When dropped, the lock is automatically released pub struct TransactionLock { - redis: ClusterConnection, + redis: ConnectionManager, lock_key: String, lock_value: String, } @@ -122,30 +121,28 @@ impl From for LockError { /// Storage for Solana transaction attempts /// Provides atomic operations to prevent duplicate transactions pub struct SolanaTransactionStorage { - redis: ClusterConnection, + redis: ConnectionManager, namespace: Option, } impl SolanaTransactionStorage { - pub fn new(redis: ClusterConnection, namespace: Option) -> Self { + pub fn new(redis: ConnectionManager, namespace: Option) -> Self { Self { redis, namespace } } /// Get the Redis key for a transaction's attempt fn attempt_key(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => { - format!("{ns}:{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG) - } - None => format!("{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG), + Some(ns) => format!("{ns}:solana_tx_attempt:{transaction_id}"), + None => format!("solana_tx_attempt:{transaction_id}"), } } /// Get the Redis key for a transaction's lock fn lock_key(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!("{ns}:{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG), - None => format!("{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG), + Some(ns) => format!("{ns}:solana_tx_lock:{transaction_id}"), + None => format!("solana_tx_lock:{transaction_id}"), } } diff --git a/executors/src/solana_executor/worker.rs b/executors/src/solana_executor/worker.rs index c1ec06e..8fd3a0f 100644 --- a/executors/src/solana_executor/worker.rs +++ b/executors/src/solana_executor/worker.rs @@ -221,7 +221,7 @@ impl DurableExecution for SolanaExecutorJobHandler { type JobData = SolanaExecutorJobData; #[tracing::instrument( - skip(self, job), + skip(self, job), fields( transaction_id = job.job.id, stage = Self::stage_name() @@ -648,7 +648,6 @@ impl SolanaExecutorJobHandler { let has_signatures = versioned_tx.signatures.iter().any(|sig| { sig.as_ref() != [0u8; 64] }); - if has_signatures { error!( transaction_id = %transaction_id, diff --git a/executors/src/transaction_registry.rs b/executors/src/transaction_registry.rs index 9754495..a63584c 100644 --- a/executors/src/transaction_registry.rs +++ b/executors/src/transaction_registry.rs @@ -1,7 +1,6 @@ use engine_core::error::EngineError; use thiserror::Error; -use twmq::redis::{AsyncCommands, Pipeline}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; #[derive(Debug, Error)] pub enum TransactionRegistryError { @@ -21,19 +20,19 @@ impl From for EngineError { } pub struct TransactionRegistry { - redis: ClusterConnection, + redis: ConnectionManager, namespace: Option, } impl TransactionRegistry { - pub fn new(redis: ClusterConnection, namespace: Option) -> Self { + pub fn new(redis: ConnectionManager, namespace: Option) -> Self { Self { redis, namespace } } fn registry_key(&self) -> String { match &self.namespace { - Some(ns) => format!("{ns}:{}:tx_registry", twmq::ENGINE_HASH_TAG), - None => format!("{}:tx_registry", twmq::ENGINE_HASH_TAG), + Some(ns) => format!("{ns}:tx_registry"), + None => "tx_registry".to_string(), } } diff --git a/integration-tests/tests/setup.rs b/integration-tests/tests/setup.rs index e8bdffa..d105cff 100644 --- a/integration-tests/tests/setup.rs +++ b/integration-tests/tests/setup.rs @@ -168,15 +168,8 @@ impl TestEnvironment { let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); // Setup Redis - let initial_nodes: Vec<&str> = config - .redis - .url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes) - .context("Failed to connect to Valkey Cluster")?; + let redis_client = twmq::redis::Client::open(config.redis.url.as_str()) + .context("Failed to connect to Redis")?; let authorization_cache = EoaAuthorizationCache::new( moka::future::Cache::builder() @@ -260,7 +253,7 @@ impl TestEnvironment { let execution_router = thirdweb_engine::ExecutionRouter { namespace: queue_config.execution_namespace.clone(), - redis: redis_client.get_async_connection().await?, + redis: redis_client.get_connection_manager().await?, authorization_cache, webhook_queue: queue_manager.webhook_queue.clone(), external_bundler_send_queue: queue_manager.external_bundler_send_queue.clone(), diff --git a/server/Cargo.toml b/server/Cargo.toml index 0b411b9..d568ffb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -27,7 +27,6 @@ futures = { workspace = true } serde-bool = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } -rustls = { workspace = true } solana-sdk = { workspace = true } solana-client = { workspace = true } aide = { workspace = true, features = [ diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 66d0cc0..ba41c46 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -31,8 +31,7 @@ use engine_executors::{ transaction_registry::TransactionRegistry, webhook::WebhookJobHandler, }; -use twmq::{Queue, error::TwmqError}; -use twmq::redis::cluster_async::ClusterConnection; +use twmq::{Queue, error::TwmqError, redis::aio::ConnectionManager}; use vault_sdk::VaultClient; use vault_types::{ RegexRule, Rule, @@ -43,7 +42,7 @@ use vault_types::{ use crate::chains::ThirdwebChainService; pub struct ExecutionRouter { - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub namespace: Option, pub webhook_queue: Arc>, pub external_bundler_send_queue: Arc>>, diff --git a/server/src/main.rs b/server/src/main.rs index c5a7858..4376aec 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,11 +22,6 @@ use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::Subscrib #[tokio::main] async fn main() -> anyhow::Result<()> { - // rustls 0.23 requires selecting a process-wide crypto provider (ring or aws-lc-rs). - // Some dependency graphs do not enable either by default, which causes a runtime panic - // when a TLS client config is constructed (e.g. when connecting to `rediss://`). - let _ = rustls::crypto::ring::default_provider().install_default(); - let config = config::get_config(); let subscriber = tracing_subscriber::registry() @@ -39,7 +34,7 @@ async fn main() -> anyhow::Result<()> { match config.server.log_format { config::LogFormat::Json => subscriber - .with(tracing_subscriber::fmt::layer().json().flatten_event(false)) + .with(tracing_subscriber::fmt::layer().json()) .init(), config::LogFormat::Pretty => subscriber.with(tracing_subscriber::fmt::layer()).init(), } @@ -75,14 +70,7 @@ async fn main() -> anyhow::Result<()> { }); let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone())); let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); - let initial_nodes: Vec<&str> = config - .redis - .url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)?; + let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?; let authorization_cache = EoaAuthorizationCache::new( moka::future::Cache::builder() @@ -130,7 +118,7 @@ async fn main() -> anyhow::Result<()> { let execution_router = ExecutionRouter { namespace: config.queue.execution_namespace.clone(), - redis: redis_client.get_async_connection().await?, + redis: redis_client.get_connection_manager().await?, authorization_cache, webhook_queue: queue_manager.webhook_queue.clone(), external_bundler_send_queue: queue_manager.external_bundler_send_queue.clone(), diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index deb042f..3e61575 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -50,7 +50,7 @@ const EOA_EXECUTOR_QUEUE_NAME: &str = "eoa_executor"; impl QueueManager { pub async fn new( - redis_client: twmq::redis::cluster::ClusterClient, + redis_client: twmq::redis::Client, queue_config: &QueueConfig, solana_config: &crate::config::SolanaConfig, chain_service: Arc, @@ -61,7 +61,7 @@ impl QueueManager { ) -> Result { // Create transaction registry let transaction_registry = Arc::new(TransactionRegistry::new( - redis_client.get_async_connection().await?, + redis_client.get_connection_manager().await?, queue_config.execution_namespace.clone(), )); @@ -255,7 +255,7 @@ impl QueueManager { eoa_signer: eoa_signer.clone(), webhook_queue: webhook_queue.clone(), namespace: queue_config.execution_namespace.clone(), - redis: redis_client.get_async_connection().await?, + redis: redis_client.get_connection_manager().await?, authorization_cache, max_inflight: 50, max_recycled_nonces: 50, @@ -286,7 +286,7 @@ impl QueueManager { }; let solana_rpc_cache = Arc::new(SolanaRpcCache::new(solana_rpc_urls)); let solana_storage = SolanaTransactionStorage::new( - redis_client.get_async_connection().await?, + redis_client.get_connection_manager().await?, queue_config.execution_namespace.clone(), ); let solana_executor_handler = SolanaExecutorJobHandler { diff --git a/twmq/src/hooks.rs b/twmq/src/hooks.rs index 57b03e7..44fc44a 100644 --- a/twmq/src/hooks.rs +++ b/twmq/src/hooks.rs @@ -1,6 +1,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; -use crate::{DurableExecution, error::TwmqError, job::PushableJob}; +use crate::{DurableExecution, delay_to_queue_seconds, error::TwmqError, job::PushableJob}; // A minimal transaction context that hooks can use pub struct TransactionContext<'a> { @@ -51,7 +51,7 @@ impl<'a> TransactionContext<'a> { .sadd(job.queue.dedupe_set_name(), &job.options.id); if let Some(delay_options) = job.options.delay { - let process_at = now + delay_options.delay.as_secs(); + let process_at = now + delay_to_queue_seconds(delay_options.delay); self.pipeline .hset( job.queue.job_meta_hash_name(&job.options.id), diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 24dc446..34fe5d6 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -18,8 +18,7 @@ use job::{ pub use multilane::{MultilanePushableJob, MultilaneQueue}; use queue::QueueOptions; use redis::Pipeline; -use redis::{AsyncCommands, RedisResult}; -use redis::cluster_async::ClusterConnection; +use redis::{AsyncCommands, RedisResult, aio::ConnectionManager}; use serde::{Serialize, de::DeserializeOwned}; use shutdown::WorkerHandle; use tokio::sync::Semaphore; @@ -29,9 +28,17 @@ pub use queue::IdempotencyMode; pub use redis; use tracing::Instrument; -/// Global hash tag used to force all keys into the same cluster hash slot. -/// This is the "easiest" Valkey Cluster strategy: correctness over sharding. -pub const ENGINE_HASH_TAG: &str = "{engine}"; +pub(crate) fn delay_to_queue_seconds(delay: Duration) -> u64 { + let delay_secs = delay.as_secs(); + + if delay.is_zero() { + 0 + } else if delay_secs == 0 { + 1 + } else { + delay_secs + } +} // Trait for error types to implement user cancellation pub trait UserCancellable { @@ -126,7 +133,7 @@ pub struct Queue where H: DurableExecution, { - pub redis: ClusterConnection, + pub redis: ConnectionManager, pub handler: Arc, pub options: QueueOptions, // concurrency: usize, @@ -141,13 +148,8 @@ impl Queue { options: Option, handler: H, ) -> Result { - let initial_nodes: Vec<&str> = redis_url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let client = redis::cluster::ClusterClient::new(initial_nodes)?; - let redis = client.get_async_connection().await?; + let client = redis::Client::open(redis_url)?; + let redis = client.get_connection_manager().await?; let queue = Self { redis, @@ -186,60 +188,51 @@ impl Queue { } pub fn pending_list_name(&self) -> String { - format!("twmq:{}:{}:pending", ENGINE_HASH_TAG, self.name()) + format!("twmq:{}:pending", self.name()) } pub fn active_hash_name(&self) -> String { - format!("twmq:{}:{}:active", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:active", self.name) } pub fn delayed_zset_name(&self) -> String { - format!("twmq:{}:{}:delayed", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:delayed", self.name) } pub fn success_list_name(&self) -> String { - format!("twmq:{}:{}:success", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:success", self.name) } pub fn failed_list_name(&self) -> String { - format!("twmq:{}:{}:failed", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:failed", self.name) } pub fn job_data_hash_name(&self) -> String { - format!("twmq:{}:{}:jobs:data", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:jobs:data", self.name) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq:{}:{}:job:{}:meta", ENGINE_HASH_TAG, self.name, job_id) + format!("twmq:{}:job:{}:meta", self.name, job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!( - "twmq:{}:{}:job:{}:errors", - ENGINE_HASH_TAG, self.name, job_id - ) + format!("twmq:{}:job:{}:errors", self.name, job_id) } pub fn job_result_hash_name(&self) -> String { - format!("twmq:{}:{}:jobs:result", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:jobs:result", self.name) } pub fn dedupe_set_name(&self) -> String { - format!("twmq:{}:{}:dedup", ENGINE_HASH_TAG, self.name) + format!("twmq:{}:dedup", self.name) } pub fn pending_cancellation_set_name(&self) -> String { - format!( - "twmq:{}:{}:pending_cancellations", - ENGINE_HASH_TAG, self.name - ) + format!("twmq:{}:pending_cancellations", self.name) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { - format!( - "twmq:{}:{}:job:{}:lease:{}", - ENGINE_HASH_TAG, self.name, job_id, lease_token - ) + format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token) } pub async fn push( @@ -249,20 +242,20 @@ impl Queue { // Check for duplicates and handle job creation with deduplication let script = redis::Script::new( r#" - local queue_id = ARGV[1] - local job_id = ARGV[2] - local job_data = ARGV[3] - local now = ARGV[4] - local delay = ARGV[5] - local reentry_position = ARGV[6] -- "first" or "last" + local job_id = ARGV[1] + local job_data = ARGV[2] + local now = ARGV[3] + local delay = ARGV[4] + local reentry_position = ARGV[5] -- "first" or "last" - local delayed_zset_name = KEYS[1] - local pending_list_name = KEYS[2] + local queue_id = KEYS[1] + local delayed_zset_name = KEYS[2] + local pending_list_name = KEYS[3] - local job_data_hash_name = KEYS[3] - local job_meta_hash_name = KEYS[4] + local job_data_hash_name = KEYS[4] + local job_meta_hash_name = KEYS[5] - local dedupe_set_name = KEYS[5] + local dedupe_set_name = KEYS[6] -- Check if job already exists in any queue if redis.call('SISMEMBER', dedupe_set_name, job_id) == 1 then @@ -316,16 +309,16 @@ impl Queue { position: RequeuePosition::Last, }); - let delay_secs = delay.delay.as_secs(); + let delay_secs = delay_to_queue_seconds(delay.delay); let position_string = delay.position.to_string(); let _result: (i32, String) = script + .key(&self.name) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.job_data_hash_name()) .key(self.job_meta_hash_name(&job.id)) .key(self.dedupe_set_name()) - .arg(self.name()) .arg(job_options.id) .arg(job_data) .arg(now) @@ -607,15 +600,14 @@ impl Queue { local batch_size = tonumber(ARGV[3]) local lease_seconds = tonumber(ARGV[4]) - local queue_id = ARGV[5] - - local delayed_zset_name = KEYS[1] - local pending_list_name = KEYS[2] - local active_hash_name = KEYS[3] - local job_data_hash_name = KEYS[4] - local pending_cancellation_set = KEYS[5] - local failed_list_name = KEYS[6] - local success_list_name = KEYS[7] + local queue_id = KEYS[1] + local delayed_zset_name = KEYS[2] + local pending_list_name = KEYS[3] + local active_hash_name = KEYS[4] + local job_data_hash_name = KEYS[5] + local pending_cancellation_set = KEYS[6] + local failed_list_name = KEYS[7] + local success_list_name = KEYS[8] local result_jobs = {} local timed_out_jobs = {} @@ -630,14 +622,14 @@ impl Queue { for i = 1, #active_jobs, 2 do local job_id = active_jobs[i] local attempts = active_jobs[i + 1] - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' -- Get the current lease token from job metadata local current_lease_token = redis.call('HGET', job_meta_hash_name, 'lease_token') if current_lease_token then -- Build the lease key and check if it exists (Redis auto-expires) - local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token + local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token local lease_exists = redis.call('EXISTS', lease_key) -- If lease doesn't exist (expired), move job back to pending @@ -677,7 +669,7 @@ impl Queue { -- Job not successful, cancel it now redis.call('LPUSH', failed_list_name, job_id) -- Add cancellation timestamp - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash_name, 'finished_at', now) table.insert(cancelled_jobs, job_id) end @@ -689,7 +681,7 @@ impl Queue { -- Step 3: Move expired delayed jobs to pending local delayed_jobs = redis.call('ZRANGEBYSCORE', delayed_zset_name, 0, now) for i, job_id in ipairs(delayed_jobs) do - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' local reentry_position = redis.call('HGET', job_meta_hash_name, 'reentry_position') or 'last' -- Remove from delayed @@ -724,7 +716,7 @@ impl Queue { -- Only process if we have data if job_data then -- Update metadata - local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash_name, 'processed_at', now) local created_at = redis.call('HGET', job_meta_hash_name, 'created_at') or now local attempts = redis.call('HINCRBY', job_meta_hash_name, 'attempts', 1) @@ -733,7 +725,7 @@ impl Queue { local lease_token = now .. '_' .. job_id .. '_' .. attempts .. '_' .. pop_id -- Create separate lease key with TTL - local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token + local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token redis.call('SET', lease_key, '1') redis.call('EXPIRE', lease_key, lease_seconds) @@ -762,6 +754,7 @@ impl Queue { Vec, Vec, ) = script + .key(self.name()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.active_hash_name()) @@ -773,7 +766,6 @@ impl Queue { .arg(pop_id) .arg(batch_size) .arg(self.options.lease_duration.as_secs()) - .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; @@ -962,14 +954,14 @@ impl Queue { // Separate call for pruning with data deletion using Lua let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local results_hash = KEYS[3] -- e.g., "myqueue:results" - local dedupe_set_name = KEYS[4] - local active_hash = KEYS[5] - local pending_list = KEYS[6] - local delayed_zset = KEYS[7] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local results_hash = KEYS[4] -- e.g., "myqueue:results" + local dedupe_set_name = KEYS[5] + local active_hash = KEYS[6] + local pending_list = KEYS[7] + local delayed_zset = KEYS[8] local max_len = tonumber(ARGV[1]) @@ -991,8 +983,8 @@ impl Queue { -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then - local job_meta_hash = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' - local errors_list_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' + local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta' + local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1010,6 +1002,7 @@ impl Queue { ); let trimmed_count: usize = trim_script + .key(self.name()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) // results_hash @@ -1018,7 +1011,6 @@ impl Queue { .key(self.pending_list_name()) // Check if job is pending .key(self.delayed_zset_name()) // Check if job is delayed .arg(self.options.max_success) // max_len (LTRIM is 0 to max_success-1) - .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; @@ -1069,7 +1061,7 @@ impl Queue { // Add to proper queue based on delay and position if let Some(delay_duration) = delay { - let delay_until = now + delay_duration.as_secs(); + let delay_until = now + delay_to_queue_seconds(delay_duration); let pos_str = position.to_string(); pipeline @@ -1143,13 +1135,13 @@ impl Queue { // Separate call for pruning with data deletion using Lua let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local dedupe_set_name = KEYS[3] - local active_hash = KEYS[4] - local pending_list = KEYS[5] - local delayed_zset = KEYS[6] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local dedupe_set_name = KEYS[4] + local active_hash = KEYS[5] + local pending_list = KEYS[6] + local delayed_zset = KEYS[7] local max_len = tonumber(ARGV[1]) @@ -1171,8 +1163,8 @@ impl Queue { -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then - local errors_list_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' - local job_meta_hash = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' + local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors' + local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1188,6 +1180,7 @@ impl Queue { ); let trimmed_count: usize = trim_script + .key(self.name()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) @@ -1195,7 +1188,6 @@ impl Queue { .key(self.pending_list_name()) // Check if job is pending .key(self.delayed_zset_name()) // Check if job is delayed .arg(self.options.max_failed) - .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index f79c414..4e1cb06 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -2,16 +2,14 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use redis::{AsyncCommands, Pipeline, RedisResult}; -use redis::cluster_async::ClusterConnection; +use redis::{AsyncCommands, Pipeline, RedisResult, aio::ConnectionManager}; use tokio::sync::Semaphore; use tokio::time::sleep; use tracing::Instrument; use crate::{ CancelResult, DurableExecution, FailHookData, NackHookData, QueueInternalErrorHookData, - SuccessHookData, UserCancellable, - ENGINE_HASH_TAG, + SuccessHookData, UserCancellable, delay_to_queue_seconds, error::TwmqError, hooks::TransactionContext, job::{ @@ -28,7 +26,7 @@ pub struct MultilaneQueue where H: DurableExecution, { - pub redis: ClusterConnection, + pub redis: ConnectionManager, handler: Arc, options: QueueOptions, /// Unique identifier for this multilane queue instance @@ -52,13 +50,8 @@ impl MultilaneQueue { options: Option, handler: H, ) -> Result { - let initial_nodes: Vec<&str> = redis_url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let client = redis::cluster::ClusterClient::new(initial_nodes)?; - let redis = client.get_async_connection().await?; + let client = redis::Client::open(redis_url)?; + let redis = client.get_connection_manager().await?; let queue = Self { redis, @@ -93,81 +86,57 @@ impl MultilaneQueue { // Redis key naming methods with proper multilane namespacing pub fn lanes_zset_name(&self) -> String { - format!("twmq_multilane:{}:{}:lanes", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:lanes", self.queue_id) } pub fn lane_pending_list_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:lane:{}:pending", - ENGINE_HASH_TAG, self.queue_id, lane_id - ) + format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id) } pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:lane:{}:delayed", - ENGINE_HASH_TAG, self.queue_id, lane_id - ) + format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id) } pub fn lane_active_hash_name(&self, lane_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:lane:{}:active", - ENGINE_HASH_TAG, self.queue_id, lane_id - ) + format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id) } pub fn success_list_name(&self) -> String { - format!("twmq_multilane:{}:{}:success", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:success", self.queue_id) } pub fn failed_list_name(&self) -> String { - format!("twmq_multilane:{}:{}:failed", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:failed", self.queue_id) } pub fn job_data_hash_name(&self) -> String { - format!( - "twmq_multilane:{}:{}:jobs:data", - ENGINE_HASH_TAG, self.queue_id - ) + format!("twmq_multilane:{}:jobs:data", self.queue_id) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:job:{}:meta", - ENGINE_HASH_TAG, self.queue_id, job_id - ) + format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!( - "twmq_multilane:{}:{}:job:{}:errors", - ENGINE_HASH_TAG, self.queue_id, job_id - ) + format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id) } pub fn job_result_hash_name(&self) -> String { - format!( - "twmq_multilane:{}:{}:jobs:result", - ENGINE_HASH_TAG, self.queue_id - ) + format!("twmq_multilane:{}:jobs:result", self.queue_id) } pub fn dedupe_set_name(&self) -> String { - format!("twmq_multilane:{}:{}:dedup", ENGINE_HASH_TAG, self.queue_id) + format!("twmq_multilane:{}:dedup", self.queue_id) } pub fn pending_cancellation_set_name(&self) -> String { - format!( - "twmq_multilane:{}:{}:pending_cancellations", - ENGINE_HASH_TAG, self.queue_id - ) + format!("twmq_multilane:{}:pending_cancellations", self.queue_id) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { format!( - "twmq_multilane:{}:{}:job:{}:lease:{}", - ENGINE_HASH_TAG, self.queue_id, job_id, lease_token + "twmq_multilane:{}:job:{}:lease:{}", + self.queue_id, job_id, lease_token ) } @@ -250,7 +219,7 @@ impl MultilaneQueue { position: RequeuePosition::Last, }); - let delay_secs = delay.delay.as_secs(); + let delay_secs = delay_to_queue_seconds(delay.delay); let position_string = delay.position.to_string(); let _result: (i32, String) = script @@ -403,9 +372,9 @@ impl MultilaneQueue { return "not_found" end - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' -- Try to remove from pending queue if redis.call('LREM', lane_pending_list, 0, job_id) > 0 then @@ -594,20 +563,20 @@ impl MultilaneQueue { -- Helper function to cleanup expired leases for a specific lane local function cleanup_lane_leases(lane_id) - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' local active_jobs = redis.call('HGETALL', lane_active_hash) for i = 1, #active_jobs, 2 do local job_id = active_jobs[i] local attempts = active_jobs[i + 1] - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' local current_lease_token = redis.call('HGET', job_meta_hash, 'lease_token') if current_lease_token then - local lease_key = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token + local lease_key = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token local lease_exists = redis.call('EXISTS', lease_key) if lease_exists == 0 then @@ -628,12 +597,12 @@ impl MultilaneQueue { -- Helper function to move delayed jobs to pending for a specific lane local function process_delayed_jobs(lane_id) - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' local delayed_jobs = redis.call('ZRANGEBYSCORE', lane_delayed_zset, 0, now) for i, job_id in ipairs(delayed_jobs) do - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' local reentry_position = redis.call('HGET', job_meta_hash, 'reentry_position') or 'last' redis.call('ZREM', lane_delayed_zset, job_id) @@ -649,8 +618,8 @@ impl MultilaneQueue { -- Helper function to pop one job from a lane local function pop_job_from_lane(lane_id) - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' local job_id = redis.call('RPOP', lane_pending_list) if not job_id then @@ -662,13 +631,13 @@ impl MultilaneQueue { return nil end - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash, 'processed_at', now) local created_at = redis.call('HGET', job_meta_hash, 'created_at') or now local attempts = redis.call('HINCRBY', job_meta_hash, 'attempts', 1) local lease_token = now .. '_' .. job_id .. '_' .. attempts - local lease_key = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token + local lease_key = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token redis.call('SET', lease_key, '1') redis.call('EXPIRE', lease_key, lease_seconds) @@ -682,11 +651,11 @@ impl MultilaneQueue { local cancel_requests = redis.call('SMEMBERS', pending_cancellation_set) for i, job_id in ipairs(cancel_requests) do - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') if lane_id then - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' if redis.call('HEXISTS', lane_active_hash, job_id) == 1 then -- Still processing, keep in cancellation set @@ -751,9 +720,9 @@ impl MultilaneQueue { empty_lanes_count = empty_lanes_count + 1 -- Check if lane should be removed from Redis - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' local pending_count = redis.call('LLEN', lane_pending_list) local delayed_count = redis.call('ZCARD', lane_delayed_zset) @@ -987,12 +956,12 @@ impl MultilaneQueue { async fn post_success_completion(&self) -> Result<(), TwmqError> { let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local results_hash = KEYS[3] - local dedupe_set_name = KEYS[4] - local lanes_zset = KEYS[5] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local results_hash = KEYS[4] + local dedupe_set_name = KEYS[5] + local lanes_zset = KEYS[6] local max_len = tonumber(ARGV[1]) @@ -1002,16 +971,16 @@ impl MultilaneQueue { if #job_ids_to_delete > 0 then for _, j_id in ipairs(job_ids_to_delete) do -- Get the lane_id for this job to check if it's active/pending/delayed - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') local should_delete = true if lane_id then -- Check if job is in any active state for this lane - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1 local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil @@ -1024,7 +993,7 @@ impl MultilaneQueue { end if should_delete then - local errors_list_name = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' + local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1041,13 +1010,13 @@ impl MultilaneQueue { ); let trimmed_count: usize = trim_script + .key(self.queue_id()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) .key(self.dedupe_set_name()) .key(self.lanes_zset_name()) // Need to check lanes .arg(self.options.max_success) - .arg(self.queue_id()) .invoke_async(&mut self.redis.clone()) .await?; @@ -1137,10 +1106,10 @@ impl MultilaneQueue { async fn post_fail_completion(&self) -> Result<(), TwmqError> { let trim_script = redis::Script::new( r#" - local queue_id = ARGV[2] - local list_name = KEYS[1] - local job_data_hash = KEYS[2] - local dedupe_set_name = KEYS[3] + local queue_id = KEYS[1] + local list_name = KEYS[2] + local job_data_hash = KEYS[3] + local dedupe_set_name = KEYS[4] local max_len = tonumber(ARGV[1]) @@ -1150,16 +1119,16 @@ impl MultilaneQueue { if #job_ids_to_delete > 0 then for _, j_id in ipairs(job_ids_to_delete) do -- Get the lane_id for this job to check if it's active/pending/delayed - local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' + local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') local should_delete = true if lane_id then -- Check if job is in any active state for this lane - local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1 local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil @@ -1172,7 +1141,7 @@ impl MultilaneQueue { end if should_delete then - local errors_list_name = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' + local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1188,11 +1157,11 @@ impl MultilaneQueue { ); let trimmed_count: usize = trim_script + .key(self.queue_id()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) .arg(self.options.max_failed) - .arg(self.queue_id()) .invoke_async(&mut self.redis.clone()) .await?; @@ -1257,7 +1226,7 @@ impl MultilaneQueue { .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); - let delay_until = now + delay_duration.as_secs(); + let delay_until = now + delay_to_queue_seconds(*delay_duration); let pos_str = position.to_string(); hook_pipeline diff --git a/twmq/src/queue.rs b/twmq/src/queue.rs index b4571e8..288b89f 100644 --- a/twmq/src/queue.rs +++ b/twmq/src/queue.rs @@ -1,7 +1,6 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; -use redis::cluster::ClusterClient; -use redis::cluster_async::ClusterConnection; +use redis::{Client, aio::ConnectionManager}; use serde::{Deserialize, Serialize}; use crate::{DurableExecution, Queue, error::TwmqError}; @@ -66,8 +65,8 @@ pub struct HasHandler; enum RedisSource { Url(String), - ClusterClient(ClusterClient), - ClusterConnection(ClusterConnection), + Client(Client), + ConnectionManager(ConnectionManager), } // Builder with typestate pattern @@ -115,9 +114,9 @@ impl QueueBuilder { } /// Set Redis connection from existing client - pub fn redis_client(self, client: ClusterClient) -> QueueBuilder { + pub fn redis_client(self, client: Client) -> QueueBuilder { QueueBuilder { - redis_source: Some(RedisSource::ClusterClient(client)), + redis_source: Some(RedisSource::Client(client)), name: self.name, options: self.options, handler: self.handler, @@ -125,10 +124,13 @@ impl QueueBuilder { } } - /// Set Redis connection from an existing cluster connection - pub fn redis_connection(self, conn: ClusterConnection) -> QueueBuilder { + /// Set Redis connection from existing connection manager + pub fn redis_connection_manager( + self, + manager: ConnectionManager, + ) -> QueueBuilder { QueueBuilder { - redis_source: Some(RedisSource::ClusterConnection(conn)), + redis_source: Some(RedisSource::ConnectionManager(manager)), name: self.name, options: self.options, handler: self.handler, @@ -190,16 +192,11 @@ impl QueueBuilder { let redis = match redis_source { RedisSource::Url(url) => { - let initial_nodes: Vec<&str> = url - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect(); - let client = ClusterClient::new(initial_nodes)?; - client.get_async_connection().await? + let client = Client::open(url)?; + client.get_connection_manager().await? } - RedisSource::ClusterClient(client) => client.get_async_connection().await?, - RedisSource::ClusterConnection(conn) => conn, + RedisSource::Client(client) => client.get_connection_manager().await?, + RedisSource::ConnectionManager(manager) => manager, }; Ok(Queue { diff --git a/twmq/tests/basic.rs b/twmq/tests/basic.rs index 8a10a1f..0c5d2f6 100644 --- a/twmq/tests/basic.rs +++ b/twmq/tests/basic.rs @@ -11,14 +11,14 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use twmq::job::{JobOptions, JobStatus}; // Assuming JobStatus is in twmq::job -use twmq::redis::cluster_async::ClusterConnection; // For cleanup utility +use twmq::redis::aio::ConnectionManager; // For cleanup utility const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys for a given queue name pattern -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index b4f17ef..0306ac2 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -1,7 +1,7 @@ // Add this to tests/basic.rs mod fixtures; use fixtures::{TestJobErrorData, TestJobOutput}; -use redis::cluster_async::ClusterConnection; +use redis::aio::ConnectionManager; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -21,9 +21,9 @@ use twmq::{ }; // Helper to clean up Redis keys for a given queue name pattern -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index 4f93078..5245188 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -15,15 +15,15 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, DelayOptions, JobResult, JobStatus, RequeuePosition}, queue::QueueOptions, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) @@ -131,8 +131,8 @@ async fn test_job_delay_basic() { }; // Create Redis connection for the execution context - let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap(); - let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap()); + let redis_client = redis::Client::open(REDIS_URL).unwrap(); + let redis_conn = Arc::new(redis_client.get_connection_manager().await.unwrap()); let handler = DelayTestJobHandler; @@ -307,8 +307,8 @@ async fn test_delay_position_ordering() { }; // Create Redis connection for the execution context - let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap(); - let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap()); + let redis_client = redis::Client::open(REDIS_URL).unwrap(); + let redis_conn = Arc::new(redis_client.get_connection_manager().await.unwrap()); let handler = DelayTestJobHandler; diff --git a/twmq/tests/idempotency_modes.rs b/twmq/tests/idempotency_modes.rs index 7194207..ce1d0f3 100644 --- a/twmq/tests/idempotency_modes.rs +++ b/twmq/tests/idempotency_modes.rs @@ -8,7 +8,7 @@ use twmq::{ DurableExecution, Queue, job::{BorrowedJob, JobResult, JobStatus}, queue::{IdempotencyMode, QueueOptions}, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; @@ -66,9 +66,9 @@ impl DurableExecution for TestJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/lease_expiry.rs b/twmq/tests/lease_expiry.rs index 6ed420b..773c100 100644 --- a/twmq/tests/lease_expiry.rs +++ b/twmq/tests/lease_expiry.rs @@ -12,7 +12,7 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, JobResult, JobStatus}, queue::QueueOptions, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; mod fixtures; @@ -21,9 +21,9 @@ use fixtures::TestJobErrorData; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index e167123..3f8c398 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -90,7 +90,7 @@ impl MultilaneTestHarness { /// Clean up all Redis keys for this test async fn cleanup(&self) { let mut conn = self.queue.redis.clone(); - let keys_pattern = format!("twmq_multilane:{}:{}:*", twmq::ENGINE_HASH_TAG, self.queue_id); + let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -165,8 +165,7 @@ impl Drop for MultilaneTestHarness { tokio::spawn(async move { let mut conn = redis; - let keys_pattern = - format!("twmq_multilane:{}:{queue_id}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq_multilane:{queue_id}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/nack.rs b/twmq/tests/nack.rs index 1c58eec..f66cd9f 100644 --- a/twmq/tests/nack.rs +++ b/twmq/tests/nack.rs @@ -15,7 +15,7 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, JobError, JobResult, JobStatus, RequeuePosition}, queue::QueueOptions, - redis::cluster_async::ClusterConnection, + redis::aio::ConnectionManager, }; mod fixtures; @@ -24,9 +24,9 @@ use fixtures::TestJobErrorData; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/prune_race_condition.rs b/twmq/tests/prune_race_condition.rs index bbbf7cb..c9898f9 100644 --- a/twmq/tests/prune_race_condition.rs +++ b/twmq/tests/prune_race_condition.rs @@ -6,7 +6,7 @@ mod fixtures; use fixtures::TestJobErrorData; -use redis::cluster_async::ClusterConnection; +use redis::aio::ConnectionManager; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -146,9 +146,9 @@ impl DurableExecution for EoaSimulatorJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs index 143bba5..fd5e9ea 100644 --- a/twmq/tests/prune_race_random_ids.rs +++ b/twmq/tests/prune_race_random_ids.rs @@ -4,7 +4,7 @@ mod fixtures; use fixtures::TestJobErrorData; -use redis::{AsyncCommands, cluster_async::ClusterConnection}; +use redis::{AsyncCommands, aio::ConnectionManager}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -121,9 +121,9 @@ impl DurableExecution for RandomJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); + let keys_pattern = format!("twmq:{queue_name}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -239,7 +239,7 @@ async fn test_prune_with_random_ids() { let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); // Count how many job metadata hashes still exist (should match success list length if pruning works) - let meta_pattern = format!("twmq:{}:{}:job:*:meta", twmq::ENGINE_HASH_TAG, queue.name()); + let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); let meta_keys: Vec = redis::cmd("KEYS") .arg(&meta_pattern) .query_async(&mut conn)