diff --git a/src/db/transactions/cleanTxs.ts b/src/db/transactions/cleanTxs.ts index 5875c2d1e..12f4ccba9 100644 --- a/src/db/transactions/cleanTxs.ts +++ b/src/db/transactions/cleanTxs.ts @@ -3,6 +3,7 @@ import { Static } from "@sinclair/typebox"; import { transactionResponseSchema } from "../../server/schemas/transaction"; // TODO: This shouldn't need to exist with zod +// @deprecated - use toTransactionSchema export const cleanTxs = ( txs: Transactions[], ): Static[] => { diff --git a/src/db/transactions/getTxByIds.ts b/src/db/transactions/getTxByIds.ts index 5ef87a9e2..cff7f98c4 100644 --- a/src/db/transactions/getTxByIds.ts +++ b/src/db/transactions/getTxByIds.ts @@ -1,30 +1,12 @@ -import { Static } from "@sinclair/typebox"; -import { PrismaTransaction } from "../../schema/prisma"; -import { transactionResponseSchema } from "../../server/schemas/transaction"; +import { Transactions } from "@prisma/client"; import { prisma } from "../client"; -import { cleanTxs } from "./cleanTxs"; -interface GetTxByIdsParams { - queueIds: string[]; - pgtx?: PrismaTransaction; -} -export const getTxByIds = async ({ - queueIds, -}: GetTxByIdsParams): Promise< - Static[] | null -> => { - const tx = await prisma.transactions.findMany({ +export const getTransactionsByQueueIds = async ( + queueIds: string[], +): Promise => { + return await prisma.transactions.findMany({ where: { - id: { - in: queueIds, - }, + id: { in: queueIds }, }, }); - - if (!tx || tx.length === 0) { - return null; - } - - const cleanedTx = cleanTxs(tx); - return cleanedTx; }; diff --git a/src/db/webhooks/getAllWebhooks.ts b/src/db/webhooks/getAllWebhooks.ts index d122fbc42..85e93eefc 100644 --- a/src/db/webhooks/getAllWebhooks.ts +++ b/src/db/webhooks/getAllWebhooks.ts @@ -3,6 +3,9 @@ import { prisma } from "../client"; export const getAllWebhooks = async (): Promise => { return await prisma.webhooks.findMany({ + where: { + revokedAt: null, + }, orderBy: { id: "asc", }, diff --git a/src/server/schemas/transaction/index.ts b/src/server/schemas/transaction/index.ts index 11fc00f0f..26fe4e88f 100644 --- a/src/server/schemas/transaction/index.ts +++ b/src/server/schemas/transaction/index.ts @@ -1,5 +1,7 @@ -import { Type } from "@sinclair/typebox"; +import { Transactions } from "@prisma/client"; +import { Static, Type } from "@sinclair/typebox"; +// @TODO: rename to TransactionSchema export const transactionResponseSchema = Type.Object({ queueId: Type.Union([ Type.String({ @@ -198,3 +200,23 @@ export enum TransactionStatus { // Tx was cancelled and will not be re-attempted. Cancelled = "cancelled", } + +export const toTransactionSchema = ( + transaction: Transactions, +): Static => ({ + ...transaction, + queueId: transaction.id, + queuedAt: transaction.queuedAt.toISOString(), + sentAt: transaction.sentAt?.toISOString() || null, + minedAt: transaction.minedAt?.toISOString() || null, + cancelledAt: transaction.cancelledAt?.toISOString() || null, + status: transaction.errorMessage + ? TransactionStatus.Errored + : transaction.minedAt + ? TransactionStatus.Mined + : transaction.cancelledAt + ? TransactionStatus.Cancelled + : transaction.sentAt + ? TransactionStatus.Sent + : TransactionStatus.Queued, +}); diff --git a/src/utils/cache/getWebhook.ts b/src/utils/cache/getWebhook.ts index 3dea8a19b..423fcfddf 100644 --- a/src/utils/cache/getWebhook.ts +++ b/src/utils/cache/getWebhook.ts @@ -14,14 +14,10 @@ export const getWebhooksByEventType = async ( return webhookCache.get(cacheKey) as Webhooks[]; } - const webhookConfig = await getAllWebhooks(); + const filteredWebhooks = (await getAllWebhooks()).filter( + (webhook) => webhook.eventType === eventType, + ); - const eventTypeWebhookDetails = webhookConfig.filter((webhook) => { - if (!webhook.revokedAt && webhook.eventType === eventType) { - return webhook; - } - }); - - webhookCache.set(cacheKey, eventTypeWebhookDetails); - return eventTypeWebhookDetails; + webhookCache.set(cacheKey, filteredWebhooks); + return filteredWebhooks; }; diff --git a/src/utils/webhook.ts b/src/utils/webhook.ts index 1906e606c..8ba8571ff 100644 --- a/src/utils/webhook.ts +++ b/src/utils/webhook.ts @@ -1,11 +1,15 @@ import { Webhooks } from "@prisma/client"; import crypto from "crypto"; -import { getTxByIds } from "../db/transactions/getTxByIds"; +import { getTransactionsByQueueIds } from "../db/transactions/getTxByIds"; import { WalletBalanceWebhookSchema, WebhooksEventTypes, } from "../schema/webhooks"; -import { TransactionStatus } from "../server/schemas/transaction"; +import { + TransactionStatus, + toTransactionSchema, +} from "../server/schemas/transaction"; +import { enqueueWebhook } from "../worker/queues/sendWebhookQueue"; import { getWebhooksByEventType } from "./cache/getWebhook"; import { logger } from "./logger"; @@ -85,59 +89,28 @@ export interface WebhookData { status: TransactionStatus; } -export const sendWebhooks = async (webhooks: WebhookData[]) => { - const queueIds = webhooks.map((webhook) => webhook.queueId); - const txs = await getTxByIds({ queueIds }); - if (!txs || txs.length === 0) { - return; - } +export const sendWebhooks = async (data: WebhookData[]) => { + const queueIds = data.map((d) => d.queueId); + const transactions = await getTransactionsByQueueIds(queueIds); - const webhooksWithTxs = webhooks - .map((webhook) => { - const tx = txs.find((tx) => tx.queueId === webhook.queueId); - return { - ...webhook, - tx, - }; - }) - .filter((webhook) => !!webhook.tx); - - for (const webhook of webhooksWithTxs) { - const webhookStatus = - webhook.status === TransactionStatus.Queued + for (const transaction of transactions) { + const transactionResponse = toTransactionSchema(transaction); + const type = + transactionResponse.status === TransactionStatus.Queued ? WebhooksEventTypes.QUEUED_TX - : webhook.status === TransactionStatus.Sent + : transactionResponse.status === TransactionStatus.Sent ? WebhooksEventTypes.SENT_TX - : webhook.status === TransactionStatus.Mined + : transactionResponse.status === TransactionStatus.Mined ? WebhooksEventTypes.MINED_TX - : webhook.status === TransactionStatus.Errored + : transactionResponse.status === TransactionStatus.Errored ? WebhooksEventTypes.ERRORED_TX - : webhook.status === TransactionStatus.Cancelled + : transactionResponse.status === TransactionStatus.Cancelled ? WebhooksEventTypes.CANCELLED_TX : undefined; - const webhookConfigs = await Promise.all([ - ...((await getWebhooksByEventType(WebhooksEventTypes.ALL_TX)) || []), - ...(webhookStatus ? await getWebhooksByEventType(webhookStatus) : []), - ]); - - await Promise.all( - webhookConfigs.map(async (webhookConfig) => { - if (webhookConfig.revokedAt) { - logger({ - service: "server", - level: "debug", - message: "No webhook set or active, skipping webhook send", - }); - return; - } - - await sendWebhookRequest( - webhookConfig, - webhook.tx as Record, - ); - }), - ); + if (type) { + await enqueueWebhook({ type, transaction }); + } } }; diff --git a/src/worker/queues/sendWebhookQueue.ts b/src/worker/queues/sendWebhookQueue.ts index 94162a871..b5457efe3 100644 --- a/src/worker/queues/sendWebhookQueue.ts +++ b/src/worker/queues/sendWebhookQueue.ts @@ -1,11 +1,13 @@ import { ContractEventLogs, ContractTransactionReceipts, + Transactions, Webhooks, } from "@prisma/client"; import { Queue } from "bullmq"; import SuperJSON from "superjson"; import { WebhooksEventTypes } from "../../schema/webhooks"; +import { getWebhooksByEventType } from "../../utils/cache/getWebhook"; import { logger } from "../../utils/logger"; import { redis } from "../../utils/redis/redis"; import { defaultJobOptions } from "./queues"; @@ -26,8 +28,22 @@ export type EnqueueContractSubscriptionWebhookData = { eventLog?: ContractEventLogs; transactionReceipt?: ContractTransactionReceipts; }; + +export type EnqueueTransactionWebhookData = { + type: + | WebhooksEventTypes.ALL_TX + | WebhooksEventTypes.QUEUED_TX + | WebhooksEventTypes.SENT_TX + | WebhooksEventTypes.MINED_TX + | WebhooksEventTypes.ERRORED_TX + | WebhooksEventTypes.CANCELLED_TX; + transaction: Transactions; +}; + // TODO: Add other webhook event types here. -type EnqueueWebhookData = EnqueueContractSubscriptionWebhookData; +type EnqueueWebhookData = + | EnqueueContractSubscriptionWebhookData + | EnqueueTransactionWebhookData; export interface WebhookJob { data: EnqueueWebhookData; @@ -38,15 +54,26 @@ export const enqueueWebhook = async (data: EnqueueWebhookData) => { switch (data.type) { case WebhooksEventTypes.CONTRACT_SUBSCRIPTION: return enqueueContractSubscriptionWebhook(data); + case WebhooksEventTypes.ALL_TX: + case WebhooksEventTypes.QUEUED_TX: + case WebhooksEventTypes.SENT_TX: + case WebhooksEventTypes.MINED_TX: + case WebhooksEventTypes.ERRORED_TX: + case WebhooksEventTypes.CANCELLED_TX: + return enqueueTransactionWebhook(data); default: logger({ service: "worker", level: "warn", - message: `Unexpected webhook type: ${data.type}`, + message: `Unexpected webhook type: ${(data as any).type}`, }); } }; +/** + * Contract Subscriptions webhooks + */ + const enqueueContractSubscriptionWebhook = async ( data: EnqueueContractSubscriptionWebhookData, ) => { @@ -88,3 +115,36 @@ const getContractSubscriptionWebhookIdempotencyKey = (args: { } throw 'Must provide "eventLog" or "transactionReceipt".'; }; + +/** + * Transaction webhooks + */ + +const enqueueTransactionWebhook = async ( + data: EnqueueTransactionWebhookData, +) => { + if (!_queue) return; + + const webhooks = [ + ...(await getWebhooksByEventType(WebhooksEventTypes.ALL_TX)), + ...(await getWebhooksByEventType(data.type)), + ]; + + for (const webhook of webhooks) { + const job: WebhookJob = { data, webhook }; + const serialized = SuperJSON.stringify(job); + await _queue.add(`${data.type}:${webhook.id}`, serialized, { + jobId: getTransactionWebhookIdempotencyKey({ + webhook, + eventType: data.type, + queueId: data.transaction.id, + }), + }); + } +}; + +const getTransactionWebhookIdempotencyKey = (args: { + webhook: Webhooks; + eventType: WebhooksEventTypes; + queueId: string; +}) => `${args.webhook.url}:${args.eventType}:${args.queueId}`; diff --git a/src/worker/tasks/sendWebhookWorker.ts b/src/worker/tasks/sendWebhookWorker.ts index 992d93463..2eeb37006 100644 --- a/src/worker/tasks/sendWebhookWorker.ts +++ b/src/worker/tasks/sendWebhookWorker.ts @@ -1,7 +1,12 @@ +import { Static } from "@sinclair/typebox"; import { Job, Processor, Worker } from "bullmq"; import superjson from "superjson"; import { WebhooksEventTypes } from "../../schema/webhooks"; import { toEventLogSchema } from "../../server/schemas/eventLog"; +import { + toTransactionSchema, + transactionResponseSchema, +} from "../../server/schemas/transaction"; import { toTransactionReceiptSchema } from "../../server/schemas/transactionReceipt"; import { redis } from "../../utils/redis/redis"; import { WebhookResponse, sendWebhookRequest } from "../../utils/webhook"; @@ -11,33 +16,46 @@ import { WebhookJob, } from "../queues/sendWebhookQueue"; -interface WebhookBody { - type: "event-log" | "transaction-receipt"; - data: any; -} - const handler: Processor = async (job: Job) => { const { data, webhook } = superjson.parse(job.data); let resp: WebhookResponse | undefined; - if (data.type === WebhooksEventTypes.CONTRACT_SUBSCRIPTION) { - let webhookBody: WebhookBody; - if (data.eventLog) { - webhookBody = { - type: "event-log", - data: toEventLogSchema(data.eventLog), + switch (data.type) { + case WebhooksEventTypes.CONTRACT_SUBSCRIPTION: { + let webhookBody: { + type: "event-log" | "transaction-receipt"; + data: any; }; - } else if (data.transactionReceipt) { - webhookBody = { - type: "transaction-receipt", - data: toTransactionReceiptSchema(data.transactionReceipt), - }; - } else { - throw new Error( - 'Missing "eventLog" or "transactionReceipt" for CONTRACT_SUBSCRIPTION webhook.', - ); + if (data.eventLog) { + webhookBody = { + type: "event-log", + data: toEventLogSchema(data.eventLog), + }; + } else if (data.transactionReceipt) { + webhookBody = { + type: "transaction-receipt", + data: toTransactionReceiptSchema(data.transactionReceipt), + }; + } else { + throw new Error( + 'Missing "eventLog" or "transactionReceipt" for CONTRACT_SUBSCRIPTION webhook.', + ); + } + resp = await sendWebhookRequest(webhook, webhookBody); + break; + } + + case WebhooksEventTypes.ALL_TX: + case WebhooksEventTypes.QUEUED_TX: + case WebhooksEventTypes.SENT_TX: + case WebhooksEventTypes.MINED_TX: + case WebhooksEventTypes.ERRORED_TX: + case WebhooksEventTypes.CANCELLED_TX: { + const webhookBody: Static = + toTransactionSchema(data.transaction); + resp = await sendWebhookRequest(webhook, webhookBody); + break; } - resp = await sendWebhookRequest(webhook, webhookBody); } if (resp && !resp.ok) { @@ -52,7 +70,7 @@ const handler: Processor = async (job: Job) => { let _worker: Worker | null = null; if (redis) { _worker = new Worker(SEND_WEBHOOK_QUEUE_NAME, handler, { - concurrency: 1, + concurrency: 10, connection: redis, }); logWorkerEvents(_worker);