diff --git a/lib/gnat/jetstream/api/kv.ex b/lib/gnat/jetstream/api/kv.ex index 00df70a..deed25e 100644 --- a/lib/gnat/jetstream/api/kv.ex +++ b/lib/gnat/jetstream/api/kv.ex @@ -9,6 +9,9 @@ defmodule Gnat.Jetstream.API.KV do @stream_prefix "KV_" @subject_prefix "$KV." @two_minutes_in_nanoseconds 1_200_000_000 + @operation_header "kv-operation" + @operation_del "DEL" + @operation_purge "PURGE" @type bucket_options :: {:history, non_neg_integer()} @@ -248,10 +251,34 @@ defmodule Gnat.Jetstream.API.KV do Gnat.Jetstream.API.KV.Watcher.stop(pid) end + @doc ~S""" + Returns true if operation is `{"kv-operation", "DEL"}` or `{"kv-operation", "PURGE"}` + + ## Parameters + - `headers` - a list of headers to test + + ## Examples + + iex> is_delete_operation?([{"kv-operation", "DEL"}]) + true + iex> is_delete_operation?([{"kv-operation", "PURGE"}]) + true + iex> is_delete_operation?([{"kv-operation", "ADD"}]) + false + """ + @spec is_delete_operation?(headers :: Gnat.headers()) :: boolean() + def is_delete_operation?(headers) do + headers + |> Enum.filter(fn {k, v} -> + k == @operation_header and (v == @operation_del or v == @operation_purge) + end) + |> length() > 0 + end + defp receive_keys(keys \\ %{}, bucket_name) do receive do {:msg, %{topic: key, body: body, headers: headers}} -> - if {"kv-operation", "DEL"} in headers do + if is_delete_operation?(headers) do receive_keys(keys, bucket_name) else Map.put(keys, subject_to_key(key, bucket_name), body) |> receive_keys(bucket_name) diff --git a/lib/gnat/jetstream/api/kv/watcher.ex b/lib/gnat/jetstream/api/kv/watcher.ex index 64fdc3d..19c265b 100644 --- a/lib/gnat/jetstream/api/kv/watcher.ex +++ b/lib/gnat/jetstream/api/kv/watcher.ex @@ -10,9 +10,6 @@ defmodule Gnat.Jetstream.API.KV.Watcher do alias Gnat.Jetstream.API.{Consumer, KV, Util} - @operation_header "kv-operation" - @operation_del "DEL" - @type keywatch_handler :: (action :: :key_deleted | :key_added, key :: String.t(), value :: any() -> nil) @@ -54,7 +51,7 @@ defmodule Gnat.Jetstream.API.KV.Watcher do def handle_info({:msg, %{topic: key, body: body, headers: headers}}, state) do key = KV.subject_to_key(key, state.bucket_name) - if {@operation_header, @operation_del} in headers do + if KV.is_delete_operation?(headers) do state.handler.(:key_deleted, key, body) end diff --git a/test/jetstream/api/kv_test.exs b/test/jetstream/api/kv_test.exs index 156e38f..95f482a 100644 --- a/test/jetstream/api/kv_test.exs +++ b/test/jetstream/api/kv_test.exs @@ -104,10 +104,18 @@ defmodule Gnat.Jetstream.API.KVTest do KV.put_value(:gnat, bucket, "baz", "quz") assert_receive({:key_added, "baz", "quz"}) + KV.put_value(:gnat, bucket, "quz", "qux") + assert_receive({:key_added, "quz", "qux"}) + KV.delete_key(:gnat, bucket, "baz") # key deletions don't carry the data removed assert_receive({:key_deleted, "baz", ""}) + # ensure we get delete event on purge + KV.purge_key(:gnat, bucket, "quz") + assert_receive({:key_deleted, "quz", ""}) + + KV.unwatch(watcher_pid) :ok = KV.delete_bucket(:gnat, bucket)