Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,76 @@ documentation,

KakfaEx supports the following Kafka features:

* Broker and Topic Metadata
* Produce Messages
* Fetch Messages
* Message Compression with Snappy and gzip
* Offset Management (fetch / commit / autocommit)
* Consumer Groups
* Topics Management (create / delete)
* Broker and Topic Metadata
* Produce Messages
* Fetch Messages
* Message Compression with Snappy and gzip
* Offset Management (fetch / commit / autocommit)
* Consumer Groups
* Topics Management (create / delete)

See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and
[A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)
for details of these features.

## IMPORTANT - Kayrock and The Future of KafkaEx

TL;DR:

* This is new implementation and we need people to test it!
* Set `kafka_version: "kayrock"` to use the new client implementation.
* The new client should be compatible with existing code when used this way.
* Many functions now suppoert an `api_version` parameter, see below for details,
e.g., how to store offsets in Kafka instead of Zookeeper.
* Version 1.0 of KafkaEx will be based on Kayrock and have a cleaner API - you
can start testing this API by using modules from the `KafkaEx.New` namespace.
See below for details.

To support some oft-requested features (offset storage in Kafka, message
timestamps), we have integrated KafkaEx with
[Kayrock](https://github.com/dantswain/kayrock) which is a library that handles
serialization and deserialization of the Kafka message protocol in a way that
can grow as Kafka does.

Unfortunately, the existing KafkaEx API is built in such a way that it doesn't
easily support this growth. This, combined with a number of other existing
warts in the current API, has led us to the conclusion that v1.0 of KafkaEx
should have a new and cleaner API.

The path we have planned to get to v1.0 is:

1. Add a Kayrock compatibility layer for the existing KafkaEx API (DONE, not released).
2. Expose Kayrock's API versioning through a select handful of KafkaEx API
functions so that users can get access to the most-requested features (e.g.,
offset storage in Kafka and message timestamps) (DONE, not released).
3. Begin designing and implementing the new API in parallel in the `KafkaEx.New`
namespace (EARLY PROGRESS).
4. Incrementally release the new API alongside the legacy API so that early
adopters can test it.
5. Once the new API is complete and stable, move it to the `KafkaEx` namespace
(i.e., drop the `New` part) and it will replace the legacy API. This will be
released as v1.0.

Users of KafkaEx can help a lot by testing the new code. At first, we need
people to test the Kayrock-based client using compatibility mode. You can do
this by simply setting `kafka_version: "kayrock"` in your configuration. That
should be all you need to change. If you want to test new features enabled by
`api_versions` options then that is also very valuable to us (see below for
links to details). Then, as work on the new API ramps up, users can
contribute feedback to pull requests (or even contribute pull requests!) and
test out the new API as it becomes available.

For more information on using the Kayrock-based client, see

* Github: [kayrock.md](https://github.com/kafka_ex/kafkaex/blob/master/kayrock.md)
* HexDocs: [kayrock-based client](kayrock.html)

For more information on the v1.0 API, see

* Github:
[new_api.md](https://github.com/kafka_ex/kafkaex/blob/master/new_api.md)
* HexDocs: [New API](new_api.html)

## Using KafkaEx in an Elixir project

The standard approach for adding dependencies to an Elixir application applies:
Expand All @@ -57,7 +115,7 @@ end

Then run `mix deps.get` to fetch dependencies.

### Adding kafka_ex application
### Adding the kafka_ex application

When using elixir < 1.4, you will need to add kafka_ex to the applications list of your mix.exs file.

Expand Down
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ config :kafka_ex,
],
# set this to the version of the kafka broker that you are using
# include only major.minor.patch versions. must be at least 0.8.0
# use "kayrock" for the new client
kafka_version: "0.10.1"

env_config = Path.expand("#{Mix.env()}.exs", __DIR__)
Expand Down
205 changes: 205 additions & 0 deletions kayrock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Kayrock-Based KafkaEx Client

This document contains information on using the new
[Kayrock](https://github.com/dantswain/kayrock)-based KafkaEx client.

The client itself is defined in the module `KafkaEx.New.Client`. This will be
the primary KafkaEx client module going forward.

**NOTE** In many places below we recommend using "version N and up". The
reality is that in many cases, the KafkaEx legacy API compatibility only
supports versions 0 to 3. Using version 3 should generally be safe and achieve
the desired outcomes. The new API will be designed to handle newer versions.

Contents:

* [Using the New Client](#using-the-new-client)
* [Common Use Case - Store Offsets In
Kafka](#common-use-case-store-offsets-in-kafka)
* [Common Use Case - Message Timestamps / New Storage Format](#common-use-case-message-timestamps-new-storage-format)

## Using the New Client

To use the new client in your project, set `kafka_version: "kayrock"` in your
config file:

```
config :kafka_ex,
kafka_version: "kayrock"
```

If want to start a single client instance, supply the new client's module
name to the `:server_impl` option of `KafkaEx.start_link_worker/2`:

```
{:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)
```

The new client should be totally backwards compatible with the legacy API;
simply use KafkaEx like you normally would. It is also compatible with the new
API defined in `KafkaEx.New.KafkaExAPI`.

## Common Use Case - Store Offsets In Kafka

Offsets are stored in Kafka (instead of Zookeeper) with offset commit message
version 1 and up (Kafka v0.10 and up). Message version 1 also includes a
timestamp parameter that was dropped in version 2, so we recommend using at
least version 2. We often use version 3 out of convenience because that
version number achieves desired results with this and other Kafka API
messages.

To retrieve offsets committed with version 1 and up, you must also use version 1
and up of the offset fetch request. A safe move here is to always use the same
version for offset commit and offset fetch.

### WARNING - OFFSET LOSS

If offsets for a consumer group are stored in Zookeeper (v0 KafkaEx legacy),
they are unavailable using v1 and up of the offset fetch message. This means
that if you have an existing KafkaEx consumer group and "upgrade" to the new
version, **you will lose offsets**. To avoid losing offsets, you should first
convert the Zookeeper offsets to Kafka storage. This can be achieved using
command line tools (documentation of which is beyond the scope of this document)
or using KafkaEx by fetching the offsets using v0 and then committing them using
v1. We may provide a tool for this in the future.

Likewise, once you store offsets in Kafka, they cannot be fetched using v0 of
the offet fetch message. If you need to "roll back" storage from Kafka to
Zookeeper, you will need to convert the offsets first.

### Examples

`KafkaEx.offset_commit/2` and `KafkaEx.offset_fetch/2` support setting the api
version via the `api_version` field of their corresponding requests structs:

```
alias KafkaEx.Protocol.OffsetCommit

# commit offsets using kafka storage
KafkaEx.offset_commit(client, %OffsetCommit.Request{
consumer_group: consumer_group,
topic: topic,
partition: 0,
offset: offset,
api_version: 3
})

# fetch an offset stored in kafka
[resp] = KafkaEx.offset_fetch(client, %OffsetFetch.Request{
topic: topic,
consumer_group: consumer_group,
partition: 0,
api_version: 3
})
%KafkaEx.Protocol.OffsetFetch.Response{partitions: [%{offset: offset}]} = resp
```

When using `KafkaEx.fetch/3` with `auto_commit: true`, you can specify the
`offset_commit_api_version` option to control how offsets are stored:

```
# store auto-committed offsets in kafka
KafkaEx.fetch(topic, partition, auto_commit: true, offset_commit_api_version: 3)
```

When using `KafkaEx.ConsumerGroup`, you can control offset storage using the
`api_versions` option:

```
# use kafka offset storage with consumer groups
# NOTE you must use compatible version for offset_fetch and offset_commit
# using the same value for both should be safe
KafkaEx.ConsumerGroup.start_link(
MyConsumer,
consumer_group_name,
[topic],
api_versions: %{offset_fetch: 3, offset_commit: 3}
)
```

## Common Use Case - Message Timestamps / New Storage Format

Message timestamps and the [new message storage
format](https://kafka.apache.org/documentation/#recordbatch) go hand-in-hand
because they both require setting the message versions for produce and fetch.

Timestamps were added in v1 of the produce/fetch messages, but the storage
format was replaced in v2 (around Kafka v0.10), so we recommend using 2 and up
(3 is safe).

### WARNING - Broker Performance

Check with your system administrator. If the broker is configured to use the
new message format (v2 and up), producing or requesting messages with old
formats (v0 and v1) can lead to significant load on the brokers because they
will need to convert messages between versions for each request. If you have a
relatively modern version of Kafka, we recommend using version 3 for both
messages.

### Examples

Whenever we use produce API version `>= 2`, the new message format is used automatically.

To publish a message with a timestamp:

```
{:ok, offset} =
KafkaEx.produce(
topic,
0,
msg,
worker_name: client,
required_acks: 1,
timestamp: 12345,
api_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 3
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

message.value # => msg
message.offset # => offste
message.timestamp # => 12345
```

If a topic has the `message.timestamp.type` setting set to `LogAppendTime`, then
timestamps will be populated automatically when a produced message is received
by the broker and appended to the log.

```
fetch_responses =
KafkaEx.fetch(topic_with_log_append_timestamps, 0,
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 3
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

message.timestamp # => log append timestamp in milliseconds
```

Note that the `KafkaEx.ConsumerGroup` `api_versions` option also supports
setting a versino for `fetch`:

```
# use new record batch format AND kafka offset storage with consumer groups
KafkaEx.ConsumerGroup.start_link(
MyConsumer,
consumer_group_name,
[topic],
api_versions: %{offset_fetch: 3, offset_commit: 3, fetch: 3}
)
```
20 changes: 17 additions & 3 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
defmodule KafkaEx do
@moduledoc File.read!(Path.expand("../README.md", __DIR__))
@moduledoc """
Kafka API

This module is the main API for users of the KafkaEx library.

Most of these functions either use the default worker (registered as
`:kafka_ex`) by default or can take a registered name or pid via a
`worker_name` option.

```
# create an unnamed worker
{:ok, pid} = KafkaEx.create_worker(:no_name)

KafkaEx.fetch("some_topic", 0, worker_name: pid)
```
"""

use Application
alias KafkaEx.Config
Expand Down Expand Up @@ -579,8 +594,7 @@ defmodule KafkaEx do
- `server_impl` - This is the GenServer that will be used for the
client genserver implementation - e.g., `KafkaEx.Server0P8P0`,
`KafkaEx.Server0P10AndLater`, `KafkaEx.New.Client`. Defaults to the value
of `KafkaEx.Config.server_impl/0` which is determined by the `kafka_version`
setting.
determined by the `kafka_version` setting.
"""
@spec start_link_worker(atom, [
KafkaEx.worker_setting() | {:server_impl, module}
Expand Down
13 changes: 12 additions & 1 deletion lib/kafka_ex/new/kafka_ex_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ defmodule KafkaEx.New.KafkaExAPI do
This module interfaces Kafka through the New.Client implementation

This is intended to become the future KafkaEx API

Most functions here take a client pid as the first argument.

```
{:ok, client} = KafkaEx.New.Client.start_link()

KafkaEx.New.KafkaExAPI.latest_offset(client, "some_topic", 0)
```
"""

alias KafkaEx.New.Client
Expand Down Expand Up @@ -78,7 +86,10 @@ defmodule KafkaEx.New.KafkaExAPI do
end

@doc """
Set the consumer group name that will be used by the given client for autocommit
Set the consumer group name that will be used by the given client for
autocommit

NOTE this function will not be supported after the legacy API is removed
"""
@spec set_consumer_group_for_auto_commit(client, consumer_group_name) ::
:ok | {:error, :invalid_consumer_group}
Expand Down
Loading