Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

handle persistent connection failures

  • Loading branch information...
commit 99653f2b014a267a64528308a0a92ab0a716ee58 1 parent 165a9b8
Andreas Schultz authored
View
52 src/gen_zmq.erl
@@ -168,6 +168,9 @@ transports_deactivate(Transport, MqSState = #gen_zmq_socket{transports = Transpo
transports_while(Fun, Data, Default, #gen_zmq_socket{transports = Transports}) ->
do_transports_while(Fun, Data, Transports, Default).
+transports_connected(#gen_zmq_socket{transports = Transports}) ->
+ Transports /= [].
+
%% walk the list of transports
%% this is intended to hide the details of the transports impl.
do_transports_while(_Fun, _Data, [], Default) ->
@@ -272,10 +275,12 @@ handle_call({send, Msg}, From, State) ->
State2 = gen_zmq_socket_fsm:do(queue_send, State1),
case Action of
return ->
- {reply, ok, State2};
+ State3 = check_send_queue(State2),
+ {reply, ok, State3};
block ->
State3 = State2#gen_zmq_socket{pending_send = From},
- {noreply, State3}
+ State4 = check_send_queue(State3),
+ {noreply, State4}
end;
{drop, Reply} ->
{reply, Reply, State};
@@ -317,12 +322,21 @@ handle_cast({deliver_connect, Transport, {ok, RemoteId}}, State) ->
State2 = send_queue_run(State1),
{noreply, State2};
-handle_cast({deliver_connect, Transport, _Reply}, State = #gen_zmq_socket{connecting = Connecting}) ->
- ConnectArgs = orddict:fetch(Transport, Connecting),
- ?DEBUG("CArgs: ~w~n", [ConnectArgs]),
- erlang:send_after(3000, self(), {reconnect, ConnectArgs#cargs{failcnt = ConnectArgs#cargs.failcnt + 1}}),
- State2 = State#gen_zmq_socket{connecting = orddict:erase(Transport, Connecting)},
- {noreply, State2};
+handle_cast({deliver_connect, Transport, Reply}, State = #gen_zmq_socket{connecting = Connecting}) ->
+ case Reply of
+ %% transient errors
+ {error, Reason} when Reason == eagain; Reason == ealready;
+ Reason == econnrefused; Reason == econnreset ->
+ ConnectArgs = orddict:fetch(Transport, Connecting),
+ ?DEBUG("CArgs: ~w~n", [ConnectArgs]),
+ erlang:send_after(3000, self(), {reconnect, ConnectArgs#cargs{failcnt = ConnectArgs#cargs.failcnt + 1}}),
+ State2 = State#gen_zmq_socket{connecting = orddict:erase(Transport, Connecting)},
+ {noreply, State2};
+ _ ->
+ State1 = State#gen_zmq_socket{connecting = orddict:erase(Transport, Connecting)},
+ State2 = check_send_queue(State1),
+ {noreply, State2}
+ end;
handle_cast({deliver_close, Transport}, State = #gen_zmq_socket{connecting = Connecting}) ->
unlink(Transport),
@@ -334,7 +348,7 @@ handle_cast({deliver_close, Transport}, State = #gen_zmq_socket{connecting = Con
erlang:send_after(3000, self(), {reconnect, ConnectArgs#cargs{failcnt = 0}}),
State2#gen_zmq_socket{connecting =orddict:erase(Transport, Connecting)};
_ ->
- State2
+ check_send_queue(State2)
end,
State4 = queue_run(State3),
?DEBUG("DELIVER_CLOSE: ~p~n", [State4]),
@@ -414,6 +428,26 @@ do_connect(ConnectArgs, MqSState = #gen_zmq_socket{identity = Identity}) ->
Connecting = orddict:store(Transport, ConnectArgs, MqSState#gen_zmq_socket.connecting),
MqSState#gen_zmq_socket{connecting = Connecting}.
+check_send_queue(MqSState = #gen_zmq_socket{send_q = []}) ->
+ MqSState;
+check_send_queue(MqSState = #gen_zmq_socket{connecting = Connecting, listen_trans = Listen}) ->
+ case {transports_connected(MqSState), orddict:size(Connecting), orddict:size(Listen)} of
+ {false, 0, 0} -> clear_send_queue(MqSState);
+ _ -> MqSState
+ end.
+
+clear_send_queue(State = #gen_zmq_socket{send_q = []}) ->
+ State;
+clear_send_queue(State = #gen_zmq_socket{send_q = [_Msg], pending_send = From})
+ when From /= none ->
+ gen_server:reply(From, {error, no_connection}),
+ State1 = gen_zmq_socket_fsm:do({deliver_send, abort}, State),
+ State1#gen_zmq_socket{send_q = [], pending_send = none};
+
+clear_send_queue(State = #gen_zmq_socket{send_q = [_Msg|Rest]}) ->
+ State1 = gen_zmq_socket_fsm:do({deliver_send, abort}, State),
+ clear_send_queue(State1#gen_zmq_socket{send_q = Rest}).
+
send_queue_run(State = #gen_zmq_socket{send_q = []}) ->
State;
send_queue_run(State = #gen_zmq_socket{send_q = [Msg], pending_send = From})
View
4 src/gen_zmq_socket_req.erl
@@ -69,6 +69,8 @@ idle(check, {send, _Msg}, #gen_zmq_socket{transports = [Head|_]}, _State) ->
idle(check, _, _MqSState, _State) ->
{error, fsm};
+idle(do, {deliver_send, abort}, MqSState, State) ->
+ {next_state, idle, MqSState, State};
idle(do, {deliver_send, Transport}, MqSState, State) ->
State1 = State#state{last_send = Transport},
MqSState1 = gen_zmq:lb(Transport, MqSState),
@@ -88,6 +90,8 @@ send_queued(check, dequeue_send, _MqSState, _State) ->
send_queued(check, _, _MqSState, _State) ->
{error, fsm};
+send_queued(do, {deliver_send, abort}, MqSState, State) ->
+ {next_state, idle, MqSState, State};
send_queued(do, {deliver_send, Transport}, MqSState, State) ->
State1 = State#state{last_send = Transport},
MqSState1 = gen_zmq:lb(Transport, MqSState),
View
2  src/gen_zmq_socket_router.erl
@@ -80,6 +80,8 @@ idle(check, _, _MqSState, _State) ->
idle(do, queue_send, MqSState, State) ->
{next_state, idle, MqSState, State};
+idle(do, {deliver_send, abort}, MqSState, State) ->
+ {next_state, idle, MqSState, State};
idle(do, {deliver_send, Transport}, MqSState, State) ->
MqSState1 = gen_zmq:lb(Transport, MqSState),
{next_state, idle, MqSState1, State};
View
7 test/gen_zmq_SUITE.erl
@@ -44,6 +44,12 @@ req_tcp_connect_close(_Config) ->
ok = gen_zmq:connect(S, {127,0,0,1}, 5555, []),
gen_zmq:close(S).
+req_tcp_connect_fail(_Config) ->
+ {ok, S} = gen_zmq:socket([{type, req}, {active, false}]),
+ ok = gen_zmq:connect(S, "undefined.undefined", 5555, []),
+ {error, _Reason} = gen_zmq:send(S, ["XXX"]),
+ gen_zmq:close(S).
+
req_tcp_connect_timeout(_Config) ->
{ok, S} = gen_zmq:socket([{type, req}, {active, false}]),
ok = gen_zmq:connect(S, {127,0,0,1}, 5555, [{timeout, 1000}]),
@@ -400,6 +406,7 @@ all() ->
[reqrep_tcp_test_active, reqrep_tcp_test_passive,
reqrep_tcp_large_active, reqrep_tcp_large_passive,
shutdown_no_blocking_test,
+ req_tcp_connect_fail,
req_tcp_bind_close, req_tcp_connect_close, req_tcp_connect_timeout,
req_tcp_connecting_timeout, req_tcp_connecting_trash,
rep_tcp_connecting_timeout, rep_tcp_connecting_trash,
Please sign in to comment.
Something went wrong with that request. Please try again.