Skip to content

Commit

Permalink
Merge pull request #18 from rabbitmq/osiris-15
Browse files Browse the repository at this point in the history
Secure replica listener using a one time token
  • Loading branch information
kjnilsson committed Feb 19, 2021
2 parents c0658d7 + 3123126 commit ba67f2b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
57 changes: 51 additions & 6 deletions src/osiris_replica.erl
Expand Up @@ -44,8 +44,13 @@
event_formatter :: undefined | mfa(),
counter :: counters:counters_ref()}).

-type token() :: binary().

-type parse_state() ::
undefined | binary() |
undefined |
{awaiting_token, token()} |
{awaiting_token, token(), binary()} |
binary() |
{non_neg_integer(), iolist(), non_neg_integer()}.

-record(?MODULE,
Expand All @@ -64,6 +69,7 @@
-define(C_COMMITTED_OFFSET, ?C_NUM_LOG_FIELDS + 1).
-define(C_FORCED_GCS, ?C_NUM_LOG_FIELDS + 2).
-define(C_PACKETS, ?C_NUM_LOG_FIELDS + 3).
-define(DEFAULT_ONE_TIME_TOKEN_TIMEOUT, 30000).

%%%===================================================================
%%% API functions
Expand Down Expand Up @@ -170,14 +176,17 @@ init(#{name := Name,
%% spawn reader process on leader node
{ok, HostName} = inet:gethostname(),
{ok, Ip} = inet:getaddr(HostName, inet),
rand:seed(exsplus),
Token = list_to_binary([rand:uniform(256) - 1 || _ <- lists:seq(1, 256)]),
ReplicaReaderConf =
#{host => Ip,
port => Port,
name => Name,
replica_pid => self(),
leader_pid => LeaderPid,
start_offset => TailInfo,
external_ref => ExtRef},
external_ref => ExtRef,
connection_token => Token},
_RRPid =
case supervisor:start_child({osiris_replica_reader_sup, Node},
#{id => make_ref(),
Expand Down Expand Up @@ -214,7 +223,8 @@ init(#{name := Name,
offset_ref = ORef,
event_formatter = EvtFmt,
counter = CntRef},
log = Log}}.
log = Log,
parse_state = {awaiting_token, Token}}}.

open_tcp_port(M, M) ->
throw({error, all_busy});
Expand Down Expand Up @@ -336,8 +346,43 @@ handle_info(force_gc,
{noreply, State};
handle_info({socket, Socket}, #?MODULE{cfg = Cfg} = State) ->
ok = inet:setopts(Socket, [{active, 5}]),

{noreply, State#?MODULE{cfg = Cfg#cfg{socket = Socket}}};
Timeout = application:get_env(osiris, one_time_token_timeout, ?DEFAULT_ONE_TIME_TOKEN_TIMEOUT),
{noreply, State#?MODULE{cfg = Cfg#cfg{socket = Socket}}, Timeout};
handle_info(timeout, #?MODULE{parse_state = {awaiting_token, _}} = State) ->
{stop, missing_token, State};
handle_info(timeout, State) ->
{noreply, State};
handle_info({tcp, Socket, <<Token:256/binary, Rem/binary>>},
#?MODULE{cfg = #cfg{socket = Socket},
parse_state = {awaiting_token, Token}} = State) ->
ok = inet:setopts(Socket, [{active, 1}]),
{noreply, State#?MODULE{parse_state = Rem}};
handle_info({tcp, Socket, <<_Other:256/binary, _Rem/binary>>},
#?MODULE{cfg = #cfg{socket = Socket},
parse_state = {awaiting_token, _Token}} = State) ->
{stop, invalid_token, State};
handle_info({tcp, Socket, Bin},
#?MODULE{cfg = #cfg{socket = Socket},
parse_state = {awaiting_token, Token}} = State) ->
ok = inet:setopts(Socket, [{active, 1}]),
{noreply, State#?MODULE{parse_state = {awaiting_token, Token, Bin}}};
handle_info({tcp, Socket, Bin0},
#?MODULE{cfg = #cfg{socket = Socket},
parse_state = {awaiting_token, Token, PartialBin}} = State) ->
Bin = <<PartialBin/binary, Bin0/binary>>,
case byte_size(Bin) of
S when S >= 256 ->
case Bin of
<<Token:256/binary, Rem/binary>> ->
ok = inet:setopts(Socket, [{active, 1}]),
{noreply, State#?MODULE{parse_state = Rem}};
_ ->
{stop, invalid_token, State}
end;
_ ->
ok = inet:setopts(Socket, [{active, 1}]),
{noreply, State#?MODULE{parse_state = {awaiting_token, Token, Bin}}}
end;
handle_info({tcp, Socket, Bin},
#?MODULE{cfg =
#cfg{socket = Socket,
Expand Down Expand Up @@ -372,7 +417,7 @@ handle_info({tcp_passive, Socket},
handle_info({tcp_closed, Socket},
#?MODULE{cfg = #cfg{name = Name, socket = Socket}} = State) ->
?DEBUG("osiris_replica: ~s Socket closed. Exiting...", [Name]),
{stop, tcp_closed, State};
{stop, normal, State};
handle_info({tcp_error, Socket, Error},
#?MODULE{cfg = #cfg{name = Name, socket = Socket}} = State) ->
?DEBUG("osiris_replica: ~s Socket error ~p. Exiting...",
Expand Down
4 changes: 3 additions & 1 deletion src/osiris_replica_reader.erl
Expand Up @@ -82,7 +82,8 @@ init(#{host := Host,
replica_pid := ReplicaPid,
leader_pid := LeaderPid,
start_offset := {StartOffset, _} = TailInfo,
external_ref := ExtRef} =
external_ref := ExtRef,
connection_token := Token} =
Args) ->
CntId = {?MODULE, ExtRef, Host, Port},
CntRef = osiris_counters:new(CntId, ?COUNTER_FIELDS),
Expand All @@ -97,6 +98,7 @@ init(#{host := Host,
{packet, 0},
{nodelay, true},
{sndbuf, SndBuf}]),
ok = gen_tcp:send(Sock, Token),
%% register data listener with osiris_proc
ok = osiris_writer:register_data_listener(LeaderPid, StartOffset),
MRef = monitor(process, LeaderPid),
Expand Down

0 comments on commit ba67f2b

Please sign in to comment.