Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions assets/svelte/consumers/ShowMessages.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
Check,
ChevronDown,
Info,
Trash2,
} from "lucide-svelte";
import * as Popover from "$lib/components/ui/popover";
import * as RadioGroup from "$lib/components/ui/radio-group";
import { Label } from "$lib/components/ui/label";
import { toast } from "svelte-sonner";
import TableWithDrawer from "$lib/components/TableWithDrawer.svelte";

Expand Down Expand Up @@ -50,6 +53,11 @@
let isResettingAll = false;
let isPopoverOpen = false;

// State for discard dialog
let isDiscardPopoverOpen = false;
let isDiscarding = false;
let discardMode: "failing" | "all" = "failing";

// Add these new state variables
let messageShapeOpen = false;
let logsOpen = true;
Expand Down Expand Up @@ -332,6 +340,20 @@
isResettingAll = false;
});
}

function handleDiscardMessages() {
isDiscardPopoverOpen = false;
isDiscarding = true;
live.pushEvent(
"discard_messages",
{ discard_type: discardMode },
(reply) => {
isDiscarding = false;
// Reset to default after operation
discardMode = "failing";
},
);
}
</script>

<TableWithDrawer
Expand Down Expand Up @@ -399,6 +421,71 @@
</Popover.Content>
</Popover.Root>

<Popover.Root bind:open={isDiscardPopoverOpen}>
<Popover.Trigger asChild let:builder>
<Button
builders={[builder]}
variant="outline"
size="sm"
disabled={totalCount === 0 || isDiscarding}
class="flex items-center space-x-2 text-red-600 hover:text-red-700 hover:bg-red-50"
>
{#if isDiscarding}
<Loader2 class="h-4 w-4 mr-1 animate-spin" />
{:else}
<Trash2 class="h-4 w-4 mr-1" />
{/if}
<span>Discard All</span>
</Button>
</Popover.Trigger>
<Popover.Content class="w-80 p-4">
<div class="grid gap-4">
<div class="space-y-2">
<h4 class="font-medium leading-none">Discard messages</h4>
<p class="text-sm text-muted-foreground">
Choose which messages to discard. This action cannot be undone.
</p>
</div>
<RadioGroup.Root bind:value={discardMode}>
<div class="flex items-center space-x-2 mb-2">
<RadioGroup.Item value="failing" id="r1" />
<Label
for="r1"
class="text-sm font-medium leading-none peer-disabled:cursor-not-allowed peer-disabled:opacity-70"
>
Discard <i>failing</i> messages
</Label>
</div>
<div class="flex items-center space-x-2">
<RadioGroup.Item value="all" id="r2" />
<Label
for="r2"
class="text-sm font-medium leading-none peer-disabled:cursor-not-allowed peer-disabled:opacity-70"
>
Discard <i>all</i> messages
</Label>
</div>
</RadioGroup.Root>
<div class="flex justify-end gap-2">
<Button
variant="outline"
size="sm"
on:click={() => (isDiscardPopoverOpen = false)}
>
Cancel
</Button>
<Button
variant="destructive"
size="sm"
on:click={handleDiscardMessages}
>
{discardMode === "failing" ? "Discard Failing" : "Discard All"}
</Button>
</div>
</div>
</Popover.Content>
</Popover.Root>

<Button
variant={paused ? "default" : "outline"}
size="sm"
Expand Down
59 changes: 59 additions & 0 deletions lib/sequin/runtime/slot_message_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,22 @@ defmodule Sequin.Runtime.SlotMessageStore do
{:error, exit_to_sequin_error(e)}
end

@doc """
Discards all messages in the message store.
"""
@spec discard_all_messages(SinkConsumer.t()) :: {:ok, non_neg_integer()} | {:error, Exception.t()}
def discard_all_messages(consumer) do
call_all_partitions(consumer, :discard_all_messages)
end

@doc """
Discards only messages that are currently in a failed state (have not_visible_until set in the future).
"""
@spec discard_failing_messages(SinkConsumer.t()) :: {:ok, non_neg_integer()} | {:error, Exception.t()}
def discard_failing_messages(consumer) do
call_all_partitions(consumer, :discard_failing_messages)
end

@doc """
Should raise so SlotProcessorServer cannot continue if this fails.
"""
Expand Down Expand Up @@ -746,6 +762,34 @@ defmodule Sequin.Runtime.SlotMessageStore do
{:reply, :ok, state}
end

@decorate track_metrics("discard_all_messages")
def handle_call(:discard_all_messages, _from, state) do
all_messages = State.all_messages(state)
all_ack_ids = Enum.map(all_messages, & &1.ack_id)

case handle_call({:messages_succeeded, all_ack_ids, false}, nil, state) do
{:reply, {:ok, count, _ack_ids}, new_state} ->
{:reply, {:ok, count}, new_state}

other ->
other
end
end

@decorate track_metrics("discard_failing_messages")
def handle_call(:discard_failing_messages, _from, state) do
failing_messages = State.failing_messages(state)
failing_ack_ids = Enum.map(failing_messages, & &1.ack_id)

case handle_call({:messages_succeeded, failing_ack_ids, false}, nil, state) do
{:reply, {:ok, count, _ack_ids}, new_state} ->
{:reply, {:ok, count}, new_state}

other ->
other
end
end

@decorate track_metrics("min_unpersisted_wal_cursor")
def handle_call({:min_unpersisted_wal_cursor, monitor_ref}, _from, state) do
if monitor_ref == state.slot_processor_monitor_ref do
Expand Down Expand Up @@ -990,6 +1034,21 @@ defmodule Sequin.Runtime.SlotMessageStore do
Error.invariant(message: "[SlotMessageStore] exited with #{inspect(e)}")
end

# Helper function to call a function on all partitions and aggregate the results
defp call_all_partitions(consumer, call_name) do
consumer
|> partitions()
|> Enum.reduce_while({:ok, 0}, fn partition, {:ok, acc_count} ->
case GenServer.call(via_tuple(consumer.id, partition), call_name) do
{:ok, count} -> {:cont, {:ok, acc_count + count}}
error -> {:halt, error}
end
end)
catch
:exit, e ->
{:error, exit_to_sequin_error(e)}
end

@decorate track_metrics("upsert_messages")
defp upsert_messages(%State{}, []), do: :ok

Expand Down
18 changes: 18 additions & 0 deletions lib/sequin/runtime/slot_message_store_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,22 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
end

defp group_id(msg), do: {msg.table_oid, msg.group_id}

@doc """
Returns all messages from the state.
"""
@spec all_messages(State.t()) :: list(message())
def all_messages(%State{} = state) do
Map.values(state.messages)
end

@doc """
Returns messages that are currently failing (have been delivered at least once).
"""
@spec failing_messages(State.t()) :: list(message())
def failing_messages(%State{} = state) do
state.messages
|> Map.values()
|> Enum.filter(fn msg -> msg.deliver_count > 0 end)
end
end
40 changes: 40 additions & 0 deletions lib/sequin_web/live/sink_consumers/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,46 @@ defmodule SequinWeb.SinkConsumersLive.Show do
end
end

def handle_event("discard_messages", %{"discard_type" => discard_type}, socket) do
consumer = socket.assigns.consumer

{result, success_message} =
case discard_type do
"failing" ->
case SlotMessageStore.discard_failing_messages(consumer) do
{:ok, count} ->
{{:ok, count}, "Discarded #{count} failing message(s)"}

error ->
{error, nil}
end

"all" ->
case SlotMessageStore.discard_all_messages(consumer) do
{:ok, count} ->
{{:ok, count}, "Discarded #{count} message(s)"}

error ->
{error, nil}
end
end

case result do
{:ok, _count} ->
{:reply, %{ok: true},
socket
|> load_consumer_messages()
|> put_flash(:toast, %{kind: :success, title: success_message})}

{:error, reason} ->
{:reply, %{ok: false},
put_flash(socket, :toast, %{
kind: :error,
title: "Failed to discard messages: #{inspect(reason)}"
})}
end
end

def handle_event("trace_stop", _params, socket) do
Trace.unsubscribe(socket.assigns.consumer.id)
{:noreply, update(socket, :trace, fn trace -> %{trace | paused: true} end)}
Expand Down
Loading
Loading