diff --git a/src/netlink.erl b/src/netlink.erl index 9f8b092..bfa20f2 100644 --- a/src/netlink.erl +++ b/src/netlink.erl @@ -2,15 +2,21 @@ -behaviour(gen_server). -export([start_link/0]). --export([init/1, handle_info/2, handle_cast/2]). +-export([init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]). -export([nl_ct_dec/1, nl_rt_dec/1, dec_netlink/2, create_table/0, gen_const/1, define_consts/0, enc_nlmsghdr/5, rtnl_wilddump/2, send/1]). -export([sockaddr_nl/3, setsockoption/4]). +-export([subscribe/2, notify/3]). -include("gen_socket.hrl"). -include("netlink.hrl"). -define(TAB, ?MODULE). +-record(state, {subscribers::list(), + ct, + rt}). +-record(subscription, {pid::pid(), + types::list(atom())}). %% netlink info -define(NETLINK_ADD_MEMBERSHIP, 1). @@ -827,12 +833,26 @@ init(_Args) -> %% UDP is close enough (connection less, datagram oriented), so we can use the driver from it {ok, Rt} = gen_udp:open(0, [binary, {fd, RtNl}]), - {ok, {Ct, Rt}}. + {ok, #state{ct = Ct, rt = Rt, subscribers = []}}. send(Msg) -> gen_server:cast(?MODULE, {send, Msg}). -handle_cast({send, Msg}, {_Ct, Rt} = State) -> +subscribe(Pid, Types) -> + gen_server:call(?MODULE, {subscribe, #subscription{pid = Pid, types = Types}}). + +handle_call({subscribe, #subscription{pid = Pid} = Subscription}, _From, #state{subscribers = Sub} = State) -> + case lists:keymember(Pid, 2, Sub) of + true -> + NewSub = lists:keyreplace(Pid, 2, Sub, Subscription), + {reply, ok, State#state{subscribers = NewSub}}; + false -> + io:format("~p:Subscribe ~p~n", [?MODULE, Pid]), + monitor(process, Pid), + {reply, ok, State#state{subscribers = [Subscription|Sub]}} + end. + +handle_cast({send, Msg}, #state{rt = Rt} = State) -> R = case inet:getfd(Rt) of {ok, Fd} -> gen_socket:send(Fd, Msg, 0); _ -> {error, invalid} @@ -840,16 +860,46 @@ handle_cast({send, Msg}, {_Ct, Rt} = State) -> io:format("Send: ~p~n", [R]), {noreply, State}. -handle_info({udp, Ct, _IP, _port, Data}, {Ct, _Rt} = State) -> - io:format("got ~p~ndec: ~p~n", [Data, nl_ct_dec(Data)]), +handle_info({udp, Ct, _IP, _port, Data}, #state{ct = Ct, rt = _Rt, subscribers = Sub} = State) -> + %% io:format("got ~p~ndec: ~p~n", [Data, nl_ct_dec(Data)]), + Subs = lists:filter(fun(Elem) -> + lists:member(ct, Elem#subscription.types) + end, Sub), + spawn(?MODULE, notify, [ct, Subs, nl_ct_dec(Data)]), {noreply, State}; -handle_info({udp, Rt, _IP, _Port, Data}, {_Ct, Rt} = State) -> - io:format("got ~p~ndec: ~p~n", [Data, nl_rt_dec_udp(Data)]), +handle_info({udp, Rt, _IP, _Port, Data}, #state{rt = Rt, ct = _Ct, subscribers = Sub} = State) -> + %% io:format("got ~p~ndec: ~p~n", [Data, nl_rt_dec_udp(Data)]), + Subs = lists:filter(fun(Elem) -> + lists:member(rt, Elem#subscription.types) + end, Sub), + spawn(?MODULE, notify, [rt, Subs, nl_rt_dec_udp(Data)]), {noreply, State}; -handle_info({udp, S, _IP, _Port, _Data}, {_Ct, _Rt} = State) -> - io:format("got on Socket ~p~n", [S]), +handle_info({udp, S, _IP, _Port, _Data}, #state{subscribers = Sub} = State) -> + %% io:format("got on Socket ~p~n", [S]), + Subs = lists:filter(fun(Elem) -> + lists:member(s, Elem#subscription.types) + end, Sub), + spawn(?MODULE, notify, [s, Subs, S]), {noreply, State}; + +handle_info({'DOWN', _Ref, process, Pid, _Reason}, #state{subscribers = Sub} = State) -> + io:format("~p:Unsubscribe ~p~n", [?MODULE, Pid]), + {noreply, State#state{subscribers = lists:delete(Pid, Sub)}}; + handle_info(Msg, {_Ct, _Rt} = State) -> io:format("got Message ~p~n", [Msg]), {noreply, State}. - + +terminate(Reason, _State) -> + io:format("~p terminate:~p~n", [?MODULE, Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +notify(ct, Pids, Msg) -> + [Pid#subscription.pid ! Msg || Pid <- Pids]; +notify(rt, Pids, Msg) -> + [Pid#subscription.pid ! Msg || Pid <- Pids]; +notify(s, Pids, Msg) -> + [Pid#subscription.pid ! Msg || Pid <- Pids].