Skip to content
Browse files

add support for timing out long-running operations

  • Loading branch information...
1 parent fcd72e5 commit 4960cec600716421007723f09b79806c34639c9a @wg committed May 6, 2009
Showing with 182 additions and 51 deletions.
  1. +5 −1 README
  2. +15 −16 src/pgsql.erl
  3. +121 −33 src/pgsql_connection.erl
  4. +13 −1 src/pgsql_sock.erl
  5. +28 −0 test_src/pgsql_tests.erl
View
6 README
@@ -2,7 +2,7 @@ Erlang PostgreSQL Database Client
* Connect
- {ok, C} = pgsql:connect(Host, [Username], [Password], [Opts]).
+ {ok, C} = pgsql:connect(Host, [Username], [Password], Opts).
Host - host to connect to.
Username - username to connect as, defaults to $USER.
@@ -13,10 +13,14 @@ Erlang PostgreSQL Database Client
+ port
+ ssl (true | false | required)
+ ssl_opts (see ssl docs in OTP)
+ + timeout (milliseconds, defaults to 5000)
{ok, C} = pgsql:connect("localhost", "username", [{database, "test_db"}]).
ok = pgsql:close(C).
+ The timeout parameter is applied to all operations. In the case of equery
+ this means that total execution time may exceed the timeout value.
+
* Simple Query
{ok, Columns, Rows} = pgsql:squery(C, Sql).
View
31 src/pgsql.erl
@@ -1,5 +1,4 @@
%%% Copyright (C) 2008 - Will Glozer. All rights reserved.
-
-module(pgsql).
-export([connect/2, connect/3, connect/4, close/1]).
@@ -11,8 +10,6 @@
-include("pgsql.hrl").
--define(timeout, 5000).
-
%% -- client interface --
connect(Host, Opts) ->
@@ -47,7 +44,7 @@ equery(C, Sql, Parameters) ->
{ok, #statement{types = Types} = S} ->
Typed_Parameters = lists:zip(Types, Parameters),
ok = pgsql_connection:equery(C, S, Typed_Parameters),
- receive_result(C);
+ receive_result(C, undefined);
Error ->
Error
end.
@@ -114,16 +111,18 @@ with_transaction(C, F) ->
%% -- internal functions --
-receive_result(C) ->
- R = receive_result(C, [], []),
- receive
- {pgsql, C, done} -> R
+receive_result(C, Result) ->
+ case receive_result(C, [], []) of
+ done -> Result;
+ timeout -> {error, timeout};
+ R -> receive_result(C, R)
end.
receive_results(C, Results) ->
case receive_result(C, [], []) of
- done -> lists:reverse(Results);
- R -> receive_results(C, [R | Results])
+ done -> lists:reverse(Results);
+ timeout -> lists:reverse([{error, timeout} | Results]);
+ R -> receive_results(C, [R | Results])
end.
receive_result(C, Cols, Rows) ->
@@ -144,9 +143,9 @@ receive_result(C, Cols, Rows) ->
{pgsql, C, {notice, _N}} ->
receive_result(C, Cols, Rows);
{pgsql, C, done} ->
- done
- after
- ?timeout -> {error, timeout}
+ done;
+ {pgsql, C, timeout} ->
+ timeout
end.
receive_extended_result(C)->
@@ -168,7 +167,7 @@ receive_extended_result(C, Rows) ->
{pgsql, C, {complete, _Type}} ->
{ok, lists:reverse(Rows)};
{pgsql, C, {notice, _N}} ->
- receive_extended_result(C, Rows)
- after
- ?timeout -> {error, timeout}
+ receive_extended_result(C, Rows);
+ {pgsql, C, timeout} ->
+ {error, timeout}
end.
View
154 src/pgsql_connection.erl
@@ -14,13 +14,14 @@
-export([startup/3, auth/2, initializing/2, ready/2, ready/3]).
-export([querying/2, parsing/2, binding/2, describing/2]).
--export([executing/2, closing/2, synchronizing/2]).
+-export([executing/2, closing/2, synchronizing/2, timeout/2]).
-include("pgsql.hrl").
-record(state, {
reader,
sock,
+ timeout,
parameters = [],
reply,
reply_to,
@@ -111,31 +112,35 @@ code_change(_Old_Vsn, State_Name, State, _Extra) ->
%% -- states --
startup({connect, Host, Username, Password, Opts}, From, State) ->
+ Timeout = proplists:get_value(timeout, Opts, 5000),
case pgsql_sock:start_link(self(), Host, Username, Opts) of
{ok, Sock} ->
put(username, Username),
put(password, Password),
- State2 = State#state{sock = Sock, reply_to = From},
- {next_state, auth, State2};
+ State2 = State#state{sock = Sock, timeout = Timeout, reply_to = From},
+ {next_state, auth, State2, Timeout};
Error ->
{stop, normal, Error, State}
end.
%% AuthenticationOk
auth({$R, <<0:?int32>>}, State) ->
- {next_state, initializing, State};
+ #state{timeout = Timeout} = State,
+ {next_state, initializing, State, Timeout};
%% AuthenticationCleartextPassword
auth({$R, <<3:?int32>>}, State) ->
+ #state{timeout = Timeout} = State,
send(State, $p, [get(password), 0]),
- {next_state, auth, State};
+ {next_state, auth, State, Timeout};
%% AuthenticationMD5Password
auth({$R, <<5:?int32, Salt:4/binary>>}, State) ->
+ #state{timeout = Timeout} = State,
Digest1 = hex(erlang:md5([get(password), get(username)])),
Str = ["md5", hex(erlang:md5([Digest1, Salt])), 0],
send(State, $p, Str),
- {next_state, auth, State};
+ {next_state, auth, State, Timeout};
auth({$R, <<M:?int32, _/binary>>}, State) ->
case M of
@@ -157,12 +162,17 @@ auth({error, E}, State) ->
Any -> Why = Any
end,
gen_fsm:reply(State#state.reply_to, {error, Why}),
+ {stop, normal, State};
+
+auth(timeout, State) ->
+ gen_fsm:reply(State#state.reply_to, {error, timeout}),
{stop, normal, State}.
%% BackendKeyData
initializing({$K, <<Pid:?int32, Key:?int32>>}, State) ->
+ #state{timeout = Timeout} = State,
State2 = State#state{backend = {Pid, Key}},
- {next_state, initializing, State2};
+ {next_state, initializing, State2, Timeout};
%% ErrorResponse
initializing({error, E}, State) ->
@@ -173,6 +183,10 @@ initializing({error, E}, State) ->
gen_fsm:reply(State#state.reply_to, {error, Why}),
{stop, normal, State};
+initializing(timeout, State) ->
+ gen_fsm:reply(State#state.reply_to, {error, timeout}),
+ {stop, normal, State};
+
%% ReadyForQuery
initializing({$Z, <<Status:8>>}, State) ->
#state{parameters = Parameters, reply_to = Reply_To} = State,
@@ -190,12 +204,14 @@ ready(_Msg, State) ->
%% execute simple query
ready({squery, Sql}, From, State) ->
+ #state{timeout = Timeout} = State,
send(State, $Q, [Sql, 0]),
State2 = State#state{statement = #statement{}, reply_to = From},
- {reply, ok, querying, State2};
+ {reply, ok, querying, State2, Timeout};
%% execute extended query
ready({equery, Statement, Parameters}, From, State) ->
+ #state{timeout = Timeout} = State,
#statement{name = StatementName, columns = Columns} = Statement,
Bin1 = encode_parameters(Parameters),
Bin2 = encode_formats(Columns),
@@ -204,7 +220,7 @@ ready({equery, Statement, Parameters}, From, State) ->
send(State, $C, [$S, "", 0]),
send(State, $S, []),
State2 = State#state{statement = Statement, reply_to = From},
- {reply, ok, querying, State2};
+ {reply, ok, querying, State2, Timeout};
ready({get_parameter, Name}, _From, State) ->
case lists:keysearch(Name, 1, State#state.parameters) of
@@ -214,88 +230,108 @@ ready({get_parameter, Name}, _From, State) ->
{reply, {ok, Value}, ready, State};
ready({parse, Name, Sql, Types}, From, State) ->
+ #state{timeout = Timeout} = State,
Bin = encode_types(Types),
send(State, $P, [Name, 0, Sql, 0, Bin]),
send(State, $D, [$S, Name, 0]),
send(State, $H, []),
S = #statement{name = Name},
- {next_state, parsing, State#state{statement = S, reply_to = From}};
+ State2 = State#state{statement = S, reply_to = From},
+ {next_state, parsing, State2, Timeout};
ready({bind, Statement, PortalName, Parameters}, From, State) ->
+ #state{timeout = Timeout} = State,
#statement{name = StatementName, columns = Columns, types = Types} = Statement,
Typed_Parameters = lists:zip(Types, Parameters),
Bin1 = encode_parameters(Typed_Parameters),
Bin2 = encode_formats(Columns),
send(State, $B, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
send(State, $H, []),
- {next_state, binding, State#state{statement = Statement, reply_to = From}};
+ State2 = State#state{statement = Statement, reply_to = From},
+ {next_state, binding, State2, Timeout};
ready({execute, Statement, PortalName, MaxRows}, From, State) ->
+ #state{timeout = Timeout} = State,
send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
send(State, $H, []),
- {reply, ok, executing, State#state{statement = Statement, reply_to = From}};
+ State2 = State#state{statement = Statement, reply_to = From},
+ {reply, ok, executing, State2, Timeout};
ready({describe, Type, Name}, From, State) ->
+ #state{timeout = Timeout} = State,
case Type of
statement -> Type2 = $S;
portal -> Type2 = $P
end,
send(State, $D, [Type2, Name, 0]),
send(State, $H, []),
- {next_state, describing, State#state{reply_to = From}};
+ {next_state, describing, State#state{reply_to = From}, Timeout};
ready({close, Type, Name}, From, State) ->
+ #state{timeout = Timeout} = State,
case Type of
statement -> Type2 = $S;
portal -> Type2 = $P
end,
send(State, $C, [Type2, Name, 0]),
send(State, $H, []),
- {next_state, closing, State#state{reply_to = From}};
+ {next_state, closing, State#state{reply_to = From}, Timeout};
ready(sync, From, State) ->
+ #state{timeout = Timeout} = State,
send(State, $S, []),
- {next_state, synchronizing, State#state{reply = ok, reply_to = From}}.
+ State2 = State#state{reply = ok, reply_to = From},
+ {next_state, synchronizing, State2, Timeout}.
%% BindComplete
querying({$2, <<>>}, State) ->
- #state{statement = #statement{columns = Columns}} = State,
+ #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
notify(State, {columns, Columns}),
- {next_state, querying, State};
+ {next_state, querying, State, Timeout};
%% CloseComplete
querying({$3, <<>>}, State) ->
- {next_state, querying, State};
+ #state{timeout = Timeout} = State,
+ {next_state, querying, State, Timeout};
%% RowDescription
querying({$T, <<Count:?int16, Bin/binary>>}, State) ->
+ #state{timeout = Timeout} = State,
Columns = decode_columns(Count, Bin),
S2 = (State#state.statement)#statement{columns = Columns},
notify(State, {columns, Columns}),
- {next_state, querying, State#state{statement = S2}};
+ {next_state, querying, State#state{statement = S2}, Timeout};
%% DataRow
querying({$D, <<_Count:?int16, Bin/binary>>}, State) ->
- #state{statement = #statement{columns = Columns}} = State,
+ #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
Data = decode_data(Columns, Bin),
notify(State, {data, Data}),
- {next_state, querying, State};
+ {next_state, querying, State, Timeout};
%% CommandComplete
querying({$C, Bin}, State) ->
+ #state{timeout = Timeout} = State,
Complete = decode_complete(Bin),
notify(State, {complete, Complete}),
- {next_state, querying, State};
+ {next_state, querying, State, Timeout};
%% EmptyQueryResponse
querying({$I, _Bin}, State) ->
+ #state{timeout = Timeout} = State,
notify(State, {complete, empty}),
- {next_state, querying, State};
+ {next_state, querying, State, Timeout};
+
+querying(timeout, State) ->
+ #state{sock = Sock, timeout = Timeout, backend = {Pid, Key}} = State,
+ pgsql_sock:cancel(Sock, Pid, Key),
+ {next_state, timeout, State, Timeout};
%% ErrorResponse
querying({error, E}, State) ->
+ #state{timeout = Timeout} = State,
notify(State, {error, E}),
- {next_state, querying, State};
+ {next_state, querying, State, Timeout};
%% ReadyForQuery
querying({$Z, <<_Status:8>>}, State) ->
@@ -304,13 +340,21 @@ querying({$Z, <<_Status:8>>}, State) ->
%% ParseComplete
parsing({$1, <<>>}, State) ->
- {next_state, describing, State};
+ #state{timeout = Timeout} = State,
+ {next_state, describing, State, Timeout};
+
+parsing(timeout, State) ->
+ #state{timeout = Timeout} = State,
+ Reply = {error, timeout},
+ send(State, $S, []),
+ {next_state, parsing, State#state{reply = Reply}, Timeout};
%% ErrorResponse
parsing({error, E}, State) ->
+ #state{timeout = Timeout} = State,
Reply = {error, E},
send(State, $S, []),
- {next_state, parsing, State#state{reply = Reply}};
+ {next_state, parsing, State#state{reply = Reply}, Timeout};
%% ReadyForQuery
parsing({$Z, <<Status:8>>}, State) ->
@@ -323,11 +367,18 @@ binding({$2, <<>>}, State) ->
gen_fsm:reply(State#state.reply_to, ok),
{next_state, ready, State};
+binding(timeout, State) ->
+ #state{timeout = Timeout} = State,
+ Reply = {error, timeout},
+ send(State, $S, []),
+ {next_state, binding, State#state{reply = Reply}, Timeout};
+
%% ErrorResponse
binding({error, E}, State) ->
+ #state{timeout = Timeout} = State,
Reply = {error, E},
send(State, $S, []),
- {next_state, binding, State#state{reply = Reply}};
+ {next_state, binding, State#state{reply = Reply}, Timeout};
%% ReadyForQuery
binding({$Z, <<Status:8>>}, State) ->
@@ -337,9 +388,10 @@ binding({$Z, <<Status:8>>}, State) ->
%% ParameterDescription
describing({$t, <<_Count:?int16, Bin/binary>>}, State) ->
+ #state{timeout = Timeout} = State,
Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
S2 = (State#state.statement)#statement{types = Types},
- {next_state, describing, State#state{statement = S2}};
+ {next_state, describing, State#state{statement = S2}, Timeout};
%% RowDescription
describing({$T, <<Count:?int16, Bin/binary>>}, State) ->
@@ -355,11 +407,18 @@ describing({$n, <<>>}, State) ->
gen_fsm:reply(State#state.reply_to, {ok, S2}),
{next_state, ready, State};
+describing(timeout, State) ->
+ #state{timeout = Timeout} = State,
+ Reply = {error, timeout},
+ send(State, $S, []),
+ {next_state, describing, State#state{reply = Reply}, Timeout};
+
%% ErrorResponse
describing({error, E}, State) ->
+ #state{timeout = Timeout} = State,
Reply = {error, E},
send(State, $S, []),
- {next_state, describing, State#state{reply = Reply}};
+ {next_state, describing, State#state{reply = Reply}, Timeout};
%% ReadyForQuery
describing({$Z, <<Status:8>>}, State) ->
@@ -369,10 +428,10 @@ describing({$Z, <<Status:8>>}, State) ->
%% DataRow
executing({$D, <<_Count:?int16, Bin/binary>>}, State) ->
- #state{statement = #statement{columns = Columns}} = State,
+ #state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
Data = decode_data(Columns, Bin),
notify(State, {data, Data}),
- {next_state, executing, State};
+ {next_state, executing, State, Timeout};
%% PortalSuspended
executing({$s, <<>>}, State) ->
@@ -389,16 +448,27 @@ executing({$I, _Bin}, State) ->
notify(State, {complete, empty}),
{next_state, ready, State};
+executing(timeout, State) ->
+ #state{sock = Sock, timeout = Timeout, backend = {Pid, Key}} = State,
+ pgsql_sock:cancel(Sock, Pid, Key),
+ send(State, $S, []),
+ {next_state, timeout, State, Timeout};
+
%% ErrorResponse
executing({error, E}, State) ->
+ #state{timeout = Timeout} = State,
notify(State, {error, E}),
- {next_state, executing, State}.
+ {next_state, executing, State, Timeout}.
%% CloseComplete
closing({$3, <<>>}, State) ->
gen_fsm:reply(State#state.reply_to, ok),
{next_state, ready, State};
+closing(timeout, State) ->
+ gen_fsm:reply(State#state.reply_to, {error, timeout}),
+ {next_state, ready, State};
+
%% ErrorResponse
closing({error, E}, State) ->
Error = {error, E},
@@ -407,15 +477,33 @@ closing({error, E}, State) ->
%% ErrorResponse
synchronizing({error, E}, State) ->
+ #state{timeout = Timeout} = State,
Reply = {error, E},
- {next_state, synchronizing, State#state{reply = Reply}};
+ {next_state, synchronizing, State#state{reply = Reply}, Timeout};
+
+synchronizing(timeout, State) ->
+ #state{timeout = Timeout} = State,
+ Reply = {error, timeout},
+ {next_state, synchronizing, State#state{reply = Reply}, Timeout};
%% ReadyForQuery
synchronizing({$Z, <<Status:8>>}, State) ->
#state{reply = Reply, reply_to = Reply_To} = State,
gen_fsm:reply(Reply_To, Reply),
{next_state, ready, State#state{reply = undefined, txstatus = Status}}.
+timeout({$Z, <<Status:8>>}, State) ->
+ notify(State, timeout),
+ {next_state, ready, State#state{txstatus = Status}};
+
+timeout(timeout, State) ->
+ {stop, timeout, State};
+
+%% ignore events that occur after timeout
+timeout(_Event, State) ->
+ #state{timeout = Timeout} = State,
+ {next_state, timeout, State, Timeout}.
+
%% -- internal functions --
%% decode data
View
14 src/pgsql_sock.erl
@@ -4,7 +4,7 @@
-behavior(gen_server).
--export([start_link/4, send/2, send/3]).
+-export([start_link/4, send/2, send/3, cancel/3]).
-export([decode_string/1, lower_atom/1]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -32,6 +32,9 @@ send(S, Data) ->
Msg = <<(byte_size(Bin) + 4):?int32, Bin/binary>>,
gen_server:cast(S, {send, Msg}).
+cancel(S, Pid, Key) ->
+ gen_server:cast(S, {cancel, Pid, Key}).
+
%% -- gen_server implementation --
init([C, Host, Username, Opts]) ->
@@ -74,6 +77,15 @@ handle_cast({send, Data}, State) ->
ok = Mod:send(Sock, Data),
{noreply, State};
+handle_cast({cancel, Pid, Key}, State) ->
+ {ok, {Addr, Port}} = inet:peername(State#state.sock),
+ SockOpts = [{active, false}, {packet, raw}, binary],
+ {ok, Sock} = gen_tcp:connect(Addr, Port, SockOpts),
+ Msg = <<16:?int32, 80877102:?int32, Pid:?int32, Key:?int32>>,
+ ok = gen_tcp:send(Sock, Msg),
+ gen_tcp:close(Sock),
+ {noreply, State};
+
handle_cast(Cast, State) ->
{stop, {unsupported_cast, Cast}, State}.
View
28 test_src/pgsql_tests.erl
@@ -406,6 +406,31 @@ text_format_test() ->
Select("numeric", "123456")
end).
+connect_timeout_test() ->
+ {error, timeout} = pgsql:connect(?host, [{port, ?port}, {timeout, 0}]).
+
+query_timeout_test() ->
+ with_connection(
+ fun(C) ->
+ {error, timeout} = pgsql:squery(C, "select pg_sleep(1)"),
+ {error, timeout} = pgsql:equery(C, "select pg_sleep(2)"),
+ {ok, _Cols, [{1}]} = pgsql:equery(C, "select 1")
+ end,
+ [{timeout, 10}]).
+
+execute_timeout_test() ->
+ with_connection(
+ fun(C) ->
+ {ok, S} = pgsql:parse(C, "select pg_sleep($1)"),
+ ok = pgsql:bind(C, S, [2]),
+ {error, timeout} = pgsql:execute(C, S, 0),
+ ok = pgsql:bind(C, S, [0]),
+ {ok, [{<<>>}]} = pgsql:execute(C, S, 0),
+ ok = pgsql:close(C, S),
+ ok = pgsql:sync(C)
+ end,
+ [{timeout, 10}]).
+
%% -- run all tests --
run_tests() ->
@@ -423,6 +448,9 @@ connect_only(Args) ->
with_connection(F) ->
with_connection(F, "epgsql_test", []).
+with_connection(F, Args) ->
+ with_connection(F, "epgsql_test", Args).
+
with_connection(F, Username, Args) ->
Args2 = [{port, ?port}, {database, "epgsql_test_db1"} | Args],
{ok, C} = pgsql:connect(?host, Username, Args2),

0 comments on commit 4960cec

Please sign in to comment.
Something went wrong with that request. Please try again.