diff --git a/assets/svelte/consumers/ShowMessages.svelte b/assets/svelte/consumers/ShowMessages.svelte index 1107dd44b..87ab76700 100644 --- a/assets/svelte/consumers/ShowMessages.svelte +++ b/assets/svelte/consumers/ShowMessages.svelte @@ -17,6 +17,7 @@ import { Label } from "$lib/components/ui/label"; import { toast } from "svelte-sonner"; import TableWithDrawer from "$lib/components/TableWithDrawer.svelte"; + import { routedSinkDocs } from "./dynamicRoutingDocs"; // Receive necessary props export let messages: any[]; @@ -62,6 +63,7 @@ let messageShapeOpen = false; let logsOpen = true; let transformedMessageOpen = false; + let routingOpen = false; // Add computed property for message delivery state $: isMessageDelivered = @@ -341,6 +343,75 @@ }); } + function formatRoutingInfo(routingInfo) { + if (!routingInfo) return "N/A"; + + if (routingInfo.error) { + return "Error"; + } + + const consumerType = consumer.type; + + // Handle sequin_stream as a special case since it's not in routedSinkDocs + if (consumerType === "sequin_stream") { + return "Sequin Stream"; + } + + // Use routedSinkDocs to get the primary field for display + const sinkDocs = routedSinkDocs[consumerType]; + if (!sinkDocs) { + return "N/A"; + } + + // Special handling for sinks that benefit from showing multiple fields + if (consumerType === "http_push") { + const method = routingInfo.method || "POST"; + const path = routingInfo.endpoint_path || "/"; + return `${method} ${path}`; + } + + if (consumerType === "rabbitmq") { + const exchange = routingInfo.exchange || "N/A"; + const routingKey = routingInfo.routing_key || "N/A"; + return `${exchange}/${routingKey}`; + } + + // For other sinks, find the first field that has a value in routingInfo + const fieldName = Object.keys(sinkDocs.fields).find( + (field) => + routingInfo[field] !== undefined && routingInfo[field] !== null, + ); + + if (fieldName) { + return routingInfo[fieldName]; + } + + // Fallback: try to find any field with a value + const anyField = Object.keys(routingInfo).find( + (field) => + routingInfo[field] !== undefined && routingInfo[field] !== null, + ); + + return anyField ? routingInfo[anyField] : "N/A"; + } + + function renderRoutingField( + consumerType: string, + fieldName: string, + fieldValue: any, + ): { label: string; value: string; description: string } { + const sinkDocs = routedSinkDocs[consumerType]; + const fieldDoc = sinkDocs?.fields[fieldName]; + + const label = + fieldDoc?.label || + fieldName.replace(/_/g, " ").replace(/\b\w/g, (l) => l.toUpperCase()); + + const description = fieldDoc?.description || ""; + const value = fieldValue || "N/A"; + + return { label, value, description }; + function handleDiscardMessages() { isDiscardPopoverOpen = false; isDiscarding = true; @@ -523,6 +594,11 @@ > Table + + Routing + @@ -568,6 +644,9 @@ {item.table_schema}.{item.table_name} + + {formatRoutingInfo(item.routing_info)} + {item.record_pks} + +
+ + + {#if routingOpen && !isMessageDelivered} +
+ {#if selectedMessage.routing_info} + {#if selectedMessage.routing_info.error} +
+ Error: {selectedMessage.routing_info.error} +
+ {:else} +
+ {#if routedSinkDocs[consumer.type]} + {#each Object.entries(routedSinkDocs[consumer.type].fields) as [fieldName, fieldDoc]} + {#if selectedMessage.routing_info[fieldName] !== undefined && selectedMessage.routing_info[fieldName] !== null} + {@const field = renderRoutingField( + consumer.type, + fieldName, + selectedMessage.routing_info[fieldName], + )} +
+ + {field.label}: + + + {#if fieldName === "headers" && typeof selectedMessage.routing_info[fieldName] === "object"} + {Object.keys( + selectedMessage.routing_info[fieldName], + ).length} header(s) + {:else} + {field.value} + {/if} + +
+ {/if} + {/each} + {:else if consumer.type === "sequin_stream"} +
+ Stream: + Sequin Stream +
+ {:else} +
+
{JSON.stringify(
+                            selectedMessage.routing_info,
+                            null,
+                            2,
+                          )}
+
+ {/if} +
+ {/if} + {:else} +
+ No routing information available +
+ {/if} +
+ {/if} +
+