Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add expiration and message retention options to Subscription #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions lib/kane/subscription.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
defmodule Kane.Subscription do
defstruct name: nil, topic: nil, ack_deadline: 10, filter: nil
defstruct name: nil,
topic: nil,
ack_deadline: 10,
filter: nil,
expires_in: nil,
message_retention_duration: nil

alias Kane.Topic
alias Kane.Message
alias Kane.Client
Expand Down Expand Up @@ -112,19 +118,29 @@ defmodule Kane.Subscription do
end
end

def data(%__MODULE__{ack_deadline: ack, topic: %Topic{} = topic, filter: nil}, :create) do
%{
"topic" => Topic.full_name(topic),
"ackDeadlineSeconds" => ack
}
end
def data(%__MODULE__{} = sub, :create) do
sub
|> Map.from_struct()
|> Enum.map(fn
{:topic, topic} ->
{"topic", Topic.full_name(topic)}

def data(%__MODULE__{ack_deadline: ack, topic: %Topic{} = topic, filter: filter}, :create) do
%{
"topic" => Topic.full_name(topic),
"ackDeadlineSeconds" => ack,
"filter" => filter
}
{:ack_deadline, ack} ->
{"ackDeadlineSeconds", ack}

{:filter, filter} when is_binary(filter) ->
{"filter", filter}

{:expires_in, expires_in} when is_integer(expires_in) ->
{"expirationPolicy", %{"ttl" => "#{expires_in}s"}}

{:message_retention_duration, retention} when is_integer(retention) ->
{"messageRetentionDuration", "#{retention}s"}

{key_to_drop, _v} -> {key_to_drop, nil}
end)
|> Enum.reject(fn {_k, value} -> is_nil(value) end)
|> Map.new()
end

def data(%__MODULE__{}, :pull, options) do
Expand Down Expand Up @@ -180,10 +196,20 @@ defmodule Kane.Subscription do
name: strip!(subscription_name),
ack_deadline: Map.get(data, "ackDeadlineSeconds"),
topic: %Topic{name: Topic.strip!(topic_name)},
filter: Map.get(data, "filter")
filter: Map.get(data, "filter"),
message_retention_duration: data |> Map.get("messageRetentionDuration") |> parse_seconds(),
expires_in: data |> get_in(["expirationPolicy", "ttl"]) |> parse_seconds()
}
end

def parse_seconds(nil), do: nil

def parse_seconds(string) when is_binary(string) do
{seconds, "s"} = Integer.parse(string)

seconds
end

defp http_options(options) do
case Keyword.get(options, :return_immediately, true) do
false -> [recv_timeout: :infinity]
Expand Down
56 changes: 56 additions & 0 deletions test/kane/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,60 @@ defmodule Kane.SubscriptionTest do
}} = Subscription.create(sub)
end

test "includes expiration keys when creating a subscription", %{
bypass: bypass
} do
name = "create-sub"
topic = "topic-to-sub"
expires_in = 24 |> :timer.hours() |> to_seconds()
message_retention = 10 |> :timer.minutes() |> to_seconds()

sub = %Subscription{
name: name,
topic: %Topic{name: topic},
expires_in: expires_in,
message_retention_duration: message_retention
}

sname = Subscription.full_name(sub)
tname = Topic.full_name(sub.topic)

Bypass.expect(bypass, fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)

assert body ==
%{
"topic" => tname,
"ackDeadlineSeconds" => sub.ack_deadline,
"messageRetentionDuration" => "#{message_retention}s",
"expirationPolicy" => %{
"ttl" => "#{expires_in}s"
}
}
|> Jason.encode!()

assert conn.method == "PUT"
assert_content_type(conn, "application/json")

Plug.Conn.send_resp(conn, 201, ~s({
"name": "#{sname}",
"topic": "#{tname}",
"ackDeadlineSeconds": 10,
"messageRetentionDuration": "#{message_retention}s",
"expirationPolicy": {"ttl": "#{expires_in}s"}
}))
end)

assert {:ok,
%Subscription{
topic: %Topic{name: ^topic},
name: ^name,
ack_deadline: 10,
expires_in: ^expires_in,
message_retention_duration: ^message_retention
}} = Subscription.create(sub)
end

test "deleting a subscription", %{bypass: bypass, project: project} do
name = "delete-me"

Expand Down Expand Up @@ -313,4 +367,6 @@ defmodule Kane.SubscriptionTest do

assert String.contains?(content_type, type)
end

defp to_seconds(milliseconds), do: div(milliseconds, 1000)
end