Skip to content

Commit

Permalink
Merge pull request emqx#9922 from keynslug/fix/EMQX-8876/inflight-stu…
Browse files Browse the repository at this point in the history
…ck-full

fix(bufworker): do not avoid retry if inflight table is full
  • Loading branch information
zmstone committed Feb 8, 2023
2 parents cec8afe + 81b1bab commit 73d5592
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 27 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_resource/src/emqx_resource.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [
Expand Down
14 changes: 2 additions & 12 deletions apps/emqx_resource/src/emqx_resource_buffer_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -334,25 +334,15 @@ resume_from_blocked(Data) ->
{single, Ref, Query} ->
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
retry_inflight_sync(Ref, Query, Data)
end;
retry_inflight_sync(Ref, Query, Data);
{batch, Ref, NotExpired, Expired} ->
update_inflight_item(InflightTID, Ref, NotExpired),
NumExpired = length(Expired),
emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
retry_inflight_sync(Ref, NotExpired, Data)
end
retry_inflight_sync(Ref, NotExpired, Data)
end.

retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
Expand Down
33 changes: 33 additions & 0 deletions apps/emqx_resource/test/emqx_connector_demo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
{ReqRef, Num} -> {ok, Num}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {sleep, For}, #{pid := Pid}) ->
?tp(connector_demo_sleep, #{mode => sync, for => For}),
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {sleep, For}},
receive
{ReqRef, Result} ->
Result
after 1000 ->
{error, timeout}
end.

on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
Expand All @@ -147,6 +158,10 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
{ok, Pid};
on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
Pid ! {big_payload, Payload, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) ->
?tp(connector_demo_sleep, #{mode => async, for => For}),
Pid ! {{sleep, For}, ReplyFun},
{ok, Pid}.

on_batch_query(InstId, BatchReq, State) ->
Expand Down Expand Up @@ -283,10 +298,28 @@ counter_loop(
State;
{{FromPid, ReqRef}, get} ->
FromPid ! {ReqRef, Num},
State;
{{sleep, _} = SleepQ, ReplyFun} ->
apply_reply(ReplyFun, handle_query(async, SleepQ, Status)),
State;
{{FromPid, ReqRef}, {sleep, _} = SleepQ} ->
FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)},
State
end,
counter_loop(NewState).

handle_query(Mode, {sleep, For} = Query, Status) ->
ok = timer:sleep(For),
Result =
case Status of
running -> ok;
blocked -> {error, {recoverable_error, blocked}}
end,
?tp(connector_demo_sleep_handled, #{
mode => Mode, query => Query, slept => For, result => Result
}),
Result.

maybe_register(Name, Pid, true) ->
ct:pal("---- Register Name: ~p", [Name]),
ct:pal("---- whereis(): ~p", [whereis(Name)]),
Expand Down
84 changes: 70 additions & 14 deletions apps/emqx_resource/test/emqx_resource_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,61 @@ t_retry_async_inflight(_Config) ->
),
ok.

t_retry_async_inflight_full(_Config) ->
ResumeInterval = 1_000,
AsyncInflightWindow = 5,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => ?FUNCTION_NAME},
#{
query_mode => async,
async_inflight_window => AsyncInflightWindow,
batch_size => 1,
batch_time => 20,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
?check_trace(
#{timetrap => 15_000},
begin
%% block
ok = emqx_resource:simple_sync_query(?ID, block),

{ok, {ok, _}} =
?wait_async_action(
inc_counter_in_parallel(
AsyncInflightWindow * 2,
fun() ->
For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4),
{sleep, For}
end,
#{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}}
),
#{?snk_kind := buffer_worker_flush_but_inflight_full},
ResumeInterval * 2
),

%% will reply with success after the resource is healed
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_enter_running}
),
ok
end,
[
fun(Trace) ->
?assertMatch([#{} | _], ?of_kind(buffer_worker_flush_but_inflight_full, Trace))
end
]
),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
ok.

t_retry_async_inflight_batch(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
Expand Down Expand Up @@ -2241,18 +2296,16 @@ t_expiration_retry_batch_multiple_times(_Config) ->
%%------------------------------------------------------------------------------

inc_counter_in_parallel(N) ->
inc_counter_in_parallel(N, #{}).
inc_counter_in_parallel(N, {inc_counter, 1}, #{}).

inc_counter_in_parallel(N, Opts0) ->
inc_counter_in_parallel(N, {inc_counter, 1}, Opts0).

inc_counter_in_parallel(N, Query, Opts) ->
Parent = self(),
Pids = [
erlang:spawn(fun() ->
Opts =
case is_function(Opts0) of
true -> Opts0();
false -> Opts0
end,
emqx_resource:query(?ID, {inc_counter, 1}, Opts),
emqx_resource:query(?ID, maybe_apply(Query), maybe_apply(Opts)),
Parent ! {complete, self()}
end)
|| _ <- lists:seq(1, N)
Expand All @@ -2267,16 +2320,11 @@ inc_counter_in_parallel(N, Opts0) ->
],
ok.

inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
inc_counter_in_parallel_increasing(N, StartN, Opts) ->
Parent = self(),
Pids = [
erlang:spawn(fun() ->
Opts =
case is_function(Opts0) of
true -> Opts0();
false -> Opts0
end,
emqx_resource:query(?ID, {inc_counter, M}, Opts),
emqx_resource:query(?ID, {inc_counter, M}, maybe_apply(Opts)),
Parent ! {complete, self()}
end)
|| M <- lists:seq(StartN, StartN + N - 1)
Expand All @@ -2290,6 +2338,14 @@ inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
|| Pid <- Pids
].

maybe_apply(FunOrTerm) ->
maybe_apply(FunOrTerm, []).

maybe_apply(Fun, Args) when is_function(Fun) ->
erlang:apply(Fun, Args);
maybe_apply(Term, _Args) ->
Term.

bin_config() ->
<<"\"name\": \"test_resource\"">>.

Expand Down
1 change: 1 addition & 0 deletions changes/v5.0.17/fix-9922.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the issue with the bridge resource buffer where it might become stuck if enough async queries fill the inflight window full before failing with retryable errors.
1 change: 1 addition & 0 deletions changes/v5.0.17/fix-9922.zh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
修复桥接资源缓冲区的问题,如果足够多的异步查询在失败并出现可重试错误之前将机上窗口填满,则可能会卡住。

0 comments on commit 73d5592

Please sign in to comment.