Skip to content

Commit

Permalink
Merge c1aa956 into 43a6bd5
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Dec 13, 2018
2 parents 43a6bd5 + c1aa956 commit 999f22e
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 19 deletions.
6 changes: 3 additions & 3 deletions Makefile
@@ -1,14 +1,14 @@
KAFKA_VERSION ?= 1.1
PROJECT = brod
PROJECT_DESCRIPTION = Kafka client library in Erlang
PROJECT_VERSION = 3.7.2
PROJECT_VERSION = 3.7.3

DEPS = supervisor3 kafka_protocol

ERLC_OPTS = -Werror +warn_unused_vars +warn_shadow_vars +warn_unused_import +warn_obsolete_guard +debug_info -Dbuild_brod_cli

dep_supervisor3_commit = 1.1.7
dep_kafka_protocol_commit = 2.2.2
dep_supervisor3_commit = 1.1.8
dep_kafka_protocol_commit = 2.2.3
dep_kafka_protocol = git https://github.com/klarna/kafka_protocol.git $(dep_kafka_protocol_commit)

EDOC_OPTS = preprocess, {macros, [{build_brod_cli, true}]}
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Expand Up @@ -135,3 +135,4 @@
* Pr #299: Fix topic subscriber returning partition offsets from callback module's init.
* 3.7.3
* Bump kafka_protocol version to 2.2.3
* Discard stale async-ack messages to group subscriber
2 changes: 1 addition & 1 deletion rebar.config
@@ -1,4 +1,4 @@
{deps, [ {supervisor3, "1.1.7"}
{deps, [ {supervisor3, "1.1.8"}
, {kafka_protocol, "2.2.3"}
]}.
{edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}.
Expand Down
12 changes: 7 additions & 5 deletions src/brod_group_subscriber.erl
Expand Up @@ -552,16 +552,18 @@ handle_ack(AckRef, #state{ generationId = GenerationId
, coordinator = Coordinator
} = State, CommitNow) ->
{Topic, Partition, Offset} = AckRef,
Consumer = get_consumer({Topic, Partition}, Consumers),
#consumer{consumer_pid = ConsumerPid} = Consumer,
ok = consume_ack(ConsumerPid, Offset),
case CommitNow of
true ->
case get_consumer({Topic, Partition}, Consumers) of
#consumer{consumer_pid = ConsumerPid} = Consumer when CommitNow ->
ok = consume_ack(ConsumerPid, Offset),
ok = do_commit_ack(Coordinator, GenerationId, Topic, Partition, Offset),
NewConsumer = Consumer#consumer{acked_offset = Offset},
NewConsumers = put_consumer(NewConsumer, Consumers),
State#state{consumers = NewConsumers};
#consumer{consumer_pid = ConsumerPid} ->
ok = consume_ack(ConsumerPid, Offset),
State;
false ->
%% Stale async-ack, discard.
State
end.

Expand Down
20 changes: 10 additions & 10 deletions test/brod_topic_subscriber_SUITE.erl
Expand Up @@ -198,42 +198,42 @@ t_async_acks(Config) when is_list(Config) ->
ok.

t_begin_offset(Config) when is_list(Config) ->
MaxSeqNo = 100,
ConsumerConfig = [ {prefetch_count, MaxSeqNo}
ConsumerConfig = [ {prefetch_count, 100}
, {prefetch_bytes, 0} %% as discard
, {sleep_timeout, 0}
, {max_wait_time, 1000}
],
CaseRef = t_begin_offset,
CasePid = self(),
Partition = 0,
{ok, StartPartitionOffset} = brod:resolve_offset(?BOOTSTRAP_HOSTS, ?TOPIC, Partition, latest),
SendFun =
fun(I) ->
Value = integer_to_binary(I),
ok = brod:produce_sync(?CLIENT_ID, ?TOPIC, Partition, <<>>, Value)
{ok, Offset} = brod:produce_sync_offset(?CLIENT_ID, ?TOPIC, Partition, <<>>, Value),
Offset
end,
RecvFun =
fun F(Pid, Timeout, Acc) ->
receive
{CaseRef, Partition, Offset, Value} ->
ok = brod_topic_subscriber:ack(Pid, Partition, Offset),
I = binary_to_integer(Value),
F(Pid, 0, [I | Acc]);
F(Pid, 0, [{Offset, I} | Acc]);
Msg ->
erlang:error({unexpected_msg, Msg})
after Timeout ->
Acc
end
end,
ok = SendFun(111),
ok = SendFun(222),
ok = SendFun(333),
InitArgs = {CaseRef, CasePid, _IsAsyncAck = true, _ConsumerOffsets = [{0, StartPartitionOffset + 1}]},
_Offset0 = SendFun(111),
Offset1 = SendFun(222),
Offset2 = SendFun(333),
%% Start as if committed Offset1, expect it to start fetching from Offset2
InitArgs = {CaseRef, CasePid, _IsAsyncAck = true, _ConsumerOffsets = [{0, Offset1}]},
{ok, SubscriberPid} =
brod:start_link_topic_subscriber(?CLIENT_ID, ?TOPIC, ConsumerConfig,
?MODULE, InitArgs),
[333] = RecvFun(SubscriberPid, 5000, []),
?assertEqual([{Offset2, 333}], RecvFun(SubscriberPid, 5000, [])),
ok = brod_topic_subscriber:stop(SubscriberPid),
ok.

Expand Down

0 comments on commit 999f22e

Please sign in to comment.