-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Share MQTT connections between processes #1287
Conversation
Refactored Carrier.Messaging.Connection so multiple processes can share a single connection. The connection keeps track of subscribers and GenMqtt-style reply endpoints and ensures correct delivery of messages. These changes are in preparation for the new executor design based on GenStage. Attempting to share a single connection between stages for the same pipeline didn't integrate well with GenMqtt causing Cog to use transient MQTT connections and incurring a small, but noticeable, performance penalty. Centralizing connection sharing logic in the connection itself resolves all previous problems and results in a more testable design compared to my previous hackish attempt.
41ed2f4
to
757dd9c
Compare
"name" => "permission"}]}} = json_response(conn, 200) | ||
assert Enum.count(commands) == 2 | ||
command_names = [Map.get(Enum.at(commands, 0), "name"), Map.get(Enum.at(commands, 1), "name")] | ||
assert command_names == ["bar", "foo"] or command_names == ["foo", "bar"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mabye this instead of lines 73-75?
assert ["bar", "foo"] == commands |> Enum.map(&(&1["name"])) |> Enum.sort
""" | ||
|
||
@internal_mq_username Cog.Util.Misc.internal_mq_username | ||
@default_connect_timeout 5000 # 5 seconds | ||
@default_log_level :error | ||
|
||
defstruct [:id, :conn, :call_reply] | ||
defstruct [:conn, :tracker, :owner] | ||
|
||
# Note: This type is what we get from emqttc; if we change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably change this comment, as it's no longer "what we get from emqttc".
@@ -47,14 +50,85 @@ defmodule Carrier.Messaging.Connection do | |||
|
|||
""" | |||
# Again, this spec is what comes from emqttc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto for this comment
@spec call(conn::connection(), topic::String.t, endpoint::String.t, message::Map.t, opts::call_opts()) :: Map.t | {:error, atom()} | ||
def call(conn, topic, endpoint, message, opts \\ []) do | ||
subscriber = Keyword.get(opts, :subscriber, self()) | ||
timeout = Keyword.get(opts, :timeout, 5000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we replace 5000
with a @default_timeout
attribute, defined up at the top of the file, like we do with @default_connect_timeout
?
""" | ||
@spec cast(conn::connection(), topic::String.t, endpoint::String.t, message::Map.t) :: :ok | {:error, atom()} | ||
def cast(conn, topic, endpoint, message) do | ||
GenServer.call(conn, {:cast, topic, endpoint, message}, :infinity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A cast
that is a call
with an :infinity
timeout? I think my brain just broke.
EDIT: and :infinity
actually means an hour? I'm very confused now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 hour is effectively infinity :)
The design predates this PR. Let's tackle it separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good... this was just the first time I noticed infinity = 1 hour.
Some infinities are larger than other infinities, though, so it's cool 👅
|> unmonitor_subscriber(subscriber) | ||
end | ||
|
||
@spec unused?(Tracker.t) :: boolean() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like this is used in non-testing code... could this become a testing helper instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be used with GenStage code. I want connections to close themselves when they aren't being used. I tried enabling the behavior now but it broke tests since the connection would close itself when all the subscriptions were removed during test teardown.
|
||
@spec find_subscribers(Tracker.t, String.t) :: [] | [pid()] | ||
def find_subscribers(%__MODULE__{subscriptions: subs}, topic) do | ||
Enum.reduce(subs, [], &(find_matching_subscriptions(&1, topic, &2))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could simplify find_matching_subscriptions
(and this) by rewriting in terms of Enum.flat_map
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this some I'm not sure the simplification is worth loss of efficiency. I'm assuming find_subscribers/2
will be called very frequently.
iiuc, I think using an empty list to represent an empty result is potentially inefficient by performing unnecessary allocations and collections:
Enum.flat_map(subs, &(find_matching_subscriptions(&1, topic)))
defp find_matching_subscriptions({_, {matcher, subscribed}}, topic) do
if Regex.match?(matcher, topic) do
subscribed
else
# This is the inefficient bit as it allocates an empty list here
# and a new list in the caller. The caller's list allocation is required
# to handle appending the returned from here to the end of the accumulator
[]
end
end
In contrast Enum.reduce
incurs the allocation and subsequent collection overhead only when a topic is matched.
end | ||
|
||
defp has_subscriptions?(tracker, subscriber) do | ||
if Enum.reduce_while(tracker.reply_endpoints, false, &(has_endpoint(&1, subscriber, &2))) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since reply_endpoints
is bidirectional, could this be replaced with Map.has_key(reply_endpoints, subscriber)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah. Whoops.
if Enum.reduce_while(tracker.reply_endpoints, false, &(has_endpoint(&1, subscriber, &2))) do | ||
true | ||
else | ||
Enum.reduce_while(tracker.subscriptions, false, &(has_any_subscription(&1, subscriber, &2))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be simplified if recast in terms of Enum.any?
|
||
defp delete_subscription(topic, subscriber, subs) do | ||
case Map.get(subs, topic) do | ||
{_, []} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If nobody is subscribed, why do we keep the entry?
461cdde
to
66baaaa
Compare
Refactored
Carrier.Messaging.Connection
so multiple Erlang/Elixir processes can sharea single connection. The connection keeps track of subscribers and GenMqtt-style reply endpoints and ensures correct delivery of messages including support for
+
and*
topic wildcards.Connection
places monitors on all subscribing processes allowing it to clean up subscriptions after subscribers crash.These changes are in preparation for the new executor design based on GenStage. Attempting to share a single connection between stages for the same pipeline didn't integrate well with GenMqtt causing Cog to use transient MQTT connections and incurring a small, but noticeable,
performance penalty.
Centralizing connection sharing logic in the connection itself resolves all previous problems and results in a more testable design compared to my earlier attempts at layering multiplexing on top of
Connection
.