Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage-twmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:

# Run coverage with tarpaulin
- name: Run coverage
run: cargo tarpaulin -p twmq --skip-clean --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*"
run: cargo tarpaulin -p twmq --skip-clean --timeout 300 --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*"

# Upload coverage to Codecov
# TODO: Uncomment once we have open-sourced the repo
Expand Down
14 changes: 2 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,7 @@ config = "0.15.11"
aws-arn = "0.3.1"

# Redis
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] }
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] }

# Dev dependencies
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }

# Rustls
#
# NOTE: rustls 0.23 requires selecting exactly one process-wide crypto provider
# (features: `ring` or `aws_lc_rs` / `aws-lc-rs`). Some dependency graphs (e.g. via
# redis-rs' rustls integration) can end up with *no* provider enabled, which causes a
# runtime panic when building TLS client/server configs.
#
# We explicitly enable the `ring` provider here to make TLS work reliably.
rustls = { version = "0.23.32", default-features = false, features = ["std", "ring"] }
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }
9 changes: 0 additions & 9 deletions core/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@ impl RpcCredentials {

Ok(header_map)
}

pub fn client_id_for_logs(&self) -> Option<&str> {
match self {
RpcCredentials::Thirdweb(ThirdwebAuth::ClientIdServiceKey(creds)) => {
Some(&creds.client_id)
}
RpcCredentials::Thirdweb(ThirdwebAuth::SecretKey(_)) => None,
}
}
}

pub trait Chain: Send + Sync {
Expand Down
53 changes: 6 additions & 47 deletions executors/src/eip7702_executor/confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use crate::{
},
};

const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm";

// --- Job Payload ---
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -170,7 +168,7 @@ where
type ErrorData = Eip7702ConfirmationError;
type JobData = Eip7702ConfirmationJobData;

#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_CONFIRM_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))]
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
async fn process(
&self,
job: &BorrowedJob<Self::JobData>,
Expand Down Expand Up @@ -204,14 +202,7 @@ where
.await
.map_err(|e| {
tracing::error!(
transaction_id = %job_data.transaction_id,
chain_id = job_data.chain_id,
client_id = job_data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
bundler_transaction_id = %job_data.bundler_transaction_id,
bundler_transaction_id = job_data.bundler_transaction_id,
sender_details = ?job_data.sender_details,
error = ?e,
"Failed to get transaction hash from bundler"
Expand Down Expand Up @@ -330,15 +321,7 @@ where
// Send webhook
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue success webhook"
);
Expand All @@ -363,15 +346,7 @@ where
if should_queue_webhook {
if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue nack webhook"
);
Expand All @@ -395,30 +370,14 @@ where
.add_remove_command(tx.pipeline(), &job.job.data.transaction_id);

tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?fail_data.error,
"EIP-7702 confirmation job failed"
);

if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue fail webhook"
);
Expand Down
55 changes: 6 additions & 49 deletions executors/src/eip7702_executor/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ use crate::{

use super::confirm::{Eip7702ConfirmationHandler, Eip7702ConfirmationJobData};

const EIP7702_SEND_QUEUE_ID: &str = "eip7702_send";
const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm";

// --- Job Payload ---
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -178,7 +175,7 @@ where
type ErrorData = Eip7702SendError;
type JobData = Eip7702SendJobData;

#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_SEND_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))]
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
async fn process(
&self,
job: &BorrowedJob<Self::JobData>,
Expand Down Expand Up @@ -389,15 +386,7 @@ where

if let Err(e) = tx.queue_job(confirmation_job) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to enqueue confirmation job"
);
Expand All @@ -406,15 +395,7 @@ where
// Send webhook
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue success webhook"
);
Expand All @@ -430,15 +411,7 @@ where
// Don't modify transaction registry on NACK - job will be retried
if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue nack webhook"
);
Expand All @@ -456,30 +429,14 @@ where
.add_remove_command(tx.pipeline(), &job.job.data.transaction_id);

tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?fail_data.error,
"EIP-7702 send job failed"
);

if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue fail webhook"
);
Expand Down
32 changes: 6 additions & 26 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use alloy::{
consensus::{Signed, TypedTransaction},
primitives::Address,
};
use twmq::redis::{AsyncCommands, Pipeline};
use twmq::redis::cluster_async::ClusterConnection;
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};

use crate::{
eoa::{
Expand All @@ -31,7 +30,6 @@ use crate::{

const MAX_RETRIES: u32 = 10;
const RETRY_BASE_DELAY_MS: u64 = 10;
const EOA_QUEUE_ID: &str = "eoa_executor";

pub trait SafeRedisTransaction: Send + Sync {
type ValidationData;
Expand All @@ -45,7 +43,7 @@ pub trait SafeRedisTransaction: Send + Sync {
) -> Self::OperationResult;
fn validation(
&self,
conn: &mut ClusterConnection,
conn: &mut ConnectionManager,
store: &EoaExecutorStore,
) -> impl Future<Output = Result<Self::ValidationData, TransactionStoreError>> + Send;
fn watch_keys(&self) -> Vec<String>;
Expand Down Expand Up @@ -595,7 +593,7 @@ impl AtomicEoaExecutorStore {
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
pipeline.expire(&tx_data_key, ttl_seconds);
pipeline.expire(
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
self.transaction_attempts_list_name(&pending_transaction.transaction_id),
ttl_seconds,
);

Expand All @@ -614,18 +612,7 @@ impl AtomicEoaExecutorStore {
&mut tx_context,
webhook_queue.clone(),
) {
tracing::error!(
transaction_id = %pending_transaction.transaction_id,
chain_id = pending_transaction.user_request.chain_id,
client_id = pending_transaction
.user_request
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EOA_QUEUE_ID,
"Failed to queue webhook for fail: {}",
e
);
tracing::error!("Failed to queue webhook for fail: {}", e);
}
}

Expand Down Expand Up @@ -683,7 +670,7 @@ impl AtomicEoaExecutorStore {
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
pipeline.expire(&tx_data_key, ttl_seconds);
pipeline.expire(
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
self.transaction_attempts_list_name(&pending_transaction.transaction_id),
ttl_seconds,
);
}
Expand All @@ -707,13 +694,6 @@ impl AtomicEoaExecutorStore {
) {
tracing::error!(
transaction_id = %pending_transaction.transaction_id,
chain_id = pending_transaction.user_request.chain_id,
client_id = pending_transaction
.user_request
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EOA_QUEUE_ID,
error = ?e,
"Failed to queue webhook for batch fail"
);
Expand Down Expand Up @@ -835,7 +815,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> {

async fn validation(
&self,
_conn: &mut ClusterConnection,
_conn: &mut ConnectionManager,
store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
Expand Down
Loading
Loading