Skip to content

Fix sub-second queue delays in TWMQ #99

Merged
0xFirekeeper merged 7 commits intomainfrom
firekeeper/twmq-delay
Mar 19, 2026
Merged

Fix sub-second queue delays in TWMQ #99
0xFirekeeper merged 7 commits intomainfrom
firekeeper/twmq-delay

Conversation

@0xFirekeeper
Copy link
Member

@0xFirekeeper 0xFirekeeper commented Mar 19, 2026

Sub-second delayed requeues in TWMQ were being truncated to 0s because queue scheduling used Duration::as_secs().

That meant values like 200ms were effectively treated as immediate retries, which could cause hot-looping at queue poll cadence.

This change adds a small helper that rounds any non-zero sub-second delay up to 1s and uses it consistently in:

  • queue push scheduling
  • hook scheduling
  • multilane queue scheduling

Summary by CodeRabbit

  • Chores

    • Switched Redis usage from cluster clients to a single connection-manager model and simplified Redis key namespaces (removed engine hash-tag).
    • Removed TLS- and clustering-related dependency features and cleaned dependency configs.
    • Added a timeout to the coverage CI run.
  • Bug Fixes / Behavior

    • Normalized delay-to-queue scheduling to avoid zero/near-zero delay inconsistencies.
  • Tests

    • Updated tests and cleanup helpers to match Redis/key changes.

Sub-second delayed requeues in TWMQ were being truncated to 0s because queue scheduling used Duration::as_secs().

That meant values like 200ms were effectively treated as immediate retries, which could cause hot-looping at queue poll cadence.

This change adds a small helper that rounds any non-zero sub-second delay up to 1s and uses it consistently in:
- queue push scheduling
- hook scheduling
- multilane queue scheduling
@coderabbitai
Copy link

coderabbitai bot commented Mar 19, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4669c887-186a-414b-9a77-2d6d5215c6c1

📥 Commits

Reviewing files that changed from the base of the PR and between dd0b108 and 90081a7.

📒 Files selected for processing (4)
  • executors/src/eoa/store/atomic.rs
  • executors/src/eoa/store/borrowed.rs
  • executors/src/eoa/store/submitted.rs
  • executors/src/solana_executor/rpc_cache.rs

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.


Walkthrough

This PR migrates Redis usage from cluster-mode to single-client connection managers across the codebase, removes the ENGINE_HASH_TAG key namespace, simplifies structured logging spans/fields, adds delay normalization via delay_to_queue_seconds, and updates Cargo/dependency and CI coverage command options.

Changes

Cohort / File(s) Summary
Dependency & CI
Cargo.toml, server/Cargo.toml, .github/workflows/coverage-twmq.yaml
Removed workspace rustls entry and removed Redis cluster/TLS feature flags from redis dependency; updated CI coverage command to add --timeout 300.
Core RPC
core/src/chain.rs
Removed RpcCredentials::client_id_for_logs(&self) -> Option<&str> helper.
TWMQ core & queue infra
twmq/src/lib.rs, twmq/src/queue.rs, twmq/src/multilane.rs, twmq/src/hooks.rs
Replaced cluster client/connection types with redis::Client/redis::aio::ConnectionManager; removed ENGINE_HASH_TAG constant and updated Redis key formats; added delay_to_queue_seconds(Duration) -> u64; adjusted Lua scripts to take queue_id via KEYS.
Executors: EOA stores & workers
executors/src/eoa/store/..., executors/src/eoa/worker/...
Changed Redis types from ClusterConnection to ConnectionManager across stores and job handler; removed EOA_QUEUE_ID usage; simplified webhook and error logging by removing chain_id, client_id, and queue_id structured fields.
Executors: EIP7702 & transaction registry
executors/src/eip7702_executor/..., executors/src/transaction_registry.rs
Removed EIP7702 queue ID constants; reduced tracing span fields and error log fields; switched registry to ConnectionManager and removed ENGINE_HASH_TAG from registry keys.
Executors: Solana & external bundler
executors/src/solana_executor/..., executors/src/external_bundler/deployment.rs
Switched to ConnectionManager for storage/locks/cache; removed ENGINE_HASH_TAG from key construction; constructors now accept redis::Client where appropriate and build a ConnectionManager.
Server & Queue manager
server/src/main.rs, server/src/execution_router/mod.rs, server/src/queue/manager.rs
Replaced cluster client/connection creation with redis::Client::open(...) + get_connection_manager(); ExecutionRouter now holds a ConnectionManager; removed explicit rustls provider install; adjusted JSON tracing call.
Integration tests & test harnesses
integration-tests/tests/setup.rs, twmq/tests/*
Test setup and cleanup helpers updated to use ConnectionManager and to match simplified Redis key patterns that omit ENGINE_HASH_TAG.
Misc small changes
executors/src/solana_executor/worker.rs, executors/src/solana_executor/rpc_cache.rs, other logging tweaks
Whitespace/formatting tweaks; avoid unnecessary clone in RpcCacheKey construction; simplified several tracing messages and removed queue-related constants from logs.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly describes the main change: fixing sub-second queue delays in TWMQ by adding a delay normalization helper.
Docstring Coverage ✅ Passed Docstring coverage is 94.44% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch firekeeper/twmq-delay
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
executors/src/eoa/store/mod.rs (1)

122-293: ⚠️ Potential issue | 🟠 Major

Add a migration path for the EOA key rename.

Lines 122-293 change every persisted key this store uses. That makes already-pending/submitted/borrowed transactions, cached nonce state, and health data invisible after rollout, so a worker restarted against a warm Redis can come up with an empty view of in-flight work and re-sync/reprocess unnecessarily.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/store/mod.rs` around lines 122 - 293, The change renames
every persisted Redis key (e.g., eoa_lock_key_name, transaction_data_key_name,
transaction_attempts_list_name, pending_transactions_zset_name,
submitted_transactions_zset_name, transaction_hash_to_id_key_name,
borrowed_transactions_hashmap_name, recycled_nonces_zset_name,
optimistic_transaction_count_key_name, last_transaction_count_key_name,
eoa_health_key_name, manual_reset_key_name), which will make existing in-flight
state invisible; add a migration step run at store initialization that detects
and migrates data from the old key names to the new names (or atomically RENAMEn
where supported): implement a routine that for each of the above name generators
computes both old and new key names, checks existence (or scans for patterns for
lists/zsets/hashes), copies or RENAMEs the data preserving types and TTLs, and
deletes the old keys only after successful copy; ensure this runs once per EOA
(use eoa_lock_key_name to guard concurrent migrations) and log/metric failures
and fall back to checking both old and new keys in getters until migration is
confirmed.
🧹 Nitpick comments (5)
twmq/tests/basic.rs (1)

19-35: Move this Redis cleanup helper into shared fixtures.

You already have a fixtures module here, and the same KEYS/DEL helper now exists in several tests (basic.rs, basic_hook.rs, idempotency_modes.rs, nack.rs, prune_race_condition.rs, prune_race_random_ids.rs). Centralizing it would avoid another repo-wide edit the next time the key prefix or connection type changes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/basic.rs` around lines 19 - 35, Extract the async helper
cleanup_redis_keys into the existing fixtures module as a shared public async
function (preserve its behavior and signature so it accepts a &ConnectionManager
and queue_name: &str), remove the duplicate copies in basic.rs, basic_hook.rs,
idempotency_modes.rs, nack.rs, prune_race_condition.rs, and
prune_race_random_ids.rs, and update those tests to call
fixtures::cleanup_redis_keys(...) instead; ensure the fixtures version logs the
same pattern, uses the same redis KEYs/DEL logic, and remains async so existing
await call sites need no changes.
executors/src/eoa/worker/send.rs (1)

349-353: Keep chain_id on these terminal error events.

Dropping client_id / queue_id is fine, but these are the logs you'll alert on when preparation or broadcast fails. With only transaction_id / nonce, multi-chain incidents are much harder to segment once the surrounding lifecycle logs are absent or sampled.

Suggested tweak
                         tracing::error!(
                             error = ?e,
                             transaction_id = pending.transaction_id,
+                            chain_id = self.chain_id,
                             "Transaction permanently failed due to non-retryable preparation error",
                         );
...
-                            tracing::error!(error = ?e, transaction_id = borrowed_tx.transaction_id, nonce = borrowed_tx.data.signed_transaction.nonce(), "Transaction nack error during send");
+                            tracing::error!(
+                                error = ?e,
+                                transaction_id = borrowed_tx.transaction_id,
+                                nonce = borrowed_tx.data.signed_transaction.nonce(),
+                                chain_id = self.chain_id,
+                                "Transaction nack error during send"
+                            );
...
-                            tracing::error!(error = ?e, transaction_id = borrowed_tx.transaction_id, nonce = borrowed_tx.data.signed_transaction.nonce(), "Transaction failed during send");
+                            tracing::error!(
+                                error = ?e,
+                                transaction_id = borrowed_tx.transaction_id,
+                                nonce = borrowed_tx.data.signed_transaction.nonce(),
+                                chain_id = self.chain_id,
+                                "Transaction failed during send"
+                            );

Also applies to: 550-556

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/worker/send.rs` around lines 349 - 353, The terminal error
logs (the tracing::error! call that currently sets error = ?e and transaction_id
= pending.transaction_id) must also include the chain identifier so multi-chain
incidents can be segmented; update that tracing::error! invocation to add
chain_id = pending.chain_id (or the correct chain id field on the
PendingTransaction struct) and do the same for the equivalent terminal error
block around the later tracing::error! at lines ~550-556 so both preparation and
broadcast terminal errors include chain_id along with transaction_id and error.
executors/src/eoa/worker/confirm.rs (1)

409-409: Logging simplification removes potentially useful context.

The simplified error logs lose structured fields like transaction_id, chain_id, and client_id that could aid debugging. However, the method is already instrumented with worker_id (line 33), and the error itself contains relevant details.

This is acceptable if it's a deliberate decision to reduce log verbosity consistently across the codebase, but consider whether the lost context might impact production debugging.

Also applies to: 415-415

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/worker/confirm.rs` at line 409, Restore the structured
fields to the error logs where you currently call tracing::error!("Failed to
update balance threshold: {}", e); — include transaction_id, chain_id, client_id
(and keep worker_id) as key-value pairs and attach the error (e) as the message
or error field so the logs remain queryable; update both occurrences (the
tracing::error call at the update-balance-threshold block and the second
occurrence around the same logic) to use
tracing::error!(transaction_id=%transaction_id, chain_id=%chain_id,
client_id=%client_id, worker_id=%worker_id, error=%e, "Failed to update balance
threshold") so structured context is preserved.
server/src/queue/manager.rs (1)

258-258: Multiple get_connection_manager() calls are acceptable but worth noting.

Each call to get_connection_manager().await? creates a new ConnectionManager instance. This is fine since ConnectionManager internally shares the underlying connection pool via Arc, making clones cheap. However, you could alternatively create a single ConnectionManager upfront and clone it for each use if you prefer explicit resource sharing.

Also applies to: 289-289

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/src/queue/manager.rs` at line 258, Multiple calls to
redis_client.get_connection_manager().await? create separate ConnectionManager
instances (though cheap); change to create a single ConnectionManager once,
assign it to a local variable (e.g., let conn_mgr =
redis_client.get_connection_manager().await? ) and then reuse/clone conn_mgr for
each struct field or function that currently calls
redis_client.get_connection_manager().await?, replacing those direct calls so
all places (references to get_connection_manager(), redis_client, and the struct
field redis) use the single conn_mgr cloned/shared value.
executors/src/eip7702_executor/confirm.rs (1)

171-171: Logging fields reduced - consider retaining chain_id for observability.

The chain_id field was removed from the tracing instrument. While transaction_id is the primary correlation key, chain_id is valuable for filtering logs by chain during operational debugging. Consider keeping it:

-#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
+#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, stage = Self::stage_name(), executor = Self::executor_name()))]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eip7702_executor/confirm.rs` at line 171, The tracing attribute
on the method (#[tracing::instrument(...)] with fields(transaction_id =
job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))
removed chain_id; restore it by adding a chain_id field (e.g.,
fields(transaction_id = job.job.id, chain_id = job.job.chain_id, stage =
Self::stage_name(), executor = Self::executor_name())) so logs include the chain
identifier for better observability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@executors/src/solana_executor/storage.rs`:
- Around line 133-146: The rename of Redis keys in attempt_key and lock_key
breaks compatibility with existing in-flight transaction keys; update the code
to support legacy keys by creating helpers (e.g., attempt_keys and lock_keys)
that return both the new key (solana_tx_attempt:{transaction_id} /
solana_tx_lock:{transaction_id} with namespace) and the legacy key format, then
change all call sites that read or acquire locks/attempts to check for an
existing key among the returned keys (use the first existing key) and when
writing/set the value write to both keys (or migrate the existing legacy key to
the new key) so rolling deploys do not reopen the duplicate-send window. Ensure
you update functions/methods that call attempt_key and lock_key to use these new
helpers (or adjust attempt_key/lock_key to return both formats) and handle
namespace variants consistently.

In `@twmq/src/lib.rs`:
- Around line 190-236: The key-naming change in functions like
pending_list_name, active_hash_name, delayed_zset_name, job_data_hash_name,
job_meta_hash_name, job_errors_list_name, job_result_hash_name, dedupe_set_name,
pending_cancellation_set_name and lease_key_name will leave existing jobs under
the old prefix inaccessible; restore compatibility by implementing either a
dual-read/dual-write mode (write new keys but read both old and new prefixes
when enqueuing/consuming) or add a one-off migration that iterates old-prefixed
keys and renames/migrates them to the new prefix before deploy; update the queue
client code paths that call these functions to attempt reads from the old prefix
fallback (e.g., check both pending_list_name() and the legacy "twmq:{}:pending"
prefix) and write to both prefixes during the transition, or perform the
migration using the same helper name functions to atomically move keys.
- Around line 31-41: The bug is that rounding sub-second delays to 1s and
separately using now.as_secs() moves the target into the next whole-second
bucket too early; change the logic so you compute the target second by adding
delay to the current instant and taking the ceiling to whole seconds instead of
separately rounding delay then using now.as_secs(). Concretely: update
delay_to_queue_seconds to return the ceiling of delay in seconds (use
delay.as_secs() + 1 when delay.subsec_nanos() > 0, but return 0 for zero delay),
and replace places that derive now with as_secs() (the code that computes the
queue/expiry second using now.as_secs(), referenced in the same modules) with
logic that computes target_sec = (now + delay).as_secs() plus one if (now +
delay).subsec_nanos() > 0 (i.e., compute ceil(now + delay) to whole seconds) so
sub-second delays are scheduled against the correct future second bucket.
- Around line 21-22: The code clones redis::aio::ConnectionManager into spawned
tasks and uses WATCH/MULTI/EXEC on that multiplexed manager, which is unsafe
because WATCH state is connection-local; replace those clones by either (A)
opening a dedicated stateful async connection for each job completion task (use
client.get_async_connection() / redis::aio::Connection and run WATCH/MULTI/EXEC
on that single connection, then drop it when done) or (B) eliminate
WATCH/MULTI/EXEC and implement the lease-check+update as a single atomic Redis
Lua script invoked with EVAL/EVALSHA from the spawned tasks; update the sections
that currently clone ConnectionManager and call WATCH/MULTI/EXEC to use one of
these two approaches (refer to uses of ConnectionManager, WATCH, MULTI, EXEC in
the job-completion/spawned-task code).

In `@twmq/src/multilane.rs`:
- Around line 222-223: The multilane requeue logic uses
delay_to_queue_seconds(delay.delay) and truncates current time with as_secs(),
causing sub-second delays (e.g., 200ms) to collapse into the next whole-second
bucket; change the math to use sub-second precision (milliseconds or
nanoseconds) for both the delay computation and the current-time comparison.
Concretely, replace delay_to_queue_seconds(delay.delay) and any uses of
as_secs() in the multilane requeue path with a millisecond/nanosecond variant
(e.g., delay_to_queue_millis or compute delay.delay.as_millis()/as_nanos()) and
compare against now.as_millis()/as_nanos() (or compute queue_instant = now +
delay and compare Instants) so that 200ms delays scheduled near second
boundaries are honored; update uses around delay.position and the matching
requeue/selection code (the places calling delay_to_queue_seconds and as_secs())
to the same higher-resolution unit so both sides use identical precision.

---

Outside diff comments:
In `@executors/src/eoa/store/mod.rs`:
- Around line 122-293: The change renames every persisted Redis key (e.g.,
eoa_lock_key_name, transaction_data_key_name, transaction_attempts_list_name,
pending_transactions_zset_name, submitted_transactions_zset_name,
transaction_hash_to_id_key_name, borrowed_transactions_hashmap_name,
recycled_nonces_zset_name, optimistic_transaction_count_key_name,
last_transaction_count_key_name, eoa_health_key_name, manual_reset_key_name),
which will make existing in-flight state invisible; add a migration step run at
store initialization that detects and migrates data from the old key names to
the new names (or atomically RENAMEn where supported): implement a routine that
for each of the above name generators computes both old and new key names,
checks existence (or scans for patterns for lists/zsets/hashes), copies or
RENAMEs the data preserving types and TTLs, and deletes the old keys only after
successful copy; ensure this runs once per EOA (use eoa_lock_key_name to guard
concurrent migrations) and log/metric failures and fall back to checking both
old and new keys in getters until migration is confirmed.

---

Nitpick comments:
In `@executors/src/eip7702_executor/confirm.rs`:
- Line 171: The tracing attribute on the method (#[tracing::instrument(...)]
with fields(transaction_id = job.job.id, stage = Self::stage_name(), executor =
Self::executor_name())) removed chain_id; restore it by adding a chain_id field
(e.g., fields(transaction_id = job.job.id, chain_id = job.job.chain_id, stage =
Self::stage_name(), executor = Self::executor_name())) so logs include the chain
identifier for better observability.

In `@executors/src/eoa/worker/confirm.rs`:
- Line 409: Restore the structured fields to the error logs where you currently
call tracing::error!("Failed to update balance threshold: {}", e); — include
transaction_id, chain_id, client_id (and keep worker_id) as key-value pairs and
attach the error (e) as the message or error field so the logs remain queryable;
update both occurrences (the tracing::error call at the update-balance-threshold
block and the second occurrence around the same logic) to use
tracing::error!(transaction_id=%transaction_id, chain_id=%chain_id,
client_id=%client_id, worker_id=%worker_id, error=%e, "Failed to update balance
threshold") so structured context is preserved.

In `@executors/src/eoa/worker/send.rs`:
- Around line 349-353: The terminal error logs (the tracing::error! call that
currently sets error = ?e and transaction_id = pending.transaction_id) must also
include the chain identifier so multi-chain incidents can be segmented; update
that tracing::error! invocation to add chain_id = pending.chain_id (or the
correct chain id field on the PendingTransaction struct) and do the same for the
equivalent terminal error block around the later tracing::error! at lines
~550-556 so both preparation and broadcast terminal errors include chain_id
along with transaction_id and error.

In `@server/src/queue/manager.rs`:
- Line 258: Multiple calls to redis_client.get_connection_manager().await?
create separate ConnectionManager instances (though cheap); change to create a
single ConnectionManager once, assign it to a local variable (e.g., let conn_mgr
= redis_client.get_connection_manager().await? ) and then reuse/clone conn_mgr
for each struct field or function that currently calls
redis_client.get_connection_manager().await?, replacing those direct calls so
all places (references to get_connection_manager(), redis_client, and the struct
field redis) use the single conn_mgr cloned/shared value.

In `@twmq/tests/basic.rs`:
- Around line 19-35: Extract the async helper cleanup_redis_keys into the
existing fixtures module as a shared public async function (preserve its
behavior and signature so it accepts a &ConnectionManager and queue_name: &str),
remove the duplicate copies in basic.rs, basic_hook.rs, idempotency_modes.rs,
nack.rs, prune_race_condition.rs, and prune_race_random_ids.rs, and update those
tests to call fixtures::cleanup_redis_keys(...) instead; ensure the fixtures
version logs the same pattern, uses the same redis KEYs/DEL logic, and remains
async so existing await call sites need no changes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 577b9dc4-b2e9-4dd7-a9c1-cae931a219eb

📥 Commits

Reviewing files that changed from the base of the PR and between b5b14a5 and dd0b108.

📒 Files selected for processing (35)
  • .github/workflows/coverage-twmq.yaml
  • Cargo.toml
  • core/src/chain.rs
  • executors/src/eip7702_executor/confirm.rs
  • executors/src/eip7702_executor/send.rs
  • executors/src/eoa/store/atomic.rs
  • executors/src/eoa/store/borrowed.rs
  • executors/src/eoa/store/mod.rs
  • executors/src/eoa/store/pending.rs
  • executors/src/eoa/store/submitted.rs
  • executors/src/eoa/worker/confirm.rs
  • executors/src/eoa/worker/mod.rs
  • executors/src/eoa/worker/send.rs
  • executors/src/external_bundler/deployment.rs
  • executors/src/solana_executor/storage.rs
  • executors/src/solana_executor/worker.rs
  • executors/src/transaction_registry.rs
  • integration-tests/tests/setup.rs
  • server/Cargo.toml
  • server/src/execution_router/mod.rs
  • server/src/main.rs
  • server/src/queue/manager.rs
  • twmq/src/hooks.rs
  • twmq/src/lib.rs
  • twmq/src/multilane.rs
  • twmq/src/queue.rs
  • twmq/tests/basic.rs
  • twmq/tests/basic_hook.rs
  • twmq/tests/delay.rs
  • twmq/tests/idempotency_modes.rs
  • twmq/tests/lease_expiry.rs
  • twmq/tests/multilane_batch_pop.rs
  • twmq/tests/nack.rs
  • twmq/tests/prune_race_condition.rs
  • twmq/tests/prune_race_random_ids.rs
💤 Files with no reviewable changes (2)
  • server/Cargo.toml
  • core/src/chain.rs

Comment on lines 133 to 146
/// Get the Redis key for a transaction's attempt
fn attempt_key(&self, transaction_id: &str) -> String {
match &self.namespace {
Some(ns) => {
format!("{ns}:{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG)
}
None => format!("{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG),
Some(ns) => format!("{ns}:solana_tx_attempt:{transaction_id}"),
None => format!("solana_tx_attempt:{transaction_id}"),
}
}

/// Get the Redis key for a transaction's lock
fn lock_key(&self, transaction_id: &str) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG),
None => format!("{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG),
Some(ns) => format!("{ns}:solana_tx_lock:{transaction_id}"),
None => format!("solana_tx_lock:{transaction_id}"),
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This key rename reopens the duplicate-send window during rollout.

Lines 136-145 rename the attempt and lock keys without any compatibility path. If Redis already contains an old-format attempt or lock for an in-flight Solana transaction, the new code won't see it and can reacquire/send the same transaction again after deploy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/solana_executor/storage.rs` around lines 133 - 146, The rename
of Redis keys in attempt_key and lock_key breaks compatibility with existing
in-flight transaction keys; update the code to support legacy keys by creating
helpers (e.g., attempt_keys and lock_keys) that return both the new key
(solana_tx_attempt:{transaction_id} / solana_tx_lock:{transaction_id} with
namespace) and the legacy key format, then change all call sites that read or
acquire locks/attempts to check for an existing key among the returned keys (use
the first existing key) and when writing/set the value write to both keys (or
migrate the existing legacy key to the new key) so rolling deploys do not reopen
the duplicate-send window. Ensure you update functions/methods that call
attempt_key and lock_key to use these new helpers (or adjust
attempt_key/lock_key to return both formats) and handle namespace variants
consistently.

Comment on lines +21 to 22
use redis::{AsyncCommands, RedisResult, aio::ConnectionManager};
use serde::{Serialize, de::DeserializeOwned};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, explore the repository structure
find . -type f -name "*.rs" | head -20

Repository: thirdweb-dev/engine-core

Length of output: 668


🏁 Script executed:

# Read the specific file to understand the context
wc -l twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 88


🏁 Script executed:

# Read the imports and the complete_job function
head -30 twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 829


🏁 Script executed:

# Read lines 136-152 to see the complete_job function
sed -n '130,160p' twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 839


🏁 Script executed:

# Read the complete_job function and surrounding context
sed -n '125,165p' twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 1025


🏁 Script executed:

# Search for the complete_job function
grep -n "fn complete_job" twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 143


🏁 Script executed:

# Read the complete_job function at line 1207
sed -n '1207,1270p' twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 2515


🏁 Script executed:

# Continue reading the complete_job function
sed -n '1270,1305p' twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 1539


🏁 Script executed:

# Check if there are other usages of complete_job to understand concurrency patterns
grep -n "complete_job" twmq/src/lib.rs | head -20

Repository: thirdweb-dev/engine-core

Length of output: 348


🏁 Script executed:

# Look for where complete_job is called and understand task execution context
grep -B5 "complete_job(" twmq/src/lib.rs | head -30

Repository: thirdweb-dev/engine-core

Length of output: 662


🏁 Script executed:

# Check the task execution context to understand concurrency
sed -n '500,550p' twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 2815


🏁 Script executed:

# Look for where queue_clone comes from to understand cloning patterns
sed -n '480,530p' twmq/src/lib.rs

Repository: thirdweb-dev/engine-core

Length of output: 2917


🏁 Script executed:

# Check for any comments in the code about ConnectionManager thread-safety or state
grep -n "ConnectionManager\|WATCH\|MULTI\|EXEC" twmq/src/lib.rs | head -30

Repository: thirdweb-dev/engine-core

Length of output: 966


🏁 Script executed:

# Look at the redis crate version to verify documentation claims
cat Cargo.toml | grep -A5 "^\[dependencies\]" | grep redis

Repository: thirdweb-dev/engine-core

Length of output: 50


🏁 Script executed:

# Find and read Cargo.toml for the twmq package
find . -path "*/twmq/Cargo.toml" -type f

Repository: thirdweb-dev/engine-core

Length of output: 85


🏁 Script executed:

# List files in twmq directory
ls -la twmq/

Repository: thirdweb-dev/engine-core

Length of output: 403


🏁 Script executed:

# Read the Cargo.toml to check redis version
cat twmq/Cargo.toml

Repository: thirdweb-dev/engine-core

Length of output: 680


🏁 Script executed:

# Check the workspace Cargo.toml for redis version
find . -maxdepth 1 -name "Cargo.toml" -type f | xargs cat | grep -A100 "^\[workspace"

Repository: thirdweb-dev/engine-core

Length of output: 2282


🏁 Script executed:

# Check if there are any mutex locks or synchronization around WATCH operations
grep -B10 -A10 "WATCH" twmq/src/lib.rs | head -50

Repository: thirdweb-dev/engine-core

Length of output: 2048


🏁 Script executed:

# Look for documentation or comments about ConnectionManager usage
grep -n "ConnectionManager\|TODO\|FIXME\|NOTE" twmq/src/lib.rs | head -20

Repository: thirdweb-dev/engine-core

Length of output: 173


🏁 Script executed:

# Check redis-rs source or documentation about ConnectionManager and WATCH
# Let's search for any usage patterns or tests that might show the intended usage
grep -r "WATCH" twmq/ | head -20

Repository: thirdweb-dev/engine-core

Length of output: 1419


🏁 Script executed:

# Check if there are any tests that demonstrate the expected behavior
find twmq -name "*.rs" -type f -exec grep -l "WATCH\|complete_job" {} \;

Repository: thirdweb-dev/engine-core

Length of output: 105


🏁 Script executed:

# Let's check the multilane.rs file to see if there's a different pattern there
sed -n '1,100p' twmq/src/multilane.rs

Repository: thirdweb-dev/engine-core

Length of output: 2925


🏁 Script executed:

# Look for any synchronization primitives (Mutex, RwLock, etc.) in the Queue implementation
grep -n "Mutex\|RwLock\|Arc\|lock\|synchron" twmq/src/lib.rs | head -30

Repository: thirdweb-dev/engine-core

Length of output: 485


🏁 Script executed:

# Let's verify the exact behavior by checking redis-rs documentation patterns
# Search for any Connection (non-multiplexed) usage or alternative patterns
grep -n "Connection\|RedisResult" twmq/src/lib.rs | head -20

Repository: thirdweb-dev/engine-core

Length of output: 230


🏁 Script executed:

# Check if there are integration tests that might show expected behavior
find twmq/tests -name "*.rs" -type f | xargs cat 2>/dev/null | head -100

Repository: thirdweb-dev/engine-core

Length of output: 3236


🏁 Script executed:

# Let me understand how redis-rs ConnectionManager handles concurrent WATCH operations
# by checking if there are any safeguards or documentation
web_search "redis-rs ConnectionManager WATCH MULTI EXEC concurrent safe"

Repository: thirdweb-dev/engine-core

Length of output: 118


🏁 Script executed:

# Also check if the code has any mutex or locking around WATCH/MULTI/EXEC blocks
grep -B20 -A5 "let mut conn = self.redis.clone()" twmq/src/lib.rs | head -60

Repository: thirdweb-dev/engine-core

Length of output: 2237


ConnectionManager with WATCH/MULTI/EXEC requires exclusive connection-level isolation.

The concurrent job processing pattern (lines 1254-1297 and 1360-1400) clones ConnectionManager in spawned tasks without connection-level synchronization. Redis WATCH/UNWATCH state is tied to the underlying connection, and ConnectionManager's multiplexed design means concurrent clones can interfere with transaction state. While the retry loop on WATCH failure masks some race conditions, it does not guarantee correctness.

Replace with either:

  • A dedicated stateful async connection per job completion
  • A Lua script that atomically handles the lease check and updates without WATCH
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/lib.rs` around lines 21 - 22, The code clones
redis::aio::ConnectionManager into spawned tasks and uses WATCH/MULTI/EXEC on
that multiplexed manager, which is unsafe because WATCH state is
connection-local; replace those clones by either (A) opening a dedicated
stateful async connection for each job completion task (use
client.get_async_connection() / redis::aio::Connection and run WATCH/MULTI/EXEC
on that single connection, then drop it when done) or (B) eliminate
WATCH/MULTI/EXEC and implement the lease-check+update as a single atomic Redis
Lua script invoked with EVAL/EVALSHA from the spawned tasks; update the sections
that currently clone ConnectionManager and call WATCH/MULTI/EXEC to use one of
these two approaches (refer to uses of ConnectionManager, WATCH, MULTI, EXEC in
the job-completion/spawned-task code).

Comment on lines +31 to +41
pub(crate) fn delay_to_queue_seconds(delay: Duration) -> u64 {
let delay_secs = delay.as_secs();

if delay.is_zero() {
0
} else if delay_secs == 0 {
1
} else {
delay_secs
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The sub-second delay fix is still early at second boundaries.

Lines 291 and 1037 still derive now with as_secs(), so rounding a non-zero sub-second delay up to 1 only moves the job into the next whole-second bucket. If a nack happens at xx.999, a 200ms delay still gets stored for xx+1 and can become eligible almost immediately on the next poll.

Also applies to: 291-325, 1037-1073

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/lib.rs` around lines 31 - 41, The bug is that rounding sub-second
delays to 1s and separately using now.as_secs() moves the target into the next
whole-second bucket too early; change the logic so you compute the target second
by adding delay to the current instant and taking the ceiling to whole seconds
instead of separately rounding delay then using now.as_secs(). Concretely:
update delay_to_queue_seconds to return the ceiling of delay in seconds (use
delay.as_secs() + 1 when delay.subsec_nanos() > 0, but return 0 for zero delay),
and replace places that derive now with as_secs() (the code that computes the
queue/expiry second using now.as_secs(), referenced in the same modules) with
logic that computes target_sec = (now + delay).as_secs() plus one if (now +
delay).subsec_nanos() > 0 (i.e., compute ceil(now + delay) to whole seconds) so
sub-second delays are scheduled against the correct future second bucket.

Comment on lines 190 to 236
pub fn pending_list_name(&self) -> String {
format!("twmq:{}:{}:pending", ENGINE_HASH_TAG, self.name())
format!("twmq:{}:pending", self.name())
}

pub fn active_hash_name(&self) -> String {
format!("twmq:{}:{}:active", ENGINE_HASH_TAG, self.name)
format!("twmq:{}:active", self.name)
}

pub fn delayed_zset_name(&self) -> String {
format!("twmq:{}:{}:delayed", ENGINE_HASH_TAG, self.name)
format!("twmq:{}:delayed", self.name)
}

pub fn success_list_name(&self) -> String {
format!("twmq:{}:{}:success", ENGINE_HASH_TAG, self.name)
format!("twmq:{}:success", self.name)
}

pub fn failed_list_name(&self) -> String {
format!("twmq:{}:{}:failed", ENGINE_HASH_TAG, self.name)
format!("twmq:{}:failed", self.name)
}

pub fn job_data_hash_name(&self) -> String {
format!("twmq:{}:{}:jobs:data", ENGINE_HASH_TAG, self.name)
format!("twmq:{}:jobs:data", self.name)
}

pub fn job_meta_hash_name(&self, job_id: &str) -> String {
format!("twmq:{}:{}:job:{}:meta", ENGINE_HASH_TAG, self.name, job_id)
format!("twmq:{}:job:{}:meta", self.name, job_id)
}

pub fn job_errors_list_name(&self, job_id: &str) -> String {
format!(
"twmq:{}:{}:job:{}:errors",
ENGINE_HASH_TAG, self.name, job_id
)
format!("twmq:{}:job:{}:errors", self.name, job_id)
}

pub fn job_result_hash_name(&self) -> String {
format!("twmq:{}:{}:jobs:result", ENGINE_HASH_TAG, self.name)
format!("twmq:{}:jobs:result", self.name)
}

pub fn dedupe_set_name(&self) -> String {
format!("twmq:{}:{}:dedup", ENGINE_HASH_TAG, self.name)
format!("twmq:{}:dedup", self.name)
}

pub fn pending_cancellation_set_name(&self) -> String {
format!(
"twmq:{}:{}:pending_cancellations",
ENGINE_HASH_TAG, self.name
)
format!("twmq:{}:pending_cancellations", self.name)
}

pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String {
format!(
"twmq:{}:{}:job:{}:lease:{}",
ENGINE_HASH_TAG, self.name, job_id, lease_token
)
format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This namespace change strands existing queued jobs on deploy.

Lines 190-236 rename every list/hash/zset/lease key the queue uses. Without a dual-read/write phase or a one-off migration, workers from the new version will see an empty queue while older pending/delayed/active job state is still sitting under the previous prefix.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/lib.rs` around lines 190 - 236, The key-naming change in functions
like pending_list_name, active_hash_name, delayed_zset_name, job_data_hash_name,
job_meta_hash_name, job_errors_list_name, job_result_hash_name, dedupe_set_name,
pending_cancellation_set_name and lease_key_name will leave existing jobs under
the old prefix inaccessible; restore compatibility by implementing either a
dual-read/dual-write mode (write new keys but read both old and new prefixes
when enqueuing/consuming) or add a one-off migration that iterates old-prefixed
keys and renames/migrates them to the new prefix before deploy; update the queue
client code paths that call these functions to attempt reads from the old prefix
fallback (e.g., check both pending_list_name() and the legacy "twmq:{}:pending"
prefix) and write to both prefixes during the transition, or perform the
migration using the same helper name functions to atomically move keys.

Comment on lines +222 to 223
let delay_secs = delay_to_queue_seconds(delay.delay);
let position_string = delay.position.to_string();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Multilane requeues still collapse to the next second bucket.

Line 222 and Line 1229 reuse the same whole-second delay rounding while the surrounding code still truncates the current time with as_secs(). A 200ms lane delay scheduled near a second boundary can therefore be picked up almost immediately, so the hot-loop fix is still incomplete for multilane jobs.

Also applies to: 1224-1242

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/multilane.rs` around lines 222 - 223, The multilane requeue logic
uses delay_to_queue_seconds(delay.delay) and truncates current time with
as_secs(), causing sub-second delays (e.g., 200ms) to collapse into the next
whole-second bucket; change the math to use sub-second precision (milliseconds
or nanoseconds) for both the delay computation and the current-time comparison.
Concretely, replace delay_to_queue_seconds(delay.delay) and any uses of
as_secs() in the multilane requeue path with a millisecond/nanosecond variant
(e.g., delay_to_queue_millis or compute delay.delay.as_millis()/as_nanos()) and
compare against now.as_millis()/as_nanos() (or compute queue_instant = now +
delay and compare Instants) so that 200ms delays scheduled near second
boundaries are honored; update uses around delay.position and the matching
requeue/selection code (the places calling delay_to_queue_seconds and as_secs())
to the same higher-resolution unit so both sides use identical precision.

@0xFirekeeper
Copy link
Member Author

review mostly false positives from valkey revert, or cosmetic
noting the WATCH/MULTI/EXEC on multiplexed ConnectionManager for later, not for this pr

@0xFirekeeper 0xFirekeeper merged commit 3ede6ab into main Mar 19, 2026
3 of 4 checks passed
@0xFirekeeper 0xFirekeeper deleted the firekeeper/twmq-delay branch March 19, 2026 17:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant