Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a88ea57
implements consumer groups
dcuddeback Feb 3, 2017
3cb335e
improves error handling
dcuddeback May 10, 2017
da7ce51
makes session timeout padding a module attribute
dcuddeback May 10, 2017
6c279d2
refactor to combine common code in sync group functions
dcuddeback May 10, 2017
f77c679
attempt to make ConsumerGroup logic easier to follow
dcuddeback May 11, 2017
32fd4e5
replace GenServer timeout with timer
dcuddeback May 11, 2017
6f40c7d
adds Metadata.Response.partitions_for_topic/2
dcuddeback May 11, 2017
7609388
manage dynamic assignments with ConsumerGroup.Supervisor
dcuddeback May 23, 2017
14793e3
improve readability of deeply-nested GenConsumer.Supervisor.start_link/4
dcuddeback May 23, 2017
9d0b4cc
starts KafkaEx.Server per consumer
dcuddeback May 23, 2017
799dcc3
Merge branch 'master' into consumer-group
dantswain Jul 25, 2017
46a8054
Unbreak merge issues
dantswain Jul 25, 2017
407dffb
Merge branch 'master' into consumer-group
dantswain Jul 26, 2017
d16c15c
Thunk logger calls
dantswain Jul 26, 2017
eb3e1e3
Move supervisors to subdirectories
dantswain Jul 26, 2017
bbf2695
Rename :ack / :commit to :async_commit / :sync_commit
dantswain Jul 27, 2017
013774f
Merge branch 'master' into consumer-group
dantswain Jul 28, 2017
5b85bb4
Have GenConsumer handle message sets
dantswain Jul 28, 2017
e3926aa
Merge branch 'master' into consumer-group
dantswain Jul 28, 2017
69eba79
Strict credo cleanup
dantswain Jul 28, 2017
3f97193
Edit documentation
dantswain Jul 31, 2017
90d93b8
Refactor st ConsumerGroup is the supervisor
dantswain Jul 31, 2017
eff912f
Fix documentation oversight
dantswain Jul 31, 2017
9c32696
Fix default worker name for test helper
dantswain Jul 31, 2017
3a7719c
Create topic on docker up
dantswain Jul 31, 2017
6187e9f
Add a basic integration test for the consumer group
dantswain Jul 31, 2017
574ab4f
Fix topic creation
dantswain Jul 31, 2017
ee89433
Try to fix test on older elixir
dantswain Jul 31, 2017
874c14c
Add PartitionAssignment, fix group_by compat issue
dantswain Jul 31, 2017
3e68eeb
Fix Map.new compat
dantswain Jul 31, 2017
af02302
Remove R17 from build matrix
dantswain Jul 31, 2017
6e984b6
Don't re-run tests if they pass the first time
dantswain Jul 31, 2017
f416a31
Merge branch 'master' into consumer-group
dantswain Aug 1, 2017
fa0232e
Forgot to save the merged file :(
dantswain Aug 1, 2017
f46f6be
Fix default worker name in test helper
dantswain Aug 1, 2017
2dafc01
Test that exiting consumer also commits offsets
dantswain Aug 1, 2017
52dcba9
Test across all partitions on the topic
dantswain Aug 1, 2017
74036e2
Add a second consumer to the test group
dantswain Aug 2, 2017
15bf18f
Refactor TestObserver
dantswain Aug 2, 2017
8a0cd14
Better assignment tests
dantswain Aug 2, 2017
8818e6a
Fix test command for travis
dantswain Aug 2, 2017
1f29f41
Assert all 4 handlers
dantswain Aug 2, 2017
b3e73c0
Test for rebalancing
dantswain Aug 2, 2017
86bdc0b
Move partition assignment out of GenConsumer
dantswain Aug 3, 2017
d15e82b
Factoring/documentation for manager opts
dantswain Aug 3, 2017
dde8f7d
Clean up test, Supervisor usage
dantswain Aug 3, 2017
48cf82f
Add note about multiple topics
dantswain Aug 4, 2017
e122f4c
Add several ops helper functions
dantswain Aug 7, 2017
ae4b964
Test assignments directly
dantswain Aug 7, 2017
d25562b
Add ability for consumer to handle messages
dantswain Aug 7, 2017
ede2b09
More test cleanup
dantswain Aug 8, 2017
ca7ff34
Review feedback
dantswain Aug 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,12 @@ KakfaEx supports the following Kafka features:
* Fetch Messages
* Message Compression with Snappy and gzip
* Offset Management (fetch / commit / autocommit)
* Consumer Groups

See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and
[A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)
for details of these features.

KafkaEx **does support** consumer groups for message consumption. This feature
was added in Kafka 0.8.2. This translates to providing a consumer group
name when committing offsets. It is up to the client to assign partitions to
workers in this mode of operation.

KafkaEx currently provides **limited support** for the [Kafka ConsumerGroup
API](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI)
that was added in Kafka 0.9.0. Most of the protocol requests are implemented
in KafkaEx, but we do not yet support automatic joining and management of
consumer group memebership (e.g., automatically assigning partitions to
clients). We are actively working on an implementation for automatic consumer
group management.

## Using KafkaEx in an Elixir project

The standard approach for adding dependencies to an Elixir application applies:
Expand Down Expand Up @@ -90,6 +78,35 @@ You can also override options when creating a worker, see below.

## Usage Examples

### Consumer Groups

To use a consumer group, first implement a handler module using
`KafkaEx.GenConsumer`.

```
defmodule ExampleGenConsumer do
use KafkaEx.GenConsumer

alias KafkaEx.Protocol.Fetch.Message

require Logger

# note - messages are delivered in batches
def handle_message_set(message_set, state) do
for %Message{value: message} <- message_set do
Logger.debug(fn -> "message: " <> inspect(message) end)
end
{:async_commit, state}
end
end
```

Then add a `KafkaEx.ConsumerGroup` to your application's supervision
tree and configure it to use the implementation module.

See the `KafkaEx.GenConsumer` and `KafkaEx.ConsumerGroup` documentation for
details.

### Create a KafkaEx Worker

KafkaEx worker processes manage the state of the connection to the Kafka broker.
Expand Down
5 changes: 5 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ config :kafka_ex,
max_restarts: 10,
# Supervision max_seconds - the time frame in which :max_restarts applies
max_seconds: 60,
# Interval in milliseconds that GenConsumer waits to commit offsets.
commit_interval: 5_000,
# Threshold number of messages consumed for GenConsumer to commit offsets
# to the broker.
commit_threshold: 100,
# This is the flag that enables use of ssl
use_ssl: true,
# see SSL OPTION DESCRIPTIONS - CLIENT SIDE at http://erlang.org/doc/man/ssl.html
Expand Down
50 changes: 49 additions & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse
alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse
alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse
alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest
alias KafkaEx.Protocol.JoinGroup.Response, as: JoinGroupResponse
alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest
alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.Offset.Response, as: OffsetResponse
alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest
Expand All @@ -14,6 +20,8 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Produce.Message
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse
alias KafkaEx.Server
alias KafkaEx.Stream

Expand Down Expand Up @@ -75,6 +83,46 @@ defmodule KafkaEx do
Server.call(worker, :consumer_group)
end

@doc """
Sends a request to join a consumer group.
"""
@spec join_group(JoinGroupRequest.t, Keyword.t) :: JoinGroupResponse.t
def join_group(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:join_group, request, timeout}, opts)
end

@doc """
Sends a request to synchronize with a consumer group.
"""
@spec sync_group(SyncGroupRequest.t, Keyword.t) :: SyncGroupResponse.t
def sync_group(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:sync_group, request, timeout}, opts)
end

@doc """
Sends a request to leave a consumer group.
"""
@spec leave_group(LeaveGroupRequest.t, Keyword.t) :: LeaveGroupResponse.t
def leave_group(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:leave_group, request, timeout}, opts)
end

@doc """
Sends a heartbeat to maintain membership in a consumer group.
"""
@spec heartbeat(HeartbeatRequest.t, Keyword.t) :: HeartbeatResponse.t
def heartbeat(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:heartbeat, request, timeout}, opts)
end

@doc """
Return metadata for the given topic; returns for all topics if topic is empty string

Expand Down Expand Up @@ -197,7 +245,7 @@ defmodule KafkaEx do
}, opts)
end

@spec offset_commit(atom, OffsetCommitRequest.t) :: OffsetCommitResponse.t
@spec offset_commit(atom, OffsetCommitRequest.t) :: [OffsetCommitResponse.t]
def offset_commit(worker_name, offset_commit_request) do
Server.call(worker_name, {:offset_commit, offset_commit_request})
end
Expand Down
Loading