Skip to content

Commit

Permalink
Merge pull request emqx#12338 from thalesmg/ds-message-gc-20240115
Browse files Browse the repository at this point in the history
feat(ds): add message GC
  • Loading branch information
thalesmg committed Jan 22, 2024
2 parents 39090d3 + 7035b4c commit d122340
Show file tree
Hide file tree
Showing 16 changed files with 1,084 additions and 68 deletions.
1 change: 1 addition & 0 deletions apps/emqx/priv/bpapi.versions
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
{emqx_delayed,3}.
{emqx_ds,1}.
{emqx_ds,2}.
{emqx_ds,3}.
{emqx_eviction_agent,1}.
{emqx_eviction_agent,2}.
{emqx_exhook,1}.
Expand Down
157 changes: 157 additions & 0 deletions apps/emqx/src/emqx_persistent_message_ds_gc_worker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_message_ds_gc_worker).

-behaviour(gen_server).

-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/qlc.hrl").
-include_lib("stdlib/include/ms_transform.hrl").

-include("emqx_persistent_session_ds.hrl").

%% API
-export([
start_link/0,
gc/0
]).

%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).

%% call/cast/info records
-record(gc, {}).

%%--------------------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------------------

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% For testing or manual ops
gc() ->
gen_server:call(?MODULE, #gc{}, infinity).

%%--------------------------------------------------------------------------------
%% `gen_server' API
%%--------------------------------------------------------------------------------

init(_Opts) ->
ensure_gc_timer(),
State = #{},
{ok, State}.

handle_call(#gc{}, _From, State) ->
maybe_gc(),
{reply, ok, State};
handle_call(_Call, _From, State) ->
{reply, error, State}.

handle_cast(_Cast, State) ->
{noreply, State}.

handle_info(#gc{}, State) ->
try_gc(),
ensure_gc_timer(),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.

%%--------------------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------------------

ensure_gc_timer() ->
Timeout = emqx_config:get([session_persistence, message_retention_period]),
_ = erlang:send_after(Timeout, self(), #gc{}),
ok.

try_gc() ->
%% Only cores should run GC.
CoreNodes = mria_membership:running_core_nodelist(),
Res = global:trans(
{?MODULE, self()},
fun maybe_gc/0,
CoreNodes,
%% Note: we set retries to 1 here because, in rare occasions, GC might start at the
%% same time in more than one node, and each one will abort the other. By allowing
%% one retry, at least one node will (hopefully) get to enter the transaction and
%% the other will abort. If GC runs too fast, both nodes might run in sequence.
%% But, in that case, GC is clearly not too costly, and that shouldn't be a problem,
%% resource-wise.
_Retries = 1
),
case Res of
aborted ->
?tp(ds_message_gc_lock_taken, #{}),
ok;
ok ->
ok
end.

now_ms() ->
erlang:system_time(millisecond).

maybe_gc() ->
AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB),
NowMS = now_ms(),
RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]),
TimeThreshold = NowMS - RetentionPeriod,
maybe_create_new_generation(AllGens, TimeThreshold),
?tp_span(
ps_message_gc,
#{},
begin
ExpiredGens =
maps:filter(
fun(_GenId, #{until := Until}) ->
is_number(Until) andalso Until =< TimeThreshold
end,
AllGens
),
ExpiredGenIds = maps:keys(ExpiredGens),
lists:foreach(
fun(GenId) ->
ok = emqx_ds:drop_generation(?PERSISTENT_MESSAGE_DB, GenId),
?tp(message_gc_generation_dropped, #{gen_id => GenId})
end,
ExpiredGenIds
)
end
).

maybe_create_new_generation(AllGens, TimeThreshold) ->
NeedNewGen =
lists:all(
fun({_GenId, #{created_at := CreatedAt}}) ->
CreatedAt =< TimeThreshold
end,
maps:to_list(AllGens)
),
case NeedNewGen of
false ->
?tp(ps_message_gc_too_early, #{}),
ok;
true ->
ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB),
?tp(ps_message_gc_added_gen, #{})
end.
5 changes: 3 additions & 2 deletions apps/emqx/src/emqx_persistent_session_ds_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ init(Opts) ->

do_init(_Opts) ->
SupFlags = #{
strategy => rest_for_one,
strategy => one_for_one,
intensity => 10,
period => 2,
auto_shutdown => never
},
CoreChildren = [
worker(gc_worker, emqx_persistent_session_ds_gc_worker, [])
worker(session_gc_worker, emqx_persistent_session_ds_gc_worker, []),
worker(message_gc_worker, emqx_persistent_message_ds_gc_worker, [])
],
Children =
case mria_rlog:role() of
Expand Down
8 changes: 8 additions & 0 deletions apps/emqx/src/emqx_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1855,6 +1855,14 @@ fields("session_persistence") ->
desc => ?DESC(session_ds_session_gc_batch_size)
}
)},
{"message_retention_period",
sc(
timeout_duration(),
#{
default => <<"1d">>,
desc => ?DESC(session_ds_message_retention_period)
}
)},
{"force_persistence",
sc(
boolean(),
Expand Down
87 changes: 85 additions & 2 deletions apps/emqx/test/emqx_persistent_messages_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").

-compile(export_all).
Expand All @@ -45,10 +46,20 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
Cluster = cluster(),
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
[{nodes, Nodes} | Config];
init_per_testcase(t_message_gc = TestCase, Config) ->
Opts = #{
extra_emqx_conf =>
"\n session_persistence.message_retention_period = 1s"
"\n session_persistence.storage.builtin.n_shards = 3"
},
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config, _Opts = #{}).

common_init_per_testcase(TestCase, Config, Opts) ->
ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
Apps = emqx_cth_suite:start(
app_specs(),
app_specs(Opts),
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
),
[{apps, Apps} | Config].
Expand Down Expand Up @@ -379,6 +390,66 @@ t_publish_empty_topic_levels(_Config) ->
emqtt:stop(Pub)
end.

t_message_gc_too_young(_Config) ->
%% Check that GC doesn't attempt to create a new generation if there are fresh enough
%% generations around. The stability of this test relies on the default value for
%% message retention being long enough. Currently, the default is 1 hour.
?check_trace(
ok = emqx_persistent_message_ds_gc_worker:gc(),
fun(Trace) ->
?assertMatch([_], ?of_kind(ps_message_gc_too_early, Trace)),
ok
end
),
ok.

t_message_gc(Config) ->
%% Check that, after GC runs, a new generation is created, retaining messages, and
%% older messages no longer are accessible.
NShards = ?config(n_shards, Config),
?check_trace(
#{timetrap => 10_000},
begin
%% ensure some messages are in the first generation
?force_ordering(
#{?snk_kind := inserted_batch},
#{?snk_kind := ps_message_gc_added_gen}
),
Msgs0 = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1)
],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0),
?tp(inserted_batch, #{}),
{ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),

Now = emqx_message:timestamp_now(),
Msgs1 = [
message(<<"foo/bar">>, <<"3">>, Now + 100),
message(<<"foo/baz">>, <<"4">>, Now + 101)
],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1),

{ok, _} = snabbkaffe:block_until(
?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),
infinity
),

TopicFilter = emqx_topic:words(<<"#">>),
StartTime = 0,
Msgs = consume(TopicFilter, StartTime),
%% only "1" and "2" should have been GC'ed
?assertEqual(
sets:from_list([<<"3">>, <<"4">>], [{version, 2}]),
sets:from_list([emqx_message:payload(Msg) || Msg <- Msgs], [{version, 2}])
),

ok
end,
[]
),
ok.

%%

connect(ClientId, CleanStart, EI) ->
Expand Down Expand Up @@ -438,9 +509,13 @@ publish(Node, Message) ->
erpc:call(Node, emqx, publish, [Message]).

app_specs() ->
app_specs(_Opts = #{}).

app_specs(Opts) ->
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
emqx_durable_storage,
{emqx, "session_persistence {enable = true}"}
{emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf}
].

cluster() ->
Expand All @@ -459,3 +534,11 @@ clear_db() ->
mria:stop(),
ok = mnesia:delete_schema([node()]),
ok.

message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.

0 comments on commit d122340

Please sign in to comment.