From f5eb5415007bebd304334085c9558770019ea8a3 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 21 Aug 2025 12:58:10 +0100 Subject: [PATCH 1/3] Add attemptDequeue span to marqs More telemetry to figure out why dequeuing is slow --- apps/webapp/app/v3/marqs/index.server.ts | 106 ++++++++++++++--------- 1 file changed, 66 insertions(+), 40 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 1636dba5f0..ac03ae9743 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -589,48 +589,74 @@ export class MarQS { for (const messageQueue of env.queues) { attemptedQueues++; - try { - const messageData = await this.#callDequeueMessage({ - messageQueue, - parentQueue, - }); - - if (!messageData) { - continue; // Try next queue if no message was dequeued + const result = await this.#trace( + "attemptDequeue", + async (innerSpan) => { + try { + innerSpan.setAttributes({ + [SemanticAttributes.QUEUE]: messageQueue, + [SemanticAttributes.PARENT_QUEUE]: parentQueue, + }); + + const messageData = await this.#callDequeueMessage({ + messageQueue, + parentQueue, + }); + + if (!messageData) { + return null; // Try next queue if no message was dequeued + } + + const message = await this.readMessage(messageData.messageId); + + if (message) { + const attributes = { + [SEMATTRS_MESSAGE_ID]: message.messageId, + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + attempted_queues: attemptedQueues, // How many queues we tried before success + attempted_envs: attemptedEnvs, // How many environments we tried before success + message_timestamp: message.timestamp, + message_age: this.#calculateMessageAge(message), + message_priority: message.priority, + message_enqueue_method: message.enqueueMethod, + message_available_at: message.availableAt, + ...flattenAttributes(message.data, "message.data"), + }; + + span.setAttributes(attributes); + innerSpan.setAttributes(attributes); + + await this.options.subscriber?.messageDequeued(message); + + await this.options.visibilityTimeoutStrategy.startHeartbeat( + messageData.messageId, + this.visibilityTimeoutInMs + ); + + return message; + } + } catch (error) { + // Log error but continue trying other queues + logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { + error, + }); + return null; + } + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "dequeue", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, } + ); - const message = await this.readMessage(messageData.messageId); - - if (message) { - span.setAttributes({ - [SEMATTRS_MESSAGE_ID]: message.messageId, - [SemanticAttributes.QUEUE]: message.queue, - [SemanticAttributes.MESSAGE_ID]: message.messageId, - [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, - [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, - attempted_queues: attemptedQueues, // How many queues we tried before success - attempted_envs: attemptedEnvs, // How many environments we tried before success - message_timestamp: message.timestamp, - message_age: this.#calculateMessageAge(message), - message_priority: message.priority, - message_enqueue_method: message.enqueueMethod, - message_available_at: message.availableAt, - ...flattenAttributes(message.data, "message.data"), - }); - - await this.options.subscriber?.messageDequeued(message); - - await this.options.visibilityTimeoutStrategy.startHeartbeat( - messageData.messageId, - this.visibilityTimeoutInMs - ); - - return message; - } - } catch (error) { - // Log error but continue trying other queues - logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { error }); - continue; + if (result) { + return result; } } } From ea54b8a643067a339e128a80fb53a68dd0a18550 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 21 Aug 2025 13:01:23 +0100 Subject: [PATCH 2/3] More spans --- apps/webapp/app/v3/marqs/index.server.ts | 68 +++++++++++++++++++++--- 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index ac03ae9743..5d318d1a4c 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -598,10 +598,27 @@ export class MarQS { [SemanticAttributes.PARENT_QUEUE]: parentQueue, }); - const messageData = await this.#callDequeueMessage({ - messageQueue, - parentQueue, - }); + const messageData = await this.#trace( + "callDequeueMessage", + async (dequeueSpan) => { + dequeueSpan.setAttributes({ + [SemanticAttributes.QUEUE]: messageQueue, + [SemanticAttributes.PARENT_QUEUE]: parentQueue, + }); + + return await this.#callDequeueMessage({ + messageQueue, + parentQueue, + }); + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "dequeue", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); if (!messageData) { return null; // Try next queue if no message was dequeued @@ -629,11 +646,46 @@ export class MarQS { span.setAttributes(attributes); innerSpan.setAttributes(attributes); - await this.options.subscriber?.messageDequeued(message); + await this.#trace( + "messageDequeued", + async (subscriberSpan) => { + subscriberSpan.setAttributes({ + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + }); + + return await this.options.subscriber?.messageDequeued(message); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "notify", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); - await this.options.visibilityTimeoutStrategy.startHeartbeat( - messageData.messageId, - this.visibilityTimeoutInMs + await this.#trace( + "startHeartbeat", + async (heartbeatSpan) => { + heartbeatSpan.setAttributes({ + [SemanticAttributes.MESSAGE_ID]: messageData.messageId, + visibility_timeout_ms: this.visibilityTimeoutInMs, + }); + + return await this.options.visibilityTimeoutStrategy.startHeartbeat( + messageData.messageId, + this.visibilityTimeoutInMs + ); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "heartbeat", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } ); return message; From 3464147955686deb23b556fdce5d65f4a4fcfd7a Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 21 Aug 2025 13:07:03 +0100 Subject: [PATCH 3/3] use "receive" for all operation names --- apps/webapp/app/v3/marqs/index.server.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 5d318d1a4c..978991fabc 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -614,7 +614,7 @@ export class MarQS { { kind: SpanKind.CONSUMER, attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "dequeue", + [SEMATTRS_MESSAGING_OPERATION]: "receive", [SEMATTRS_MESSAGING_SYSTEM]: "marqs", }, } @@ -660,7 +660,7 @@ export class MarQS { { kind: SpanKind.INTERNAL, attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "notify", + [SEMATTRS_MESSAGING_OPERATION]: "receive", [SEMATTRS_MESSAGING_SYSTEM]: "marqs", }, } @@ -682,7 +682,7 @@ export class MarQS { { kind: SpanKind.INTERNAL, attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "heartbeat", + [SEMATTRS_MESSAGING_OPERATION]: "receive", [SEMATTRS_MESSAGING_SYSTEM]: "marqs", }, } @@ -701,7 +701,7 @@ export class MarQS { { kind: SpanKind.CONSUMER, attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "dequeue", + [SEMATTRS_MESSAGING_OPERATION]: "receive", [SEMATTRS_MESSAGING_SYSTEM]: "marqs", }, }