diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 1636dba5f0..978991fabc 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -589,48 +589,126 @@ export class MarQS { for (const messageQueue of env.queues) { attemptedQueues++; - try { - const messageData = await this.#callDequeueMessage({ - messageQueue, - parentQueue, - }); - - if (!messageData) { - continue; // Try next queue if no message was dequeued + const result = await this.#trace( + "attemptDequeue", + async (innerSpan) => { + try { + innerSpan.setAttributes({ + [SemanticAttributes.QUEUE]: messageQueue, + [SemanticAttributes.PARENT_QUEUE]: parentQueue, + }); + + const messageData = await this.#trace( + "callDequeueMessage", + async (dequeueSpan) => { + dequeueSpan.setAttributes({ + [SemanticAttributes.QUEUE]: messageQueue, + [SemanticAttributes.PARENT_QUEUE]: parentQueue, + }); + + return await this.#callDequeueMessage({ + messageQueue, + parentQueue, + }); + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + + if (!messageData) { + return null; // Try next queue if no message was dequeued + } + + const message = await this.readMessage(messageData.messageId); + + if (message) { + const attributes = { + [SEMATTRS_MESSAGE_ID]: message.messageId, + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + attempted_queues: attemptedQueues, // How many queues we tried before success + attempted_envs: attemptedEnvs, // How many environments we tried before success + message_timestamp: message.timestamp, + message_age: this.#calculateMessageAge(message), + message_priority: message.priority, + message_enqueue_method: message.enqueueMethod, + message_available_at: message.availableAt, + ...flattenAttributes(message.data, "message.data"), + }; + + span.setAttributes(attributes); + innerSpan.setAttributes(attributes); + + await this.#trace( + "messageDequeued", + async (subscriberSpan) => { + subscriberSpan.setAttributes({ + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + }); + + return await this.options.subscriber?.messageDequeued(message); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + + await this.#trace( + "startHeartbeat", + async (heartbeatSpan) => { + heartbeatSpan.setAttributes({ + [SemanticAttributes.MESSAGE_ID]: messageData.messageId, + visibility_timeout_ms: this.visibilityTimeoutInMs, + }); + + return await this.options.visibilityTimeoutStrategy.startHeartbeat( + messageData.messageId, + this.visibilityTimeoutInMs + ); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + + return message; + } + } catch (error) { + // Log error but continue trying other queues + logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { + error, + }); + return null; + } + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, } + ); - const message = await this.readMessage(messageData.messageId); - - if (message) { - span.setAttributes({ - [SEMATTRS_MESSAGE_ID]: message.messageId, - [SemanticAttributes.QUEUE]: message.queue, - [SemanticAttributes.MESSAGE_ID]: message.messageId, - [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, - [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, - attempted_queues: attemptedQueues, // How many queues we tried before success - attempted_envs: attemptedEnvs, // How many environments we tried before success - message_timestamp: message.timestamp, - message_age: this.#calculateMessageAge(message), - message_priority: message.priority, - message_enqueue_method: message.enqueueMethod, - message_available_at: message.availableAt, - ...flattenAttributes(message.data, "message.data"), - }); - - await this.options.subscriber?.messageDequeued(message); - - await this.options.visibilityTimeoutStrategy.startHeartbeat( - messageData.messageId, - this.visibilityTimeoutInMs - ); - - return message; - } - } catch (error) { - // Log error but continue trying other queues - logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { error }); - continue; + if (result) { + return result; } } }