Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions assets/svelte/consumers/ShowMessages.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { Label } from "$lib/components/ui/label";
import { toast } from "svelte-sonner";
import TableWithDrawer from "$lib/components/TableWithDrawer.svelte";
import { routedSinkDocs } from "./dynamicRoutingDocs";

// Receive necessary props
export let messages: any[];
Expand Down Expand Up @@ -62,6 +63,7 @@
let messageShapeOpen = false;
let logsOpen = true;
let transformedMessageOpen = false;
let routingOpen = false;

// Add computed property for message delivery state
$: isMessageDelivered =
Expand Down Expand Up @@ -341,6 +343,75 @@
});
}

function formatRoutingInfo(routingInfo) {
if (!routingInfo) return "N/A";

if (routingInfo.error) {
return "Error";
}

const consumerType = consumer.type;

// Handle sequin_stream as a special case since it's not in routedSinkDocs
if (consumerType === "sequin_stream") {
return "Sequin Stream";
}

// Use routedSinkDocs to get the primary field for display
const sinkDocs = routedSinkDocs[consumerType];
if (!sinkDocs) {
return "N/A";
}

// Special handling for sinks that benefit from showing multiple fields
if (consumerType === "http_push") {
const method = routingInfo.method || "POST";
const path = routingInfo.endpoint_path || "/";
return `${method} ${path}`;
}

if (consumerType === "rabbitmq") {
const exchange = routingInfo.exchange || "N/A";
const routingKey = routingInfo.routing_key || "N/A";
return `${exchange}/${routingKey}`;
}

// For other sinks, find the first field that has a value in routingInfo
const fieldName = Object.keys(sinkDocs.fields).find(
(field) =>
routingInfo[field] !== undefined && routingInfo[field] !== null,
);

if (fieldName) {
return routingInfo[fieldName];
}

// Fallback: try to find any field with a value
const anyField = Object.keys(routingInfo).find(
(field) =>
routingInfo[field] !== undefined && routingInfo[field] !== null,
);

return anyField ? routingInfo[anyField] : "N/A";
}

function renderRoutingField(
consumerType: string,
fieldName: string,
fieldValue: any,
): { label: string; value: string; description: string } {
const sinkDocs = routedSinkDocs[consumerType];
const fieldDoc = sinkDocs?.fields[fieldName];

const label =
fieldDoc?.label ||
fieldName.replace(/_/g, " ").replace(/\b\w/g, (l) => l.toUpperCase());

const description = fieldDoc?.description || "";
const value = fieldValue || "N/A";

return { label, value, description };

function handleDiscardMessages() {
isDiscardPopoverOpen = false;
isDiscarding = true;
Expand Down Expand Up @@ -523,6 +594,11 @@
>
Table
</th>
<th
class="px-2 py-1 text-left text-2xs font-medium text-gray-500 uppercase tracking-wider"
>
Routing
</th>
<th
class="px-2 py-1 text-left text-2xs font-medium text-gray-500 uppercase tracking-wider"
>
Expand Down Expand Up @@ -568,6 +644,9 @@
<td class="px-2 py-1 whitespace-nowrap text-2xs text-gray-500"
>{item.table_schema}.{item.table_name}</td
>
<td class="px-2 py-1 whitespace-nowrap text-2xs text-gray-500">
{formatRoutingInfo(item.routing_info)}
</td>
<td class="px-2 py-1 whitespace-nowrap text-2xs">{item.record_pks}</td>
<td
class="px-2 py-1 whitespace-nowrap text-2xs"
Expand Down Expand Up @@ -699,6 +778,97 @@
</div>
</div>

<!-- Routing Section -->
<div class="border rounded-lg">
<button
class="w-full px-4 py-2 flex flex-col items-start"
on:click={() => !isMessageDelivered && (routingOpen = !routingOpen)}
class:opacity-50={isMessageDelivered}
class:cursor-not-allowed={isMessageDelivered}
>
<div class="w-full flex justify-between items-center">
<span class="font-medium">Routing</span>
{#if !isMessageDelivered}
<div class="flex items-center gap-2">
<div class:rotate-180={routingOpen}>
<ChevronDown
class="h-4 w-4 transform transition-transform"
/>
</div>
</div>
{/if}
</div>
{#if isMessageDelivered}
<span class="text-sm text-gray-500 mt-1">
Routing information is not available for acknowledged messages
</span>
{/if}
</button>

{#if routingOpen && !isMessageDelivered}
<div class="px-4 pb-4">
{#if selectedMessage.routing_info}
{#if selectedMessage.routing_info.error}
<div class="text-sm text-red-600">
Error: {selectedMessage.routing_info.error}
</div>
{:else}
<div class="space-y-2">
{#if routedSinkDocs[consumer.type]}
{#each Object.entries(routedSinkDocs[consumer.type].fields) as [fieldName, fieldDoc]}
{#if selectedMessage.routing_info[fieldName] !== undefined && selectedMessage.routing_info[fieldName] !== null}
{@const field = renderRoutingField(
consumer.type,
fieldName,
selectedMessage.routing_info[fieldName],
)}
<div class="flex justify-between items-center">
<span
class="text-sm font-medium text-gray-500"
title={field.description}
>
{field.label}:
</span>
<span class="text-sm text-gray-900">
{#if fieldName === "headers" && typeof selectedMessage.routing_info[fieldName] === "object"}
{Object.keys(
selectedMessage.routing_info[fieldName],
).length} header(s)
{:else}
{field.value}
{/if}
</span>
</div>
{/if}
{/each}
{:else if consumer.type === "sequin_stream"}
<div class="flex justify-between items-center">
<span class="text-sm font-medium text-gray-500"
>Stream:</span
>
<span class="text-sm text-gray-900">Sequin Stream</span>
</div>
{:else}
<div class="text-sm text-gray-500">
<pre
class="bg-gray-100 p-2 rounded text-xs overflow-x-auto">{JSON.stringify(
selectedMessage.routing_info,
null,
2,
)}</pre>
</div>
{/if}
</div>
{/if}
{:else}
<div class="text-sm text-gray-500">
No routing information available
</div>
{/if}
</div>
{/if}
</div>

<!-- Message Shape Accordion -->
<div class="border rounded-lg">
<button
Expand Down
6 changes: 5 additions & 1 deletion assets/svelte/functions/Edit.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,11 @@ Please help me create or modify the Elixir function transform to achieve the des
<h2 class="text-lg font-semibold tracking-tight">
Function Configuration
</h2>
<a href={functionDocsUrl} target="_blank">
<a
href={functionDocsUrl ||
"https://sequinstream.com/docs/reference/transforms"}
target="_blank"
>
<Button variant="outline" size="sm">
<BookText class="h-3 w-3 mr-1" />
Docs
Expand Down
20 changes: 17 additions & 3 deletions lib/sequin_web/live/sink_consumers/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,7 @@ defmodule SequinWeb.SinkConsumersLive.Show do

defp encode_message(consumer, message) do
state = get_message_state(consumer, message)
routing_info = maybe_routing_info(consumer, message)

case message do
%ConsumerRecord{} = message ->
Expand All @@ -1247,7 +1248,8 @@ defmodule SequinWeb.SinkConsumersLive.Show do
table_schema: message.data.metadata.table_schema,
table_oid: message.table_oid,
trace_id: message.replication_message_trace_id,
functioned_message: maybe_function_message(consumer, message)
functioned_message: maybe_function_message(consumer, message),
routing_info: routing_info
}

%ConsumerEvent{} = message ->
Expand All @@ -1271,7 +1273,8 @@ defmodule SequinWeb.SinkConsumersLive.Show do
table_schema: message.data.metadata.table_schema,
table_oid: message.table_oid,
trace_id: message.replication_message_trace_id,
functioned_message: maybe_function_message(consumer, message)
functioned_message: maybe_function_message(consumer, message),
routing_info: routing_info
}

%AcknowledgedMessage{} = message ->
Expand All @@ -1295,7 +1298,8 @@ defmodule SequinWeb.SinkConsumersLive.Show do
table_name: message.table_name,
table_schema: message.table_schema,
table_oid: message.table_oid,
trace_id: message.trace_id
trace_id: message.trace_id,
routing_info: routing_info
}
end
end
Expand All @@ -1310,6 +1314,16 @@ defmodule SequinWeb.SinkConsumersLive.Show do
"Error functioning message: #{Exception.message(error)}"
end

defp maybe_routing_info(_consumer, %AcknowledgedMessage{}), do: nil

defp maybe_routing_info(consumer, message) do
routing_info = Sequin.Runtime.Routing.route_message(consumer, message)
Map.from_struct(routing_info)
rescue
error ->
%{error: "Error calculating routing: #{Exception.message(error)}"}
end

defp get_message_state(%{type: :sequin_stream}, %AcknowledgedMessage{}), do: "acknowledged"
defp get_message_state(_consumer, %AcknowledgedMessage{state: "discarded"}), do: "discarded"
defp get_message_state(_consumer, %AcknowledgedMessage{}), do: "delivered"
Expand Down
Loading