Skip to content

Commit

Permalink
Improve test stability
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Dec 13, 2018
1 parent 187596e commit c1aa956
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions test/brod_topic_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -198,42 +198,42 @@ t_async_acks(Config) when is_list(Config) ->
ok.

t_begin_offset(Config) when is_list(Config) ->
MaxSeqNo = 100,
ConsumerConfig = [ {prefetch_count, MaxSeqNo}
ConsumerConfig = [ {prefetch_count, 100}
, {prefetch_bytes, 0} %% as discard
, {sleep_timeout, 0}
, {max_wait_time, 1000}
],
CaseRef = t_begin_offset,
CasePid = self(),
Partition = 0,
{ok, StartPartitionOffset} = brod:resolve_offset(?BOOTSTRAP_HOSTS, ?TOPIC, Partition, latest),
SendFun =
fun(I) ->
Value = integer_to_binary(I),
ok = brod:produce_sync(?CLIENT_ID, ?TOPIC, Partition, <<>>, Value)
{ok, Offset} = brod:produce_sync_offset(?CLIENT_ID, ?TOPIC, Partition, <<>>, Value),
Offset
end,
RecvFun =
fun F(Pid, Timeout, Acc) ->
receive
{CaseRef, Partition, Offset, Value} ->
ok = brod_topic_subscriber:ack(Pid, Partition, Offset),
I = binary_to_integer(Value),
F(Pid, 0, [I | Acc]);
F(Pid, 0, [{Offset, I} | Acc]);
Msg ->
erlang:error({unexpected_msg, Msg})
after Timeout ->
Acc
end
end,
ok = SendFun(111),
ok = SendFun(222),
ok = SendFun(333),
InitArgs = {CaseRef, CasePid, _IsAsyncAck = true, _ConsumerOffsets = [{0, StartPartitionOffset + 1}]},
_Offset0 = SendFun(111),
Offset1 = SendFun(222),
Offset2 = SendFun(333),
%% Start as if committed Offset1, expect it to start fetching from Offset2
InitArgs = {CaseRef, CasePid, _IsAsyncAck = true, _ConsumerOffsets = [{0, Offset1}]},
{ok, SubscriberPid} =
brod:start_link_topic_subscriber(?CLIENT_ID, ?TOPIC, ConsumerConfig,
?MODULE, InitArgs),
[333] = RecvFun(SubscriberPid, 5000, []),
?assertEqual([{Offset2, 333}], RecvFun(SubscriberPid, 5000, [])),
ok = brod_topic_subscriber:stop(SubscriberPid),
ok.

Expand Down

0 comments on commit c1aa956

Please sign in to comment.