Skip to content

Commit

Permalink
Add diameter_sctp option packet
Browse files Browse the repository at this point in the history
To determine the wrapping of messages passed to recv callbacks and into
diameter. The default passing of the input stream in transport_data is
probably of no practical use, but has been set since time immemorial.
  • Loading branch information
Anders Svensson committed Jun 12, 2017
1 parent 373cd07 commit c591056
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
53 changes: 42 additions & 11 deletions lib/diameter/src/transport/diameter_sctp.erl
Expand Up @@ -78,6 +78,7 @@

-type option() :: {sender, boolean()}
| sender
| {packet, boolean() | raw}
| {message_cb, false | diameter:evaluable()}.

-type uint() :: non_neg_integer().
Expand All @@ -101,6 +102,8 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
packet = true :: boolean() %% legacy transport_data?
| raw,
message_cb :: false | diameter:evaluable(),
send = false :: pid() | boolean()}). %% sending process

Expand Down Expand Up @@ -243,7 +246,8 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
{Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]),
{Split, Rest}
= proplists:split(Opts, [accept, packet, sender, message_cb]),
OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
Expand All @@ -252,14 +256,15 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
#listener{ref = Ref,
service = SvcPid,
socket = Sock,
opts = [[[M] || {accept, M} <- OwnOpts]
opts = [[[M] || {accept, M} <- OwnOpts],
proplists:get_value(packet, OwnOpts, true)
| [proplists:get_value(K, OwnOpts, false)
|| K <- [sender, message_cb]]]};

%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
{[Ps | Split], Rest}
= proplists:split(Opts, [rport, raddr, sender, message_cb]),
= proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
OwnOpts = lists:append(Split),
CB = proplists:get_value(message_cb, OwnOpts, false),
false == CB orelse (Pid ! {diameter, ack}),
Expand All @@ -273,6 +278,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock,
message_cb = CB,
packet = proplists:get_value(packet, OwnOpts, true),
send = proplists:get_value(sender, OwnOpts, false)};

%% An accepting transport spawned by diameter, not yet owning an
Expand Down Expand Up @@ -309,12 +315,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
S#transport{parent = Pid};
{K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
[Matches, Sender, CB] = Opts,
[Matches, Packet, Sender, CB] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
false == CB orelse (S#transport.parent ! {diameter, ack}),
t(T, S#transport{socket = Sock,
message_cb = CB,
packet = Packet,
send = Sender});
accept_timeout = T ->
x(T);
Expand Down Expand Up @@ -740,10 +747,9 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
recv(T, S#transport{assoc_id = Id});

%% Inbound Diameter message.
recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S)
recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}},
message(recv, Pkt, S);
message(recv, Msg, S);

recv({_, #sctp_shutdown_event{}}, _) ->
stop;
Expand Down Expand Up @@ -888,7 +894,9 @@ setopts(Sock) ->

%% A message_cb is invoked whenever a message is sent or received, or
%% to provide acknowledgement of a completed send or discarded
%% request. See diameter_tcp for semantics.
%% request. See diameter_tcp for semantics, the only difference being
%% that a recv callback can get a diameter_packet record as Msg
%% depending on how/if option packet has been specified.

%% message/3

Expand All @@ -898,8 +906,8 @@ message(send, false = M, S) ->
message(ack, _, #transport{message_cb = false} = S) ->
S;

message(Dir, Msg, #transport{message_cb = CB} = S) ->
setopts(S, actions(cb(CB, Dir, Msg), Dir, S)).
message(Dir, Msg, S) ->
setopts(S, actions(cb(S, Dir, Msg), Dir, S)).

%% actions/3

Expand Down Expand Up @@ -935,8 +943,31 @@ actions(CB, _, S) ->

%% cb/3

cb(false, _, Msg) ->
cb(#transport{message_cb = false, packet = P}, recv, Msg) ->
[pkt(P, true, Msg)];

cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) ->
cb(CB, D, pkt(P, false, Msg));

cb(#transport{message_cb = CB}, Dir, Msg) ->
cb(CB, Dir, Msg);

cb(false, send, Msg) ->
[Msg];

cb(CB, Dir, Msg) ->
diameter_lib:eval([CB, Dir, Msg]).

%% pkt/3

pkt(false, _, {_Info, Bin}) ->
Bin;

pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) ->
#diameter_packet{bin = Bin, transport_data = {stream, Id}};

pkt(raw, true, {[Info], Bin}) ->
#diameter_packet{bin = Bin, transport_data = Info};

pkt(raw, false, {[_], _} = Msg) ->
Msg.
6 changes: 5 additions & 1 deletion lib/diameter/test/diameter_traffic_SUITE.erl
Expand Up @@ -466,7 +466,9 @@ add_transports(Config) ->
LRef = ?util:listen(SN,
[T,
{sender, SS},
{message_cb, ST andalso {?MODULE, message, [4]}}],
{message_cb, ST andalso {?MODULE, message, [4]}}
| [{packet, hd(?util:scramble([false, raw]))}
|| T == sctp andalso CS]],
[{capabilities_cb, fun capx/2},
{pool_size, 8},
{spawn_opt, [{min_heap_size, 8096}]},
Expand Down Expand Up @@ -1509,6 +1511,8 @@ request(#diameter_base_RAR{}, _Caps) ->
%% Limit the number of messages received. More can be received if read
%% in the same packet.

message(recv = D, {[_], Bin}, N) ->
message(D, Bin, N);
message(Dir, #diameter_packet{bin = Bin}, N) ->
message(Dir, Bin, N);

Expand Down

0 comments on commit c591056

Please sign in to comment.