From b9349c0dfc707f063bd523c42cbdee1764539c05 Mon Sep 17 00:00:00 2001 From: thisisgoldman Date: Wed, 16 Jul 2025 19:02:27 -0700 Subject: [PATCH 1/5] Add routing to messages tab --- assets/svelte/consumers/ShowMessages.svelte | 392 ++++++++++++++++++++ assets/svelte/functions/Edit.svelte | 3 +- lib/sequin_web/live/sink_consumers/show.ex | 128 ++++++- 3 files changed, 519 insertions(+), 4 deletions(-) diff --git a/assets/svelte/consumers/ShowMessages.svelte b/assets/svelte/consumers/ShowMessages.svelte index 761af256d..f573e29ea 100644 --- a/assets/svelte/consumers/ShowMessages.svelte +++ b/assets/svelte/consumers/ShowMessages.svelte @@ -54,6 +54,7 @@ let messageShapeOpen = false; let logsOpen = true; let transformedMessageOpen = false; + let routingOpen = false; // Add computed property for message delivery state $: isMessageDelivered = @@ -332,6 +333,60 @@ isResettingAll = false; }); } + + function formatRoutingInfo(routingInfo) { + if (!routingInfo) return "N/A"; + + if (routingInfo.error) { + return "Error"; + } + + // Format based on sink type + const consumerType = consumer.type; + + switch (consumerType) { + case "http_push": + const method = routingInfo.method || "POST"; + const path = routingInfo.endpoint_path || "/"; + return `${method} ${path}`; + case "kafka": + return routingInfo.topic || "N/A"; + case "redis_string": + return routingInfo.key || "N/A"; + case "redis_stream": + return routingInfo.stream_key || "N/A"; + case "nats": + return routingInfo.subject || "N/A"; + case "gcp_pubsub": + return routingInfo.topic_id || "N/A"; + case "azure_event_hub": + return routingInfo.partition_key || "N/A"; + case "sns": + return routingInfo.topic_arn || "N/A"; + case "kinesis": + return routingInfo.stream_arn || "N/A"; + case "typesense": + return routingInfo.collection_name || "N/A"; + case "meilisearch": + return routingInfo.index_name || "N/A"; + case "elasticsearch": + return routingInfo.index_name || "N/A"; + case "s2": + const basin = routingInfo.basin || "N/A"; + const stream = routingInfo.stream || "N/A"; + return `${basin}/${stream}`; + case "rabbitmq": + const exchange = routingInfo.exchange || "N/A"; + const routingKey = routingInfo.routing_key || "N/A"; + return `${exchange}/${routingKey}`; + case "sqs": + return routingInfo.queue_url || "N/A"; + case "sequin_stream": + return "Sequin Stream"; + default: + return "N/A"; + } + } PKs + + Routing + @@ -482,6 +542,9 @@ >{item.table_schema}.{item.table_name} {item.record_pks} + + {formatRoutingInfo(item.routing_info)} + + +
+ + + +
+

About Routing

+

+ This shows the outcome of the routing function applied + to this message. The routing determines where this + message will be sent based on the sink type and + configuration. +

+
+
+ +
+ +
+
+ {/if} + + {#if isMessageDelivered} + + Routing information is not available for acknowledged messages + + {/if} + + + {#if routingOpen && !isMessageDelivered} +
+ {#if selectedMessage.routing_info} + {#if selectedMessage.routing_info.error} +
+ Error: {selectedMessage.routing_info.error} +
+ {:else} +
+ {#if consumer.type === "http_push"} +
+ Method: + {selectedMessage.routing_info.method || "POST"} +
+
+ Endpoint: + {selectedMessage.routing_info.endpoint_path || + "/"} +
+ {#if selectedMessage.routing_info.headers && Object.keys(selectedMessage.routing_info.headers).length > 0} +
+ Headers: + {Object.keys(selectedMessage.routing_info.headers) + .length} header(s) +
+ {/if} + {:else if consumer.type === "kafka"} +
+ Topic: + {selectedMessage.routing_info.topic || "N/A"} +
+
+ Message Key: + {selectedMessage.routing_info.message_key || + "N/A"} +
+ {:else if consumer.type === "redis_string"} +
+ Key: + {selectedMessage.routing_info.key || "N/A"} +
+
+ Action: + {selectedMessage.routing_info.action || "N/A"} +
+ {#if selectedMessage.routing_info.expire_ms} +
+ Expire (ms): + {selectedMessage.routing_info.expire_ms} +
+ {/if} + {:else if consumer.type === "redis_stream"} +
+ Stream Key: + {selectedMessage.routing_info.stream_key || + "N/A"} +
+ {:else if consumer.type === "nats"} +
+ Subject: + {selectedMessage.routing_info.subject || "N/A"} +
+ {#if selectedMessage.routing_info.headers && Object.keys(selectedMessage.routing_info.headers).length > 0} +
+ Headers: + {Object.keys(selectedMessage.routing_info.headers) + .length} header(s) +
+ {/if} + {:else if consumer.type === "gcp_pubsub"} +
+ Topic ID: + {selectedMessage.routing_info.topic_id || + "N/A"} +
+ {:else if consumer.type === "azure_event_hub"} +
+ Partition Key: + {selectedMessage.routing_info.partition_key || + "N/A"} +
+ {:else if consumer.type === "sns"} +
+ Topic ARN: + {selectedMessage.routing_info.topic_arn || + "N/A"} +
+ {:else if consumer.type === "kinesis"} +
+ Stream ARN: + {selectedMessage.routing_info.stream_arn || + "N/A"} +
+ {:else if consumer.type === "typesense"} +
+ Collection: + {selectedMessage.routing_info.collection_name || + "N/A"} +
+
+ Action: + {selectedMessage.routing_info.action || "N/A"} +
+ {:else if consumer.type === "meilisearch"} +
+ Index: + {selectedMessage.routing_info.index_name || + "N/A"} +
+
+ Action: + {selectedMessage.routing_info.action || "N/A"} +
+ {:else if consumer.type === "elasticsearch"} +
+ Index: + {selectedMessage.routing_info.index_name || + "N/A"} +
+ {:else if consumer.type === "s2"} +
+ Basin: + {selectedMessage.routing_info.basin || "N/A"} +
+
+ Stream: + {selectedMessage.routing_info.stream || "N/A"} +
+ {:else if consumer.type === "rabbitmq"} +
+ Exchange: + {selectedMessage.routing_info.exchange || + "N/A"} +
+
+ Routing Key: + {selectedMessage.routing_info.routing_key || + "N/A"} +
+ {#if selectedMessage.routing_info.headers && Object.keys(selectedMessage.routing_info.headers).length > 0} +
+ Headers: + {Object.keys(selectedMessage.routing_info.headers) + .length} header(s) +
+ {/if} + {:else if consumer.type === "sqs"} +
+ Queue URL: + {selectedMessage.routing_info.queue_url || + "N/A"} +
+ {:else if consumer.type === "sequin_stream"} +
+ Stream: + Sequin Stream +
+ {:else} +
+
{JSON.stringify(
+                            selectedMessage.routing_info,
+                            null,
+                            2,
+                          )}
+
+ {/if} +
+ {/if} + {:else} +
+ No routing information available +
+ {/if} +
+ {/if} + +
- - -
-

About Routing

-

- This shows the outcome of the routing function applied - to this message. The routing determines where this - message will be sent based on the sink type and - configuration. -

-
-
-
{:else}
- {#if consumer.type === "http_push"} -
- Method: - {selectedMessage.routing_info.method || "POST"} -
-
- Endpoint: - {selectedMessage.routing_info.endpoint_path || - "/"} -
- {#if selectedMessage.routing_info.headers && Object.keys(selectedMessage.routing_info.headers).length > 0} -
- Headers: - {Object.keys(selectedMessage.routing_info.headers) - .length} header(s) -
- {/if} - {:else if consumer.type === "kafka"} -
- Topic: - {selectedMessage.routing_info.topic || "N/A"} -
-
- Message Key: - {selectedMessage.routing_info.message_key || - "N/A"} -
- {:else if consumer.type === "redis_string"} -
- Key: - {selectedMessage.routing_info.key || "N/A"} -
-
- Action: - {selectedMessage.routing_info.action || "N/A"} -
- {#if selectedMessage.routing_info.expire_ms} -
- Expire (ms): - {selectedMessage.routing_info.expire_ms} -
- {/if} - {:else if consumer.type === "redis_stream"} -
- Stream Key: - {selectedMessage.routing_info.stream_key || - "N/A"} -
- {:else if consumer.type === "nats"} -
- Subject: - {selectedMessage.routing_info.subject || "N/A"} -
- {#if selectedMessage.routing_info.headers && Object.keys(selectedMessage.routing_info.headers).length > 0} -
- Headers: - {Object.keys(selectedMessage.routing_info.headers) - .length} header(s) -
- {/if} - {:else if consumer.type === "gcp_pubsub"} -
- Topic ID: - {selectedMessage.routing_info.topic_id || - "N/A"} -
- {:else if consumer.type === "azure_event_hub"} -
- Partition Key: - {selectedMessage.routing_info.partition_key || - "N/A"} -
- {:else if consumer.type === "sns"} -
- Topic ARN: - {selectedMessage.routing_info.topic_arn || - "N/A"} -
- {:else if consumer.type === "kinesis"} -
- Stream ARN: - {selectedMessage.routing_info.stream_arn || - "N/A"} -
- {:else if consumer.type === "typesense"} -
- Collection: - {selectedMessage.routing_info.collection_name || - "N/A"} -
-
- Action: - {selectedMessage.routing_info.action || "N/A"} -
- {:else if consumer.type === "meilisearch"} -
- Index: - {selectedMessage.routing_info.index_name || - "N/A"} -
-
- Action: - {selectedMessage.routing_info.action || "N/A"} -
- {:else if consumer.type === "elasticsearch"} -
- Index: - {selectedMessage.routing_info.index_name || - "N/A"} -
- {:else if consumer.type === "s2"} -
- Basin: - {selectedMessage.routing_info.basin || "N/A"} -
-
- Stream: - {selectedMessage.routing_info.stream || "N/A"} -
- {:else if consumer.type === "rabbitmq"} -
- Exchange: - {selectedMessage.routing_info.exchange || - "N/A"} -
-
- Routing Key: - {selectedMessage.routing_info.routing_key || - "N/A"} -
- {#if selectedMessage.routing_info.headers && Object.keys(selectedMessage.routing_info.headers).length > 0} -
- Headers: - {Object.keys(selectedMessage.routing_info.headers) - .length} header(s) -
- {/if} - {:else if consumer.type === "sqs"} -
- Queue URL: - {selectedMessage.routing_info.queue_url || - "N/A"} -
+ {#if routedSinkDocs[consumer.type]} + {#each Object.entries(routedSinkDocs[consumer.type].fields) as [fieldName, fieldDoc]} + {#if selectedMessage.routing_info[fieldName] !== undefined && selectedMessage.routing_info[fieldName] !== null} + {@const field = renderRoutingField( + consumer.type, + fieldName, + selectedMessage.routing_info[fieldName], + )} +
+ + {field.label}: + + + {#if fieldName === "headers" && typeof selectedMessage.routing_info[fieldName] === "object"} + {Object.keys( + selectedMessage.routing_info[fieldName], + ).length} header(s) + {:else} + {field.value} + {/if} + +
+ {/if} + {/each} {:else if consumer.type === "sequin_stream"}