Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Commit

Permalink
Merge pull request #105 from rabbitmq/rabbitmq-server-589
Browse files Browse the repository at this point in the history
Add support for proxy protocol
  • Loading branch information
michaelklishin committed Feb 10, 2017
2 parents c114f59 + 0f5786e commit a3d0934
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 13 deletions.
5 changes: 3 additions & 2 deletions Makefile
Expand Up @@ -19,15 +19,16 @@ define PROJECT_ENV
%% see rabbitmq/rabbitmq-stomp#39
{trailing_lf, true},
%% see rabbitmq/rabbitmq-stomp#57
{hide_server_info, false}
{hide_server_info, false},
{proxy_protocol, false}
]
endef

define PROJECT_APP_EXTRA_KEYS
{broker_version_requirements, []}
endef

DEPS = ranch rabbit_common rabbit amqp_client
DEPS = ranch rabbit_common rabbit amqp_client ranch_proxy_protocol
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers

DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
Expand Down
1 change: 1 addition & 0 deletions rabbitmq-components.mk
Expand Up @@ -106,6 +106,7 @@ dep_cowboy_commit = 1.1.0
dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2
dep_ranch_commit = 1.3.1
dep_webmachine_commit = 1.10.8p2
dep_ranch_proxy_protocol = git git@github.com:heroku/ranch_proxy_protocol.git 1.4.1

RABBITMQ_COMPONENTS = amqp_client \
rabbit \
Expand Down
20 changes: 11 additions & 9 deletions src/rabbit_stomp_reader.erl
Expand Up @@ -67,7 +67,8 @@ info(Pid, InfoItems) ->

init([SupHelperPid, Ref, Sock, Configuration]) ->
process_flag(trap_exit, true),
rabbit_net:accept_ack(Ref, Sock),
RealSocket = rabbit_net:unwrap_socket(Sock),
rabbit_net:accept_ack(Ref, RealSocket),

case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
Expand All @@ -83,7 +84,7 @@ init([SupHelperPid, Ref, Sock, Configuration]) ->
gen_server2:enter_loop(?MODULE, [],
rabbit_event:init_stats_timer(
run_socket(control_throttle(
#reader_state{socket = Sock,
#reader_state{socket = RealSocket,
conn_name = ConnStr,
parse_state = ParseState,
processor_state = ProcState,
Expand All @@ -94,13 +95,13 @@ init([SupHelperPid, Ref, Sock, Configuration]) ->
recv_outstanding = false})), #reader_state.stats_timer),
{backoff, 1000, 1000, 10000});
{network_error, Reason} ->
rabbit_net:fast_close(Sock),
rabbit_net:fast_close(RealSocket),
terminate({shutdown, Reason}, undefined);
{error, enotconn} ->
rabbit_net:fast_close(Sock),
rabbit_net:fast_close(RealSocket),
terminate(shutdown, undefined);
{error, Reason} ->
rabbit_net:fast_close(Sock),
rabbit_net:fast_close(RealSocket),
terminate({network_error, Reason}, undefined)
end.

Expand Down Expand Up @@ -339,20 +340,21 @@ log_reason(Reason, #reader_state{ processor_state = ProcState }) ->
%%----------------------------------------------------------------------------

processor_args(Configuration, Sock) ->
RealSocket = rabbit_net:unwrap_socket(Sock),
SendFun = fun (sync, IoData) ->
%% no messages emitted
catch rabbit_net:send(Sock, IoData);
catch rabbit_net:send(RealSocket, IoData);
(async, IoData) ->
%% {inet_reply, _, _} will appear soon
%% We ignore certain errors here, as we will be
%% receiving an asynchronous notification of the
%% same (or a related) fault shortly anyway. See
%% bug 21365.
catch rabbit_net:port_command(Sock, IoData)
catch rabbit_net:port_command(RealSocket, IoData)
end,
{ok, {PeerAddr, _PeerPort}} = rabbit_net:sockname(Sock),
{ok, {PeerAddr, _PeerPort}} = rabbit_net:sockname(RealSocket),
{SendFun, adapter_info(Sock),
ssl_login_name(Sock, Configuration), PeerAddr}.
ssl_login_name(RealSocket, Configuration), PeerAddr}.

adapter_info(Sock) ->
amqp_connection:socket_adapter_info(Sock, {'STOMP', 0}).
Expand Down
13 changes: 11 additions & 2 deletions src/rabbit_stomp_sup.erl
Expand Up @@ -50,11 +50,20 @@ listener_specs(Fun, Args, Listeners) ->
tcp_listener_spec([Address, SocketOpts, Configuration, NumAcceptors]) ->
rabbit_networking:tcp_listener_spec(
rabbit_stomp_listener_sup, Address, SocketOpts,
ranch_tcp, rabbit_stomp_client_sup, Configuration,
transport(stomp), rabbit_stomp_client_sup, Configuration,
stomp, NumAcceptors, "STOMP TCP Listener").

ssl_listener_spec([Address, SocketOpts, SslOpts, Configuration, NumAcceptors]) ->
rabbit_networking:tcp_listener_spec(
rabbit_stomp_listener_sup, Address, SocketOpts ++ SslOpts,
ranch_ssl, rabbit_stomp_client_sup, Configuration,
transport('stomp/ssl'), rabbit_stomp_client_sup, Configuration,
'stomp/ssl', NumAcceptors, "STOMP SSL Listener").

transport(Protocol) ->
ProxyProtocol = application:get_env(rabbitmq_stomp, proxy_protocol, false),
case {Protocol, ProxyProtocol} of
{stomp, false} -> ranch_tcp;
{stomp, true} -> ranch_proxy;
{'stomp/ssl', false} -> ranch_ssl;
{'stomp/ssl', true} -> ranch_proxy_ssl
end.
97 changes: 97 additions & 0 deletions test/proxy_protocol_SUITE.erl
@@ -0,0 +1,97 @@
-module(proxy_protocol_SUITE).
-compile([export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

-define(TIMEOUT, 5000).

all() ->
[
{group, non_parallel_tests}
].

groups() ->
[
{non_parallel_tests, [], [
proxy_protocol,
proxy_protocol_tls
]}
].

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Suffix},
{rmq_certspwd, "bunnychow"},
{rabbitmq_ct_tls_verify, verify_none}
]),
MqttConfig = stomp_config(),
rabbit_ct_helpers:run_setup_steps(Config1,
[ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

stomp_config() ->
{rabbitmq_stomp, [
{proxy_protocol, true}
]}.

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) -> Config.
end_per_group(_, Config) -> Config.

init_per_testcase(_Testcase, Config) ->
Config.

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

proxy_protocol(Config) ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
[binary, {active, false}, {packet, raw}]),
ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"),
ok = inet:send(Socket, stomp_connect_frame()),
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, connection_name, []),
match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]),
gen_tcp:close(Socket),
ok.

proxy_protocol_tls(Config) ->
app_utils:start_applications([asn1, crypto, public_key, ssl]),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls),
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
[binary, {active, false}, {packet, raw}]),
ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"),
{ok, SslSocket} = ssl:connect(Socket, [], ?TIMEOUT),
ok = ssl:send(SslSocket, stomp_connect_frame()),
{ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, connection_name, []),
match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]),
gen_tcp:close(Socket),
ok.

connection_name() ->
Connections = ets:tab2list(connection_created),
{_Key, Values} = lists:nth(1, Connections),
{_, Name} = lists:keyfind(name, 1, Values),
Name.

merge_app_env(MqttConfig, Config) ->
rabbit_ct_helpers:merge_app_env(Config, MqttConfig).

stomp_connect_frame() ->
<<"CONNECT\n",
"login:guest\n",
"passcode:guest\n",
"\n",
0>>.

0 comments on commit a3d0934

Please sign in to comment.