diff --git a/README.md b/README.md index a1885ebf..66ad7e1b 100644 --- a/README.md +++ b/README.md @@ -19,18 +19,76 @@ documentation, KakfaEx supports the following Kafka features: -* Broker and Topic Metadata -* Produce Messages -* Fetch Messages -* Message Compression with Snappy and gzip -* Offset Management (fetch / commit / autocommit) -* Consumer Groups -* Topics Management (create / delete) +* Broker and Topic Metadata +* Produce Messages +* Fetch Messages +* Message Compression with Snappy and gzip +* Offset Management (fetch / commit / autocommit) +* Consumer Groups +* Topics Management (create / delete) See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and [A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) for details of these features. +## IMPORTANT - Kayrock and The Future of KafkaEx + +TL;DR: + +* This is new implementation and we need people to test it! +* Set `kafka_version: "kayrock"` to use the new client implementation. +* The new client should be compatible with existing code when used this way. +* Many functions now suppoert an `api_version` parameter, see below for details, + e.g., how to store offsets in Kafka instead of Zookeeper. +* Version 1.0 of KafkaEx will be based on Kayrock and have a cleaner API - you + can start testing this API by using modules from the `KafkaEx.New` namespace. + See below for details. + +To support some oft-requested features (offset storage in Kafka, message +timestamps), we have integrated KafkaEx with +[Kayrock](https://github.com/dantswain/kayrock) which is a library that handles +serialization and deserialization of the Kafka message protocol in a way that +can grow as Kafka does. + +Unfortunately, the existing KafkaEx API is built in such a way that it doesn't +easily support this growth. This, combined with a number of other existing +warts in the current API, has led us to the conclusion that v1.0 of KafkaEx +should have a new and cleaner API. + +The path we have planned to get to v1.0 is: + +1. Add a Kayrock compatibility layer for the existing KafkaEx API (DONE, not released). +2. Expose Kayrock's API versioning through a select handful of KafkaEx API + functions so that users can get access to the most-requested features (e.g., + offset storage in Kafka and message timestamps) (DONE, not released). +3. Begin designing and implementing the new API in parallel in the `KafkaEx.New` + namespace (EARLY PROGRESS). +4. Incrementally release the new API alongside the legacy API so that early + adopters can test it. +5. Once the new API is complete and stable, move it to the `KafkaEx` namespace + (i.e., drop the `New` part) and it will replace the legacy API. This will be + released as v1.0. + +Users of KafkaEx can help a lot by testing the new code. At first, we need +people to test the Kayrock-based client using compatibility mode. You can do +this by simply setting `kafka_version: "kayrock"` in your configuration. That +should be all you need to change. If you want to test new features enabled by +`api_versions` options then that is also very valuable to us (see below for +links to details). Then, as work on the new API ramps up, users can +contribute feedback to pull requests (or even contribute pull requests!) and +test out the new API as it becomes available. + +For more information on using the Kayrock-based client, see + +* Github: [kayrock.md](https://github.com/kafka_ex/kafkaex/blob/master/kayrock.md) +* HexDocs: [kayrock-based client](kayrock.html) + +For more information on the v1.0 API, see + +* Github: + [new_api.md](https://github.com/kafka_ex/kafkaex/blob/master/new_api.md) +* HexDocs: [New API](new_api.html) + ## Using KafkaEx in an Elixir project The standard approach for adding dependencies to an Elixir application applies: @@ -57,7 +115,7 @@ end Then run `mix deps.get` to fetch dependencies. -### Adding kafka_ex application +### Adding the kafka_ex application When using elixir < 1.4, you will need to add kafka_ex to the applications list of your mix.exs file. diff --git a/config/config.exs b/config/config.exs index 2ca9f28c..a1678e23 100644 --- a/config/config.exs +++ b/config/config.exs @@ -65,6 +65,7 @@ config :kafka_ex, ], # set this to the version of the kafka broker that you are using # include only major.minor.patch versions. must be at least 0.8.0 + # use "kayrock" for the new client kafka_version: "0.10.1" env_config = Path.expand("#{Mix.env()}.exs", __DIR__) diff --git a/kayrock.md b/kayrock.md new file mode 100644 index 00000000..d7e9e52e --- /dev/null +++ b/kayrock.md @@ -0,0 +1,205 @@ +# Kayrock-Based KafkaEx Client + +This document contains information on using the new +[Kayrock](https://github.com/dantswain/kayrock)-based KafkaEx client. + +The client itself is defined in the module `KafkaEx.New.Client`. This will be +the primary KafkaEx client module going forward. + +**NOTE** In many places below we recommend using "version N and up". The +reality is that in many cases, the KafkaEx legacy API compatibility only +supports versions 0 to 3. Using version 3 should generally be safe and achieve +the desired outcomes. The new API will be designed to handle newer versions. + +Contents: + +* [Using the New Client](#using-the-new-client) +* [Common Use Case - Store Offsets In + Kafka](#common-use-case-store-offsets-in-kafka) +* [Common Use Case - Message Timestamps / New Storage Format](#common-use-case-message-timestamps-new-storage-format) + +## Using the New Client + +To use the new client in your project, set `kafka_version: "kayrock"` in your +config file: + +``` +config :kafka_ex, + kafka_version: "kayrock" +``` + +If want to start a single client instance, supply the new client's module +name to the `:server_impl` option of `KafkaEx.start_link_worker/2`: + +``` +{:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) +``` + +The new client should be totally backwards compatible with the legacy API; +simply use KafkaEx like you normally would. It is also compatible with the new +API defined in `KafkaEx.New.KafkaExAPI`. + +## Common Use Case - Store Offsets In Kafka + +Offsets are stored in Kafka (instead of Zookeeper) with offset commit message +version 1 and up (Kafka v0.10 and up). Message version 1 also includes a +timestamp parameter that was dropped in version 2, so we recommend using at +least version 2. We often use version 3 out of convenience because that +version number achieves desired results with this and other Kafka API +messages. + +To retrieve offsets committed with version 1 and up, you must also use version 1 +and up of the offset fetch request. A safe move here is to always use the same +version for offset commit and offset fetch. + +### WARNING - OFFSET LOSS + +If offsets for a consumer group are stored in Zookeeper (v0 KafkaEx legacy), +they are unavailable using v1 and up of the offset fetch message. This means +that if you have an existing KafkaEx consumer group and "upgrade" to the new +version, **you will lose offsets**. To avoid losing offsets, you should first +convert the Zookeeper offsets to Kafka storage. This can be achieved using +command line tools (documentation of which is beyond the scope of this document) +or using KafkaEx by fetching the offsets using v0 and then committing them using +v1. We may provide a tool for this in the future. + +Likewise, once you store offsets in Kafka, they cannot be fetched using v0 of +the offet fetch message. If you need to "roll back" storage from Kafka to +Zookeeper, you will need to convert the offsets first. + +### Examples + +`KafkaEx.offset_commit/2` and `KafkaEx.offset_fetch/2` support setting the api +version via the `api_version` field of their corresponding requests structs: + +``` +alias KafkaEx.Protocol.OffsetCommit + +# commit offsets using kafka storage +KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 3 + }) + +# fetch an offset stored in kafka +[resp] = KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 3 + }) +%KafkaEx.Protocol.OffsetFetch.Response{partitions: [%{offset: offset}]} = resp +``` + +When using `KafkaEx.fetch/3` with `auto_commit: true`, you can specify the +`offset_commit_api_version` option to control how offsets are stored: + +``` +# store auto-committed offsets in kafka +KafkaEx.fetch(topic, partition, auto_commit: true, offset_commit_api_version: 3) +``` + +When using `KafkaEx.ConsumerGroup`, you can control offset storage using the +`api_versions` option: + +``` +# use kafka offset storage with consumer groups +# NOTE you must use compatible version for offset_fetch and offset_commit +# using the same value for both should be safe +KafkaEx.ConsumerGroup.start_link( + MyConsumer, + consumer_group_name, + [topic], + api_versions: %{offset_fetch: 3, offset_commit: 3} +) +``` + +## Common Use Case - Message Timestamps / New Storage Format + +Message timestamps and the [new message storage +format](https://kafka.apache.org/documentation/#recordbatch) go hand-in-hand +because they both require setting the message versions for produce and fetch. + +Timestamps were added in v1 of the produce/fetch messages, but the storage +format was replaced in v2 (around Kafka v0.10), so we recommend using 2 and up +(3 is safe). + +### WARNING - Broker Performance + +Check with your system administrator. If the broker is configured to use the +new message format (v2 and up), producing or requesting messages with old +formats (v0 and v1) can lead to significant load on the brokers because they +will need to convert messages between versions for each request. If you have a +relatively modern version of Kafka, we recommend using version 3 for both +messages. + +### Examples + +Whenever we use produce API version `>= 2`, the new message format is used automatically. + +To publish a message with a timestamp: + +``` +{:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 3 + ) + +fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + +[fetch_response | _] = fetch_responses +[partition_response | _] = fetch_response.partitions +message = List.last(partition_response.message_set) + +message.value # => msg +message.offset # => offste +message.timestamp # => 12345 +``` + +If a topic has the `message.timestamp.type` setting set to `LogAppendTime`, then +timestamps will be populated automatically when a produced message is received +by the broker and appended to the log. + +``` +fetch_responses = + KafkaEx.fetch(topic_with_log_append_timestamps, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + +[fetch_response | _] = fetch_responses +[partition_response | _] = fetch_response.partitions +message = List.last(partition_response.message_set) + +message.timestamp # => log append timestamp in milliseconds +``` + +Note that the `KafkaEx.ConsumerGroup` `api_versions` option also supports +setting a versino for `fetch`: + +``` +# use new record batch format AND kafka offset storage with consumer groups +KafkaEx.ConsumerGroup.start_link( + MyConsumer, + consumer_group_name, + [topic], + api_versions: %{offset_fetch: 3, offset_commit: 3, fetch: 3} +) +``` diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 7bb202c5..2c7629f4 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -1,5 +1,20 @@ defmodule KafkaEx do - @moduledoc File.read!(Path.expand("../README.md", __DIR__)) + @moduledoc """ + Kafka API + + This module is the main API for users of the KafkaEx library. + + Most of these functions either use the default worker (registered as + `:kafka_ex`) by default or can take a registered name or pid via a + `worker_name` option. + + ``` + # create an unnamed worker + {:ok, pid} = KafkaEx.create_worker(:no_name) + + KafkaEx.fetch("some_topic", 0, worker_name: pid) + ``` + """ use Application alias KafkaEx.Config @@ -579,8 +594,7 @@ defmodule KafkaEx do - `server_impl` - This is the GenServer that will be used for the client genserver implementation - e.g., `KafkaEx.Server0P8P0`, `KafkaEx.Server0P10AndLater`, `KafkaEx.New.Client`. Defaults to the value - of `KafkaEx.Config.server_impl/0` which is determined by the `kafka_version` - setting. + determined by the `kafka_version` setting. """ @spec start_link_worker(atom, [ KafkaEx.worker_setting() | {:server_impl, module} diff --git a/lib/kafka_ex/new/kafka_ex_api.ex b/lib/kafka_ex/new/kafka_ex_api.ex index 27457d6f..50076f46 100644 --- a/lib/kafka_ex/new/kafka_ex_api.ex +++ b/lib/kafka_ex/new/kafka_ex_api.ex @@ -3,6 +3,14 @@ defmodule KafkaEx.New.KafkaExAPI do This module interfaces Kafka through the New.Client implementation This is intended to become the future KafkaEx API + + Most functions here take a client pid as the first argument. + + ``` + {:ok, client} = KafkaEx.New.Client.start_link() + + KafkaEx.New.KafkaExAPI.latest_offset(client, "some_topic", 0) + ``` """ alias KafkaEx.New.Client @@ -78,7 +86,10 @@ defmodule KafkaEx.New.KafkaExAPI do end @doc """ - Set the consumer group name that will be used by the given client for autocommit + Set the consumer group name that will be used by the given client for + autocommit + + NOTE this function will not be supported after the legacy API is removed """ @spec set_consumer_group_for_auto_commit(client, consumer_group_name) :: :ok | {:error, :invalid_consumer_group} diff --git a/mix.exs b/mix.exs index 430f1bed..a9606135 100644 --- a/mix.exs +++ b/mix.exs @@ -20,7 +20,11 @@ defmodule KafkaEx.Mixfile do deps: deps(), docs: [ main: "readme", - extras: ["README.md"], + extras: [ + "README.md", + "kayrock.md", + "new_api.md" + ], source_url: "https://github.com/kafkaex/kafka_ex" ] ] @@ -34,15 +38,22 @@ defmodule KafkaEx.Mixfile do end defp deps do - [ + main_deps = [ {:kayrock, "~> 0.1.8"}, {:credo, "~> 0.8.10", only: :dev}, {:dialyxir, "~> 1.0.0-rc.3", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, - {:ex_doc, "0.18.3", only: :dev}, {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]} ] + + # we need a newer version of ex_doc, but it will cause problems on older + # versions of elixir + if Version.match?(System.version(), ">= 1.7.0") do + main_deps ++ [{:ex_doc, "~> 0.19", only: :dev}] + else + main_deps + end end defp description do diff --git a/mix.lock b/mix.lock index 8839f6a6..73170a2a 100644 --- a/mix.lock +++ b/mix.lock @@ -5,8 +5,8 @@ "crc32cer": {:hex, :crc32cer, "0.1.3", "8984906c4b4fae6aa292c48f286a1c83b19ad44bd102287acb94d696015967ce", [:make, :rebar, :rebar3], [], "hexpm"}, "credo": {:hex, :credo, "0.8.10", "261862bb7363247762e1063713bb85df2bbd84af8d8610d1272cd9c1943bba63", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"}, "dialyxir": {:hex, :dialyxir, "1.0.0-rc.3", "774306f84973fc3f1e2e8743eeaa5f5d29b117f3916e5de74c075c02f1b8ef55", [:mix], [], "hexpm"}, - "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, + "earmark": {:hex, :earmark, "1.4.1", "07bb382826ee8d08d575a1981f971ed41bd5d7e86b917fd012a93c51b5d28727", [:mix], [], "hexpm"}, + "ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, "excoveralls": {:hex, :excoveralls, "0.7.1", "3dd659db19c290692b5e2c4a2365ae6d4488091a1ba58f62dcbdaa0c03da5491", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"}, "hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/new_api.md b/new_api.md new file mode 100644 index 00000000..ff7e5a02 --- /dev/null +++ b/new_api.md @@ -0,0 +1,163 @@ +# KafkaEx 1.0 API + +This document describes the design approach and gives an overview of thew new +client API. The API itself is documented in `KafkaEx.New.KafkaExAPI`. The +current plan is for `KafkaEx.New.KafkaExAPI` to replace `KafkaEx` in the v1.0 +release. + +The new API is designed to continue to provide a useful Kafka client API +foremost, to address some of the limitations and inconveniences of the +existing API (both in terms of usage and maintenance). A central goal of the +new API is to allow us to support new Kafka features more rapidly than in the +past. + +## Status + +The new API is still in very early stages of development. We will try to keep +this section up-to-date with respect to what features have been implemented. +`KafkaEx.New.KafkaExAPI` is the source of truth for this summary. + +Features implemented: + +* Get latest offset for a partition as `{:ok, offset}` or `{:error, error_code}` + (no more fishing through the response structs). +* Get metadata for an arbitrary list of topics + +## Major Differences from the Legacy API + +* There is currently no supervisor for clients. It is assumed that the user + will manage these when not used in a consumer group. (This does not apply to + clients started via the legacy `create_worker` API, which are started under the standard + supervision tree.) +* The client does not automatically fetch metadata for all topics as this can + lead to timeouts on large clusters. There should be no observable impact here + because the client fetches metadata for specific topics on-demand. +* A client is no longer "attached" to a specific consumer group. In the legacy + implementation this was a consequence of the way autocommit was handled. + +## Design Philosophy + +Two main design principles in the new client are driven by factors that made +maintenance of the legacy API difficult: + +1. Delegate and genericize API message version handling + + Kafka API message serialization and deserialization has been externalized to + a library ([Kayrock](https://github.com/dantswain/kayrock)) that can easily + support new message formats as they are released. Kayrock exposes a generic + serialize/deserialize functionality, which means we do not have to write code + to handle specific versions of specific messages at a low level in KafkaEx. + + +2. Separation of connection state management and API logic + + As much as possible, we avoid putting API logic inside the client GenServer. + Instead, we write functions that form Kayrock request structs based on user + input, use `KafkaEx.New.Client.send_request/3` to perform communication with + the cluster, and then act accordingly based on the response. + +### Example: Fetching Kafka Config Values + +Suppose that we wanted to implement a function to retrieve Kafka broker +config settings. This is the DescribeConfig API and corresponds to the +`Kayrock.DescribeConfig` namespace. After some research (and trial-and-error +using iex and functions like the ones below), we find that to fetch broker +settings, we find that we need to set `resource_type` to 4 and +`resource_name` to the string representation of the broker's id number (e.g., +`1 => "1"`): + +``` +# List all config values for node 1 of the cluster +{:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + +# resource type 4 is 'broker' +req = %Kayrock.DescribeConfigs.V0.Request{ + resources: [%{resource_type: 4, resource_name: "1", config_names: nil}] +} + +KafkaEx.New.Client.send_request(pid, req, KafkaEx.New.NodeSelector.node_id(1)) + +{:ok, + %Kayrock.DescribeConfigs.V0.Response{ + correlation_id: 15, + resources: [ + %{ + config_entries: [ + %{ + config_name: "advertised.host.name", + config_value: nil, + is_default: 1, + is_sensitive: 0, + read_only: 1 + }, + %{ + config_name: "log.cleaner.min.compaction.lag.ms", + config_value: "0", + is_default: 1, + is_sensitive: 0, + read_only: 1 + }, + + ... + + ] + } + ] + }} +``` + +From the above, we could feasibly write a convenience function to fetch broker +configs and doing so would not require us to modify the client code at all. +Furthermore, as the DescribeConfigs API changes over time, we can easily support +it by changing only that function. The goal of the new API is that most simple +functionality can be implemented simply by writing such a function: + +``` +alias KafkaEx.New.Client +alias KafkaEx.New.NodeSelector + +def get_broker_config_values(client, config_names, broker_id) do + request = %Kayrock.DescribeConfigs.V0.Request{ + resources: [%{ + resource_type: 4, + resource_name: "#{broker_id}", + config_names: config_names + }] + } + + # note this requires no changes to the client itself + {:ok, %{ + resources: [ + %{ + error_code: 0, + config_entries: config_entries + } + ] + }} = Client.send_request(client, request, NodeSelector.node_id(broker_id)) + + config_entries +end +``` + +This implementation only supports version 0 of the DescribeConfigs API. We can +achieve some level of forward compatibility by adding an `opts` keyword list +and some code to handle `api_version` in the opts: + +``` +def get_broker_config_values(client, config_names, broker_id, opts \\ []) do + api_version = Keywork.get(opts, :api_version, 0) + # a setting in v1+ + include_synonyms = Keyword.get(opts, :include_synonyms, false) + + request = Kayrock.DescribeConfigs.get_request_struct(api_version) + + request = + if api_version > 1 do + %{request | include_synonyms: include_synonyms} + else + request + end + + # rest is the same +end +``` \ No newline at end of file