Skip to content

Commit

Permalink
handle multipart netlink messages transparently: subcribers only rece…
Browse files Browse the repository at this point in the history
…ive complete blocks of netlink messages
  • Loading branch information
rhaberkorn committed Sep 30, 2011
1 parent b55b2d2 commit 58bff25
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
12 changes: 7 additions & 5 deletions include/netlink.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@
msg ::ctnetlink_msg()
}).

-type ctnetlink_ev() :: {ctnetlink, [#ctnetlink{} | #ctnetlink_exp{} | #netlink{}, ...]}.
%% type of messages sent to `ct' subscriber
-type netlink_record() :: #rtnetlink{} | #ctnetlink{} | #ctnetlink_exp{}.

-type ctnetlink_ev() :: {ctnetlink, MsgGrp :: [#ctnetlink{} | #ctnetlink_exp{}, ...]}.
%% netlink event message sent to `ct' subscriber
%% `MsgGrp' is a single-part netlink message or a complete
%% multipart netlink message.
-type rtnetlink_ev() :: {rtnetlink, [#rtnetlink{}, ...]}.
%% type of messages sent to `rt' subscriber
%% messages with no subsystem are decoded as #rtnetlink!
%% multipart messages are terminates by #rtnetlink{type = done}
%% netlink event message sent to `rt' subscriber

67 changes: 57 additions & 10 deletions src/netlink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@
-include("netlink.hrl").

-define(TAB, ?MODULE).
-record(state, {subscribers::list(),
ct,
rt}).
-record(subscription, {pid::pid(),
types::list(atom())}).

-record(subscription, {
pid :: pid(),
types :: [ct | rt | s]
}).

-record(state, {
subscribers = [] :: [#subscription{}],
ct :: gen_udp:socket(),
rt :: gen_udp:socket(),

msgbuf = [] :: [netlink_record()]
}).

%% netlink info
-define(NETLINK_ADD_MEMBERSHIP, 1).
Expand Down Expand Up @@ -1224,7 +1232,7 @@ init(_Args) ->
%% UDP is close enough (connection less, datagram oriented), so we can use the driver from it
{ok, Rt} = gen_udp:open(0, [binary, {fd, RtNl}]),

{ok, #state{ct = Ct, rt = Rt, subscribers = []}}.
{ok, #state{ct = Ct, rt = Rt}}.

send(SubSys, Msg) ->
gen_server:cast(?MODULE, {send, SubSys, Msg}).
Expand Down Expand Up @@ -1267,15 +1275,19 @@ handle_info({udp, Ct, _IP, _port, Data}, #state{ct = Ct, rt = _Rt, subscribers =
Subs = lists:filter(fun(Elem) ->
lists:member(ct, Elem#subscription.types)
end, Sub),
spawn(?MODULE, notify, [ctnetlink, Subs, nl_ct_dec_udp(Data)]),
{noreply, State};
NewState = handle_messages(nl_ct_dec_udp(Data), ctnetlink, Subs, State),

{noreply, NewState};

handle_info({udp, Rt, _IP, _Port, Data}, #state{rt = Rt, ct = _Ct, subscribers = Sub} = State) ->
%% io:format("got ~p~ndec: ~p~n", [Data, nl_rt_dec_udp(Data)]),
Subs = lists:filter(fun(Elem) ->
lists:member(rt, Elem#subscription.types)
end, Sub),
spawn(?MODULE, notify, [rtnetlink, Subs, nl_rt_dec_udp(Data)]),
{noreply, State};
NewState = handle_messages(nl_rt_dec_udp(Data), rtnetlink, Subs, State),

{noreply, NewState};

handle_info({udp, S, _IP, _Port, _Data}, #state{subscribers = Sub} = State) ->
%% io:format("got on Socket ~p~n", [S]),
Subs = lists:filter(fun(Elem) ->
Expand All @@ -1302,3 +1314,38 @@ code_change(_OldVsn, State, _Extra) ->
notify(SubSys, Pids, Msgs) ->
lists:foreach(fun(Pid) -> Pid#subscription.pid ! {SubSys, Msgs} end, Pids).

-spec process_maybe_multipart(InputMsgs :: [netlink_record() | #netlink{}],
MsgBuf :: [netlink_record()]) ->
{incomplete, NewMsgBuf :: [netlink_record()]} |
{done, MsgGrp :: [netlink_record()], RestInput :: [netlink_record() | #netlink{}]}.

process_maybe_multipart([], MsgBuf) ->
{incomplete, MsgBuf};
process_maybe_multipart([Msg | Rest], MsgBuf) ->
Type = element(2, Msg), % Msg may be arbitrary netlink record
Flags = element(3, Msg),

case Type of
done ->
{done, lists:reverse(MsgBuf), Rest};
_ ->
case proplists:get_bool(multi, Flags) of
false -> {done, [Msg], Rest};
true -> process_maybe_multipart(Rest, [Msg | MsgBuf])
end
end.

-spec handle_messages(InputMsgs :: [netlink_record() | #netlink{}],
atom(), [#subscription{}], #state{}) -> #state{}.

handle_messages([], _SubSys, _Subs, State) ->
State#state{msgbuf = []};
handle_messages(Msgs, SubSys, Subs, State) ->
case process_maybe_multipart(Msgs, State#state.msgbuf) of
{incomplete, MsgBuf} ->
State#state{msgbuf = MsgBuf};
{done, MsgGrp, Rest} ->
spawn(?MODULE, notify, [SubSys, Subs, MsgGrp]),
handle_messages(Rest, SubSys, Subs, State#state{msgbuf = []})
end.

0 comments on commit 58bff25

Please sign in to comment.