Skip to content

Commit

Permalink
Replace erlang:port_command/2 hack with gen_udp:connect/3
Browse files Browse the repository at this point in the history
  • Loading branch information
lpgauth committed Jan 3, 2020
1 parent ae1aa33 commit 12b2f32
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 69 deletions.
4 changes: 1 addition & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
{erl_opts, [
debug_info,
{platform_define, "^21.3|^22", 'ATOMICS'},
{platform_define, "^18|^19|^2", 'ETS_TAKE'},
{platform_define, "^18", 'UDP_HEADER_1'},
{platform_define, "^19|^20|^21|^22.0", 'UDP_HEADER_2'}
{platform_define, "^18|^19|^2", 'ETS_TAKE'}
]}.

{profiles, [
Expand Down
3 changes: 1 addition & 2 deletions src/shackle_tcp_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ terminate(_Reason, {#state {
[E, R, ?GET_STACK(Stacktrace)])
end,
?SERVER_UTILS:reply_all(Id, {error, shutdown}),
shackle_backlog:delete(Id),
ok.
shackle_backlog:delete(Id).

%% private
close(#state {id = Id} = State, ClientState) ->
Expand Down
81 changes: 17 additions & 64 deletions src/shackle_udp_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@

-compile(inline).
-compile({inline_size, 512}).
-compile({nowarn_unused_function, [{uint32, 1}]}).

-ignore_xref([
{?MODULE, uint32, 1}
]).

-export([
start_link/2
Expand All @@ -22,7 +17,6 @@

-record(state, {
client :: client(),
header :: undefined | iodata(),
id :: server_id(),
init_options :: init_options(),
ip :: inet:ip_address() | inet:hostname(),
Expand All @@ -36,9 +30,6 @@
timer_ref :: undefined | reference()
}).

-define(INET_AF_INET, 1).
-define(INT16(X), [((X) bsr 8) band 16#ff, (X) band 16#ff]).

-type state() :: #state {}.

%% public
Expand Down Expand Up @@ -93,15 +84,14 @@ handle_msg({Request, #cast {
timeout = Timeout
} = Cast}, {#state {
client = Client,
header = Header,
id = Id,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->

try Client:handle_request(Request, ClientState) of
{ok, ExtRequestId, Data, ClientState2} ->
case send(Socket, Header, Data) of
case gen_udp:send(Socket, Data) of
ok ->
?METRICS(Client, counter, <<"send">>),
Msg = {timeout, ExtRequestId},
Expand All @@ -116,13 +106,11 @@ handle_msg({Request, #cast {
end
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handle_request error: ~p:~p~n~p~n",
?WARN(PoolName, "handle_request crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
?SERVER_UTILS:reply(Id, {error, client_crash}, Cast),
{ok, {State, ClientState}}
end;
handle_msg({inet_reply, _Socket, ok}, {State, ClientState}) ->
{ok, {State, ClientState}};
handle_msg({udp, Socket, _Ip, _InPortNo, Data}, {#state {
client = Client,
id = Id,
Expand Down Expand Up @@ -181,13 +169,14 @@ handle_msg({timeout, ExtRequestId}, {#state {
end,
{ok, {State, ClientState}}
end;
handle_msg({inet_reply, Socket, {error, Reason}}, {#state {
pool_name = PoolName,
socket = Socket
handle_msg({udp_error, Socket, Reason}, {#state {
socket = Socket,
pool_name = PoolName
} = State, ClientState}) ->

?WARN(PoolName, "send error: ~p", [Reason]),
{ok, {State, ClientState}};
?WARN(PoolName, "connection error: ~p", [Reason]),
gen_udp:close(Socket),
close(State, ClientState);
handle_msg(?MSG_CONNECT, {#state {
client = Client,
id = Id,
Expand All @@ -200,15 +189,14 @@ handle_msg(?MSG_CONNECT, {#state {
} = State, ClientState}) ->

case connect(PoolName, Ip, Port, SocketOptions) of
{ok, Header, Socket} ->
{ok, Socket} ->
case ?SERVER_UTILS:client(Client, PoolName, Init, inet, Socket) of
{ok, ClientState2} ->
ReconnectState2 =
?SERVER_UTILS:reconnect_state_reset(ReconnectState),
shackle_status:enable(Id),

{ok, {State#state {
header = Header,
reconnect_state = ReconnectState2,
socket = Socket
}, ClientState2}};
Expand Down Expand Up @@ -247,14 +235,19 @@ terminate(_Reason, {#state {
shackle_backlog:delete(Id).

%% private
close(#state {id = Id} = State, ClientState) ->
shackle_status:disable(Id),
?SERVER_UTILS:reply_all(Id, {error, socket_closed}),
reconnect(State, ClientState).

connect(PoolName, Ip, Port, SocketOptions) ->
case inet:getaddrs(Ip, inet) of
{ok, Addrs} ->
Ip2 = shackle_utils:random_element(Addrs),
Header = header(Ip2, Port),
case gen_udp:open(0, SocketOptions) of
{ok, Socket} ->
{ok, Header, Socket};
ok = gen_udp:connect(Socket, Ip2, Port),
{ok, Socket};
{error, Reason} ->
?WARN(PoolName, "connect error: ~p", [Reason]),
{error, Reason}
Expand All @@ -264,33 +257,6 @@ connect(PoolName, Ip, Port, SocketOptions) ->
{error, Reason}
end.

close(#state {id = Id} = State, ClientState) ->
shackle_status:disable(Id),
?SERVER_UTILS:reply_all(Id, {error, socket_closed}),
reconnect(State, ClientState).

-ifdef(UDP_HEADER_1).

header(IP, Port) ->
[?INT16(Port), ip4_to_bytes(IP)].

-else.
-ifdef(UDP_HEADER_2).

header(IP, Port) ->
[?INET_AF_INET, ?INT16(Port), ip4_to_bytes(IP)].

-else.

header(IP, Port) ->
[?INET_AF_INET, ?INT16(Port), ip4_to_bytes(IP), uint32(0)].

-endif.
-endif.

ip4_to_bytes({A, B, C, D}) ->
[A band 16#ff, B band 16#ff, C band 16#ff, D band 16#ff].

reconnect(State, undefined) ->
reconnect_timer(State, undefined);
reconnect(#state {
Expand All @@ -301,7 +267,7 @@ reconnect(#state {
try Client:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate error: ~p:~p~n~p~n",
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
reconnect_timer(State, ClientState).
Expand All @@ -326,16 +292,3 @@ reconnect_timer(#state {
socket = undefined,
timer_ref = TimerRef
}, ClientState}}.

send(Socket, Header, Data) ->
try
true = erlang:port_command(Socket, [Header, Data]),
ok
catch
Error:Reason ->
{error, {Error, Reason}}
end.

uint32(X) ->
[((X) bsr 24) band 16#ff, ((X) bsr 16) band 16#ff,
((X) bsr 8) band 16#ff, (X) band 16#ff].

0 comments on commit 12b2f32

Please sign in to comment.