Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

handle multipart netlink messages transparently: subcribers only rece…

…ive complete blocks of netlink messages
  • Loading branch information...
commit 58bff25745689575765a6b77d9b90149d41d3b3d 1 parent b55b2d2
Robin Haberkorn authored
Showing with 64 additions and 15 deletions.
  1. +7 −5 include/netlink.hrl
  2. +57 −10 src/netlink.erl
12 include/netlink.hrl
View
@@ -70,10 +70,12 @@
msg ::ctnetlink_msg()
}).
--type ctnetlink_ev() :: {ctnetlink, [#ctnetlink{} | #ctnetlink_exp{} | #netlink{}, ...]}.
- %% type of messages sent to `ct' subscriber
+-type netlink_record() :: #rtnetlink{} | #ctnetlink{} | #ctnetlink_exp{}.
+
+-type ctnetlink_ev() :: {ctnetlink, MsgGrp :: [#ctnetlink{} | #ctnetlink_exp{}, ...]}.
+ %% netlink event message sent to `ct' subscriber
+ %% `MsgGrp' is a single-part netlink message or a complete
+ %% multipart netlink message.
-type rtnetlink_ev() :: {rtnetlink, [#rtnetlink{}, ...]}.
- %% type of messages sent to `rt' subscriber
- %% messages with no subsystem are decoded as #rtnetlink!
- %% multipart messages are terminates by #rtnetlink{type = done}
+ %% netlink event message sent to `rt' subscriber
67 src/netlink.erl
View
@@ -15,11 +15,19 @@
-include("netlink.hrl").
-define(TAB, ?MODULE).
--record(state, {subscribers::list(),
- ct,
- rt}).
--record(subscription, {pid::pid(),
- types::list(atom())}).
+
+-record(subscription, {
+ pid :: pid(),
+ types :: [ct | rt | s]
+}).
+
+-record(state, {
+ subscribers = [] :: [#subscription{}],
+ ct :: gen_udp:socket(),
+ rt :: gen_udp:socket(),
+
+ msgbuf = [] :: [netlink_record()]
+}).
%% netlink info
-define(NETLINK_ADD_MEMBERSHIP, 1).
@@ -1224,7 +1232,7 @@ init(_Args) ->
%% UDP is close enough (connection less, datagram oriented), so we can use the driver from it
{ok, Rt} = gen_udp:open(0, [binary, {fd, RtNl}]),
- {ok, #state{ct = Ct, rt = Rt, subscribers = []}}.
+ {ok, #state{ct = Ct, rt = Rt}}.
send(SubSys, Msg) ->
gen_server:cast(?MODULE, {send, SubSys, Msg}).
@@ -1267,15 +1275,19 @@ handle_info({udp, Ct, _IP, _port, Data}, #state{ct = Ct, rt = _Rt, subscribers =
Subs = lists:filter(fun(Elem) ->
lists:member(ct, Elem#subscription.types)
end, Sub),
- spawn(?MODULE, notify, [ctnetlink, Subs, nl_ct_dec_udp(Data)]),
- {noreply, State};
+ NewState = handle_messages(nl_ct_dec_udp(Data), ctnetlink, Subs, State),
+
+ {noreply, NewState};
+
handle_info({udp, Rt, _IP, _Port, Data}, #state{rt = Rt, ct = _Ct, subscribers = Sub} = State) ->
%% io:format("got ~p~ndec: ~p~n", [Data, nl_rt_dec_udp(Data)]),
Subs = lists:filter(fun(Elem) ->
lists:member(rt, Elem#subscription.types)
end, Sub),
- spawn(?MODULE, notify, [rtnetlink, Subs, nl_rt_dec_udp(Data)]),
- {noreply, State};
+ NewState = handle_messages(nl_rt_dec_udp(Data), rtnetlink, Subs, State),
+
+ {noreply, NewState};
+
handle_info({udp, S, _IP, _Port, _Data}, #state{subscribers = Sub} = State) ->
%% io:format("got on Socket ~p~n", [S]),
Subs = lists:filter(fun(Elem) ->
@@ -1302,3 +1314,38 @@ code_change(_OldVsn, State, _Extra) ->
notify(SubSys, Pids, Msgs) ->
lists:foreach(fun(Pid) -> Pid#subscription.pid ! {SubSys, Msgs} end, Pids).
+-spec process_maybe_multipart(InputMsgs :: [netlink_record() | #netlink{}],
+ MsgBuf :: [netlink_record()]) ->
+ {incomplete, NewMsgBuf :: [netlink_record()]} |
+ {done, MsgGrp :: [netlink_record()], RestInput :: [netlink_record() | #netlink{}]}.
+
+process_maybe_multipart([], MsgBuf) ->
+ {incomplete, MsgBuf};
+process_maybe_multipart([Msg | Rest], MsgBuf) ->
+ Type = element(2, Msg), % Msg may be arbitrary netlink record
+ Flags = element(3, Msg),
+
+ case Type of
+ done ->
+ {done, lists:reverse(MsgBuf), Rest};
+ _ ->
+ case proplists:get_bool(multi, Flags) of
+ false -> {done, [Msg], Rest};
+ true -> process_maybe_multipart(Rest, [Msg | MsgBuf])
+ end
+ end.
+
+-spec handle_messages(InputMsgs :: [netlink_record() | #netlink{}],
+ atom(), [#subscription{}], #state{}) -> #state{}.
+
+handle_messages([], _SubSys, _Subs, State) ->
+ State#state{msgbuf = []};
+handle_messages(Msgs, SubSys, Subs, State) ->
+ case process_maybe_multipart(Msgs, State#state.msgbuf) of
+ {incomplete, MsgBuf} ->
+ State#state{msgbuf = MsgBuf};
+ {done, MsgGrp, Rest} ->
+ spawn(?MODULE, notify, [SubSys, Subs, MsgGrp]),
+ handle_messages(Rest, SubSys, Subs, State#state{msgbuf = []})
+ end.
+
Please sign in to comment.
Something went wrong with that request. Please try again.