Skip to content

Commit

Permalink
Merge pull request #326 from klarna/compare-begin-offset-with-last-st…
Browse files Browse the repository at this point in the history
…able-offset

brod_consumer should compare begin_offset to last stable offset
  • Loading branch information
zmstone committed Jun 5, 2019
2 parents 01633d0 + f09eb58 commit 5e218d8
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Makefile
@@ -1,7 +1,7 @@
KAFKA_VERSION ?= 1.1
PROJECT = brod
PROJECT_DESCRIPTION = Kafka client library in Erlang
PROJECT_VERSION = 3.7.9
PROJECT_VERSION = 3.7.10

DEPS = supervisor3 kafka_protocol

Expand Down
6 changes: 6 additions & 0 deletions changelog.md
Expand Up @@ -157,4 +157,10 @@
* Fix brod-cli sub-record formatting crash
* Upgrade to kafka_protocol 2.2.8 to discard replica_not_available error code in metadata response
* Fix empty responses field in fetch response #323
* 3.7.10
* Compare begin_offset with last stable offset before advancing to next offset in case empty
batch is received. Prior to this version, fetch attempts on unstable messages (messages
belong to open transactions (transactions which are neigher committed nor aborted),
may result in an empty message set, then `brod_consumer` or `brod_utils:fetch` jumps to
the next offset (if it is less than high-watermark offset).

2 changes: 1 addition & 1 deletion src/brod.app.src
@@ -1,7 +1,7 @@
%% -*- mode:erlang -*-
{application,brod,
[{description,"Apache Kafka Erlang client library"},
{vsn,"3.7.9"},
{vsn,"3.7.10"},
{registered,[]},
{applications,[kernel,stdlib,kafka_protocol,supervisor3]},
{env,[]},
Expand Down
16 changes: 8 additions & 8 deletions src/brod_consumer.erl
Expand Up @@ -376,18 +376,19 @@ handle_batches(_Header, ?incomplete_batch(Size),
State = maybe_send_fetch_request(State1),
{noreply, State};
handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) ->
HighWmOffset = kpro:find(high_watermark, Header),
StableOffset = brod_utils:get_stable_offset(Header),
State =
case BeginOffset < HighWmOffset of
case BeginOffset < StableOffset of
true ->
%% There are chances that kafka may return empty message set
%% when messages are delete from a compacted topic.
%% when messages are deleted from a compacted topic.
%% Since there is no way to know how big the 'hole' is
%% we can only bump begin_offset with +1 and try again.
State1 = State0#state{begin_offset = BeginOffset + 1},
maybe_send_fetch_request(State1);
false ->
%% we have reached the end of partition
%% we have either reached the end of a partition
%% or trying to read uncommitted messages
%% try to poll again (maybe after a delay)
maybe_delay_fetch_request(State0)
end,
Expand All @@ -399,20 +400,19 @@ handle_batches(Header, Batches,
, topic = Topic
, partition = Partition
} = State0) ->
HighWmOffset = kpro:find(high_watermark, Header),
%% for API version 4 or higher, use last_stable_offset
LatestOffset = kpro:find(last_stable_offset, Header, HighWmOffset),
StableOffset = brod_utils:get_stable_offset(Header),
{NewBeginOffset, Messages} =
brod_utils:flatten_batches(BeginOffset, Header, Batches),
State1 = State0#state{begin_offset = NewBeginOffset},
State =
case Messages =:= [] of
true ->
%% All messages are before requested offset, hence dropped
State1;
false ->
MsgSet = #kafka_message_set{ topic = Topic
, partition = Partition
, high_wm_offset = LatestOffset
, high_wm_offset = StableOffset
, messages = Messages
},
ok = cast_to_subscriber(Subscriber, MsgSet),
Expand Down
19 changes: 14 additions & 5 deletions src/brod_utils.erl
Expand Up @@ -31,6 +31,7 @@
, get_metadata/1
, get_metadata/2
, get_metadata/3
, get_stable_offset/1
, group_per_key/1
, group_per_key/2
, init_sasl_opt/1
Expand Down Expand Up @@ -322,18 +323,16 @@ fetch(Conn, ReqFun, Offset, MaxBytes) ->
{ok, #{batches := ?incomplete_batch(Size)}} ->
fetch(Conn, ReqFun, Offset, Size);
{ok, #{header := Header, batches := Batches}} ->
HighWm = kpro:find(high_watermark, Header),
%% for API version 4 or higher, use last_stable_offset
LatestOffset = kpro:find(last_stable_offset, Header, HighWm),
StableOffset = get_stable_offset(Header),
{NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches),
case Offset < LatestOffset andalso Msgs =:= [] of
case Offset < StableOffset andalso Msgs =:= [] of
true ->
%% Not reached the latest stable offset yet,
%% but received an empty batch-set (all messages are dropped).
%% try again with new begin-offset
fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
false ->
{ok, {LatestOffset, Msgs}}
{ok, {StableOffset, Msgs}}
end;
{error, Reason} ->
{error, Reason}
Expand Down Expand Up @@ -467,6 +466,16 @@ make_batch_input(Key, Value) ->
false -> [unify_msg(make_msg_input(Key, Value))]
end.

%% @doc last_stable_offset is added in fetch response version 4
%% This function takes high watermark offset as last_stable_offset
%% in case it's missing.
%% Offsets are considered 'unstable' if they belong to open transactions
get_stable_offset(Header) ->
HighWmOffset = kpro:find(high_watermark, Header),
StableOffset = kpro:find(last_stable_offset, Header, HighWmOffset),
StableOffset > HighWmOffset andalso error(unexpected_last_stable_offset),
StableOffset.

%%%_* Internal functions =======================================================

drop_aborted(#{aborted_transactions := AbortedL}, Batches) ->
Expand Down
59 changes: 59 additions & 0 deletions test/brod_consumer_SUITE.erl
Expand Up @@ -28,6 +28,7 @@

%% Test cases
-export([ t_drop_aborted/1
, t_wait_for_unstable_offsets/1
, t_fetch_aborted_from_the_middle/1
, t_direct_fetch/1
, t_direct_fetch_with_small_max_bytes/1
Expand Down Expand Up @@ -199,6 +200,64 @@ test_drop_aborted(Config, QueryApiVsn) ->
], Msgs)
end.

t_wait_for_unstable_offsets(Config) when is_list(Config) ->
case has_txn() of
true -> t_wait_for_unstable_offsets({run, Config});
false -> ok
end;
t_wait_for_unstable_offsets({run, Config}) ->
Client = ?config(client),
Topic = ?TOPIC,
Partition = 0,
TxnId = make_transactional_id(),
{ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)),
%% ensure we have enough time to test before expire
TxnOpts = #{txn_timeout => timer:seconds(30)},
{ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId, TxnOpts),
ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]),
%% Send one message in this transaction, return the offset in kafka
ProduceFun =
fun(Seqno, Msg) ->
Vsn = 3, %% lowest API version which supports transactional produce
Opts = #{txn_ctx => TxnCtx, first_sequence => Seqno},
Batch = [#{value => Msg}],
ProduceReq = kpro_req_lib:produce(Vsn, Topic, Partition, Batch, Opts),
{ok, LeaderConn} =
brod_client:get_leader_connection(Client, Topic, Partition),
{ok, ProduceRsp} = kpro:request_sync(LeaderConn, ProduceReq, 5000),
{ok, Offset} = brod_utils:parse_rsp(ProduceRsp),
Offset
end,
Seqnos = lists:seq(0, 100),
Msgs = [{Seqno, iolist_to_binary(make_ts_str())} || Seqno <- Seqnos],
Offsets = [{Seqno, ProduceFun(Seqno, Msg)} || {Seqno, Msg} <- Msgs],
{_, BaseOffset} = hd(Offsets),
{_, LastOffset} = lists:last(Offsets),
Cfg = ?config(client_config),
Fetch = fun(O) ->
{ok, {StableOffset, MsgL}} =
brod:fetch({?HOSTS, Cfg}, Topic, Partition, O,
#{max_wait_time => 100}),
{StableOffset, MsgL}
end,
%% Transaction is not committed yet, fetch should not see anything
?assertMatch({BaseOffset, []}, Fetch(BaseOffset)),
?assertMatch({BaseOffset, []}, Fetch(BaseOffset + 1)),
?assertMatch({BaseOffset, []}, Fetch(LastOffset)),
?assertMatch({BaseOffset, []}, Fetch(LastOffset + 1)),
ok = kpro:txn_commit(TxnCtx),
ok = kpro:close_connection(Conn),
%% A commit batch is appended behind last message
%% commit batch is empty but takes one offset, hence + 2
StableOffset = LastOffset + 2,
{FetchedStableOffset, [FetchedFirstMsg | _]} = Fetch(BaseOffset),
{_, ExpectedMsg} = hd(Msgs),
?assertMatch(#kafka_message{value = ExpectedMsg}, FetchedFirstMsg),
?assertEqual(StableOffset, FetchedStableOffset),
?assertMatch({StableOffset, [_ | _]}, Fetch(BaseOffset + 1)),
?assertMatch({StableOffset, []}, Fetch(StableOffset)),
ok.

%% Produce large(-ish) transactional batches, then abort them all
%% try fetch from offsets in the middle of large batches,
%% expect no delivery of any aborted batches.
Expand Down

0 comments on commit 5e218d8

Please sign in to comment.