Skip to content

Commit

Permalink
generic netlink request wrapper based on multipart message abstraction
Browse files Browse the repository at this point in the history
request/2 can be used to synchronously do a netlink request, returning all the reply messages
  • Loading branch information
rhaberkorn committed Sep 30, 2011
1 parent 58bff25 commit 80463bc
Showing 1 changed file with 99 additions and 19 deletions.
118 changes: 99 additions & 19 deletions src/netlink.erl
@@ -1,15 +1,17 @@
-module(netlink).

-behaviour(gen_server).

-export([start/0, start_link/0, stop/0]).
-export([subscribe/2, send/2, request/2]).

-export([init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).

-export([nl_ct_dec_udp/1, nl_ct_dec/1, nl_rt_dec_udp/1, nl_rt_dec/1,
nl_rt_enc/1, nl_ct_enc/1,
dec_netlink/2,
create_table/0, gen_const/1, define_consts/0, enc_nlmsghdr/5, rtnl_wilddump/2, send/2]).
create_table/0, gen_const/1, define_consts/0, enc_nlmsghdr/5, rtnl_wilddump/2]).
-export([sockaddr_nl/3, setsockoption/4]).
-export([subscribe/2, notify/3]).
-export([notify/3]).

-include("gen_socket.hrl").
-include("netlink.hrl").
Expand All @@ -26,7 +28,9 @@
ct :: gen_udp:socket(),
rt :: gen_udp:socket(),

msgbuf = [] :: [netlink_record()]
msgbuf = [] :: [netlink_record()],
curseq = 16#FF :: non_neg_integer(),
requests :: gb_tree()
}).

%% netlink info
Expand Down Expand Up @@ -1189,6 +1193,10 @@ rtnl_wilddump(Family, Type) ->
NumFamily = gen_socket:family(Family),
enc_nlmsghdr(Type, [root, match, request], 0, 0, << NumFamily:8 >>).

%%
%% API implementation
%%

start() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).

Expand All @@ -1198,6 +1206,21 @@ start_link() ->
stop() ->
gen_server:cast(?MODULE, stop).

-spec send(atom(), binary()) -> ok.
send(SubSys, Msg) ->
gen_server:cast(?MODULE, {send, SubSys, Msg}).

subscribe(Pid, Types) ->
gen_server:call(?MODULE, {subscribe, #subscription{pid = Pid, types = Types}}).

-spec request(atom(), netlink_record()) -> {ok, [netlink_record(), ...]} | {error, term()}.
request(SubSys, Msg) ->
gen_server:call(?MODULE, {request, SubSys, Msg}).

%%
%% gen_server callbacks
%%

init(_Args) ->
create_table(),
gen_const(define_consts()),
Expand Down Expand Up @@ -1232,13 +1255,10 @@ init(_Args) ->
%% UDP is close enough (connection less, datagram oriented), so we can use the driver from it
{ok, Rt} = gen_udp:open(0, [binary, {fd, RtNl}]),

{ok, #state{ct = Ct, rt = Rt}}.

send(SubSys, Msg) ->
gen_server:cast(?MODULE, {send, SubSys, Msg}).

subscribe(Pid, Types) ->
gen_server:call(?MODULE, {subscribe, #subscription{pid = Pid, types = Types}}).
{ok, #state{
ct = Ct, rt = Rt,
requests = gb_trees:empty()
}}.

handle_call({subscribe, #subscription{pid = Pid} = Subscription}, _From, #state{subscribers = Sub} = State) ->
case lists:keymember(Pid, #subscription.pid, Sub) of
Expand All @@ -1249,6 +1269,28 @@ handle_call({subscribe, #subscription{pid = Pid} = Subscription}, _From, #state{
io:format("~p:Subscribe ~p~n", [?MODULE, Pid]),
monitor(process, Pid),
{reply, ok, State#state{subscribers = [Subscription|Sub]}}
end;

handle_call({request, rt, Msg}, From, #state{rt = Rt, curseq = Seq} = State) ->
Req = nl_rt_enc(prepare_request(Msg, Seq)),
case inet:getfd(Rt) of
{ok, Fd} ->
NewState = register_request(Seq, From, State),
gen_socket:send(Fd, Req, 0),
{noreply, NewState};
_ ->
{reply, {error, socket}, State}
end;

handle_call({request, ct, Msg}, From, #state{ct = Ct, curseq = Seq} = State) ->
Req = nl_ct_enc(prepare_request(Msg, Seq)),
case inet:getfd(Ct) of
{ok, Fd} ->
NewState = register_request(Seq, From, State),
gen_socket:send(Fd, Req, 0),
{noreply, NewState};
_ ->
{reply, {error, socket}, State}
end.

handle_cast({send, rt, Msg}, #state{rt = Rt} = State) ->
Expand All @@ -1275,7 +1317,7 @@ handle_info({udp, Ct, _IP, _port, Data}, #state{ct = Ct, rt = _Rt, subscribers =
Subs = lists:filter(fun(Elem) ->
lists:member(ct, Elem#subscription.types)
end, Sub),
NewState = handle_messages(nl_ct_dec_udp(Data), ctnetlink, Subs, State),
NewState = handle_messages(ctnetlink, nl_ct_dec_udp(Data), Subs, State),

{noreply, NewState};

Expand All @@ -1284,7 +1326,7 @@ handle_info({udp, Rt, _IP, _Port, Data}, #state{rt = Rt, ct = _Ct, subscribers =
Subs = lists:filter(fun(Elem) ->
lists:member(rt, Elem#subscription.types)
end, Sub),
NewState = handle_messages(nl_rt_dec_udp(Data), rtnetlink, Subs, State),
NewState = handle_messages(rtnetlink, nl_rt_dec_udp(Data), Subs, State),

{noreply, NewState};

Expand All @@ -1311,6 +1353,10 @@ terminate(Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%
%% gen_server internal functions
%%

notify(SubSys, Pids, Msgs) ->
lists:foreach(fun(Pid) -> Pid#subscription.pid ! {SubSys, Msgs} end, Pids).

Expand All @@ -1335,17 +1381,51 @@ process_maybe_multipart([Msg | Rest], MsgBuf) ->
end
end.

-spec handle_messages(InputMsgs :: [netlink_record() | #netlink{}],
atom(), [#subscription{}], #state{}) -> #state{}.
-spec handle_messages(atom(), InputMsgs :: [netlink_record() | #netlink{}],
[#subscription{}], #state{}) -> #state{}.

handle_messages([], _SubSys, _Subs, State) ->
handle_messages(_SubSys, [], _Subs, State) ->
State#state{msgbuf = []};
handle_messages(Msgs, SubSys, Subs, State) ->
handle_messages(SubSys, Msgs, Subs, State) ->
case process_maybe_multipart(Msgs, State#state.msgbuf) of
{incomplete, MsgBuf} ->
State#state{msgbuf = MsgBuf};
{done, MsgGrp, Rest} ->
spawn(?MODULE, notify, [SubSys, Subs, MsgGrp]),
handle_messages(Rest, SubSys, Subs, State#state{msgbuf = []})
NewState = case is_request_reply(MsgGrp, State) of
true -> send_request_reply(MsgGrp, State);
false -> spawn(?MODULE, notify, [SubSys, Subs, MsgGrp]),
State
end,
handle_messages(SubSys, Rest, Subs, NewState#state{msgbuf = []})
end.

-spec prepare_request(netlink_record(), non_neg_integer()) -> netlink_record().
prepare_request(Msg0, Seq) ->
% Msg0 may be arbitrary netlink record
Flags = element(3, Msg0),
Msg1 = setelement(3, Msg0, [request | Flags]),
setelement(4, Msg1, Seq).

register_request(Seq, From, #state{requests = Requests} = State) ->
NextSeq = case (Seq + 1) rem 16#FFFFFFFF of
0 -> 16#FF;
X -> X
end,
State#state{
requests = gb_trees:insert(Seq, From, Requests),
curseq = NextSeq
}.

-spec is_request_reply(MaybeReply :: [netlink_record(), ...], #state{}) -> boolean().
is_request_reply([Msg | _Rest], #state{requests = Request}) ->
MsgSeq = element(4, Msg),
gb_trees:is_defined(MsgSeq, Request).

-spec send_request_reply([netlink_record(), ...], #state{}) -> #state{}.
send_request_reply([Msg | _Rest] = Reply, #state{requests = Requests} = State) ->
ReqSeq = element(4, Msg),
From = gb_trees:get(ReqSeq, Requests),

gen_server:reply(From, {ok, Reply}),
State#state{requests = gb_trees:delete(ReqSeq, Requests)}.

0 comments on commit 80463bc

Please sign in to comment.