Skip to content

Commit

Permalink
[diameter_srv] rework peer selection logic
Browse files Browse the repository at this point in the history
backport from travelping/erccn@/212347714950f3ebfee79a26d86f0800c0f2001e

* avoid creating intermediate maps and sorting where possible
* fix connection selection (use least loaded instead of most loaded)
  • Loading branch information
RoadRunnr committed May 13, 2024
1 parent bffe076 commit 85348b6
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 23 deletions.
90 changes: 68 additions & 22 deletions src/ergw_aaa_diameter_srv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
terminate/2, code_change/3]).

-ifdef(TEST).
-export([peers/0]).
-export([peers/0, pick_connection/2, pick_peer_h/3]).
-endif.

-include_lib("kernel/include/logger.hrl").
Expand Down Expand Up @@ -322,45 +322,91 @@ get_peer(Host, Peers) ->
update_bucket(Peer).

%% relative_connection_load/3
connection_load({PeerRef, _} = Peer, CM, Peers) ->
Cnt = maps:get(PeerRef, Peers, 0),
maps:update_with(-Cnt, [Peer|_], [Peer], CM).
connection_load({PeerRef, _} = Peer, {Load, Cands} = State, Peers) ->
PeerLoad = maps:get(PeerRef, Peers, 0),
if PeerLoad < Load -> {PeerLoad, [Peer]};
PeerLoad =:= Load -> {Load, [Peer|Cands]};
true -> State
end.

%% pick_connection/2
pick_connection(Connections, Peers) ->
CMap =
lists:foldl(
connection_load(_, _, Peers), #{}, Connections),
{_, Cands} = hd(lists:keysort(1, maps:to_list(CMap))),
{_, Cands} = lists:foldl(connection_load(_, _, Peers), {infinity, []}, Connections),
N = rand:uniform(length(Cands)),
lists:nth(N, Cands).

%% relative_peer_load/4
relative_peer_load(Host, _, LM, Peers) ->
Peer = get_peer(Host, Peers),
relative_peer_load(OH, _, {Load, Cands} = State, Peers) ->
#peer{outstanding = Cnt,
capacity = Cap,
rate = Rate,
tokens = Tokens} = Peer,
Load =
rate = Rate,
tokens = Tokens} = get_peer(OH, Peers),
PeerLoad =
erlang:floor((Cnt / Cap) * ?LoadBuckets) +
erlang:floor(((1.0 - Tokens / Rate) * ?LoadBuckets)),
maps:update_with(Load, [Host|_], [Host], LM).
if PeerLoad < Load -> {PeerLoad, [OH]};
PeerLoad =:= Load -> {Load, [OH|Cands]};
true -> State
end.

pick_peers(CandidateMap, Peers) ->
{_, Cands} = maps:fold(relative_peer_load(_, _, _, Peers), {infinity, []}, CandidateMap),
N = rand:uniform(length(Cands)),
OH = lists:nth(N, Cands),
maps:get(OH, CandidateMap).

-if(?OTP_RELEASE >= 25).

%% pick_peer_h/3
pick_peer_h([], _SvcName, _Peers) ->
false;
pick_peer_h(Candidates, _SvcName, Peers) ->
CandMap = lists:foldl(
fun({_, #diameter_caps{origin_host = {_, OH}}} = Peer, CM) ->
maps:update_with(OH, [Peer|_], [Peer], CM)
end, #{}, Candidates),
LoadMap = maps:fold(relative_peer_load(_, _, _, Peers), #{}, CandMap),
[{_, Cands} | _] = lists:keysort(1, maps:to_list(LoadMap)),
N = rand:uniform(length(Cands)),
Connection = pick_connection(maps:get(lists:nth(N, Cands), CandMap), Peers),
CandidateMap =
maps:groups_from_list(
fun({_, #diameter_caps{origin_host = {_, OH}}}) -> OH end,
Candidates),
PeerCandidateList = pick_peers(CandidateMap, Peers),
Connection = pick_connection(PeerCandidateList, Peers),
{ok, Connection}.

-else.

%% from OTP-26.2
maps_groups_from_list(Fun, List0) when is_function(Fun, 1) ->
try lists:reverse(List0) of
List ->
groups_from_list_1(Fun, List, #{})
catch
error:_ ->
erlang:error(badarg, [Fun, List0])
end;
maps_groups_from_list(Fun, List) ->
erlang:error(badarg, [Fun, List]).

maps_groups_from_list_1(Fun, [H | Tail], Acc) ->
K = Fun(H),
NewAcc = case Acc of
#{K := Vs} -> Acc#{K := [H | Vs]};
#{} -> Acc#{K => [H]}
end,
maps_groups_from_list_1(Fun, Tail, NewAcc);
maps_groups_from_list_1(_Fun, [], Acc) ->
Acc.

%% pick_peer_h/3
pick_peer_h([], _SvcName, _Peers) ->
false;
pick_peer_h(Candidates, _SvcName, Peers) ->
CandidateMap =
maps_groups_from_list(
fun({_, #diameter_caps{origin_host = {_, OH}}}) -> OH end,
Candidates),
PeerCandidateList = pick_peers(CandidateMap, Peers),
Connection = pick_connection(PeerCandidateList, Peers),
{ok, Connection}.

-endif.

inc_element(Element, Stats) ->
%% inc with wrap around to 0 at 2^64
setelement(Element, Stats, (element(Element, Stats) + 1) rem (1 bsl 64)).
Expand Down
43 changes: 42 additions & 1 deletion test/diameter_Gy_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-include("../include/diameter_3gpp_ts32_299.hrl").
-include("../include/ergw_aaa_session.hrl").
-include("ergw_aaa_test_lib.hrl").
-include("ergw_aaa_internal.hrl").

-define(HUT, ergw_aaa_ro).
-define(SERVICE, <<"diam-test">>).
Expand Down Expand Up @@ -123,7 +124,8 @@
%%%===================================================================

all() ->
[simple_session,
[diameter_srv,
simple_session,
simple_session_async,
abort_session_request,
tarif_time_change,
Expand Down Expand Up @@ -272,6 +274,45 @@ wait_for_outstanding_reqs(Cnt) ->
%%% Test cases
%%%===================================================================

diameter_srv() ->
[{doc, "Test some diameter_srv functions"}].
diameter_srv(_Config) ->
Ref0 = make_ref(),
Ref1 = make_ref(),

?equal({Ref0, 'conn-0'},
ergw_aaa_diameter_srv:pick_connection(
[{Ref1, 'conn-1'}, {Ref0, 'conn-0'}],
#{Ref0 => 0, Ref1 => 1})),

?match({_, 'conn-0'},
ergw_aaa_diameter_srv:pick_connection(
[{Ref1, 'conn-1'}, {Ref0, 'conn-0'}, {make_ref(), 'conn-0'}],
#{Ref0 => 0, Ref1 => 1})),

?match({ok, {Ref0, _}},
ergw_aaa_diameter_srv:pick_peer_h(
[{Ref0, #diameter_caps{origin_host = {undefined, <<"conn-0">>}}},
{Ref1, #diameter_caps{origin_host = {undefined, <<"conn-1">>}}}],
undefined,
#{<<"conn-0">> => #peer{outstanding = 0, capacity = 100, rate = 100, tokens = 100},
<<"conn-1">> => #peer{outstanding = 50, capacity = 100, rate = 100, tokens = 100}})),
?match({ok, {Ref1, _}},
ergw_aaa_diameter_srv:pick_peer_h(
[{Ref0, #diameter_caps{origin_host = {undefined, <<"conn-0">>}}},
{Ref1, #diameter_caps{origin_host = {undefined, <<"conn-1">>}}}],
undefined,
#{<<"conn-0">> => #peer{outstanding = 50, capacity = 100, rate = 100, tokens = 100},
<<"conn-1">> => #peer{outstanding = 0, capacity = 100, rate = 100, tokens = 100}})),
?match({ok, {Ref1, _}},
ergw_aaa_diameter_srv:pick_peer_h(
[{Ref0, #diameter_caps{origin_host = {undefined, <<"conn-0">>}}},
{Ref1, #diameter_caps{origin_host = {undefined, <<"conn-0">>}}}],
undefined,
#{Ref0 => 10,
<<"conn-0">> => #peer{outstanding = 0, capacity = 100, rate = 100, tokens = 100}})),
ok.

simple_session() ->
[{doc, "Simple Gy session"}].
simple_session(Config) ->
Expand Down

0 comments on commit 85348b6

Please sign in to comment.