From 187596e24ad21d85faa72650c1d24b16b6725d8f Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 Dec 2018 17:35:59 +0100 Subject: [PATCH 1/2] Discard stale ack messages in group subscriber Group subscriber should not crash when receiving stale ack messages --- Makefile | 6 +++--- changelog.md | 1 + rebar.config | 2 +- src/brod_group_subscriber.erl | 12 +++++++----- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 57a803a1..d10ea95c 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,14 @@ KAFKA_VERSION ?= 1.1 PROJECT = brod PROJECT_DESCRIPTION = Kafka client library in Erlang -PROJECT_VERSION = 3.7.2 +PROJECT_VERSION = 3.7.3 DEPS = supervisor3 kafka_protocol ERLC_OPTS = -Werror +warn_unused_vars +warn_shadow_vars +warn_unused_import +warn_obsolete_guard +debug_info -Dbuild_brod_cli -dep_supervisor3_commit = 1.1.7 -dep_kafka_protocol_commit = 2.2.2 +dep_supervisor3_commit = 1.1.8 +dep_kafka_protocol_commit = 2.2.3 dep_kafka_protocol = git https://github.com/klarna/kafka_protocol.git $(dep_kafka_protocol_commit) EDOC_OPTS = preprocess, {macros, [{build_brod_cli, true}]} diff --git a/changelog.md b/changelog.md index b3825b6b..21c9792d 100644 --- a/changelog.md +++ b/changelog.md @@ -135,3 +135,4 @@ * Pr #299: Fix topic subscriber returning partition offsets from callback module's init. * 3.7.3 * Bump kafka_protocol version to 2.2.3 + * Discard stale async-ack messages to group subscriber diff --git a/rebar.config b/rebar.config index 3fe623ac..7d928e5a 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ -{deps, [ {supervisor3, "1.1.7"} +{deps, [ {supervisor3, "1.1.8"} , {kafka_protocol, "2.2.3"} ]}. {edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}. diff --git a/src/brod_group_subscriber.erl b/src/brod_group_subscriber.erl index 2b9438e5..a096440a 100644 --- a/src/brod_group_subscriber.erl +++ b/src/brod_group_subscriber.erl @@ -552,16 +552,18 @@ handle_ack(AckRef, #state{ generationId = GenerationId , coordinator = Coordinator } = State, CommitNow) -> {Topic, Partition, Offset} = AckRef, - Consumer = get_consumer({Topic, Partition}, Consumers), - #consumer{consumer_pid = ConsumerPid} = Consumer, - ok = consume_ack(ConsumerPid, Offset), - case CommitNow of - true -> + case get_consumer({Topic, Partition}, Consumers) of + #consumer{consumer_pid = ConsumerPid} = Consumer when CommitNow -> + ok = consume_ack(ConsumerPid, Offset), ok = do_commit_ack(Coordinator, GenerationId, Topic, Partition, Offset), NewConsumer = Consumer#consumer{acked_offset = Offset}, NewConsumers = put_consumer(NewConsumer, Consumers), State#state{consumers = NewConsumers}; + #consumer{consumer_pid = ConsumerPid} -> + ok = consume_ack(ConsumerPid, Offset), + State; false -> + %% Stale async-ack, discard. State end. From c1aa95695cfbc1d08280b3a9fc71b2c343c1b1eb Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 Dec 2018 18:38:59 +0100 Subject: [PATCH 2/2] Improve test stability --- test/brod_topic_subscriber_SUITE.erl | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 7b4c00f2..6f480ce7 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -198,8 +198,7 @@ t_async_acks(Config) when is_list(Config) -> ok. t_begin_offset(Config) when is_list(Config) -> - MaxSeqNo = 100, - ConsumerConfig = [ {prefetch_count, MaxSeqNo} + ConsumerConfig = [ {prefetch_count, 100} , {prefetch_bytes, 0} %% as discard , {sleep_timeout, 0} , {max_wait_time, 1000} @@ -207,11 +206,11 @@ t_begin_offset(Config) when is_list(Config) -> CaseRef = t_begin_offset, CasePid = self(), Partition = 0, - {ok, StartPartitionOffset} = brod:resolve_offset(?BOOTSTRAP_HOSTS, ?TOPIC, Partition, latest), SendFun = fun(I) -> Value = integer_to_binary(I), - ok = brod:produce_sync(?CLIENT_ID, ?TOPIC, Partition, <<>>, Value) + {ok, Offset} = brod:produce_sync_offset(?CLIENT_ID, ?TOPIC, Partition, <<>>, Value), + Offset end, RecvFun = fun F(Pid, Timeout, Acc) -> @@ -219,21 +218,22 @@ t_begin_offset(Config) when is_list(Config) -> {CaseRef, Partition, Offset, Value} -> ok = brod_topic_subscriber:ack(Pid, Partition, Offset), I = binary_to_integer(Value), - F(Pid, 0, [I | Acc]); + F(Pid, 0, [{Offset, I} | Acc]); Msg -> erlang:error({unexpected_msg, Msg}) after Timeout -> Acc end end, - ok = SendFun(111), - ok = SendFun(222), - ok = SendFun(333), - InitArgs = {CaseRef, CasePid, _IsAsyncAck = true, _ConsumerOffsets = [{0, StartPartitionOffset + 1}]}, + _Offset0 = SendFun(111), + Offset1 = SendFun(222), + Offset2 = SendFun(333), + %% Start as if committed Offset1, expect it to start fetching from Offset2 + InitArgs = {CaseRef, CasePid, _IsAsyncAck = true, _ConsumerOffsets = [{0, Offset1}]}, {ok, SubscriberPid} = brod:start_link_topic_subscriber(?CLIENT_ID, ?TOPIC, ConsumerConfig, ?MODULE, InitArgs), - [333] = RecvFun(SubscriberPid, 5000, []), + ?assertEqual([{Offset2, 333}], RecvFun(SubscriberPid, 5000, [])), ok = brod_topic_subscriber:stop(SubscriberPid), ok.