Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/db/wallets/walletNonce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export const deleteAllNonces = async () => {
const keys = [
...(await redis.keys("nonce:*")),
...(await redis.keys("nonce-recycled:*")),
...(await redis.keys("sent-nonce:*")),
];
if (keys.length > 0) {
await redis.del(keys);
Expand Down
15 changes: 12 additions & 3 deletions src/server/routes/transaction/retry-failed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { eth_getTransactionReceipt, getRpcClient } from "thirdweb";
import { TransactionDB } from "../../../db/transactions/db";
import { getChain } from "../../../utils/chain";
import { thirdwebClient } from "../../../utils/sdk";
import { MineTransactionQueue } from "../../../worker/queues/mineTransactionQueue";
import { SendTransactionQueue } from "../../../worker/queues/sendTransactionQueue";
import { createCustomError } from "../../middleware/error";
import { standardResponseSchema } from "../../schemas/sharedApiSchemas";
Expand Down Expand Up @@ -108,15 +109,23 @@ export async function retryFailedTransaction(fastify: FastifyInstance) {
}
}

const job = await SendTransactionQueue.q.getJob(
const sendJob = await SendTransactionQueue.q.getJob(
SendTransactionQueue.jobId({
queueId: transaction.queueId,
resendCount: 0,
}),
);
if (sendJob) {
await sendJob.remove();
}

if (job) {
await job.remove();
const mineJob = await MineTransactionQueue.q.getJob(
MineTransactionQueue.jobId({
queueId: transaction.queueId,
}),
);
if (mineJob) {
await mineJob.remove();
}

await SendTransactionQueue.add({
Expand Down
18 changes: 7 additions & 11 deletions src/worker/queues/mineTransactionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,8 @@ export type MineTransactionData = {
export class MineTransactionQueue {
static q = new Queue<string>("transactions-2-mine", {
connection: redis,
defaultJobOptions: {
...defaultJobOptions,
// Delay confirming the tx by 500ms.
delay: 500,
// Retry after 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 512s, 1024s (17 minutes)
// This needs to be long enough to handle transactions stuck in mempool.
// @TODO: This can be more optimized based on the chain block time.
attempts: 10,
backoff: { type: "exponential", delay: 2_000 },
},
// Backoff strategy is defined on the worker (`BackeoffStrategy`) and when adding to the queue (`attempts`).
defaultJobOptions,
});

// There must be a worker to poll the result for every transaction hash,
Expand All @@ -29,7 +21,11 @@ export class MineTransactionQueue {
static add = async (data: MineTransactionData) => {
const serialized = superjson.stringify(data);
const jobId = this.jobId(data);
await this.q.add(jobId, serialized, { jobId });
await this.q.add(jobId, serialized, {
jobId,
attempts: 200, // > 30 minutes with the backoffStrategy defined on the worker
backoff: { type: "custom" },
});
};

static length = async () => this.q.getWaitingCount();
Expand Down
28 changes: 16 additions & 12 deletions src/worker/tasks/mineTransactionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,17 @@ const _mineTransaction = async (

// Resend the transaction (after some initial delay).
const config = await getConfig();
if (resendCount < config.maxRetriesPerTx) {
const blockNumber = await getBlockNumberish(chainId);
const ellapsedBlocks = blockNumber - sentAtBlock;
if (ellapsedBlocks >= config.minEllapsedBlocksBeforeRetry) {
const message = `Resending transaction after ${ellapsedBlocks} blocks. blockNumber=${blockNumber} sentAtBlock=${sentAtBlock}`;
job.log(message);
logger({ service: "worker", level: "info", queueId, message });
const blockNumber = await getBlockNumberish(chainId);
const ellapsedBlocks = blockNumber - sentAtBlock;
if (ellapsedBlocks >= config.minEllapsedBlocksBeforeRetry) {
const message = `Resending transaction after ${ellapsedBlocks} blocks. blockNumber=${blockNumber} sentAtBlock=${sentAtBlock}`;
job.log(message);
logger({ service: "worker", level: "info", queueId, message });

await SendTransactionQueue.add({
queueId,
resendCount: resendCount + 1,
});
}
await SendTransactionQueue.add({
queueId,
resendCount: resendCount + 1,
});
}

return null;
Expand Down Expand Up @@ -229,6 +227,12 @@ export const initMineTransactionWorker = () => {
const _worker = new Worker(MineTransactionQueue.q.name, handler, {
concurrency: env.CONFIRM_TRANSACTION_QUEUE_CONCURRENCY,
connection: redis,
settings: {
backoffStrategy: (attemptsMade: number) => {
// Retries after: 2s, 4s, 6s, 8s, 10s, 10s, 10s, 10s, ...
return Math.min(attemptsMade * 2_000, 10_000);
},
},
});

// If a transaction fails to mine after all retries, set it as errored and release the nonce.
Expand Down
11 changes: 10 additions & 1 deletion src/worker/tasks/sendTransactionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
isReplacementGasFeeTooLow,
prettifyError,
} from "../../utils/error";
import { logger } from "../../utils/logger";
import { getChecksumAddress } from "../../utils/primitiveTypes";
import { redis } from "../../utils/redis/redis";
import { thirdwebClient } from "../../utils/sdk";
Expand Down Expand Up @@ -181,7 +182,15 @@ const _sendTransaction = async (

// Acquire an unused nonce for this transaction.
const { nonce, isRecycledNonce } = await acquireNonce(chainId, from);
job.log(`Acquired nonce ${nonce}. isRecycledNonce=${isRecycledNonce}`);
job.log(
`Acquired nonce ${nonce} for transaction ${queuedTransaction.queueId}. isRecycledNonce=${isRecycledNonce}`,
);
logger({
level: "info",
message: `Acquired nonce ${nonce} for transaction ${queuedTransaction.queueId}. isRecycledNonce=${isRecycledNonce}`,
service: "worker",
});

populatedTransaction.nonce = nonce;
job.log(`Sending transaction: ${stringify(populatedTransaction)}`);

Expand Down