Skip to content

Commit

Permalink
add ipc zmq support based on gen_socket
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Schultz committed Dec 7, 2011
1 parent 99653f2 commit 13eb642
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 48 deletions.
1 change: 1 addition & 0 deletions src/gen_zmq.app.src
Expand Up @@ -7,6 +7,7 @@
kernel,
stdlib,
sasl,
gen_socket,
gen_listener_tcp
]},
{mod, { gen_zmq_app, []}},
Expand Down
66 changes: 52 additions & 14 deletions src/gen_zmq.erl
Expand Up @@ -30,7 +30,7 @@

%% API scheduler
-export([start_link/1, start/1, socket_link/1, socket/1]).
-export([bind/3, connect/4, close/1]).
-export([bind/4, connect/4, connect/5, close/1]).
-export([recv/1, recv/2]).
-export([send/2]).
-export([setopts/2]).
Expand All @@ -45,7 +45,7 @@
terminate/2, code_change/3]).

-define(SERVER, ?MODULE).
-record(cargs, {address, port, tcpopts, timeout, failcnt}).
-record(cargs, {family, address, port, tcpopts, timeout, failcnt}).

-ifdef(debug).
-define(SERVER_OPTS,{debug,[trace]}).
Expand Down Expand Up @@ -75,12 +75,18 @@ socket_link(Opts) when is_list(Opts) ->
socket(Opts) when is_list(Opts) ->
start(Opts).

bind(Socket, Port, Opts) ->
bind(Socket, tcp, Port, Opts) ->
%%TODO: socket options
gen_server:call(Socket, {bind, Port, Opts}).
gen_server:call(Socket, {bind, tcp, Port, Opts});

connect(Socket, Address, Port, Opts) ->
gen_server:call(Socket, {connect, Address, Port, Opts}).
bind(Socket, unix, Path, Opts) ->
%%TODO: socket options
gen_server:call(Socket, {bind, unix, Path, Opts}).

connect(Socket, tcp, Address, Port, Opts) ->
gen_server:call(Socket, {connect, tcp, Address, Port, Opts}).
connect(Socket, unix, Path, Opts) ->
gen_server:call(Socket, {connect, unix, Path, Opts}).

close(Socket) ->
gen_server:call(Socket, close).
Expand Down Expand Up @@ -231,7 +237,7 @@ init_socket(Owner, Type, Opts) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call({bind, Port, Opts}, _From, MqSState = #gen_zmq_socket{identity = Identity}) ->
handle_call({bind, tcp, Port, Opts}, _From, MqSState = #gen_zmq_socket{identity = Identity}) ->
TcpOpts0 = [binary,inet, {active,false}, {send_timeout,5000}, {backlog,10}, {nodelay,true}, {packet,raw}, {reuseaddr,true}],
TcpOpts1 = case proplists:get_value(ip, Opts) of
undefined -> TcpOpts0;
Expand All @@ -240,16 +246,41 @@ handle_call({bind, Port, Opts}, _From, MqSState = #gen_zmq_socket{identity = Ide
?DEBUG("bind: ~p~n", [TcpOpts1]),
case gen_zmq_tcp_socket:start_link(Identity, Port, TcpOpts1) of
{ok, Pid} ->
Listen = orddict:append(Pid, {Port, Opts}, MqSState#gen_zmq_socket.listen_trans),
Listen = orddict:append(Pid, {tcp, Port, Opts}, MqSState#gen_zmq_socket.listen_trans),
{reply, ok, MqSState#gen_zmq_socket{listen_trans = Listen}};
Reply ->
{reply, Reply, MqSState}
end;

handle_call({connect, Address, Port, Opts}, _From, State) ->
handle_call({bind, unix, Path, Opts}, _From, MqSState = #gen_zmq_socket{identity = Identity}) ->
TcpOpts0 = [binary, {active,false}, {send_timeout,5000}, {backlog,10}, {nodelay,true}, {packet,raw}, {reuseaddr,true}],
?DEBUG("bind: ~p~n", [TcpOpts0]),

{ok, Fd} = gen_socket:socket(unix, stream, 0),
case gen_socket:bind(Fd, gen_socket:sockaddr_unix(Path)) of
ok -> case gen_zmq_tcp_socket:start_link(Identity, 0, [{fd, Fd}|TcpOpts0]) of
{ok, Pid} ->
Listen = orddict:append(Pid, {unix, Path, Opts}, MqSState#gen_zmq_socket.listen_trans),
{reply, ok, MqSState#gen_zmq_socket{listen_trans = Listen}};
Reply ->
{reply, Reply, MqSState}
end;
Reply ->
{reply, Reply, MqSState}
end;

handle_call({connect, tcp, Address, Port, Opts}, _From, State) ->
TcpOpts = [binary, inet, {active,false}, {send_timeout,5000}, {nodelay,true}, {packet,raw}, {reuseaddr,true}],
Timeout = proplists:get_value(timeout, Opts, 5000),
ConnectArgs = #cargs{address = Address, port = Port, tcpopts = TcpOpts,
ConnectArgs = #cargs{family = tcp, address = Address, port = Port, tcpopts = TcpOpts,
timeout = Timeout, failcnt = 0},
NewState = do_connect(ConnectArgs, State),
{reply, ok, NewState};

handle_call({connect, unix, Path, Opts}, _From, State) ->
TcpOpts = [binary, {active,false}, {send_timeout,5000}, {nodelay,true}, {packet,raw}, {reuseaddr,true}],
Timeout = proplists:get_value(timeout, Opts, 5000),
ConnectArgs = #cargs{family = unix, address = Path, tcpopts = TcpOpts,
timeout = Timeout, failcnt = 0},
NewState = do_connect(ConnectArgs, State),
{reply, ok, NewState};
Expand Down Expand Up @@ -309,15 +340,13 @@ handle_call({setopts, Opts}, _From, State) ->
%%--------------------------------------------------------------------

handle_cast({deliver_accept, Transport, RemoteId}, State) ->
ct:pal("deliver_accept: ~p ~p~n", [Transport, RemoteId]),
link(Transport),
State1 = transports_activate(Transport, RemoteId, State),
?DEBUG("DELIVER_ACCPET: ~p~n", [State1]),
State2 = send_queue_run(State1),
{noreply, State2};

handle_cast({deliver_connect, Transport, {ok, RemoteId}}, State) ->
ct:pal("deliver_connect: ~p ~p~n", [Transport, RemoteId]),
State1 = transports_activate(Transport, RemoteId, State),
State2 = send_queue_run(State1),
{noreply, State2};
Expand Down Expand Up @@ -419,12 +448,21 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

do_connect(ConnectArgs, MqSState = #gen_zmq_socket{identity = Identity}) ->
do_connect(ConnectArgs = #cargs{family = tcp}, MqSState = #gen_zmq_socket{identity = Identity}) ->
?DEBUG("starting connect: ~w~n", [ConnectArgs]),
#cargs{address = Address, port = Port, tcpopts = TcpOpts,
timeout = Timeout, failcnt = _FailCnt} = ConnectArgs,
{ok, Transport} = gen_zmq_link:start_connection(),
gen_zmq_link:connect(Identity, Transport, Address, Port, TcpOpts, Timeout),
gen_zmq_link:connect(Identity, Transport, tcp, Address, Port, TcpOpts, Timeout),
Connecting = orddict:store(Transport, ConnectArgs, MqSState#gen_zmq_socket.connecting),
MqSState#gen_zmq_socket{connecting = Connecting};

do_connect(ConnectArgs = #cargs{family = unix}, MqSState = #gen_zmq_socket{identity = Identity}) ->
?DEBUG("starting connect: ~w~n", [ConnectArgs]),
#cargs{address = Path, tcpopts = TcpOpts,
timeout = Timeout, failcnt = _FailCnt} = ConnectArgs,
{ok, Transport} = gen_zmq_link:start_connection(),
gen_zmq_link:connect(Identity, Transport, unix, Path, TcpOpts, Timeout),
Connecting = orddict:store(Transport, ConnectArgs, MqSState#gen_zmq_socket.connecting),
MqSState#gen_zmq_socket{connecting = Connecting}.

Expand Down
33 changes: 29 additions & 4 deletions src/gen_zmq_link.erl
Expand Up @@ -29,7 +29,7 @@

%% API
-export([start_link/0]).
-export([start_connection/0, accept/4, connect/6, close/1]).
-export([start_connection/0, accept/4, connect/6, connect/7, close/1]).

%% gen_fsm callbacks
-export([init/1, handle_event/3,
Expand Down Expand Up @@ -89,8 +89,11 @@ accept(MqSocket, Identity, Server, Socket) ->
gen_tcp:controlling_process(Socket, Server),
gen_fsm:send_event(Server, {accept, MqSocket, Identity, Socket}).

connect(Identity, Server, Address, Port, TcpOpts, Timeout) ->
gen_fsm:send_event(Server, {connect, self(), Identity, Address, Port, TcpOpts, Timeout}).
connect(Identity, Server, unix, Path, TcpOpts, Timeout) ->
gen_fsm:send_event(Server, {connect, self(), Identity, unix, Path, TcpOpts, Timeout}).

connect(Identity, Server, tcp, Address, Port, TcpOpts, Timeout) ->
gen_fsm:send_event(Server, {connect, self(), Identity, tcp, Address, Port, TcpOpts, Timeout}).

send(Server, Msg) ->
gen_fsm:send_event(Server, {send, Msg}).
Expand Down Expand Up @@ -140,7 +143,7 @@ setup({accept, MqSocket, Identity, Socket}, State) ->
?DEBUG("NewState: ~p~n", [NewState]),
send_frames([Identity], {next_state, open, NewState, ?CONNECT_TIMEOUT});

setup({connect, MqSocket, Identity, Address, Port, TcpOpts, Timeout}, State) ->
setup({connect, MqSocket, Identity, tcp, Address, Port, TcpOpts, Timeout}, State) ->
?DEBUG("got connect: ~w, ~w~n", [Address, Port]),

%%TODO: socket options
Expand All @@ -152,6 +155,28 @@ setup({connect, MqSocket, Identity, Address, Port, TcpOpts, Timeout}, State) ->
Reply ->
gen_zmq:deliver_connect(MqSocket, Reply),
{stop, normal, State}
end;

setup({connect, MqSocket, Identity, unix, Path, TcpOpts, _Timeout}, State) ->
?DEBUG("got unix connect: ~p~n", [Path]),

%%TODO: socket options
{ok, Fd} = gen_socket:socket(unix, stream, 0),
case gen_socket:connect(Fd, gen_socket:sockaddr_unix(Path)) of
ok -> case gen_tcp:fdopen(Fd, TcpOpts) of
{ok, Socket} ->
NewState = State#state{mqsocket = MqSocket, identity = Identity, socket = Socket},
ok = inet:setopts(Socket, [{active, once}]),
?DEBUG("unix connect ok~n"),
{next_state, connecting, NewState, ?CONNECT_TIMEOUT};
Reply ->
gen_zmq:deliver_connect(MqSocket, Reply),
?DEBUG("unix connect fail ~p,~p~n", [Reply, TcpOpts]),
{stop, normal, State}
end;
Reply ->
gen_zmq:deliver_connect(MqSocket, Reply),
{stop, normal, State}
end.

connecting(timeout, State = #state{mqsocket = MqSocket}) ->
Expand Down

0 comments on commit 13eb642

Please sign in to comment.