diff --git a/src/gtp_path.erl b/src/gtp_path.erl index 47797050..6c8afd33 100644 --- a/src/gtp_path.erl +++ b/src/gtp_path.erl @@ -10,7 +10,7 @@ -behaviour(gen_statem). -compile({parse_transform, cut}). --compile({no_auto_import,[register/2]}). +% -compile({no_auto_import,[register/2]}). %% API -export([start_link/4, all/1, @@ -31,6 +31,10 @@ -include_lib("gtplib/include/gtp_packet.hrl"). -include("include/ergw.hrl"). +% echo_timer is the status of the echo send to the remote peer +-record(state, {peer :: 'UP' | 'DOWN', % State of remote peer + echo_timer :: 'stopped' | 'echo_to_send' | 'awaiting_response'}). + %%%=================================================================== %%% API %%%=================================================================== @@ -69,7 +73,7 @@ bind(#gtp{ie = #{{v2_recovery, 0} := #v2_recovery{restart_counter = RestartCount bind(Request, Context) -> path_recovery(undefined, bind_path(Request, Context)). -unbind(#context{version = Version, control_port = GtpPort, +unbind(#context{version = Version, control_port = GtpPort, remote_control_teid = #fq_teid{ip = RemoteIP}}) -> case get(GtpPort, Version, RemoteIP) of Path when is_pid(Path) -> @@ -127,28 +131,29 @@ ping(GtpPort, Version, IP) -> callback_mode() -> [handle_event_function, state_enter]. -% State = 'UP' | 'DOWN' init([#gtp_port{name = PortName} = GtpPort, Version, RemoteIP, Args]) -> gtp_path_reg:register({PortName, Version, RemoteIP}), + State = #state{peer = 'UP', + echo_timer = 'stopped'}, + Data = #{ % Path Info Keys gtp_port => GtpPort, % #gtp_port{} version => Version, % v1 | v2 handler => get_handler(GtpPort, Version), ip => RemoteIP, - recovery => undefined, % Undefined | non_neg_integer - % Echo Info Keys + recovery => undefined, % undefined | non_neg_integer + % Echo Info values t3 => proplists:get_value(t3, Args, 10 * 1000), %% 10sec n3 => proplists:get_value(n3, Args, 5), echo => proplists:get_value(ping, Args, 60 * 1000), - echo_timer => stopped, % stopped | echo_to_send | awaiting_response % Table Info Keys table => ets_new() % tid - }, + }, - ?LOG(debug, "State = UP, Data: ~p", [Data]), - {ok, 'UP', Data}. + ?LOG(debug, "State: ~p Data: ~p", [State, Data]), + {ok, State, Data}. handle_event(enter, _OldState, _State, _Data) -> keep_state_and_data; @@ -157,21 +162,21 @@ handle_event({call, From}, all, _State, #{table := TID}) -> Reply = ets:tab2list(TID), {keep_state_and_data, [{reply, From, Reply}]}; -handle_event({call, From}, {bind, Pid}, _State, #{recovery := RestartCounter} = Data0) -> - Data = register(Pid, Data0), - {keep_state, Data, [{reply, From, {ok, RestartCounter}}]}; - -handle_event({call, From}, {bind, Pid, RestartCounter}, _State, Data0) -> - Data1 = update_restart_counter(RestartCounter, Data0), - Data = register(Pid, Data1), - {keep_state, Data, [{reply, From, ok}]}; - -handle_event({call, From}, {unbind, Pid}, _State, Data0) -> - Data = unregister(Pid, Data0), - case Data of - #{echo_timer := stopped} -> - Actions = echo_timeout_action([{reply, From, ok}], infinity, send_echo), - {keep_state, Data, Actions}; +handle_event({call, From}, {bind, Pid}, State0, #{recovery := RestartCounter} = Data) -> + State = register(Pid, State0, Data), + {next_state, State, Data, [{reply, From, {ok, RestartCounter}}]}; + +handle_event({call, From}, {bind, Pid, RestartCounter}, State0, Data0) -> + {State1, Data} = update_restart_counter(RestartCounter, State0, Data0), + State = register(Pid, State1, Data), + {next_state, State, Data, [{reply, From, ok}]}; + +handle_event({call, From}, {unbind, Pid}, State0, Data) -> + State = unregister(Pid, State0, Data), + case State of + #state{echo_timer = 'stopped'} -> + Actions = echo_timeout_action([{reply, From, ok}], infinity, 'send_echo'), + {next_state, State, Data, Actions}; _ -> {keep_state, Data, [{reply, From, ok}]} end; @@ -189,17 +194,17 @@ handle_event({call, _From}, Request, _State, Data) -> {keep_state_and_data, [{reply, ok, Data}]}; handle_event(cast, {handle_request, ReqKey, #gtp{type = echo_request} = Msg0}, - _State, #{gtp_port := GtpPort, handler := Handler} = Data0) -> + State0, #{gtp_port := GtpPort, handler := Handler} = Data0) -> ?LOG(debug, "echo_request: ~p", [Msg0]), try gtp_packet:decode_ies(Msg0) of Msg = #gtp{} -> - Data = handle_recovery_ie(Msg, Data0), + {State, Data} = handle_recovery_ie(Msg, State0, Data0), ResponseIEs = Handler:build_recovery(echo_response, GtpPort, true, []), Response = Msg#gtp{type = echo_response, ie = ResponseIEs}, ergw_gtp_c_socket:send_response(ReqKey, Response, false), - {keep_state, Data} + {next_state, State, Data} catch Class:Error -> ?LOG(error, "GTP decoding failed with ~p:~p for ~p", @@ -207,54 +212,59 @@ handle_event(cast, {handle_request, ReqKey, #gtp{type = echo_request} = Msg0}, {keep_state, Data0} end; -handle_event(cast, down, _State, Data0) -> - Data = path_down(undefined, Data0), - {keep_state, Data}; +handle_event(cast, down, State, Data0) -> + {State, Data} = path_down(undefined, State, Data0), + {next_state, State, Data}; -handle_event(cast,{handle_response, echo_request, #gtp{type = echo_response} = Msg}, - State0, #{echo := EchoInterval} =Data0) -> +handle_event(cast,{handle_response, echo_request, #gtp{type = echo_response} = Msg}, + State0, #{echo := EchoInterval} = Data0) -> ?LOG(debug, "echo_response: ~p", [Msg]), - Data1 = handle_recovery_ie(Msg, Data0), - {State, Data} = echo_response(Msg, State0, Data1), - Actions = echo_timeout_action([], EchoInterval, send_echo), - {next_state, State, Data, Actions}; + {State1, Data1} = handle_recovery_ie(Msg, State0, Data0), + {State, Data} = echo_response(Msg, State1, Data1), + case State of + #state{echo_timer = 'echo_to_send'} -> + Actions = echo_timeout_action([], EchoInterval, 'send_echo'), + {next_state, State, Data, Actions}; + _ -> + {next_state, State, Data} + end; handle_event(cast,{handle_response, echo_request, timeout = Msg}, State0, Data0) -> ?LOG(debug, "echo_response: ~p", [Msg]), - Actions = echo_timeout_action([], infinity, send_echo), + Actions = echo_timeout_action([], infinity, 'send_echo'), {State, Data} = echo_response(Msg, State0, Data0), {next_state, State, Data, Actions}; %% test support -handle_event(cast, '$ping', _State, #{echo_timer := awaiting_response}) -> +handle_event(cast, '$ping', #state{echo_timer = 'awaiting_response'}, _Data) -> keep_state_and_data; -handle_event(cast, '$ping', _State, #{echo_timer := echo_to_send} = Data0) -> - Data = send_echo_request(Data0), - Actions = echo_timeout_action([], infinity, send_echo), - {keep_state, Data, Actions}; +handle_event(cast, '$ping', #state{echo_timer = 'echo_to_send'} = State0, Data) -> + State = send_echo_request(State0, Data), + Actions = echo_timeout_action([], infinity, 'send_echo'), + {next_state, State, Data, Actions}; handle_event(cast, Msg, _State, _Data) -> ?LOG(error, "~p: ~w: handle_event(cast, ...): ~p", [self(), ?MODULE, Msg]), keep_state_and_data; -handle_event(info,{'DOWN', _MonitorRef, process, Pid, _Info}, _State, Data0) -> - Data = unregister(Pid, Data0), - case Data of - #{echo_timer := echo_Stopped} -> - Actions = echo_timeout_action([], infinity, send_echo), - {keep_state, Data, Actions}; +handle_event(info,{'DOWN', _MonitorRef, process, Pid, _Info}, State0, Data) -> + State = unregister(Pid, State0, Data), + case State of + #state{echo_timer = 'echo_Stopped'} -> + Actions = echo_timeout_action([], infinity, 'send_echo'), + {next_state, State, Data, Actions}; _ -> {keep_state, Data} end; -handle_event({timeout, echo_timer}, send_echo, _state, - #{echo_timer := echo_to_send} = Data0) -> - ?LOG(debug, "handle_event timeout: ~p", [Data0]), - Data = send_echo_request(Data0), - Actions = echo_timeout_action([], infinity, send_echo), - {keep_state, Data, Actions}; +handle_event({timeout, 'echo_create'}, 'send_echo', + #state{echo_timer = 'echo_to_send'} = State0, Data) -> + ?LOG(debug, "handle_event timeout: ~p", [Data]), + State = send_echo_request(State0, Data), + Actions = echo_timeout_action([], infinity, 'send_echo'), + {next_state, State, Data, Actions}; -handle_event({timeout, echo_timer}, send_echo, _State, Data) -> +handle_event({timeout, 'echo_create'}, 'send_echo', _State, Data) -> ?LOG(debug, "handle_event timeout: ~p", [Data]), {keep_state, Data}; @@ -297,35 +307,37 @@ code_change(_OldVsn, State, Data, _Extra) -> -define(SMALLER(S1, S2), ((S1 < S2 andalso (S2 - S1) < 128) orelse (S1 > S2 andalso (S1 - S2) > 128))). -update_restart_counter(RestartCounter, #{recovery := undefined} = Data) -> - Data#{recovery => RestartCounter}; -update_restart_counter(RestartCounter, #{recovery := RestartCounter} = Data) -> - Data; -update_restart_counter(NewRestartCounter, #{ip := IP, recovery := OldRestartCounter} = Data) +update_restart_counter(RestartCounter, State, #{recovery := undefined} = Data) -> + {State, Data#{recovery => RestartCounter}}; +update_restart_counter(RestartCounter, State, #{recovery := RestartCounter} = Data) -> + {State, Data}; +update_restart_counter(NewRestartCounter, State, + #{ip := IP, recovery := OldRestartCounter} = Data) when ?SMALLER(OldRestartCounter, NewRestartCounter) -> ?LOG(warning, "GSN ~s restarted (~w != ~w)", [inet:ntoa(IP), OldRestartCounter, NewRestartCounter]), - path_down(NewRestartCounter, Data); + path_down(NewRestartCounter, State, Data); -update_restart_counter(NewRestartCounter, #{ip := IP, recovery := OldRestartCounter} = Data) +update_restart_counter(NewRestartCounter, State, + #{ip := IP, recovery := OldRestartCounter} = Data) when not ?SMALLER(OldRestartCounter, NewRestartCounter) -> ?LOG(warning, "possible race on message with restart counter for GSN ~s (old: ~w, new: ~w)", [inet:ntoa(IP), OldRestartCounter, NewRestartCounter]), - Data. + {State, Data}. handle_recovery_ie(#gtp{version = v1, ie = #{{recovery, 0} := #recovery{restart_counter = - RestartCounter}}}, Data) -> - update_restart_counter(RestartCounter, Data); + RestartCounter}}}, State, Data) -> + update_restart_counter(RestartCounter, State, Data); handle_recovery_ie(#gtp{version = v2, ie = #{{v2_recovery, 0} := #v2_recovery{restart_counter = - RestartCounter}}}, Data) -> - update_restart_counter(RestartCounter, Data); -handle_recovery_ie(_Msg, Data) -> - Data. + RestartCounter}}}, State, Data) -> + update_restart_counter(RestartCounter, State, Data); +handle_recovery_ie(_Msg, State, Data) -> + {State, Data}. ets_new() -> ets:new(?MODULE, [public, ordered_set, {keypos, 1}]). @@ -340,22 +352,23 @@ ets_foreach(TID, Fun, {Pids, Continuation}) lists:foreach(fun([Pid]) -> Fun(Pid) end, Pids), ets_foreach(TID, Fun, ets:match_object(Continuation)). -register(Pid, #{table := TID} = Data) -> +register(Pid, State, #{table := TID} = Data) -> ?LOG(debug, "~s: register(~p)", [?MODULE, Pid]), erlang:monitor(process, Pid), ets:insert(TID, {Pid}), - update_path_counter(ets:info(TID, size), Data). + update_path_counter(ets:info(TID, size), State, Data). -unregister(Pid, #{table := TID} = Data) -> +unregister(Pid, State, #{table := TID} = Data) -> ets:delete(TID, Pid), - update_path_counter(ets:info(TID, size), Data). + update_path_counter(ets:info(TID, size), State, Data). -update_path_counter(PathCounter, #{gtp_port := GtpPort, version := Version, ip := IP} = Data) -> +update_path_counter(PathCounter, State, #{gtp_port := GtpPort, version := Version, ip := IP} = + Data) -> ergw_prometheus:gtp_path_contexts(GtpPort, IP, Version, PathCounter), if PathCounter =:= 0 -> - stop_echo_request(Data); + stop_echo_request(State); true -> - start_echo_request(Data) + start_echo_request(State, Data) end. bind_path(#gtp{version = Version}, Context) -> @@ -374,44 +387,45 @@ path_recovery(_RestartCounter, #context{path = Path} = Context) -> {ok, PathRestartCounter} = gen_statem:call(Path, {bind, self()}), Context#context{remote_restart_counter = PathRestartCounter}. -start_echo_request(#{echo_timer := stopped} = Data) -> - send_echo_request(Data); -start_echo_request(Data) -> - Data. +start_echo_request(#state{echo_timer = 'stopped'} = State, Data) -> + send_echo_request(State, Data); +start_echo_request(State, _Data) -> + State. -stop_echo_request(Data) -> - Data#{echo_timer => stopped}. +stop_echo_request(State) -> + State#state{echo_timer = 'stopped'}. -send_echo_request(#{gtp_port := GtpPort, handler := Handler, ip := DstIP, - t3 := T3, n3 := N3} = Data) -> +send_echo_request(State, #{gtp_port := GtpPort, handler := Handler, ip := DstIP, + t3 := T3, n3 := N3}) -> Msg = Handler:build_echo_request(GtpPort), CbInfo = {?MODULE, handle_response, [self(), echo_request]}, ergw_gtp_c_socket:send_request(GtpPort, DstIP, ?GTP1c_PORT, T3, N3, Msg, CbInfo), - Data#{echo_timer => awaiting_response}. + State#state{echo_timer = 'awaiting_response'}. -echo_response(Msg, State0, #{echo_timer := awaiting_response} = Data0) -> +echo_response(Msg, #state{echo_timer = 'awaiting_response'} = State0, Data0) -> {State, Data} = update_path_state(Msg, State0, Data0), - {State, Data#{echo_timer => echo_to_send}}; + {State#state{echo_timer = 'echo_to_send'}, Data}; echo_response(Msg, State, Data) -> update_path_state(Msg, State, Data). -update_path_state(#gtp{}, _State, Data) -> - {'UP', Data}; -update_path_state(_, _State, Data) -> - {'DOWN', path_down(undefined, Data)}. +update_path_state(#gtp{}, State, Data) -> + {State#state{peer = 'UP'}, Data}; +update_path_state(_, State0, Data) -> + State = State0#state{peer = 'DOWN'}, + path_down(undefined, State, Data). -path_down(RestartCounter, #{table := TID} = Data0) -> +path_down(RestartCounter, State, #{table := TID} = Data0) -> Path = self(), proc_lib:spawn(fun() -> ets_foreach(TID, gtp_context:path_restart(_, Path)), ets:delete(TID) end), Data = Data0#{table => ets_new(), recovery => RestartCounter}, - update_path_counter(0, Data). + {update_path_counter(0, State, Data), Data}. echo_timeout_action(Actions, Timeout, Ref) when is_integer(Timeout) orelse Timeout =:= infinity; is_atom(Ref) -> - [{{timeout, echo_timer}, Timeout, Ref} | Actions]; + [{{timeout, 'echo_create'}, Timeout, Ref} | Actions]; echo_timeout_action(Actions, _, _) -> Actions.