Skip to content

Commit

Permalink
Merge pull request #316 from klarna/drop-aborted-txn-msgs
Browse files Browse the repository at this point in the history
Drop batches in aborted transactions
  • Loading branch information
zmstone committed Apr 25, 2019
2 parents 037c4ac + 1909ec2 commit e2c4133
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 70 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
KAFKA_VERSION ?= 1.1
PROJECT = brod
PROJECT_DESCRIPTION = Kafka client library in Erlang
PROJECT_VERSION = 3.7.7
PROJECT_VERSION = 3.7.8

DEPS = supervisor3 kafka_protocol

Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,7 @@
* 3.7.7
* Fix `badrecord` race: message-set is delivered to `brod_group_subscriber` after
unsubscribed from `brod_consumer`.
* 3.7.8
* Drop batches in aborted transactions (and all control batches)
also improve offset fast-forwarding when empty batches are received

8 changes: 8 additions & 0 deletions elvis.config
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@
}}
, {elvis_style, no_debug_call}
]
},
#{ dirs => ["test"]
, filter => "*.erl"
, rules => [ {elvis_style, line_length,
#{ limit => 80
, skip_comments => false
}}
]
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion src/brod.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode:erlang -*-
{application,brod,
[{description,"Apache Kafka Erlang client library"},
{vsn,"3.7.7"},
{vsn,"3.7.8"},
{registered,[]},
{applications,[kernel,stdlib,kafka_protocol,supervisor3]},
{env,[]},
Expand Down
46 changes: 21 additions & 25 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -397,31 +397,27 @@ handle_batches(Header, Batches,
, partition = Partition
} = State0) ->
HighWmOffset = kpro:find(high_watermark, Header),
State1 = case brod_utils:flatten_batches(BeginOffset, Batches) of
[] ->
%% flatten_batches might remove all actual messages
%% (if they were all before BeginOffset), leaving us
%% with nothing in this batch. Since there is no way
%% to know how big the 'hole' is we can only bump
%% begin_offset with +1 and try again.
State0#state{ begin_offset = BeginOffset + 1 };
Messages ->
MsgSet = #kafka_message_set{ topic = Topic
, partition = Partition
, high_wm_offset = HighWmOffset
, messages = Messages
},
ok = cast_to_subscriber(Subscriber, MsgSet),
NewPendingAcks = add_pending_acks(PendingAcks, Messages),
{value, ?PENDING(LastOffset, _LastMsgSize)} =
queue:peek_r(NewPendingAcks#pending_acks.queue),
State2 = State0#state{ pending_acks = NewPendingAcks
, begin_offset = LastOffset + 1
},
maybe_shrink_max_bytes(State2, MsgSet#kafka_message_set.messages)
end,
State = maybe_send_fetch_request(State1),
{noreply, State}.
%% for API version 4 or higher, use last_stable_offset
LatestOffset = kpro:find(last_stable_offset, Header, HighWmOffset),
{NewBeginOffset, Messages} =
brod_utils:flatten_batches(BeginOffset, Header, Batches),
State1 = State0#state{begin_offset = NewBeginOffset},
State =
case Messages =:= [] of
true ->
State1;
false ->
MsgSet = #kafka_message_set{ topic = Topic
, partition = Partition
, high_wm_offset = LatestOffset
, messages = Messages
},
ok = cast_to_subscriber(Subscriber, MsgSet),
NewPendingAcks = add_pending_acks(PendingAcks, Messages),
State2 = State1#state{pending_acks = NewPendingAcks},
maybe_shrink_max_bytes(State2, MsgSet#kafka_message_set.messages)
end,
{noreply, maybe_send_fetch_request(State)}.

%% Add received offsets to pending queue.
add_pending_acks(PendingAcks, Messages) ->
Expand Down
89 changes: 77 additions & 12 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
, fetch/5
, fetch_committed_offsets/3
, fetch_committed_offsets/4
, flatten_batches/2
, flatten_batches/3
, get_metadata/1
, get_metadata/2
, get_metadata/3
Expand Down Expand Up @@ -173,10 +173,35 @@ assert_topic(Topic) ->
ok_when(is_binary(Topic) andalso size(Topic) > 0, {bad_topic, Topic}).

%% @doc Make a flat message list from decoded batch list.
-spec flatten_batches(offset(), [kpro:batch()]) -> [kpro:message()].
flatten_batches(BeginOffset, Batches) ->
MsgList = lists:append([Msgs || {_Meta, Msgs} <- Batches]),
drop_old_messages(BeginOffset, MsgList).
%% Return the next beging-offset together with the messages.
-spec flatten_batches(offset(), map(), [kpro:batch()]) ->
{offset(), [kpro:message()]}.
flatten_batches(BeginOffset, _, []) ->
%% empty batch implies we have reached the end of a partition,
%% we do not want to advance begin-offset here,
%% instead, we should try again (after a delay) with the same offset
{BeginOffset, []};
flatten_batches(BeginOffset, Header, Batches0) ->
{LastMeta, _} = lists:last(Batches0),
Batches = drop_aborted(Header, Batches0),
MsgList = lists:append([Msgs || {Meta, Msgs} <- Batches,
not is_control(Meta)]),
case LastMeta of
#{last_offset := LastOffset} ->
%% For magic v2 messages, there is information about last
%% offset of a given batch in its metadata.
%% Make use of this information to fast-forward to the next
%% batch's base offset.
{LastOffset + 1, drop_old_messages(BeginOffset, MsgList)};
_ when MsgList =/= [] ->
%% This is an old version (magic v0 or v1) message set.
%% Use OffsetOfLastMessage + 1 as begin_offset in the next fetch request
#kafka_message{offset = Offset} = lists:last(MsgList),
{Offset + 1, drop_old_messages(BeginOffset, MsgList)};
_ ->
%% Not much info about offsets, give it a try at the very next offset.
{BeginOffset + 1, []}
end.

%% @doc Fetch a single message set from the given topic-partition.
-spec fetch(connection() | {[endpoint()], conn_config()},
Expand Down Expand Up @@ -236,7 +261,7 @@ fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId, Topics) ->
kpro:connect_coordinator(BootstrapEndpoints, ConnCfg, Args),
fun(Pid) -> do_fetch_committed_offsets(Pid, GroupId, Topics) end).

%% @doc Fetch commited offsts for the given topics in a consumer group.
%% @doc Fetch commited offsets for the given topics in a consumer group.
%% 1. Get broker endpoint by calling
%% `brod_client:get_group_coordinator'
%% 2. Establish a connecton to the discovered endpoint.
Expand Down Expand Up @@ -296,12 +321,19 @@ fetch(Conn, ReqFun, Offset, MaxBytes) ->
{error, ErrorCode};
{ok, #{batches := ?incomplete_batch(Size)}} ->
fetch(Conn, ReqFun, Offset, Size);
{ok, #{header := Header, batches := Batches0}} ->
HighWmOffset = kpro:find(high_watermark, Header),
Batches = flatten_batches(Offset, Batches0),
case Offset < HighWmOffset andalso Batches =:= [] of
true -> fetch(Conn, ReqFun, Offset + 1, MaxBytes);
false -> {ok, {HighWmOffset, Batches}}
{ok, #{header := Header, batches := Batches}} ->
HighWm = kpro:find(high_watermark, Header),
%% for API version 4 or higher, use last_stable_offset
LatestOffset = kpro:find(last_stable_offset, Header, HighWm),
{NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches),
case Offset < LatestOffset andalso Msgs =:= [] of
true ->
%% Not reached the latest stable offset yet,
%% but received an empty batch-set (all messages are dropped).
%% try again with new begin-offset
fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
false ->
{ok, {LatestOffset, Msgs}}
end;
{error, Reason} ->
{error, Reason}
Expand Down Expand Up @@ -437,6 +469,39 @@ make_batch_input(Key, Value) ->

%%%_* Internal functions =======================================================

drop_aborted(#{aborted_transactions := AbortedL}, Batches) ->
%% Drop batches for each abored transaction
lists:foldl(
fun(#{producer_id := ProducerId, first_offset := FirstOffset}, BatchesIn) ->
do_drop_aborted(ProducerId, FirstOffset, BatchesIn, [])
end, Batches, AbortedL);
drop_aborted(_, Batches) ->
%% old version, no aborted_transactions field
Batches.

do_drop_aborted(_, _, [], Acc) -> lists:reverse(Acc);
do_drop_aborted(ProducerId, FirstOffset, [{Meta, Msgs} | Batches], Acc) ->
#kafka_message{offset = BaseOffset} = hd(Msgs),
case {is_txn(Meta, ProducerId), is_control(Meta)} of
{true, true} ->
%% this is the end of a transaction
%% no need to scan remaining batches
lists:reverse(Acc) ++ Batches;
{true, false} when BaseOffset >= FirstOffset ->
%% this batch is a part of aborted transaction, drop it
do_drop_aborted(ProducerId, FirstOffset, Batches, Acc);
_ ->
do_drop_aborted(ProducerId, FirstOffset, Batches, [{Meta, Msgs} | Acc])
end.

%% Return true if a batch is in a transaction from the given producer.
is_txn(#{is_transaction := true,
producer_id := Id}, Id) -> true;
is_txn(_ProducerId, _Meta) -> false.

is_control(#{is_control := true}) -> true;
is_control(_) -> false.

%% Make a function to build fetch requests.
%% The function takes offset and max_bytes as input as these two parameters
%% are varient when continuously polling a specific topic-partition.
Expand Down
Loading

0 comments on commit e2c4133

Please sign in to comment.