-
Notifications
You must be signed in to change notification settings - Fork 164
Consumer Group #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer Group #195
Conversation
|
Hello, @dcuddeback! This is your first Pull Request that will be reviewed by Ebert, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information. |
| defp sync_follower(%State{worker_name: worker_name, session_timeout: session_timeout, | ||
| group_name: group_name, generation_id: generation_id, member_id: member_id} = state) do | ||
| KafkaEx.sync_group(group_name, generation_id, member_id, [], timeout: session_timeout + 5000, worker_name: worker_name) | ||
| |> update_assignments(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a function call when a pipeline is only one function long
| assignments = assign_partitions(state, members) | ||
|
|
||
| KafkaEx.sync_group(group_name, generation_id, member_id, assignments, timeout: session_timeout + 5000, worker_name: worker_name) | ||
| |> update_assignments(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a function call when a pipeline is only one function long
| case Elixir.Supervisor.start_link(__MODULE__, {consumer_module, group_name, assignments, opts}) do | ||
| {:ok, pid} -> | ||
| Enum.each(assignments, fn ({topic, partition}) -> | ||
| case Elixir.Supervisor.start_child(pid, [topic, partition, opts]) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function body is nested too deep (max depth is 2, was 3).
lib/kafka_ex/gen_consumer.ex
Outdated
|
|
||
| {:commit, new_state} -> | ||
| %State{state | consumer_state: new_state, acked_offset: offset + 1, current_offset: offset + 1} | ||
| |> commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a function call when a pipeline is only one function long
lib/kafka_ex/gen_consumer.ex
Outdated
| case consumer_module.handle_message(message, consumer_state) do | ||
| {:ack, new_state} -> | ||
| %State{state | consumer_state: new_state, acked_offset: offset + 1, current_offset: offset + 1} | ||
| |> auto_commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a function call when a pipeline is only one function long
|
|
||
| def assign_partitions(members, partitions) do | ||
| Enum.zip(Stream.cycle(members), partitions) | ||
| |> Enum.group_by(&(elem(&1, 0)), &(elem(&1, 1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a function call when a pipeline is only one function long
| end | ||
| end | ||
|
|
||
| defmodule Supervisor do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modules should have a @moduledoc tag.
lib/kafka_ex/server_0_p_8_p_2.ex
Outdated
| def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" | ||
| def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_join_group(_, _, _, _, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_sync_group(_, _, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function takes too many parameters (arity is 6, max is 5).
lib/kafka_ex/server_0_p_8_p_2.ex
Outdated
| def kafka_server_sync_group(_, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" | ||
| def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_join_group(_, _, _, _, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function takes too many parameters (arity is 6, max is 5).
lib/kafka_ex/server_0_p_9_p_0.ex
Outdated
| end | ||
|
|
||
| def kafka_server_sync_group(group_name, generation_id, member_id, assignments, state) do | ||
| def kafka_server_sync_group(group_name, generation_id, member_id, assignments, network_timeout, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function takes too many parameters (arity is 6, max is 5).
lib/kafka_ex/server_0_p_9_p_0.ex
Outdated
| end | ||
|
|
||
| def kafka_server_join_group(topics, session_timeout, state) do | ||
| def kafka_server_join_group(group_name, topics, member_id, session_timeout, network_timeout, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function takes too many parameters (arity is 6, max is 5).
lib/kafka_ex/server_0_p_8_p_0.ex
Outdated
| def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" | ||
| def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_join_group(_, _, _, _, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_sync_group(_, _, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function takes too many parameters (arity is 6, max is 5).
lib/kafka_ex/server_0_p_8_p_0.ex
Outdated
| def kafka_server_sync_group(_, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" | ||
| def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" | ||
| def kafka_server_join_group(_, _, _, _, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function takes too many parameters (arity is 6, max is 5).
3aa8ae3 to
868aeae
Compare
lib/kafka_ex/gen_consumer.ex
Outdated
| """ | ||
| @type partition :: {topic, partition_id} | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be no more than 1 consecutive blank lines.
f07452b to
07e9b4d
Compare
|
I've been testing this branch with some data (a few hundred records/sec) and it's been working pretty well. Let me know if there's anything I can do to help get this merged. |
|
@briankereszturi Thanks. I'm working through the issues reported by @ebert. I've written much of the documentation already. The README could use an update, and I think it could use some tests, too. I could use a hand with either of those. |
|
I'll take a crack at those within the next day or two. |
| @@ -1,12 +1,22 @@ | |||
| defmodule KafkaEx.Protocol.LeaveGroup do | |||
| defmodule Request do | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modules should have a @moduledoc tag.
| Implementation of the Kafka Hearbeat request and response APIs | ||
| """ | ||
|
|
||
| defmodule Request do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modules should have a @moduledoc tag.
| """ | ||
| @member_assignment_version 0 | ||
|
|
||
| defmodule Request do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modules should have a @moduledoc tag.
a4b9d2f to
3d4281a
Compare
|
Ebert has finished reviewing this Pull Request and has found:
You can see more details about this review at https://ebertapp.io/github/kafkaex/kafka_ex/pulls/195. |
3d4281a to
4af9ae2
Compare
4af9ae2 to
ba1c82e
Compare
|
Rebased to incorporate the changes from #196. |
|
@dcuddeback Thanks for your patience (or at least hiding your impatience ;) ). I'll start with responding to your questions before diving into the code.
I think the cleanest solution right now is to use one KafkaEx.Server per Connection pools may be something we need to do eventually, and I think
Your observation here is astute. KafkaEx is really generally a low-level When I go through the code I'll try to keep this in mind.
What is your motivation to kill the child workers in parallel? Depending on If they do need to handle cleanup in parallel, one solution might be to use a This actually sounds like a pretty good use case for
I'm a little torn on Is there a use case for wanting to have a consumer be able to decide, on a The only other return value I can think of would be some kind of
Check out #196 - I haven't gotten into the code yet but hopefully that's |
dantswain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've only really gotten through consumer_group.ex so far but it's a lot to digest so I wanted to do this in chunks (I think the only way I can realistically do it is in chunks).
What do you think about this as a possible alternate structure wrt the supervision tree?
ConsumerGroup.start_linkstarts a supervisor, the supervisor has these children:
a) theConsumerGroupworker you have here (some care will have to go into naming :/ )
b) aConsumerGroup.WorkerSupervisorthat has:simple_one_for_oneand child specs for the actual consumer workers
Then, on join/rebalance, we reap/restart the consumer workers. That way the supervision tree is all started at once and we don't have to have any processes dangling outside the supervision tree.
| ] | ||
| end | ||
|
|
||
| @heartbeat_interval 5_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call these @default_heartbeat_interval and @default_session_timeout just to be really clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
| def handle_info(:timeout, %State{generation_id: nil, member_id: ""} = state) do | ||
| new_state = join(state) | ||
|
|
||
| {:noreply, new_state, new_state.heartbeat_interval} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to do this with a timer. Timeouts get canceled if the process receives any messages (call, cast, info, etc). You can use :timer.send_after/2 or :timer.send_interval/2 and hold onto the timer reference in the state in case it needs to be canceled. send_interval is fine as long as the action being done is much shorter than the interval between calls, otherwise you might need to use send_after to make sure you don't end up with a process mailbox backlog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. That seems much cleaner than using GenServer timeouts.
| {:stop, reason, new_state} | ||
| end | ||
|
|
||
| def handle_info({:EXIT, _pid, _reason}, %State{} = state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will ever match unless you change the previous clause to, say, when is_pid(pid) so that it only matches when pid is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous clause uses pid twice so it only matches when pid matches consumer_pid (the GenConsumer.Supervisor). I think this clause was matching exit signals from the supervisor that was shutdown gracefully during a rebalance (because a new supervisor is assigned to consumer_pid).
| end | ||
| end | ||
|
|
||
| defp sync_leader(%State{worker_name: worker_name, topics: topics, partitions: nil} = state, members) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some comments to describe what each of these functions is doing? The logic is fairly involved and that would help a lot with understanding the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| %MetadataResponse{topic_metadatas: topic_metadatas} = KafkaEx.metadata(worker_name: worker_name) | ||
|
|
||
| partitions = Enum.flat_map(topics, fn (topic) -> | ||
| %TopicMetadata{error_code: :no_error, partition_metadatas: partition_metadatas} = Enum.find(topic_metadatas, &(&1.topic == topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be out of scope but it might clean this up a little if there was a function like MetadataResponse.for_topic(metadata_response, topic)
| } | ||
|
|
||
| sync_request | ||
| |> KafkaEx.sync_group(worker_name: worker_name, timeout: session_timeout + 5000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5000 should get a module attribute or var so that it isn't a magic number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
| Enum.map(partition_ids, &({topic, &1})) | ||
| end) | ||
|
|
||
| {:ok, pid} = KafkaEx.GenConsumer.Supervisor.start_link(consumer_module, group_name, assignments, consumer_opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to have this already started and awaiting dispatch. I'll address this in an overall comment.
| |> Enum.map(fn ({member, topic_partitions}) -> | ||
| assigns = | ||
| topic_partitions | ||
| |> Enum.group_by(&(elem(&1, 0)), &(elem(&1, 1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit dense - could it be split to a function that has a name indicating what it's doing? Maybe just |> Enum.map(fn({k, v}) -> {k, [v]} end) (if I'm reading correctly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see what I can do to make it easier to follow. This is transforming a list of partitions into the format expected by SyncGroupRequest. That is [{topic1, part1}, {topic1, part2}, {topic2, part1}] gets transformed into [{topic1, [part1, part2], {topic2, part1}].
|
@dantswain Thanks for the detailed feedback. Unfortunately, our availability isn't overlapping very well on this PR. As of yesterday, my focus is on moving my family to another state. I expect to be preoccupied with that through May. I'm still interested in finishing this PR. I just want you to be aware of my preoccupation as it will impact my responsiveness.
That makes sense to me. How do you see the server processes being managed in this case? The simplest way I can think of is to start and link a server process to a
Thanks for confirming this. My biggest worry was that my approach might be completely off the mark in this regard.
Just that the delay was noticeable when shutting down one-by-one. The consumer processes are shutdown before rejoining the group on a rebalance. The time to rejoin a group is potentially the heartbeat interval plus the time to shutdown consumers and commit their offsets. I wanted to reduce the chance that this could encroach on the session timeout.
This honestly felt like the least well-defined part of the feature to me. I don't feel strongly about keeping those names. I had also though of calling
I'm actually not sure if you're asking about the use case of choosing between
I was thinking along the lines of retry in X seconds with exponential back-off, but that could also be handled inside the consumer. Having discussed the above three points regarding the return value for
I saw that PR come in and already rebased to be consistent with it. I did add a parameter to allow overriding |
I didn't mean to have a process outside of the supervision tree. This is probably my misunderstanding of OTP. I thought that starting the supervisor with I like your suggested structure, too. I'm not partial to starting/stopping the supervisor on rebalance. I modeled it as starting a supervisor with a list of partition assignments, because I started with |
|
@dcuddeback Do you think you'll have time to revisit this? I'd hate for all the effort to go to waste, so if you don't think you'll have time I want to make sure we mark it as something someone else could pick up and make contributions to. |
|
@dantswain I'd love to. I was working on this for my former employer, but I'm on a break now for a couple of months while I move out of state. My move-in date is in about three weeks. I could use a break from packing boxes, so maybe I can set aside a day next week to work on this. |
|
Closing in favor of #207. |
I've taken a shot at implementing the consumer group API (#67). There's still some more to do for this PR, but I wanted to get some feedback on some specific questions before going further. First, I'll explain the implementation.
At a high level, this PR provides an API that allows one to write a consumer like this:
The API is modeled after
GenServer.init/2is expected to return{:ok, state}.handle_message/2is passed a message and state and is expected to return an instruction (:ack,:commit, etc) and a new state.The consumer can be launched as a consumer group like this:
It's also possible to launch an individual consumer manually, if not using the consumer group protocol, but this is not the intended use case:
Design
I think this is pretty close to what we brainstormed on Slack.
The implementation uses the
KafkaExmodule as a low-level API, which picks up four methods (join_group/3,sync_group/5,leave_group/3, andheartbeat/4), which implement the individual network calls. The rest of the high-level consumer API is implemented as a layer on top of the interface provided by theKafkaExmodule.The Kafka client-side assignment protocol is implemented by
KafkaEx.ConsumerGroup. This module manages an individual BEAM node's membership in a single consumer group. After receiving aSyncGroupResponsefrom the broker, it launches a supervisor to manage the consumption from its assigned partitions. When shutdown by its supervisor or notified by a Kafka broker that the consumer group needs to rebalance, it shuts down the process tree starting atKafakEx.GenConsumer.Supervisor, which triggers offset commits.KafkaEx.GenConsumer.Supervisoris given a list of assigned partitions when it's started, which it uses to manage a group ofKafkaEx.GenConsumerprocesses. It launches oneGenConsumerper assigned partition, as shown in the diagram above.A single instance of
KafkaEx.GenConsumerconsumes from a single partition. It requires an application to implementhandle_message/2, which is called for each message consumed from the partition. The implementation is required to return an atom to indicate how the message should be handled::ackmeans that the message was consumed successfully. The highest acked offset will be committed at the next auto-commit point. Auto-committing can be configured based on a number of acked messages or time since last commit.:commitmeans that the message was consumed successfuly and the offset should be committed synchronously, before the next message is handled. This would obviously hurt throughput, but could be used sparingly when a consumer makes non-idempotent changes, for example.When shutdown, a
GenConsumerwill commit the last acked offset.The
GenConsumerwill also be consulted byConsumerGroupwhen determining partition assignments.GenConsumer.assign_partitions/2implements round-robin assignment by default, but can be overridden by a client application to provide a different assignment strategy.Questions
While working on this, I ran into a few issues that I would like feedback on:
In the diagram above, there's a red box that says, "All requests serialized here." Each call to
GenServer.call()is synchronous and will block until previous calls finish. This is going to limit the available concurrency, because only one request can be in flight perKafkaEx.Serverat any time, even though eachKafkaEx.Serverholds a connection to each broker. All otherNetworkClientconnections owned by aKafkaEx.Serverwill be idle during a network request.This can be partially mitigated by having a pool of
KafkaEx.Serverworkers and check out a worker for each call, but there's still a lot of activeNetworkClientconnections that can't be used concurrently. It seems a more efficient option would be to poolNetworkClientconnections on a per-broker basis. Or maybe usingGenServer.reply()to reply asynchronously instead of returning{:reply, ...}would be enough to allow multiple simultaneous calls.I'm curious what your thoughts are on this issue. I don't feel like I know this library well enough yet to know the best way to approach this.
Ultimately, I'm wondering what would be considered good enough for this PR that would be compatible with tackling this issue later, if possible.
My biggest stumbling block to understanding the library well is that I'm not sure what level of abstraction is intended for
KafkaEx.Server. The servers implement a lot of calls that map one-to-one to requests in the Kafka network protocol. I added four more methods that follow that pattern and treated it as a low-level API interface for the rest of this PR. Is that the intended way to useKafkaEx.Server?I see that it also has state for consumer groups and metadata as well as functionality for streaming. Have you considered separating this functionality out into high-level modules and keeping
KafkaEx.Serveras a low-level interface for the network protocol? That approach is what felt most natural to me in working on this PR.ConsumerGroupis a separateGenServerwhich usesKafakEx.Serverfor its network protocol, but it feels like it's duplicating some of the state that's currently kept in theKafkaEx.Serverimplementations.When shutting down
KafkaEx.GenConsumer.Supervisor, I want to shutdown all of the supervised consumers in parallel, so that they can commit their offsets concurrently. The only way I could find to get a supervisor to shutdown its children in parallel was to use the:simple_one_for_onerestart strategy. From the Erlang docs::simple_one_for_oneseems like it's geared more towards dynamically managed children, so the resulting code is a little messy IMO. You can see the difference in commit 3aa8ae3. I'm still learning OTP, so I'm wondering if you know a cleaner or more idiomatic way to shutdown a supervisor's children in parallel.I haven't fleshed out all of the possible responses for
GenConsumer.handle_message/2. All the use cases I have are fine with just:ack, and:commitis a natural extension of:ack.I could use your help defining the initial return values. I don't know enough about other people's use cases to know what's useful here. It will be easier to add new return values than to change existing ones, so I'm erring on the side of fewer to start with. Do you have any use cases that aren't satisfied by
:ackand:commit?I could see adding
:noreplywithGenConsumer.ack/2andGenConsumer.commit/2methods (analogous toGenServer.reply/2) to allow offloading message handling to another process, such as aGenStage, but I don't have a use case to motivate it.I made the network and
GenServertimeout an option that can be passed toKafkaEx.join_group/3and friends. This was necessary forjoin_group, because the broker doesn't respond until all group members join or the session timeout expires. Since the default minimum for session timeouts (group.min.session.timeout.ms) is 6 seconds andJoinGroupRequestcan potentially block for as long as the session timeout, this can easily exceed the defaultGenServertimeout of 5 seconds.There's a ticket related to this (Sync timeouts greater than 5 seconds are not useful #94). If you like the way I handled this for
join_group/3,sync_group/5,leave_group/3, andheartbeat/4, I could propogate the change to the rest of theKafkaExmethods that useGenServer.call. Please let me know if you think that's the right direction.