Permalink
Browse files

add pub/sub.

  • Loading branch information...
Helge Sychla
Helge Sychla committed Jan 4, 2011
1 parent a1c7e01 commit ebe1395d18a86c1d33362311dd8ea85263dcd946
Showing with 60 additions and 10 deletions.
  1. +60 −10 src/netlink.erl
View
@@ -2,15 +2,21 @@
-behaviour(gen_server).
-export([start_link/0]).
--export([init/1, handle_info/2, handle_cast/2]).
+-export([init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).
-export([nl_ct_dec/1, nl_rt_dec/1, dec_netlink/2, create_table/0, gen_const/1, define_consts/0, enc_nlmsghdr/5, rtnl_wilddump/2, send/1]).
-export([sockaddr_nl/3, setsockoption/4]).
+-export([subscribe/2, notify/3]).
-include("gen_socket.hrl").
-include("netlink.hrl").
-define(TAB, ?MODULE).
+-record(state, {subscribers::list(),
+ ct,
+ rt}).
+-record(subscription, {pid::pid(),
+ types::list(atom())}).
%% netlink info
-define(NETLINK_ADD_MEMBERSHIP, 1).
@@ -827,29 +833,73 @@ init(_Args) ->
%% UDP is close enough (connection less, datagram oriented), so we can use the driver from it
{ok, Rt} = gen_udp:open(0, [binary, {fd, RtNl}]),
- {ok, {Ct, Rt}}.
+ {ok, #state{ct = Ct, rt = Rt, subscribers = []}}.
send(Msg) ->
gen_server:cast(?MODULE, {send, Msg}).
-handle_cast({send, Msg}, {_Ct, Rt} = State) ->
+subscribe(Pid, Types) ->
+ gen_server:call(?MODULE, {subscribe, #subscription{pid = Pid, types = Types}}).
+
+handle_call({subscribe, #subscription{pid = Pid} = Subscription}, _From, #state{subscribers = Sub} = State) ->
+ case lists:keymember(Pid, 2, Sub) of
+ true ->
+ NewSub = lists:keyreplace(Pid, 2, Sub, Subscription),
+ {reply, ok, State#state{subscribers = NewSub}};
+ false ->
+ io:format("~p:Subscribe ~p~n", [?MODULE, Pid]),
+ monitor(process, Pid),
+ {reply, ok, State#state{subscribers = [Subscription|Sub]}}
+ end.
+
+handle_cast({send, Msg}, #state{rt = Rt} = State) ->
R = case inet:getfd(Rt) of
{ok, Fd} -> gen_socket:send(Fd, Msg, 0);
_ -> {error, invalid}
end,
io:format("Send: ~p~n", [R]),
{noreply, State}.
-handle_info({udp, Ct, _IP, _port, Data}, {Ct, _Rt} = State) ->
- io:format("got ~p~ndec: ~p~n", [Data, nl_ct_dec(Data)]),
+handle_info({udp, Ct, _IP, _port, Data}, #state{ct = Ct, rt = _Rt, subscribers = Sub} = State) ->
+ %% io:format("got ~p~ndec: ~p~n", [Data, nl_ct_dec(Data)]),
+ Subs = lists:filter(fun(Elem) ->
+ lists:member(ct, Elem#subscription.types)
+ end, Sub),
+ spawn(?MODULE, notify, [ct, Subs, nl_ct_dec(Data)]),
{noreply, State};
-handle_info({udp, Rt, _IP, _Port, Data}, {_Ct, Rt} = State) ->
- io:format("got ~p~ndec: ~p~n", [Data, nl_rt_dec_udp(Data)]),
+handle_info({udp, Rt, _IP, _Port, Data}, #state{rt = Rt, ct = _Ct, subscribers = Sub} = State) ->
+ %% io:format("got ~p~ndec: ~p~n", [Data, nl_rt_dec_udp(Data)]),
+ Subs = lists:filter(fun(Elem) ->
+ lists:member(rt, Elem#subscription.types)
+ end, Sub),
+ spawn(?MODULE, notify, [rt, Subs, nl_rt_dec_udp(Data)]),
{noreply, State};
-handle_info({udp, S, _IP, _Port, _Data}, {_Ct, _Rt} = State) ->
- io:format("got on Socket ~p~n", [S]),
+handle_info({udp, S, _IP, _Port, _Data}, #state{subscribers = Sub} = State) ->
+ %% io:format("got on Socket ~p~n", [S]),
+ Subs = lists:filter(fun(Elem) ->
+ lists:member(s, Elem#subscription.types)
+ end, Sub),
+ spawn(?MODULE, notify, [s, Subs, S]),
{noreply, State};
+
+handle_info({'DOWN', _Ref, process, Pid, _Reason}, #state{subscribers = Sub} = State) ->
+ io:format("~p:Unsubscribe ~p~n", [?MODULE, Pid]),
+ {noreply, State#state{subscribers = lists:delete(Pid, Sub)}};
+
handle_info(Msg, {_Ct, _Rt} = State) ->
io:format("got Message ~p~n", [Msg]),
{noreply, State}.
-
+
+terminate(Reason, _State) ->
+ io:format("~p terminate:~p~n", [?MODULE, Reason]),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+notify(ct, Pids, Msg) ->
+ [Pid#subscription.pid ! Msg || Pid <- Pids];
+notify(rt, Pids, Msg) ->
+ [Pid#subscription.pid ! Msg || Pid <- Pids];
+notify(s, Pids, Msg) ->
+ [Pid#subscription.pid ! Msg || Pid <- Pids].

0 comments on commit ebe1395

Please sign in to comment.