Skip to content

Commit

Permalink
Log errors from ranch:handshake
Browse files Browse the repository at this point in the history
Fixes #11171

An MQTT user encountered TLS handshake timeouts with their IoT device,
and the actual error from `ssl:handshake` / `ranch:handshake` was not
caught and logged.

At this time, `ranch` uses `exit(normal)` in the case of timeouts, but
that should change in the future
(ninenines/ranch#336)
  • Loading branch information
lukebakken committed May 6, 2024
1 parent 746c06f commit 620fff2
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 80 deletions.
22 changes: 15 additions & 7 deletions deps/rabbit/src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ failed_to_recv_proxy_header(Ref, Error) ->
end,
rabbit_log:debug(Msg, [Error]),
% The following call will clean up resources then exit
_ = ranch:handshake(Ref),
_ = catch ranch:handshake(Ref),
exit({shutdown, failed_to_recv_proxy_header}).

handshake(Ref, ProxyProtocolEnabled) ->
Expand All @@ -572,14 +572,22 @@ handshake(Ref, ProxyProtocolEnabled) ->
{error, protocol_error, Error} ->
failed_to_recv_proxy_header(Ref, Error);
{ok, ProxyInfo} ->
{ok, Sock} = ranch:handshake(Ref),
setup_socket(Sock),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
case catch ranch:handshake(Ref) of
{'EXIT', normal} ->
{error, handshake_failed};
{ok, Sock} ->
setup_socket(Sock),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
end
end;
false ->
{ok, Sock} = ranch:handshake(Ref),
setup_socket(Sock),
{ok, Sock}
case catch ranch:handshake(Ref) of
{'EXIT', normal} ->
{error, handshake_failed};
{ok, Sock} ->
setup_socket(Sock),
{ok, Sock}
end
end.

setup_socket(Sock) ->
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ shutdown(Pid, Explanation) ->
no_return().
init(Parent, HelperSups, Ref) ->
?LG_PROCESS_TYPE(reader),
%% Note:
%% This function could return an error if the handshake times out.
%% It is less likely to happen here as compared to MQTT, so
%% crashing with a `badmatch` seems appropriate.
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbit, proxy_protocol, false)),
Deb = sys:debug_options([]),
Expand Down
59 changes: 32 additions & 27 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,39 @@ close_connection(Pid, Reason) ->
init(Ref) ->
process_flag(trap_exit, true),
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [mqtt]}),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(?APP_NAME, proxy_protocol, false)),
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
erlang:send_after(LoginTimeout, self(), login_timeout),
State0 = #state{socket = RealSocket,
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
conn_name = ConnName,
await_recv = false,
connection_state = running,
conserve = false,
parse_state = rabbit_mqtt_packet:init_state()},
State1 = control_throttle(State0),
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
gen_server:enter_loop(?MODULE, [], State);
{error, Reason = enotconn} ->
?LOG_INFO("MQTT could not get connection string: ~s", [Reason]),
rabbit_net:fast_close(RealSocket),
ignore;
ProxyProtocolEnabled = application:get_env(?APP_NAME, proxy_protocol, false),
case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of
{error, Reason} ->
?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]),
rabbit_net:fast_close(RealSocket),
{stop, Reason}
?LOG_ERROR("MQTT could not establish connection: ~s", [Reason]),
{stop, Reason};
{ok, Sock} ->
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
erlang:send_after(LoginTimeout, self(), login_timeout),
State0 = #state{socket = RealSocket,
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
conn_name = ConnName,
await_recv = false,
connection_state = running,
conserve = false,
parse_state = rabbit_mqtt_packet:init_state()},
State1 = control_throttle(State0),
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
gen_server:enter_loop(?MODULE, [], State);
{error, Reason = enotconn} ->
?LOG_INFO("MQTT could not get connection string: ~s", [Reason]),
rabbit_net:fast_close(RealSocket),
ignore;
{error, Reason} ->
?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]),
rabbit_net:fast_close(RealSocket),
{stop, Reason}
end
end.

handle_call({info, InfoItems}, _From, State) ->
Expand Down
88 changes: 46 additions & 42 deletions deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,51 +63,55 @@ close_connection(Pid, Reason) ->

init([SupHelperPid, Ref, Configuration]) ->
process_flag(trap_exit, true),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbitmq_stomp, proxy_protocol, false)),
RealSocket = rabbit_net:unwrap_socket(Sock),

case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
ProcInitArgs = processor_args(Configuration, Sock),
ProcState = rabbit_stomp_processor:initial_state(Configuration,
ProcInitArgs),

rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
[self(), ConnName]),

ParseState = rabbit_stomp_frame:initial_state(),
_ = register_resource_alarm(),

LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
erlang:send_after(LoginTimeout, self(), login_timeout),

gen_server2:enter_loop(?MODULE, [],
rabbit_event:init_stats_timer(
run_socket(control_throttle(
#reader_state{socket = RealSocket,
conn_name = ConnName,
parse_state = ParseState,
processor_state = ProcState,
heartbeat_sup = SupHelperPid,
heartbeat = {none, none},
max_frame_size = MaxFrameSize,
current_frame_size = 0,
state = running,
conserve_resources = false,
recv_outstanding = false})), #reader_state.stats_timer),
{backoff, 1000, 1000, 10000});
{error, enotconn} ->
rabbit_net:fast_close(RealSocket),
terminate(shutdown, undefined);
ProxyProtocolEnabled = application:get_env(rabbitmq_stomp, proxy_protocol, false),
case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of
{error, Reason} ->
rabbit_net:fast_close(RealSocket),
terminate({network_error, Reason}, undefined)
rabbit_log_connection:error(
"STOMP could not establish connection: ~s", [Reason]),
{stop, Reason};
{ok, Sock} ->
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
ProcInitArgs = processor_args(Configuration, Sock),
ProcState = rabbit_stomp_processor:initial_state(Configuration,
ProcInitArgs),

rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
[self(), ConnName]),

ParseState = rabbit_stomp_frame:initial_state(),
_ = register_resource_alarm(),

LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
erlang:send_after(LoginTimeout, self(), login_timeout),

gen_server2:enter_loop(?MODULE, [],
rabbit_event:init_stats_timer(
run_socket(control_throttle(
#reader_state{socket = RealSocket,
conn_name = ConnName,
parse_state = ParseState,
processor_state = ProcState,
heartbeat_sup = SupHelperPid,
heartbeat = {none, none},
max_frame_size = MaxFrameSize,
current_frame_size = 0,
state = running,
conserve_resources = false,
recv_outstanding = false})), #reader_state.stats_timer),
{backoff, 1000, 1000, 10000});
{error, enotconn} ->
rabbit_net:fast_close(RealSocket),
terminate(shutdown, undefined);
{error, Reason} ->
rabbit_net:fast_close(RealSocket),
terminate({network_error, Reason}, undefined)
end
end.


handle_call({info, InfoItems}, _From, State) ->
Infos = lists:map(
fun(InfoItem) ->
Expand Down
11 changes: 7 additions & 4 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,13 @@ init([KeepaliveSup,
heartbeat := Heartbeat,
transport := ConnTransport}]) ->
process_flag(trap_exit, true),
{ok, Sock} =
rabbit_networking:handshake(Ref,
application:get_env(rabbitmq_stream,
proxy_protocol, false)),
ProxyProtocolEnabled =
application:get_env(rabbitmq_stream, proxy_protocol, false),
%% Note:
%% This function could return an error if the handshake times out.
%% It is less likely to happen here as compared to MQTT, so
%% crashing with a `badmatch` seems appropriate.
{ok, Sock} = rabbit_networking:handshake(Ref, ProxyProtocolEnabled),
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
Expand Down

0 comments on commit 620fff2

Please sign in to comment.