Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix and test for race condition on connection timeout

  • Loading branch information...
commit 661d02c694c02bbf9ce17abd0954242c4efe99e0 1 parent aaa1c0b
Ransom Richardson authored
Showing with 91 additions and 22 deletions.
  1. +58 −21 src/emysql_conn_mgr.erl
  2. +33 −1 test/conn_mgr_SUITE.erl
View
79 src/emysql_conn_mgr.erl
@@ -84,7 +84,15 @@ wait_for_connection(PoolId)->
Connection
after lock_timeout() ->
%-% io:format("~p gets no connection and times out -> EXIT~n~n", [self()]),
- exit(connection_lock_timeout)
+ case do_gen_call({end_wait, PoolId}) of
+ ok ->
+ exit(connection_lock_timeout);
+ not_waiting ->
+ %% If we aren't waiting, then someone must
+ %% have sent us a connection mssage at the
+ %% same time that we timed out.
+ receive_connection_not_waiting()
+ end
end;
Connection ->
%-% io:format("~p gets connection~n", [self()]),
@@ -185,13 +193,13 @@ handle_call({remove_connections, PoolId, Num}, _From, State) ->
end;
handle_call({lock_connection_or_wait, PoolId}, {From, _Mref}, State) ->
- %% place to calling pid at the end of the waiting queue of its pool
case find_pool(PoolId, State#state.pools) of
{Pool, OtherPools} ->
case lock_next_connection(State, Pool, OtherPools) of
{ok, NewConn, State1} ->
{reply, NewConn, State1};
unavailable ->
+ %% place the calling pid at the end of the waiting queue of its pool
PoolNow = Pool#pool{ waiting = queue:in(From, Pool#pool.waiting) },
{reply, unavailable, State#state{pools=[PoolNow|OtherPools]}}
end;
@@ -199,6 +207,28 @@ handle_call({lock_connection_or_wait, PoolId}, {From, _Mref}, State) ->
{reply, {error, pool_not_found}, State}
end;
+handle_call({end_wait, PoolId}, {From, _Mref}, State) ->
+ case find_pool(PoolId, State#state.pools) of
+ {Pool, OtherPools} ->
+ %% Remove From from the wait queue
+ QueueNow = queue:filter(
+ fun(Pid) -> Pid =/= From end,
+ Pool#pool.waiting),
+ PoolNow = Pool#pool{ waiting = QueueNow },
+ %% See if the length changed to know if From was removed.
+ OldLen = queue:len(Pool#pool.waiting),
+ NewLen = queue:len(QueueNow),
+ if
+ OldLen =:= NewLen ->
+ Reply = not_waiting;
+ true ->
+ Reply = ok
+ end,
+ {reply, Reply, State#state{pools=[PoolNow|OtherPools]}};
+ undefined ->
+ {reply, {error, pool_not_found}, State}
+ end;
+
handle_call({lock_connection, PoolId}, _From, State) ->
%% find the next available connection in the pool identified by PoolId
%-% io:format("gen srv: lock connection for pool ~p~n", [PoolId]),
@@ -367,29 +397,36 @@ pass_on_or_queue_as_available(State, Connection) ->
end;
%% if the waiting queue is not empty then remove the head of
- %% the queue and check if that process is still waiting
- %% for a connection. If so, send the connection. Regardless,
- %% update the pool & queue in state once the head has been removed.
- false ->
+ %% the queue and send it the connection.
+ %% Update the pool & queue in state once the head has been removed.
+ false ->
- {{value, Pid}, OtherWaiting} = queue:out(Waiting),
+ {{value, Pid}, OtherWaiting} = queue:out(Waiting),
PoolNow = Pool#pool{ waiting = OtherWaiting },
StateNow = State#state{ pools = [PoolNow|OtherPools] },
+ erlang:send(Pid, {connection, Connection}),
+ {ok, StateNow}
+ end;
- case erlang:is_process_alive(Pid) of
- true ->
- erlang:send(Pid, {connection, Connection}),
- {ok, StateNow};
- _ ->
- % loop, to traverse queue to find a healthy candidate, until empty.
- pass_on_or_queue_as_available(StateNow, Connection)
- end
- end;
-
- %% pool not found
- undefined ->
- {{error, pool_not_found}, State}
- end.
+ %% pool not found
+ undefined ->
+ {{error, pool_not_found}, State}
+ end.
lock_timeout() ->
emysql_app:lock_timeout().
+
+
+%% This is called after we timed out, but discovered that we weren't waiting for a
+%% connection.
+receive_connection_not_waiting() ->
+ receive
+ {connection, Connection} ->
+ %%-% io:format("~p gets a connection after timeout in queue~n", [self()]),
+ Connection
+ after
+ %% This should never happen, as we should only be here if we had been sent a connection
+ lock_timeout() ->
+ %%-% io:format("~p gets no connection and times out again -> EXIT~n~n", [self()]),
+ exit(connection_lock_second_timeout)
+ end.
View
34 test/conn_mgr_SUITE.erl
@@ -23,7 +23,7 @@
% Test cases have self explanatory names.
%%--------------------------------------------------------------------
all() ->
- [two_procs].
+ [two_procs, no_lock_timeout].
%%--------------------------------------------------------------------
@@ -57,9 +57,41 @@ two_procs(_) ->
|| _ <- lists:seq(1,Num)],
ok.
+%% Process that will do a bunch of requests against mysql
test_proc() ->
[
#result_packet{} = emysql:execute(test_pool, "describe hello_table;")
|| _ <- lists:seq(1,1000)
].
+%% Test Case: Make sure that the pool is still usable after a lock timeout
+%% Test for race on connection where the connection is sent to a process
+%% at the exact same time it hits the timeout.
+%% See ransomr's first comment on Issue 9
+%%--------------------------------------------------------------------
+no_lock_timeout(_) ->
+ Num = 2,
+ OldEnv = application:get_env(emysql, lock_timeout),
+ %% This is really hard to repro - the timing is very tricky.
+ %% On my machine a timeout of 1 seems to reproduce it after about 10 attempts.
+ %% Once it happens the connection is lost from the pool, so the pool needs to be reset.
+ application:set_env(emysql, lock_timeout, 1),
+ process_flag(trap_exit, true),
+ [spawn_link(fun test_proc/0)
+ || _ <- lists:seq(1,Num)],
+ %% We expect one process to timeout, but the other should exit normally
+ receive
+ {'EXIT', _, Reason1} ->
+ connection_lock_timeout = Reason1
+ end,
+ receive
+ {'EXIT', _, Reason2} ->
+ normal = Reason2
+ end,
+ case OldEnv of
+ undefined ->
+ application:unset_env(emysql, lock_timeout);
+ {ok, Timeout} ->
+ application:set_env(emysql, lock_timeout, Timeout)
+ end,
+ ok.
Please sign in to comment.
Something went wrong with that request. Please try again.