Skip to content

Commit

Permalink
Update kafka_protocol to get ready for kafka 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Jul 19, 2018
1 parent bb2d950 commit 7b78fc9
Show file tree
Hide file tree
Showing 58 changed files with 1,678 additions and 3,069 deletions.
9 changes: 3 additions & 6 deletions .travis.yml
Expand Up @@ -11,21 +11,18 @@ before_install:
- curl -L https://github.com/docker/compose/releases/download/1.6.0/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin/
- git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo cp rebar3 /usr/bin; cd ..
- git clone https://github.com/inaka/elvis.git; cd elvis; rebar3 escriptize; sudo cp _build/default/bin/elvis /usr/bin; cd ..

notifications:
email: false

otp_release:
- 20.0
- 21.0
- 20.3
- 19.3
- 18.2.1
- 17.5

script:
- set -e
- make vsn-check
- elvis rock

- echo "verifying rebar compilation"
Expand All @@ -43,5 +40,5 @@ script:
- make xref
- make test-env
- make t
- if [ "$(erl -noshell -eval 'io:format(erlang:system_info(otp_release)), halt(0)')" -gt 18 ]; then make dialyze; fi
- if [ "$(erl -noshell -eval 'io:format(erlang:system_info(otp_release)), halt(0)')" -eq 21 ]; then make dialyze; fi

27 changes: 9 additions & 18 deletions Makefile
@@ -1,22 +1,26 @@
PROJECT = brod
PROJECT_DESCRIPTION = Kafka client library in Erlang
PROJECT_VERSION = 3.5.2
PROJECT_VERSION = 3.6.0

DEPS = supervisor3 kafka_protocol
TEST_DEPS = docopt jsone meck proper
REL_DEPS = docopt jsone

ERLC_OPTS = -Werror +warn_unused_vars +warn_shadow_vars +warn_unused_import +warn_obsolete_guard +debug_info
TEST_ERLC_OPTS = -Werror +warn_unused_vars +warn_shadow_vars +warn_unused_import +warn_obsolete_guard +debug_info
COMMON_ERLC_OPTS = -Werror +warn_unused_vars +warn_shadow_vars +warn_unused_import +warn_obsolete_guard +debug_info -Dbuild_brod_cli
ERLC_OPTS = $(COMMON_ERLC_OPTS)
TEST_ERLC_OPTS = $(COMMON_ERLC_OPTS)


dep_supervisor3_commit = 1.1.5
dep_kafka_protocol_commit = 1.1.2
dep_kafka_protocol_commit = 2.0.0
dep_kafka_protocol = git https://github.com/klarna/kafka_protocol.git $(dep_kafka_protocol_commit)
dep_docopt = git https://github.com/zmstone/docopt-erl.git 0.1.3

ERTS_VSN = $(shell erl -noshell -eval 'io:put_chars(erlang:system_info(version)), halt()')
ESCRIPT_FILE = scripts/brod_cli
ESCRIPT_EMU_ARGS = -sname brod_cli

EDOC_OPTS = preprocess, {macros, [{build_brod_cli, true}]}
COVER = true

EUNIT_OPTS = verbose
Expand All @@ -25,19 +29,6 @@ CT_OPTS = -ct_use_short_names true

ERL_LIBS := $(ERL_LIBS):$(CURDIR)

ifeq ($(MAKECMDGOALS),)
export BROD_CLI=true
else ifneq ($(filter rel,$(MAKECMDGOALS)),)
export BROD_CLI=true
else ifneq ($(filter escript,$(MAKECMDGOALS)),)
export BROD_CLI=true
endif

ifeq ($(BROD_CLI),true)
ERLC_OPTS += -DBROD_CLI
TEST_ERLC_OPTS += -DBROD_CLI
endif

## Make app the default target
## To avoid building a release when brod is used as a erlang.mk project's dependency
app::
Expand All @@ -51,7 +42,7 @@ rel:: escript
@tar -pczf _rel/brod.tar.gz -C _rel brod

test-env:
./scripts/setup-test-env.sh
./scripts/setup-test-env.sh 1.1

t: eunit ct
./scripts/cover-summary.escript eunit.coverdata ct.coverdata
Expand Down
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -505,3 +505,8 @@ NOTE: This feature is designed for force overwriting commits, not for regular us
./scripts/brod commits -b localhost:9092 -i the-group-id -t topic-name -o "0:10000" --protocol range
```

## TODOs

* HTML tagged EDoc
* Support scram-sasl in brod-cli

13 changes: 13 additions & 0 deletions changelog.md
Expand Up @@ -89,3 +89,16 @@
* 3.5.2
* Fix issue #263: Kafka 0.11 may send empty batch in fetch response when messages are deleted in
compacted topics.
* 3.6.0
* Moved 3 modules to kafka_protocol:
- `brod_sock` -> `kpro_connection`
- `brod_auth_backed` -> `kpro_auth_backend`
- `brod_kafka_requests` -> `kpro_sent_reqs`
* `#kafka_message.key` and `#kafka_message.value` are now always `binary()`
(they were of spec `undefined | binary()` prior to this version).
i.e. empty bytes are now decoded as `<<>>` instead of `undefined`.
This may cause dialyzer check failures.
* `brod_client` no longer logs about metadata socket down, it had been confusing rather than being helpful
* There is no more cool-down delay for metadata socket re-establishment
* `brod_group_coordinator` default session timeout changed from 10 seconds to 30,
and heartbeat interval changed from 2 seconds to 5.
18 changes: 11 additions & 7 deletions docker/Dockerfile
@@ -1,17 +1,20 @@
FROM java:openjdk-8-jre

ENV SCALA_VERSION 2.11
ARG KAFKA_VERSION

# can also be 0.11.0.2 or 0.10.2.1
ENV KAFKA_VERSION 1.0.0
ENV KAFKA_DOWNLOAD_URL="https://apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
#ENV KAFKA_VERSION 0.9.0.0
#ENV KAFKA_DOWNLOAD_URL="https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
ENV KAFKA_VERSION ${KAFKA_VERSION}
ENV SCALA_VERSION 2.11

RUN apt-get update && \
apt-get install -y zookeeper wget supervisor dnsutils && \
rm -rf /var/lib/apt/lists/* && \
apt-get clean && \
if [[ "$KAFKA_VERSION" = 0.9* ]]; then \
DOWNLOAD_URL_PREFIX="https://archive.apache.org/dist/kafka"; \
else \
DOWNLOAD_URL_PREFIX="https://apache.org/dist/kafka/"; \
fi && \
KAFKA_DOWNLOAD_URL="$DOWNLOAD_URL_PREFIX/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" && \
wget -q "${KAFKA_DOWNLOAD_URL}" -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && \
rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
Expand All @@ -22,7 +25,8 @@ COPY docker-entrypoint.sh /docker-entrypoint.sh
COPY server.properties /etc/kafka/server.properties
COPY server.jks /etc/kafka/server.jks
COPY truststore.jks /etc/kafka/truststore.jks
COPY jaas.conf /etc/kafka/jaas.conf
COPY jaas-plain.conf /etc/kafka/jaas-plain.conf
COPY jaas-plain-scram.conf /etc/kafka/jaas-plain-scram.conf

ENTRYPOINT ["/docker-entrypoint.sh"]

Expand Down
2 changes: 2 additions & 0 deletions docker/docker-compose-basic.yml
Expand Up @@ -4,6 +4,8 @@ services:
kafka:
build:
context: .
args:
KAFKA_VERSION: "${KAFKA_VERSION}"
zookeeper:
image: docker_kafka
ports:
Expand Down
10 changes: 6 additions & 4 deletions docker/docker-compose-kafka-1.yml
Expand Up @@ -10,11 +10,13 @@ services:
container_name: kafka_1
ports:
- "9092:9092"
- "9192:9192"
- "9292:9292"
- "9093:9093"
- "9094:9094"
- "9095:9095"
environment:
BROKER_ID: 0
PLAINTEXT_PORT: 9092
SSL_PORT: 9192
SASL_SSL_PORT: 9292
SSL_PORT: 9093
SASL_SSL_PORT: 9094
SASL_PLAINTEXT_PORT: 9095

24 changes: 14 additions & 10 deletions docker/docker-compose-kafka-2.yml
Expand Up @@ -10,23 +10,27 @@ services:
container_name: kafka_1
ports:
- "9092:9092"
- "9192:9192"
- "9292:9292"
- "9093:9093"
- "9094:9094"
- "9095:9095"
environment:
BROKER_ID: 0
PLAINTEXT_PORT: 9092
SSL_PORT: 9192
SASL_SSL_PORT: 9292
SSL_PORT: 9093
SASL_SSL_PORT: 9094
SASL_PLAINTEXT_PORT: 9095
kafka_2:
image: docker_kafka
container_name: kafka_2
ports:
- "9093:9093"
- "9193:9193"
- "9293:9293"
- "9192:9092"
- "9193:9093"
- "9194:9094"
- "9195:9095"
environment:
BROKER_ID: 1
PLAINTEXT_PORT: 9093
SSL_PORT: 9193
SASL_SSL_PORT: 9293
PLAINTEXT_PORT: 9092
SSL_PORT: 9093
SASL_SSL_PORT: 9094
SASL_PLAINTEXT_PORT: 9095

17 changes: 10 additions & 7 deletions docker/docker-entrypoint.sh
Expand Up @@ -7,8 +7,7 @@ fi

## run zookeeper
if [ "$2" = "zookeeper" ]; then
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
exit $?
exec /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
fi

if [ "$2" != "kafka" ]; then
Expand All @@ -32,19 +31,23 @@ if [[ "$KAFKA_VERSION" = 0.9* ]]; then
sed -r -i "s/^(advertised.listeners)=(.*)/\1=PLAINTEXT:\/\/$ipaddress:$PLAINTEXT_PORT,SSL:\/\/$ipaddress:$SSL_PORT/g" $prop_file
sed -r -i "s/^(listeners)=(.*)/\1=PLAINTEXT:\/\/:$PLAINTEXT_PORT,SSL:\/\/:$SSL_PORT/g" $prop_file
else
sed -r -i "s/^(advertised.listeners)=(.*)/\1=PLAINTEXT:\/\/$ipaddress:$PLAINTEXT_PORT,SSL:\/\/$ipaddress:$SSL_PORT,SASL_SSL:\/\/$ipaddress:$SASL_SSL_PORT/g" $prop_file
sed -r -i "s/^(listeners)=(.*)/\1=PLAINTEXT:\/\/:$PLAINTEXT_PORT,SSL:\/\/:$SSL_PORT,SASL_SSL:\/\/:$SASL_SSL_PORT/g" $prop_file
sed -r -i "s/^(advertised.listeners)=(.*)/\1=PLAINTEXT:\/\/$ipaddress:$PLAINTEXT_PORT,SSL:\/\/$ipaddress:$SSL_PORT,SASL_SSL:\/\/$ipaddress:$SASL_SSL_PORT,SASL_PLAINTEXT:\/\/$ipaddress:$SASL_PLAINTEXT_PORT/g" $prop_file
sed -r -i "s/^(listeners)=(.*)/\1=PLAINTEXT:\/\/:$PLAINTEXT_PORT,SSL:\/\/:$SSL_PORT,SASL_SSL:\/\/:$SASL_SSL_PORT,SASL_PLAINTEXT:\/\/:$SASL_PLAINTEXT_PORT/g" $prop_file
echo "sasl.enabled.mechanisms=PLAIN" >> $prop_file
fi

echo "sasl.enabled.mechanisms=PLAIN" >> $prop_file
echo "sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512" >> $prop_file
echo "offsets.topic.replication.factor=1" >> $prop_file
echo "transaction.state.log.min.isr=1" >> $prop_file
echo "transaction.state.log.replication.factor=1" >> $prop_file

if [[ "$KAFKA_VERSION" = 0.9* ]]; then
JAAS_CONF=""
elif [[ "$KAFKA_VERSION" = 0.10* ]]; then
JAAS_CONF="-Djava.security.auth.login.config=/etc/kafka/jaas-plain.conf"
else
JAAS_CONF="-Djava.security.auth.login.config=/etc/kafka/jaas.conf"
JAAS_CONF="-Djava.security.auth.login.config=/etc/kafka/jaas-plain-scram.conf"
fi

#-Djavax.net.debug=all
KAFKA_HEAP_OPTS="-Xmx1G -Xms1G $JAAS_CONF" /opt/kafka/bin/kafka-server-start.sh $prop_file

9 changes: 9 additions & 0 deletions docker/jaas-plain-scram.conf
@@ -0,0 +1,9 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_admin="nimda"
user_alice="ecila";

org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="nimda";
};
4 changes: 2 additions & 2 deletions docker/jaas.conf → docker/jaas-plain.conf
@@ -1,5 +1,5 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_admin="admin-secret"
user_alice="alice-secret";
user_admin="nimda"
user_alice="ecila";
};
4 changes: 3 additions & 1 deletion elvis.config
Expand Up @@ -30,6 +30,8 @@
#{ level => 3,
ignore => [ brod_group_coordinator
, brod_utils
, brod_cli
, brod_cli_pipe
]
}}
, {elvis_style, god_modules,
Expand All @@ -38,7 +40,7 @@
}}
, {elvis_style, no_nested_try_catch}
, {elvis_style, invalid_dynamic_call,
#{ignore => [brod_sock, brod_group_coordinator]
#{ignore => [brod_group_coordinator]
}}
, {elvis_style, used_ignored_variable}
, {elvis_style, no_behavior_info}
Expand Down
8 changes: 0 additions & 8 deletions erlang.mk
Expand Up @@ -2171,14 +2171,6 @@ pkg_kafka_fetch = git
pkg_kafka_repo = https://github.com/wooga/kafka-erlang
pkg_kafka_commit = master

PACKAGES += kafka_protocol
pkg_kafka_protocol_name = kafka_protocol
pkg_kafka_protocol_description = Kafka protocol Erlang library
pkg_kafka_protocol_homepage = https://github.com/klarna/kafka_protocol
pkg_kafka_protocol_fetch = git
pkg_kafka_protocol_repo = https://github.com/klarna/kafka_protocol.git
pkg_kafka_protocol_commit = master

PACKAGES += kai
pkg_kai_name = kai
pkg_kai_description = DHT storage by Takeshi Inoue
Expand Down
8 changes: 4 additions & 4 deletions include/brod.hrl
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2014-2017, Klarna AB
%%% Copyright (c) 2014-2018, Klarna Bank AB (publ)
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,15 +28,15 @@
, partition :: brod:partition()
, high_wm_offset :: integer() %% max offset of the partition
, messages :: [brod:message()] %% exposed to brod user
| kpro:incomplete_message() %% this union member
%% is internal only
| kpro:incomplete_batch() %% this union member
%% is internal only
}).

-record(kafka_fetch_error,
{ topic :: brod:topic()
, partition :: brod:partition()
, error_code :: brod:error_code()
, error_desc :: binary()
, error_desc = ""
}).

-record(brod_call_ref, { caller :: pid()
Expand Down
4 changes: 2 additions & 2 deletions include/brod_int.hrl
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2014-2017, Klarna AB
%%% Copyright (c) 2014-2018, Klarna Bank AB (publ)
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,7 @@
| ?undef.

%% Is kafka error code
-define(IS_ERROR(EC), kpro_error_code:is_error(EC)).
-define(IS_ERROR(EC), ((EC) =/= ?no_error)).

-define(KV(Key, Value), {Key, Value}).
-define(TKV(Ts, Key, Value), {Ts, Key, Value}).
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
@@ -1,4 +1,4 @@
{deps, [ {supervisor3, "1.1.5"}
, {kafka_protocol, "1.1.2"}
, {kafka_protocol, "2.0.0"}
]}.
{erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import,warn_obsolete_guard,debug_info]}.
29 changes: 16 additions & 13 deletions rebar.config.script
@@ -1,17 +1,20 @@
IsRebar3 = erlang:function_exported(rebar3, main, 1),

case IsRebar3 of
true ->
CONFIG;
false ->
Rebar3Deps = proplists:get_value(deps, CONFIG),
Rebar2Deps = [
{supervisor3, ".*",
{git, "https://github.com/klarna/supervisor3",
{tag, proplists:get_value(supervisor3, Rebar3Deps)}}},
{kafka_protocol, ".*",
{git, "https://github.com/klarna/kafka_protocol.git",
{tag, proplists:get_value(kafka_protocol, Rebar3Deps)}}}
],
lists:keyreplace(deps, 1, CONFIG, {deps, Rebar2Deps})
true ->
CONFIG;
false ->
URLs = [ {supervisor3, "https://github.com/klarna/supervisor3.git"}
, {kafka_protocol, "https://github.com/klarna/kafka_protocol.git"}
],
Rebar3Deps = proplists:get_value(deps, CONFIG),
Rebar2Deps =
lists:map(
fun({Name, URL}) ->
case proplists:get_value(Name, Rebar3Deps) of
{git, _, _} = Git -> {Name, ".*", Git};
Vsn -> {Name, ".*", {git, URL, {tag, Vsn}}}
end
end, URLs),
lists:keyreplace(deps, 1, CONFIG, {deps, Rebar2Deps})
end.

0 comments on commit 7b78fc9

Please sign in to comment.