Skip to content

Commit

Permalink
perf(ds): inherit only LTS paths containing wildcards when adding a n…
Browse files Browse the repository at this point in the history
…ew generation

Fixes emqx#12338 (comment)
  • Loading branch information
thalesmg committed Jan 23, 2024
1 parent d122340 commit 1eb47d0
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 23 deletions.
148 changes: 143 additions & 5 deletions apps/emqx_durable_storage/src/emqx_ds_lts.erl
Expand Up @@ -20,7 +20,7 @@
-export([
trie_create/1, trie_create/0,
trie_restore/2,
trie_restore_existing/2,
trie_copy_learned_paths/2,
topic_key/3,
match_topics/2,
lookup_topic_key/2
Expand Down Expand Up @@ -120,10 +120,6 @@ trie_create() ->
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
trie_restore(Options, Dump) ->
Trie = trie_create(Options),
trie_restore_existing(Trie, Dump).

-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie().
trie_restore_existing(Trie, Dump) ->
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
trie_insert(Trie, StateFrom, Token, StateTo)
Expand All @@ -132,6 +128,17 @@ trie_restore_existing(Trie, Dump) ->
),
Trie.

-spec trie_copy_learned_paths(trie(), trie()) -> trie().
trie_copy_learned_paths(OldTrie, NewTrie) ->
WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)],
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
trie_insert(NewTrie, StateFrom, Token, StateTo)
end,
lists:flatten(WildcardPaths)
),
NewTrie.

%% @doc Lookup the topic key. Create a new one, if not found.
-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
topic_key(Trie, ThresholdFun, Tokens) ->
Expand Down Expand Up @@ -385,6 +392,41 @@ emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' -
ets:lookup(Tab, {State, Token})
].

all_emanating(#trie{trie = Tab}, State) ->
ets:select(
Tab,
ets:fun2ms(fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
{{S, Edge}, Next}
end)
).

paths(#trie{} = T) ->
Roots = all_emanating(T, ?PREFIX),
lists:flatmap(
fun({Segment, Next}) ->
follow_path(T, Next, [{Segment, Next}])
end,
Roots
).

follow_path(#trie{} = T, State, Path) ->
lists:flatmap(
fun
({{_State, ?EOT}, _Next} = Segment) ->
[lists:reverse([Segment | Path])];
({_Edge, Next} = Segment) ->
follow_path(T, Next, [Segment | Path])
end,
all_emanating(T, State)
).

contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) ->
true;
contains_wildcard([_ | Rest]) ->
contains_wildcard(Rest);
contains_wildcard([]) ->
false.

%%================================================================================
%% Tests
%%================================================================================
Expand Down Expand Up @@ -636,4 +678,100 @@ test_key(Trie, Threshold, Topic0) ->
{ok, Ret} = lookup_topic_key(Trie, Topic),
Ret.

paths_test() ->
T = trie_create(),
Threshold = 4,
ThresholdFun = fun
(0) -> 1000;
(_) -> Threshold
end,
PathsToInsert =
[
[''],
[1],
[2, 2],
[3, 3, 3],
[2, 3, 4]
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold + 2)] ++
[['', I, ''] || I <- lists:seq(1, Threshold + 2)],
lists:foreach(
fun(PathSpec) ->
test_key(T, ThresholdFun, PathSpec)
end,
PathsToInsert
),

%% Test that the paths we've inserted are produced in the output
Paths = paths(T),
FormattedPaths = lists:map(fun format_path/1, Paths),
ExpectedWildcardPaths =
[
[4, '+', 4],
['', '+', '']
],
ExpectedPaths =
[
[''],
[1],
[2, 2],
[3, 3, 3]
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold)] ++
[['', I, ''] || I <- lists:seq(1, Threshold)] ++
ExpectedWildcardPaths,
FormatPathSpec =
fun(PathSpec) ->
lists:map(
fun
(I) when is_integer(I) -> integer_to_binary(I);
(A) -> A
end,
PathSpec
) ++ [?EOT]
end,
lists:foreach(
fun(PathSpec) ->
Path = FormatPathSpec(PathSpec),
?assert(
lists:member(Path, FormattedPaths),
#{
paths => FormattedPaths,
expected_path => Path
}
)
end,
ExpectedPaths
),

%% Test filter function for paths containing wildcards
WildcardPaths = lists:filter(fun contains_wildcard/1, Paths),
FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
?assertEqual(
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
#{
expected => ExpectedWildcardPaths,
wildcards => FormattedWildcardPaths
}
),

%% Test that we're able to reconstruct the same trie from the paths
T2 = trie_create(),
[
trie_insert(T2, State, Edge, Next)
|| Path <- Paths,
{{State, Edge}, Next} <- Path
],
#trie{trie = Tab1} = T,
#trie{trie = Tab2} = T2,
Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
?assertEqual(Dump1, Dump2),

ok.

format_path([{{_State, Edge}, _Next} | Rest]) ->
[Edge | format_path(Rest)];
format_path([]) ->
[].

-endif.
27 changes: 9 additions & 18 deletions apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl
Expand Up @@ -205,17 +205,15 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
s().
post_creation_actions(
#{
db := DBHandle,
old_gen_id := OldGenId,
old_cf_refs := OldCFRefs,
new_gen_runtime_data := NewGenData0
new_gen_runtime_data := NewGenData,
old_gen_runtime_data := OldGenData
}
) ->
{_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs),
#s{trie = NewTrie0} = NewGenData0,
NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF),
#s{trie = OldTrie} = OldGenData,
#s{trie = NewTrie0} = NewGenData,
NewTrie = copy_previous_trie(OldTrie, NewTrie0),
?tp(bitfield_lts_inherited_trie, #{}),
NewGenData0#s{trie = NewTrie}.
NewGenData#s{trie = NewTrie}.

-spec drop(
emqx_ds_storage_layer:shard_id(),
Expand Down Expand Up @@ -533,16 +531,9 @@ restore_trie(TopicIndexBytes, DB, CF) ->
rocksdb:iterator_close(IT)
end.

-spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) ->
emqx_ds_lts:trie().
copy_previous_trie(DBHandle, NewTrie, OldCF) ->
{ok, IT} = rocksdb:iterator(DBHandle, OldCF, []),
try
OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
emqx_ds_lts:trie_restore_existing(NewTrie, OldDump)
after
rocksdb:iterator_close(IT)
end.
-spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie().
copy_previous_trie(OldTrie, NewTrie) ->
emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie).

read_persisted_trie(IT, {ok, KeyB, ValB}) ->
[
Expand Down

0 comments on commit 1eb47d0

Please sign in to comment.