Skip to content
Browse files

generic netlink request wrapper based on multipart message abstraction

request/2 can be used to synchronously do a netlink request, returning all the reply messages
  • Loading branch information...
1 parent 58bff25 commit 80463bcd545ad9d165c0e7119cbb2dda852a1a86 @rhaberkorn rhaberkorn committed Sep 30, 2011
Showing with 99 additions and 19 deletions.
  1. +99 −19 src/netlink.erl
View
118 src/netlink.erl
@@ -1,15 +1,17 @@
-module(netlink).
-
-behaviour(gen_server).
+
-export([start/0, start_link/0, stop/0]).
+-export([subscribe/2, send/2, request/2]).
+
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).
-export([nl_ct_dec_udp/1, nl_ct_dec/1, nl_rt_dec_udp/1, nl_rt_dec/1,
nl_rt_enc/1, nl_ct_enc/1,
dec_netlink/2,
- create_table/0, gen_const/1, define_consts/0, enc_nlmsghdr/5, rtnl_wilddump/2, send/2]).
+ create_table/0, gen_const/1, define_consts/0, enc_nlmsghdr/5, rtnl_wilddump/2]).
-export([sockaddr_nl/3, setsockoption/4]).
--export([subscribe/2, notify/3]).
+-export([notify/3]).
-include("gen_socket.hrl").
-include("netlink.hrl").
@@ -26,7 +28,9 @@
ct :: gen_udp:socket(),
rt :: gen_udp:socket(),
- msgbuf = [] :: [netlink_record()]
+ msgbuf = [] :: [netlink_record()],
+ curseq = 16#FF :: non_neg_integer(),
+ requests :: gb_tree()
}).
%% netlink info
@@ -1189,6 +1193,10 @@ rtnl_wilddump(Family, Type) ->
NumFamily = gen_socket:family(Family),
enc_nlmsghdr(Type, [root, match, request], 0, 0, << NumFamily:8 >>).
+%%
+%% API implementation
+%%
+
start() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).
@@ -1198,6 +1206,21 @@ start_link() ->
stop() ->
gen_server:cast(?MODULE, stop).
+-spec send(atom(), binary()) -> ok.
+send(SubSys, Msg) ->
+ gen_server:cast(?MODULE, {send, SubSys, Msg}).
+
+subscribe(Pid, Types) ->
+ gen_server:call(?MODULE, {subscribe, #subscription{pid = Pid, types = Types}}).
+
+-spec request(atom(), netlink_record()) -> {ok, [netlink_record(), ...]} | {error, term()}.
+request(SubSys, Msg) ->
+ gen_server:call(?MODULE, {request, SubSys, Msg}).
+
+%%
+%% gen_server callbacks
+%%
+
init(_Args) ->
create_table(),
gen_const(define_consts()),
@@ -1232,13 +1255,10 @@ init(_Args) ->
%% UDP is close enough (connection less, datagram oriented), so we can use the driver from it
{ok, Rt} = gen_udp:open(0, [binary, {fd, RtNl}]),
- {ok, #state{ct = Ct, rt = Rt}}.
-
-send(SubSys, Msg) ->
- gen_server:cast(?MODULE, {send, SubSys, Msg}).
-
-subscribe(Pid, Types) ->
- gen_server:call(?MODULE, {subscribe, #subscription{pid = Pid, types = Types}}).
+ {ok, #state{
+ ct = Ct, rt = Rt,
+ requests = gb_trees:empty()
+ }}.
handle_call({subscribe, #subscription{pid = Pid} = Subscription}, _From, #state{subscribers = Sub} = State) ->
case lists:keymember(Pid, #subscription.pid, Sub) of
@@ -1249,6 +1269,28 @@ handle_call({subscribe, #subscription{pid = Pid} = Subscription}, _From, #state{
io:format("~p:Subscribe ~p~n", [?MODULE, Pid]),
monitor(process, Pid),
{reply, ok, State#state{subscribers = [Subscription|Sub]}}
+ end;
+
+handle_call({request, rt, Msg}, From, #state{rt = Rt, curseq = Seq} = State) ->
+ Req = nl_rt_enc(prepare_request(Msg, Seq)),
+ case inet:getfd(Rt) of
+ {ok, Fd} ->
+ NewState = register_request(Seq, From, State),
+ gen_socket:send(Fd, Req, 0),
+ {noreply, NewState};
+ _ ->
+ {reply, {error, socket}, State}
+ end;
+
+handle_call({request, ct, Msg}, From, #state{ct = Ct, curseq = Seq} = State) ->
+ Req = nl_ct_enc(prepare_request(Msg, Seq)),
+ case inet:getfd(Ct) of
+ {ok, Fd} ->
+ NewState = register_request(Seq, From, State),
+ gen_socket:send(Fd, Req, 0),
+ {noreply, NewState};
+ _ ->
+ {reply, {error, socket}, State}
end.
handle_cast({send, rt, Msg}, #state{rt = Rt} = State) ->
@@ -1275,7 +1317,7 @@ handle_info({udp, Ct, _IP, _port, Data}, #state{ct = Ct, rt = _Rt, subscribers =
Subs = lists:filter(fun(Elem) ->
lists:member(ct, Elem#subscription.types)
end, Sub),
- NewState = handle_messages(nl_ct_dec_udp(Data), ctnetlink, Subs, State),
+ NewState = handle_messages(ctnetlink, nl_ct_dec_udp(Data), Subs, State),
{noreply, NewState};
@@ -1284,7 +1326,7 @@ handle_info({udp, Rt, _IP, _Port, Data}, #state{rt = Rt, ct = _Ct, subscribers =
Subs = lists:filter(fun(Elem) ->
lists:member(rt, Elem#subscription.types)
end, Sub),
- NewState = handle_messages(nl_rt_dec_udp(Data), rtnetlink, Subs, State),
+ NewState = handle_messages(rtnetlink, nl_rt_dec_udp(Data), Subs, State),
{noreply, NewState};
@@ -1311,6 +1353,10 @@ terminate(Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%%
+%% gen_server internal functions
+%%
+
notify(SubSys, Pids, Msgs) ->
lists:foreach(fun(Pid) -> Pid#subscription.pid ! {SubSys, Msgs} end, Pids).
@@ -1335,17 +1381,51 @@ process_maybe_multipart([Msg | Rest], MsgBuf) ->
end
end.
--spec handle_messages(InputMsgs :: [netlink_record() | #netlink{}],
- atom(), [#subscription{}], #state{}) -> #state{}.
+-spec handle_messages(atom(), InputMsgs :: [netlink_record() | #netlink{}],
+ [#subscription{}], #state{}) -> #state{}.
-handle_messages([], _SubSys, _Subs, State) ->
+handle_messages(_SubSys, [], _Subs, State) ->
State#state{msgbuf = []};
-handle_messages(Msgs, SubSys, Subs, State) ->
+handle_messages(SubSys, Msgs, Subs, State) ->
case process_maybe_multipart(Msgs, State#state.msgbuf) of
{incomplete, MsgBuf} ->
State#state{msgbuf = MsgBuf};
{done, MsgGrp, Rest} ->
- spawn(?MODULE, notify, [SubSys, Subs, MsgGrp]),
- handle_messages(Rest, SubSys, Subs, State#state{msgbuf = []})
+ NewState = case is_request_reply(MsgGrp, State) of
+ true -> send_request_reply(MsgGrp, State);
+ false -> spawn(?MODULE, notify, [SubSys, Subs, MsgGrp]),
+ State
+ end,
+ handle_messages(SubSys, Rest, Subs, NewState#state{msgbuf = []})
end.
+-spec prepare_request(netlink_record(), non_neg_integer()) -> netlink_record().
+prepare_request(Msg0, Seq) ->
+ % Msg0 may be arbitrary netlink record
+ Flags = element(3, Msg0),
+ Msg1 = setelement(3, Msg0, [request | Flags]),
+ setelement(4, Msg1, Seq).
+
+register_request(Seq, From, #state{requests = Requests} = State) ->
+ NextSeq = case (Seq + 1) rem 16#FFFFFFFF of
+ 0 -> 16#FF;
+ X -> X
+ end,
+ State#state{
+ requests = gb_trees:insert(Seq, From, Requests),
+ curseq = NextSeq
+ }.
+
+-spec is_request_reply(MaybeReply :: [netlink_record(), ...], #state{}) -> boolean().
+is_request_reply([Msg | _Rest], #state{requests = Request}) ->
+ MsgSeq = element(4, Msg),
+ gb_trees:is_defined(MsgSeq, Request).
+
+-spec send_request_reply([netlink_record(), ...], #state{}) -> #state{}.
+send_request_reply([Msg | _Rest] = Reply, #state{requests = Requests} = State) ->
+ ReqSeq = element(4, Msg),
+ From = gb_trees:get(ReqSeq, Requests),
+
+ gen_server:reply(From, {ok, Reply}),
+ State#state{requests = gb_trees:delete(ReqSeq, Requests)}.
+

0 comments on commit 80463bc

Please sign in to comment.
Something went wrong with that request. Please try again.