From 89e6eab9116461089e38ca50ece119685663b04d Mon Sep 17 00:00:00 2001 From: lsxliron Date: Fri, 19 Apr 2024 00:36:47 -0400 Subject: [PATCH 1/3] emitting :key_deleted when key is purged --- lib/gnat/jetstream/api/kv.ex | 29 +++++++++++++++++++++++++++- lib/gnat/jetstream/api/kv/watcher.ex | 5 +---- test/jetstream/api/kv_test.exs | 8 ++++++++ 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/lib/gnat/jetstream/api/kv.ex b/lib/gnat/jetstream/api/kv.ex index 00df70a..612ed39 100644 --- a/lib/gnat/jetstream/api/kv.ex +++ b/lib/gnat/jetstream/api/kv.ex @@ -9,6 +9,9 @@ defmodule Gnat.Jetstream.API.KV do @stream_prefix "KV_" @subject_prefix "$KV." @two_minutes_in_nanoseconds 1_200_000_000 + @operation_header "kv-operation" + @operation_del "DEL" + @operation_purge "PURGE" @type bucket_options :: {:history, non_neg_integer()} @@ -248,10 +251,34 @@ defmodule Gnat.Jetstream.API.KV do Gnat.Jetstream.API.KV.Watcher.stop(pid) end + @doc ~S""" + Returns true if operation is `{"kv-operation", "DEL"}` or `{"kv-operation", "PURGE"}` + + ## Parameters + - `headers` - a list of headers to test + + ## Example + + iex> is_delete_operation([{"kv-operation", "DEL"}]) + true + iex> is_delete_operation([{"kv-operation", "PURGE"}]) + true + iex> is_delete_operation([{"kv-operation", "ADD"}]) + true + """ + @spec is_delete_operation?(headers :: Gnat.headers()) :: boolean() + def is_delete_operation?(headers) do + headers + |> Enum.filter(fn {k, v} -> + k == @operation_header and (v == @operation_del or v == @operation_purge) + end) + |> length() > 0 + end + defp receive_keys(keys \\ %{}, bucket_name) do receive do {:msg, %{topic: key, body: body, headers: headers}} -> - if {"kv-operation", "DEL"} in headers do + if is_delete_operation?(headers) do receive_keys(keys, bucket_name) else Map.put(keys, subject_to_key(key, bucket_name), body) |> receive_keys(bucket_name) diff --git a/lib/gnat/jetstream/api/kv/watcher.ex b/lib/gnat/jetstream/api/kv/watcher.ex index 64fdc3d..19c265b 100644 --- a/lib/gnat/jetstream/api/kv/watcher.ex +++ b/lib/gnat/jetstream/api/kv/watcher.ex @@ -10,9 +10,6 @@ defmodule Gnat.Jetstream.API.KV.Watcher do alias Gnat.Jetstream.API.{Consumer, KV, Util} - @operation_header "kv-operation" - @operation_del "DEL" - @type keywatch_handler :: (action :: :key_deleted | :key_added, key :: String.t(), value :: any() -> nil) @@ -54,7 +51,7 @@ defmodule Gnat.Jetstream.API.KV.Watcher do def handle_info({:msg, %{topic: key, body: body, headers: headers}}, state) do key = KV.subject_to_key(key, state.bucket_name) - if {@operation_header, @operation_del} in headers do + if KV.is_delete_operation?(headers) do state.handler.(:key_deleted, key, body) end diff --git a/test/jetstream/api/kv_test.exs b/test/jetstream/api/kv_test.exs index 156e38f..95f482a 100644 --- a/test/jetstream/api/kv_test.exs +++ b/test/jetstream/api/kv_test.exs @@ -104,10 +104,18 @@ defmodule Gnat.Jetstream.API.KVTest do KV.put_value(:gnat, bucket, "baz", "quz") assert_receive({:key_added, "baz", "quz"}) + KV.put_value(:gnat, bucket, "quz", "qux") + assert_receive({:key_added, "quz", "qux"}) + KV.delete_key(:gnat, bucket, "baz") # key deletions don't carry the data removed assert_receive({:key_deleted, "baz", ""}) + # ensure we get delete event on purge + KV.purge_key(:gnat, bucket, "quz") + assert_receive({:key_deleted, "quz", ""}) + + KV.unwatch(watcher_pid) :ok = KV.delete_bucket(:gnat, bucket) From 69ecfdcddca6fb019d33f9167eb19a76fe68bb66 Mon Sep 17 00:00:00 2001 From: lsxliron Date: Fri, 19 Apr 2024 00:40:30 -0400 Subject: [PATCH 2/3] fixed typos --- lib/gnat/jetstream/api/kv.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/gnat/jetstream/api/kv.ex b/lib/gnat/jetstream/api/kv.ex index 612ed39..337976f 100644 --- a/lib/gnat/jetstream/api/kv.ex +++ b/lib/gnat/jetstream/api/kv.ex @@ -257,13 +257,13 @@ defmodule Gnat.Jetstream.API.KV do ## Parameters - `headers` - a list of headers to test - ## Example + ## Examples - iex> is_delete_operation([{"kv-operation", "DEL"}]) + iex> is_delete_operation?([{"kv-operation", "DEL"}]) true - iex> is_delete_operation([{"kv-operation", "PURGE"}]) + iex> is_delete_operation?([{"kv-operation", "PURGE"}]) true - iex> is_delete_operation([{"kv-operation", "ADD"}]) + iex> is_delete_operation?([{"kv-operation", "ADD"}]) true """ @spec is_delete_operation?(headers :: Gnat.headers()) :: boolean() From 4941c5bf0a2f14c37ce0f52fc68bcccd8cfcebe9 Mon Sep 17 00:00:00 2001 From: lsxliron Date: Fri, 19 Apr 2024 00:41:17 -0400 Subject: [PATCH 3/3] fixed example return value --- lib/gnat/jetstream/api/kv.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/gnat/jetstream/api/kv.ex b/lib/gnat/jetstream/api/kv.ex index 337976f..deed25e 100644 --- a/lib/gnat/jetstream/api/kv.ex +++ b/lib/gnat/jetstream/api/kv.ex @@ -264,7 +264,7 @@ defmodule Gnat.Jetstream.API.KV do iex> is_delete_operation?([{"kv-operation", "PURGE"}]) true iex> is_delete_operation?([{"kv-operation", "ADD"}]) - true + false """ @spec is_delete_operation?(headers :: Gnat.headers()) :: boolean() def is_delete_operation?(headers) do