Skip to content

Commit

Permalink
moved echo_timer from map to state_rec
Browse files Browse the repository at this point in the history
  • Loading branch information
vijay-hirani committed Jun 29, 2020
1 parent 402b7f0 commit c8c02a6
Showing 1 changed file with 108 additions and 94 deletions.
202 changes: 108 additions & 94 deletions src/gtp_path.erl
Expand Up @@ -10,7 +10,7 @@
-behaviour(gen_statem).

-compile({parse_transform, cut}).
-compile({no_auto_import,[register/2]}).
% -compile({no_auto_import,[register/2]}).

%% API
-export([start_link/4, all/1,
Expand All @@ -31,6 +31,10 @@
-include_lib("gtplib/include/gtp_packet.hrl").
-include("include/ergw.hrl").

% echo_timer is the status of the echo send to the remote peer
-record(state, {peer :: 'UP' | 'DOWN', % State of remote peer
echo_timer :: 'stopped' | 'echo_to_send' | 'awaiting_response'}).

%%%===================================================================
%%% API
%%%===================================================================
Expand Down Expand Up @@ -69,7 +73,7 @@ bind(#gtp{ie = #{{v2_recovery, 0} := #v2_recovery{restart_counter = RestartCount
bind(Request, Context) ->
path_recovery(undefined, bind_path(Request, Context)).

unbind(#context{version = Version, control_port = GtpPort,
unbind(#context{version = Version, control_port = GtpPort,
remote_control_teid = #fq_teid{ip = RemoteIP}}) ->
case get(GtpPort, Version, RemoteIP) of
Path when is_pid(Path) ->
Expand Down Expand Up @@ -127,28 +131,29 @@ ping(GtpPort, Version, IP) ->

callback_mode() -> [handle_event_function, state_enter].

% State = 'UP' | 'DOWN'
init([#gtp_port{name = PortName} = GtpPort, Version, RemoteIP, Args]) ->
gtp_path_reg:register({PortName, Version, RemoteIP}),

State = #state{peer = 'UP',
echo_timer = 'stopped'},

Data = #{
% Path Info Keys
gtp_port => GtpPort, % #gtp_port{}
version => Version, % v1 | v2
handler => get_handler(GtpPort, Version),
ip => RemoteIP,
recovery => undefined, % Undefined | non_neg_integer
% Echo Info Keys
recovery => undefined, % undefined | non_neg_integer
% Echo Info values
t3 => proplists:get_value(t3, Args, 10 * 1000), %% 10sec
n3 => proplists:get_value(n3, Args, 5),
echo => proplists:get_value(ping, Args, 60 * 1000),
echo_timer => stopped, % stopped | echo_to_send | awaiting_response
% Table Info Keys
table => ets_new() % tid
},
},

?LOG(debug, "State = UP, Data: ~p", [Data]),
{ok, 'UP', Data}.
?LOG(debug, "State: ~p Data: ~p", [State, Data]),
{ok, State, Data}.

handle_event(enter, _OldState, _State, _Data) ->
keep_state_and_data;
Expand All @@ -157,21 +162,21 @@ handle_event({call, From}, all, _State, #{table := TID}) ->
Reply = ets:tab2list(TID),
{keep_state_and_data, [{reply, From, Reply}]};

handle_event({call, From}, {bind, Pid}, _State, #{recovery := RestartCounter} = Data0) ->
Data = register(Pid, Data0),
{keep_state, Data, [{reply, From, {ok, RestartCounter}}]};

handle_event({call, From}, {bind, Pid, RestartCounter}, _State, Data0) ->
Data1 = update_restart_counter(RestartCounter, Data0),
Data = register(Pid, Data1),
{keep_state, Data, [{reply, From, ok}]};

handle_event({call, From}, {unbind, Pid}, _State, Data0) ->
Data = unregister(Pid, Data0),
case Data of
#{echo_timer := stopped} ->
Actions = echo_timeout_action([{reply, From, ok}], infinity, send_echo),
{keep_state, Data, Actions};
handle_event({call, From}, {bind, Pid}, State0, #{recovery := RestartCounter} = Data) ->
State = register(Pid, State0, Data),
{next_state, State, Data, [{reply, From, {ok, RestartCounter}}]};

handle_event({call, From}, {bind, Pid, RestartCounter}, State0, Data0) ->
{State1, Data} = update_restart_counter(RestartCounter, State0, Data0),
State = register(Pid, State1, Data),
{next_state, State, Data, [{reply, From, ok}]};

handle_event({call, From}, {unbind, Pid}, State0, Data) ->
State = unregister(Pid, State0, Data),
case State of
#state{echo_timer = 'stopped'} ->
Actions = echo_timeout_action([{reply, From, ok}], infinity, 'send_echo'),
{next_state, State, Data, Actions};
_ ->
{keep_state, Data, [{reply, From, ok}]}
end;
Expand All @@ -189,72 +194,77 @@ handle_event({call, _From}, Request, _State, Data) ->
{keep_state_and_data, [{reply, ok, Data}]};

handle_event(cast, {handle_request, ReqKey, #gtp{type = echo_request} = Msg0},
_State, #{gtp_port := GtpPort, handler := Handler} = Data0) ->
State0, #{gtp_port := GtpPort, handler := Handler} = Data0) ->
?LOG(debug, "echo_request: ~p", [Msg0]),
try gtp_packet:decode_ies(Msg0) of
Msg = #gtp{} ->

Data = handle_recovery_ie(Msg, Data0),
{State, Data} = handle_recovery_ie(Msg, State0, Data0),

ResponseIEs = Handler:build_recovery(echo_response, GtpPort, true, []),
Response = Msg#gtp{type = echo_response, ie = ResponseIEs},
ergw_gtp_c_socket:send_response(ReqKey, Response, false),
{keep_state, Data}
{next_state, State, Data}
catch
Class:Error ->
?LOG(error, "GTP decoding failed with ~p:~p for ~p",
[Class, Error, Msg0]),
{keep_state, Data0}
end;

handle_event(cast, down, _State, Data0) ->
Data = path_down(undefined, Data0),
{keep_state, Data};
handle_event(cast, down, State, Data0) ->
{State, Data} = path_down(undefined, State, Data0),
{next_state, State, Data};

handle_event(cast,{handle_response, echo_request, #gtp{type = echo_response} = Msg},
State0, #{echo := EchoInterval} =Data0) ->
handle_event(cast,{handle_response, echo_request, #gtp{type = echo_response} = Msg},
State0, #{echo := EchoInterval} = Data0) ->
?LOG(debug, "echo_response: ~p", [Msg]),
Data1 = handle_recovery_ie(Msg, Data0),
{State, Data} = echo_response(Msg, State0, Data1),
Actions = echo_timeout_action([], EchoInterval, send_echo),
{next_state, State, Data, Actions};
{State1, Data1} = handle_recovery_ie(Msg, State0, Data0),
{State, Data} = echo_response(Msg, State1, Data1),
case State of
#state{echo_timer = 'echo_to_send'} ->
Actions = echo_timeout_action([], EchoInterval, 'send_echo'),
{next_state, State, Data, Actions};
_ ->
{next_state, State, Data}
end;

handle_event(cast,{handle_response, echo_request, timeout = Msg}, State0, Data0) ->
?LOG(debug, "echo_response: ~p", [Msg]),
Actions = echo_timeout_action([], infinity, send_echo),
Actions = echo_timeout_action([], infinity, 'send_echo'),
{State, Data} = echo_response(Msg, State0, Data0),
{next_state, State, Data, Actions};

%% test support
handle_event(cast, '$ping', _State, #{echo_timer := awaiting_response}) ->
handle_event(cast, '$ping', #state{echo_timer = 'awaiting_response'}, _Data) ->
keep_state_and_data;
handle_event(cast, '$ping', _State, #{echo_timer := echo_to_send} = Data0) ->
Data = send_echo_request(Data0),
Actions = echo_timeout_action([], infinity, send_echo),
{keep_state, Data, Actions};
handle_event(cast, '$ping', #state{echo_timer = 'echo_to_send'} = State0, Data) ->
State = send_echo_request(State0, Data),
Actions = echo_timeout_action([], infinity, 'send_echo'),
{next_state, State, Data, Actions};

handle_event(cast, Msg, _State, _Data) ->
?LOG(error, "~p: ~w: handle_event(cast, ...): ~p", [self(), ?MODULE, Msg]),
keep_state_and_data;

handle_event(info,{'DOWN', _MonitorRef, process, Pid, _Info}, _State, Data0) ->
Data = unregister(Pid, Data0),
case Data of
#{echo_timer := echo_Stopped} ->
Actions = echo_timeout_action([], infinity, send_echo),
{keep_state, Data, Actions};
handle_event(info,{'DOWN', _MonitorRef, process, Pid, _Info}, State0, Data) ->
State = unregister(Pid, State0, Data),
case State of
#state{echo_timer = 'echo_Stopped'} ->

This comment has been minimized.

Copy link
@ebengt

ebengt Jun 29, 2020

Contributor

'echo_Stopped' is not part of the allowed states:
'stopped' | 'echo_to_send' | 'awaiting_response'

Actions = echo_timeout_action([], infinity, 'send_echo'),
{next_state, State, Data, Actions};
_ ->
{keep_state, Data}
end;

handle_event({timeout, echo_timer}, send_echo, _state,
#{echo_timer := echo_to_send} = Data0) ->
?LOG(debug, "handle_event timeout: ~p", [Data0]),
Data = send_echo_request(Data0),
Actions = echo_timeout_action([], infinity, send_echo),
{keep_state, Data, Actions};
handle_event({timeout, 'echo_create'}, 'send_echo',
#state{echo_timer = 'echo_to_send'} = State0, Data) ->
?LOG(debug, "handle_event timeout: ~p", [Data]),
State = send_echo_request(State0, Data),
Actions = echo_timeout_action([], infinity, 'send_echo'),
{next_state, State, Data, Actions};

handle_event({timeout, echo_timer}, send_echo, _State, Data) ->
handle_event({timeout, 'echo_create'}, 'send_echo', _State, Data) ->
?LOG(debug, "handle_event timeout: ~p", [Data]),
{keep_state, Data};

Expand Down Expand Up @@ -297,35 +307,37 @@ code_change(_OldVsn, State, Data, _Extra) ->

-define(SMALLER(S1, S2), ((S1 < S2 andalso (S2 - S1) < 128) orelse (S1 > S2 andalso (S1 - S2) > 128))).

update_restart_counter(RestartCounter, #{recovery := undefined} = Data) ->
Data#{recovery => RestartCounter};
update_restart_counter(RestartCounter, #{recovery := RestartCounter} = Data) ->
Data;
update_restart_counter(NewRestartCounter, #{ip := IP, recovery := OldRestartCounter} = Data)
update_restart_counter(RestartCounter, State, #{recovery := undefined} = Data) ->
{State, Data#{recovery => RestartCounter}};
update_restart_counter(RestartCounter, State, #{recovery := RestartCounter} = Data) ->
{State, Data};
update_restart_counter(NewRestartCounter, State,
#{ip := IP, recovery := OldRestartCounter} = Data)
when ?SMALLER(OldRestartCounter, NewRestartCounter) ->
?LOG(warning, "GSN ~s restarted (~w != ~w)",
[inet:ntoa(IP), OldRestartCounter, NewRestartCounter]),
path_down(NewRestartCounter, Data);
path_down(NewRestartCounter, State, Data);

update_restart_counter(NewRestartCounter, #{ip := IP, recovery := OldRestartCounter} = Data)
update_restart_counter(NewRestartCounter, State,
#{ip := IP, recovery := OldRestartCounter} = Data)
when not ?SMALLER(OldRestartCounter, NewRestartCounter) ->
?LOG(warning, "possible race on message with restart counter for GSN ~s (old: ~w, new: ~w)",
[inet:ntoa(IP), OldRestartCounter, NewRestartCounter]),
Data.
{State, Data}.

handle_recovery_ie(#gtp{version = v1,
ie = #{{recovery, 0} :=
#recovery{restart_counter =
RestartCounter}}}, Data) ->
update_restart_counter(RestartCounter, Data);
RestartCounter}}}, State, Data) ->
update_restart_counter(RestartCounter, State, Data);

handle_recovery_ie(#gtp{version = v2,
ie = #{{v2_recovery, 0} :=
#v2_recovery{restart_counter =
RestartCounter}}}, Data) ->
update_restart_counter(RestartCounter, Data);
handle_recovery_ie(_Msg, Data) ->
Data.
RestartCounter}}}, State, Data) ->
update_restart_counter(RestartCounter, State, Data);
handle_recovery_ie(_Msg, State, Data) ->
{State, Data}.

ets_new() ->
ets:new(?MODULE, [public, ordered_set, {keypos, 1}]).
Expand All @@ -340,22 +352,23 @@ ets_foreach(TID, Fun, {Pids, Continuation})
lists:foreach(fun([Pid]) -> Fun(Pid) end, Pids),
ets_foreach(TID, Fun, ets:match_object(Continuation)).

register(Pid, #{table := TID} = Data) ->
register(Pid, State, #{table := TID} = Data) ->
?LOG(debug, "~s: register(~p)", [?MODULE, Pid]),
erlang:monitor(process, Pid),
ets:insert(TID, {Pid}),
update_path_counter(ets:info(TID, size), Data).
update_path_counter(ets:info(TID, size), State, Data).

unregister(Pid, #{table := TID} = Data) ->
unregister(Pid, State, #{table := TID} = Data) ->
ets:delete(TID, Pid),
update_path_counter(ets:info(TID, size), Data).
update_path_counter(ets:info(TID, size), State, Data).

update_path_counter(PathCounter, #{gtp_port := GtpPort, version := Version, ip := IP} = Data) ->
update_path_counter(PathCounter, State, #{gtp_port := GtpPort, version := Version, ip := IP} =
Data) ->
ergw_prometheus:gtp_path_contexts(GtpPort, IP, Version, PathCounter),
if PathCounter =:= 0 ->
stop_echo_request(Data);
stop_echo_request(State);
true ->
start_echo_request(Data)
start_echo_request(State, Data)
end.

bind_path(#gtp{version = Version}, Context) ->
Expand All @@ -374,44 +387,45 @@ path_recovery(_RestartCounter, #context{path = Path} = Context) ->
{ok, PathRestartCounter} = gen_statem:call(Path, {bind, self()}),
Context#context{remote_restart_counter = PathRestartCounter}.

start_echo_request(#{echo_timer := stopped} = Data) ->
send_echo_request(Data);
start_echo_request(Data) ->
Data.
start_echo_request(#state{echo_timer = 'stopped'} = State, Data) ->
send_echo_request(State, Data);
start_echo_request(State, _Data) ->
State.

stop_echo_request(Data) ->
Data#{echo_timer => stopped}.
stop_echo_request(State) ->
State#state{echo_timer = 'stopped'}.

send_echo_request(#{gtp_port := GtpPort, handler := Handler, ip := DstIP,
t3 := T3, n3 := N3} = Data) ->
send_echo_request(State, #{gtp_port := GtpPort, handler := Handler, ip := DstIP,
t3 := T3, n3 := N3}) ->
Msg = Handler:build_echo_request(GtpPort),
CbInfo = {?MODULE, handle_response, [self(), echo_request]},
ergw_gtp_c_socket:send_request(GtpPort, DstIP, ?GTP1c_PORT, T3, N3, Msg, CbInfo),
Data#{echo_timer => awaiting_response}.
State#state{echo_timer = 'awaiting_response'}.

echo_response(Msg, State0, #{echo_timer := awaiting_response} = Data0) ->
echo_response(Msg, #state{echo_timer = 'awaiting_response'} = State0, Data0) ->
{State, Data} = update_path_state(Msg, State0, Data0),
{State, Data#{echo_timer => echo_to_send}};
{State#state{echo_timer = 'echo_to_send'}, Data};
echo_response(Msg, State, Data) ->
update_path_state(Msg, State, Data).

update_path_state(#gtp{}, _State, Data) ->
{'UP', Data};
update_path_state(_, _State, Data) ->
{'DOWN', path_down(undefined, Data)}.
update_path_state(#gtp{}, State, Data) ->
{State#state{peer = 'UP'}, Data};
update_path_state(_, State0, Data) ->
State = State0#state{peer = 'DOWN'},
path_down(undefined, State, Data).

path_down(RestartCounter, #{table := TID} = Data0) ->
path_down(RestartCounter, State, #{table := TID} = Data0) ->
Path = self(),
proc_lib:spawn(fun() ->
ets_foreach(TID, gtp_context:path_restart(_, Path)),
ets:delete(TID)
end),
Data = Data0#{table => ets_new(), recovery => RestartCounter},
update_path_counter(0, Data).
{update_path_counter(0, State, Data), Data}.

echo_timeout_action(Actions, Timeout, Ref)
when is_integer(Timeout) orelse Timeout =:= infinity;
is_atom(Ref) ->
[{{timeout, echo_timer}, Timeout, Ref} | Actions];
[{{timeout, 'echo_create'}, Timeout, Ref} | Actions];
echo_timeout_action(Actions, _, _) ->
Actions.

0 comments on commit c8c02a6

Please sign in to comment.