diff --git a/lib/gnat/jetstream/api/consumer.ex b/lib/gnat/jetstream/api/consumer.ex index e24002d..10e8411 100644 --- a/lib/gnat/jetstream/api/consumer.ex +++ b/lib/gnat/jetstream/api/consumer.ex @@ -113,7 +113,8 @@ defmodule Gnat.Jetstream.API.Consumer do deliver_policy: :all, max_ack_pending: 20_000, max_deliver: -1, - replay_policy: :instant + replay_policy: :instant, + pause_until: nil ] @type t :: %__MODULE__{ @@ -142,7 +143,8 @@ defmodule Gnat.Jetstream.API.Consumer do opt_start_time: nil | DateTime.t(), rate_limit_bps: nil | non_neg_integer(), replay_policy: :instant | :original, - sample_freq: nil | binary() + sample_freq: nil | binary(), + pause_until: nil | DateTime.t() } @type info :: %{ @@ -177,7 +179,9 @@ defmodule Gnat.Jetstream.API.Consumer do num_redelivered: non_neg_integer(), num_waiting: non_neg_integer(), push_bound: nil | boolean(), - stream_name: binary() + stream_name: binary(), + paused: boolean(), + pause_remaining: nil | Time.t() # TODO: Verify type here } @type config :: %{ @@ -275,6 +279,34 @@ defmodule Gnat.Jetstream.API.Consumer do end end + @doc """ + Pause a consumer until a specific time. + + ## Examples + + iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]}) + iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"}) + iex> pause_until = DateTime.add(DateTime.utc_now(), 1, :hour) + iex> Gnat.Jetstream.API.Consumer.pause(:gnat, "astream", "consumer", pause_until) + :ok + + """ + @spec pause( + conn :: Gnat.t(), + stream_name :: binary(), + consumer_name :: binary(), + pause_until :: DateTime.t(), + opts :: [domain: nil | binary()] + ) :: + :ok | {:error, any()} +def pause(conn, stream_name, consumer_name, pause_until, opts \\ []) do + topic = "#{js_api(opts[:domain])}.CONSUMER.PAUSE.#{stream_name}.#{consumer_name}" + payload = Jason.encode!(%{ pause_until: pause_until }) + with {:ok, _response} <- request(conn, topic, payload) do + :ok + end +end + @doc """ Information about the consumer.