Skip to content

Commit

Permalink
utilize connection_id in publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
radwo committed Nov 13, 2023
1 parent 4b71c2f commit deebe4c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 32 deletions.
19 changes: 13 additions & 6 deletions lib/tackle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,38 @@ defmodule Tackle do
exchange = options[:exchange]
routing_key = options[:routing_key]
exchange_opts = options[:exchange_opts] || []
connection_id = options[:connection_id] || :default

{_exchange_type, exchange_name} =
exchange
|> Tackle.Util.parse_exchange()

Logger.debug("Connecting to '#{Tackle.Util.scrub_url(url)}'")
{:ok, connection} = Tackle.Connection.open(url)
{:ok, connection} = Tackle.Connection.open(connection_id, url)
channel = Tackle.Channel.create(connection)

try do
Tackle.Exchange.create(channel, exchange, exchange_opts)
Tackle.Exchange.publish(channel, exchange_name, message, routing_key)
after
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
Tackle.Util.cleanup(connection_id, connection, channel)
end
end

def republish(options) do
url = options[:url]
queue = options[:queue]
exchange = options[:exchange]
exchange_name = options[:exchange]
routing_key = options[:routing_key]
count = options[:count] || 1
connection_id = options[:connection_id] || :default

{:ok, connection} = Tackle.Connection.open(connection_id, url)
channel = Tackle.Channel.create(connection)

Tackle.Republisher.republish(url, queue, exchange, routing_key, count)
try do
Tackle.Republisher.republish(url, queue, exchange_name, routing_key, count)
after
Tackle.Util.cleanup(connection_id, connection, channel)
end
end
end
13 changes: 12 additions & 1 deletion lib/tackle/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,18 @@ defmodule Tackle.Connection do
open_(name, url)
end

def open(url), do: AMQP.Connection.open(url)
def open(url) do
Logger.debug("Connecting to '#{scrub_url(url)}'")

AMQP.Connection.open(url)
end

defp scrub_url(url) do
url
|> URI.parse()
|> Map.put(:userinfo, nil)
|> URI.to_string()
end

@doc """
Get a list of opened connections
Expand Down
4 changes: 3 additions & 1 deletion lib/tackle/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ defmodule Tackle.Consumer do
delay_queue: delay_queue,
dead_queue: dead_queue,
retry_limit: retry_limit,
consumer_tag: consumer_tag
consumer_tag: consumer_tag,
connection_id: connection_id
}

{:ok, state}
Expand Down Expand Up @@ -189,6 +190,7 @@ defmodule Tackle.Consumer do
retry_count = Tackle.DelayedRetry.retry_count_from_headers(headers)

options = [
connection_id: state.connection_id,
persistent: true,
headers: [
retry_count: retry_count + 1
Expand Down
7 changes: 3 additions & 4 deletions lib/tackle/delayed_retry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ defmodule Tackle.DelayedRetry do
def retry_count_from_headers([_ | tail]), do: retry_count_from_headers(tail)

def publish(url, queue, payload, options) do
Logger.debug("Connecting to '#{Tackle.Util.scrub_url(url)}'")
connection_id = options[:connection_id] || :default

{:ok, connection} = Tackle.Connection.open(url)
{:ok, connection} = Tackle.Connection.open(connection_id, url)
{:ok, channel} = Channel.open(connection)

try do
:ok = AMQP.Basic.publish(channel, "", queue, payload, options)
after
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
Tackle.Util.cleanup(connection_id, connection, channel)
end
end
end
23 changes: 13 additions & 10 deletions lib/tackle/republisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ defmodule Tackle.Republisher do
use AMQP
require Logger

def republish(url, queue, exchange, routing_key, count) do
Logger.debug("Connecting to '#{Tackle.Util.scrub_url(url)}'")

{:ok, connection} = Tackle.Connection.open(url)
@deprecated "Use Tackle.republish/1 instead"
def republish(url, queue, exchange, routing_key, count) when is_binary(url) do
connection_id = :default
{:ok, connection} = Tackle.Connection.open(connection_id, url)
channel = Tackle.Channel.create(connection)

try do
0..(count - 1)
|> Enum.each(fn idx ->
republish_one_message(channel, queue, exchange, routing_key, idx)
end)
republish(channel, queue, exchange, routing_key, count)
after
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
Tackle.Util.cleanup(connection_id, connection, channel)
end
end

def republish(channel, queue, exchange, routing_key, count) do
0..(count - 1)
|> Enum.each(fn idx ->
republish_one_message(channel, queue, exchange, routing_key, idx)
end)
end

defp republish_one_message(channel, queue, exchange, routing_key, idx) do
Logger.info("(#{idx}) Fetching message... from '#{inspect(queue)}' queue")

Expand Down
12 changes: 7 additions & 5 deletions lib/tackle/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ defmodule Tackle.Util do
end
end

def scrub_url(url) do
url
|> URI.parse()
|> Map.put(:userinfo, nil)
|> URI.to_string()
def cleanup(:default, connection, channel) do
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
end

def cleanup(_, _, channel) do
AMQP.Channel.close(channel)
end
end
7 changes: 2 additions & 5 deletions test/integration/republish_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ defmodule Tackle.RepublishTest do
#

MessageTrace.clear("fixed-service")
{:ok, _} = FixedConsumer.start_link()
{:ok, fixed_consumer} = FixedConsumer.start_link()
:timer.sleep(1000)

Tackle.republish(%{
Expand All @@ -99,13 +99,10 @@ defmodule Tackle.RepublishTest do
count: 2
})

GenServer.stop(fixed_consumer)
:timer.sleep(2000)
end

# Since bumping the `amqp` dependency from 1.1.0 - the process is not connecting fast enough to the queue.
# This causes the test to fail. I'm not sure why this is happening, but I'm skipping the test for now.
@tag :skip
@tag :fixme
test "consumes only two messages" do
assert MessageTrace.content("fixed-service") == "Hi there!"
end
Expand Down

0 comments on commit deebe4c

Please sign in to comment.