Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added optional timeout argument for (most) database functions. Defaul…

…ts to ?PGSQL_TIMEOUT. Fixes #301
  • Loading branch information...
commit 691694a46b37e4e5f112a9605e31f7d5c84884ef 1 parent 28b2550
@mworrell mworrell authored
View
129 src/dbdrivers/postgresql/epgsql/pgsql.erl
@@ -7,9 +7,22 @@
-module(pgsql).
--export([connect/2, connect/3, connect/4, close/1]).
--export([last_id/2, reset_id/2, squery1/2, equery1/2, equery1/3, assoc/2, assoc/3]).
--export([get_parameter/2, squery/2, equery/2, equery/3]).
+-export([
+ connect/2, connect/3, connect/4, connect/5,
+ close/1
+]).
+-export([
+ last_id/2, last_id/3,
+ reset_id/2, reset_id/3,
+ squery1/2, squery1/3,
+ equery1/2, equery1/3, equery1/4,
+ assoc/2, assoc/3, assoc/4
+]).
+-export([
+ get_parameter/2,
+ squery/2, squery/3,
+ equery/2, equery/3, equery/4
+]).
-export([parse/2, parse/3, parse/4, describe/2, describe/3]).
-export([bind/3, bind/4, execute/2, execute/3, execute/4]).
-export([close/2, close/3, sync/1]).
@@ -27,16 +40,18 @@ connect(Host, Opts) ->
connect(Host, Username, Opts) ->
connect(Host, Username, "", Opts).
-
connect(Host, Username, Password, Opts) ->
+ connect(Host, Username, Password, Opts, ?TIMEOUT).
+
+connect(Host, Username, Password, Opts, Timeout) ->
{ok, C} = pgsql_connection:start_link(),
- case pgsql_connection:connect(C, Host, Username, Password, Opts) of
+ case pgsql_connection:connect(C, Host, Username, Password, Opts, Timeout) of
{ok, Conn} ->
case proplists:get_value(schema, Opts) of
undefined ->
{ok, Conn};
Schema when is_list(Schema) ->
- case squery(Conn, "SET search_path TO " ++ Schema) of
+ case squery(Conn, "SET search_path TO " ++ Schema, Timeout) of
{ok, [], []} ->
{ok, Conn};
Error ->
@@ -55,21 +70,33 @@ close(C) when is_pid(C) ->
get_parameter(C, Name) ->
pgsql_connection:get_parameter(C, Name).
-last_id(C, Table) when is_atom(Table) ->
- last_id(C, atom_to_list(Table));
last_id(C, Table) ->
- equery1(C, "select currval(pg_get_serial_sequence($1, 'id'))", [Table]).
+ last_id(C, Table, ?TIMEOUT).
+
+last_id(C, Table, Timeout) when is_atom(Table) ->
+ last_id(C, atom_to_list(Table), Timeout);
+last_id(C, Table, Timeout) ->
+ equery1(C, "select currval(pg_get_serial_sequence($1, 'id'))", [Table], Timeout).
-reset_id(C, Table) when is_atom(Table) ->
- reset_id(C, atom_to_list(Table));
reset_id(C, Table) ->
- {ok, Max} = equery1(C, "select max(id) from \""++Table++"\""),
- equery1(C, "select setval(pg_get_serial_sequence($1, 'id'), $2)", [Table, Max+1]).
+ reset_id(C, Table, ?TIMEOUT).
+
+reset_id(C, Table, Timeout) when is_atom(Table) ->
+ reset_id(C, atom_to_list(Table), Timeout);
+reset_id(C, Table, Timeout) ->
+ {ok, Max} = equery1(C, "select max(id) from \""++Table++"\"", Timeout),
+ equery1(C, "select setval(pg_get_serial_sequence($1, 'id'), $2)", [Table, Max+1], Timeout).
assoc(C, Sql) ->
- assoc(C, Sql, []).
-assoc(C, Sql, Parameters) ->
- case equery(C, Sql, Parameters) of
+ assoc(C, Sql, [], ?TIMEOUT).
+
+assoc(C, Sql, Timeout) when is_integer(Timeout) ->
+ assoc(C, Sql, [], Timeout);
+assoc(C, Sql, Parameters) when is_list(Parameters); is_tuple(Parameters) ->
+ assoc(C, Sql, Parameters, ?TIMEOUT).
+
+assoc(C, Sql, Parameters, Timeout) ->
+ case equery(C, Sql, Parameters, Timeout) of
{ok, Columns, Rows} ->
Names = [ list_to_atom(binary_to_list(Name)) || #column{name=Name} <- Columns ],
Rows1 = [ lists:zip(Names, tuple_to_list(Row)) || Row <- Rows ],
@@ -77,8 +104,12 @@ assoc(C, Sql, Parameters) ->
Other -> Other
end.
+
squery1(C, Sql) ->
- case squery(C,Sql) of
+ squery1(C, Sql, ?TIMEOUT).
+
+squery1(C, Sql, Timeout) ->
+ case squery(C,Sql, Timeout) of
{ok, _Columns, []} -> {error, noresult};
{ok, _RowCount, _Columns, []} -> {error, noresult};
{ok, _Columns, [Row|_]} -> {ok, element(1, Row)};
@@ -87,9 +118,15 @@ squery1(C, Sql) ->
end.
equery1(C, Sql) ->
- equery1(C, Sql, []).
-equery1(C, Sql, Parameters) ->
- case equery(C, Sql, Parameters) of
+ equery1(C, Sql, [], ?TIMEOUT).
+
+equery1(C, Sql, Parameters) when is_list(Parameters); is_tuple(Parameters) ->
+ equery1(C, Sql, Parameters, ?TIMEOUT);
+equery1(C, Sql, Timeout) when is_integer(Timeout) ->
+ equery1(C, Sql, [], Timeout).
+
+equery1(C, Sql, Parameters, Timeout) ->
+ case equery(C, Sql, Parameters, Timeout) of
{ok, _Columns, []} -> {error, noresult};
{ok, _RowCount, _Columns, []} -> {error, noresult};
{ok, _Columns, [Row|_]} -> {ok, element(1, Row)};
@@ -100,24 +137,33 @@ equery1(C, Sql, Parameters) ->
squery(C, Sql) ->
- ok = pgsql_connection:squery(C, Sql),
- case receive_results(C, []) of
+ squery(C, Sql, ?TIMEOUT).
+
+squery(C, Sql, Timeout) ->
+ ok = pgsql_connection:squery(C, Sql, Timeout),
+ case receive_results(C, [], Timeout) of
[Result] -> Result;
Results -> Results
end.
equery(C, Sql) ->
- equery(C, Sql, []).
-
+ equery(C, Sql, [], ?TIMEOUT).
+equery(C, Sql, Timeout) when is_integer(Timeout) ->
+ equery(C, Sql, [], Timeout);
equery(C, Sql, Parameters) when is_tuple(Parameters) ->
- equery(C, Sql, tuple_to_list(Parameters));
-equery(C, Sql, Parameters) ->
- case pgsql_connection:parse(C, "", Sql, []) of
+ equery(C, Sql, tuple_to_list(Parameters), ?TIMEOUT);
+equery(C, Sql, Parameters) when is_list(Parameters) ->
+ equery(C, Sql, Parameters, ?TIMEOUT).
+
+equery(C, Sql, Parameters, Timeout) when is_tuple(Parameters) ->
+ equery(C, Sql, tuple_to_list(Parameters), Timeout);
+equery(C, Sql, Parameters, Timeout) when is_list(Parameters) ->
+ case pgsql_connection:parse(C, "", Sql, [], Timeout) of
{ok, #statement{types = Types} = S} ->
Typed_Parameters = lists:zip(Types, Parameters),
- ok = pgsql_connection:equery(C, S, Typed_Parameters),
- receive_result(C);
+ ok = pgsql_connection:equery(C, S, Typed_Parameters, Timeout),
+ receive_result(C, Timeout);
Error ->
?LOG("SQL error ~p : ~p", [Error, Sql]),
Error
@@ -187,24 +233,24 @@ with_transaction(C, F) ->
%% -- internal functions --
-receive_result(C) ->
- R = receive_result(C, [], []),
+receive_result(C, Timeout) ->
+ R = receive_result(C, [], [], Timeout),
receive
{pgsql, C, done} -> R
end.
-receive_results(C, Results) ->
- case receive_result(C, [], []) of
+receive_results(C, Results, Timeout) ->
+ case receive_result(C, [], [], Timeout) of
done -> lists:reverse(Results);
- R -> receive_results(C, [R | Results])
+ R -> receive_results(C, [R | Results], Timeout)
end.
-receive_result(C, Cols, Rows) ->
+receive_result(C, Cols, Rows, Timeout) ->
receive
{pgsql, C, {columns, Cols2}} ->
- receive_result(C, Cols2, Rows);
+ receive_result(C, Cols2, Rows, Timeout);
{pgsql, C, {data, Row}} ->
- receive_result(C, Cols, [Row | Rows]);
+ receive_result(C, Cols, [Row | Rows], Timeout);
{pgsql, C, {error, _E} = Error} ->
Error;
{pgsql, C, {complete, {_Type, Count}}} ->
@@ -215,17 +261,20 @@ receive_result(C, Cols, Rows) ->
{pgsql, C, {complete, _Type}} ->
{ok, Cols, lists:reverse(Rows)};
{pgsql, C, {notice, _N}} ->
- receive_result(C, Cols, Rows);
+ receive_result(C, Cols, Rows, Timeout);
{pgsql, C, done} ->
done
after
- ?TIMEOUT -> {error, timeout}
+ Timeout -> {error, timeout}
end.
receive_extended_result(C)->
receive_extended_result(C, []).
receive_extended_result(C, Rows) ->
+ receive_extended_result(C, Rows, ?TIMEOUT).
+
+receive_extended_result(C, Rows, Timeout) ->
receive
{pgsql, C, {data, Row}} ->
receive_extended_result(C, [Row | Rows]);
@@ -243,5 +292,5 @@ receive_extended_result(C, Rows) ->
{pgsql, C, {notice, _N}} ->
receive_extended_result(C, Rows)
after
- ?TIMEOUT -> {error, timeout}
+ Timeout -> {error, timeout}
end.
View
75 src/dbdrivers/postgresql/epgsql/pgsql_connection.erl
@@ -5,11 +5,23 @@
-behavior(gen_fsm).
--export([start_link/0, stop/1, connect/5, get_parameter/2]).
--export([squery/2, equery/3]).
--export([parse/4, bind/4, execute/4, describe/3]).
--export([close/3, sync/1]).
--export([database/1]).
+-export([start_link/0, stop/1]).
+-export([
+ connect/5, get_parameter/2,
+ connect/6, get_parameter/3,
+
+ squery/2, equery/3,
+ squery/3, equery/4,
+
+ parse/4, bind/4, execute/4, describe/3,
+ parse/5, bind/5, execute/5, describe/4,
+
+ close/3, sync/1,
+ close/4, sync/2,
+
+ database/1,
+ database/2
+]).
-export([init/1, handle_event/3, handle_sync_event/4]).
-export([handle_info/3, terminate/3, code_change/4]).
@@ -47,37 +59,68 @@ stop(C) ->
gen_fsm:send_all_state_event(C, stop).
connect(C, Host, Username, Password, Opts) ->
- gen_fsm:sync_send_event(C, {connect, Host, Username, Password, Opts}, ?PGSQL_TIMEOUT).
+ connect(C, Host, Username, Password, Opts, ?PGSQL_TIMEOUT).
+
+connect(C, Host, Username, Password, Opts, Timeout) ->
+ gen_fsm:sync_send_event(C, {connect, Host, Username, Password, Opts}, Timeout).
get_parameter(C, Name) ->
- gen_fsm:sync_send_event(C, {get_parameter, to_binary(Name)}, ?PGSQL_TIMEOUT).
+ get_parameter(C, Name, ?PGSQL_TIMEOUT).
+
+get_parameter(C, Name, Timeout) ->
+ gen_fsm:sync_send_event(C, {get_parameter, to_binary(Name)}, Timeout).
squery(C, Sql) ->
- gen_fsm:sync_send_event(C, {squery, Sql}, ?PGSQL_TIMEOUT).
+ squery(C, Sql, ?PGSQL_TIMEOUT).
+
+squery(C, Sql, Timeout) ->
+ gen_fsm:sync_send_event(C, {squery, Sql}, Timeout).
equery(C, Statement, Parameters) ->
- gen_fsm:sync_send_event(C, {equery, Statement, Parameters}, ?PGSQL_TIMEOUT).
+ equery(C, Statement, Parameters, ?PGSQL_TIMEOUT).
+
+equery(C, Statement, Parameters, Timeout) ->
+ gen_fsm:sync_send_event(C, {equery, Statement, Parameters}, Timeout).
parse(C, Name, Sql, Types) ->
- gen_fsm:sync_send_event(C, {parse, Name, Sql, Types}, ?PGSQL_TIMEOUT).
+ parse(C, Name, Sql, Types, ?PGSQL_TIMEOUT).
+
+parse(C, Name, Sql, Types, Timeout) ->
+ gen_fsm:sync_send_event(C, {parse, Name, Sql, Types}, Timeout).
bind(C, Statement, PortalName, Parameters) ->
- gen_fsm:sync_send_event(C, {bind, Statement, PortalName, Parameters}, ?PGSQL_TIMEOUT).
+ bind(C, Statement, PortalName, Parameters, ?PGSQL_TIMEOUT).
+
+bind(C, Statement, PortalName, Parameters, Timeout) ->
+ gen_fsm:sync_send_event(C, {bind, Statement, PortalName, Parameters}, Timeout).
execute(C, Statement, PortalName, MaxRows) ->
- gen_fsm:sync_send_event(C, {execute, Statement, PortalName, MaxRows}, ?PGSQL_TIMEOUT).
+ execute(C, Statement, PortalName, MaxRows, ?PGSQL_TIMEOUT).
+
+execute(C, Statement, PortalName, MaxRows, Timeout) ->
+ gen_fsm:sync_send_event(C, {execute, Statement, PortalName, MaxRows}, Timeout).
describe(C, Type, Name) ->
- gen_fsm:sync_send_event(C, {describe, Type, Name}, ?PGSQL_TIMEOUT).
+ describe(C, Type, Name, ?PGSQL_TIMEOUT).
+
+describe(C, Type, Name, Timeout) ->
+ gen_fsm:sync_send_event(C, {describe, Type, Name}, Timeout).
close(C, Type, Name) ->
- gen_fsm:sync_send_event(C, {close, Type, Name}, ?PGSQL_TIMEOUT).
+ close(C, Type, Name, ?PGSQL_TIMEOUT).
+close(C, Type, Name, Timeout) ->
+ gen_fsm:sync_send_event(C, {close, Type, Name}, Timeout).
sync(C) ->
- gen_fsm:sync_send_event(C, sync, ?PGSQL_TIMEOUT).
+ sync(C, ?PGSQL_TIMEOUT).
+sync(C, Timeout) ->
+ gen_fsm:sync_send_event(C, sync, Timeout).
database(C) ->
- gen_fsm:sync_send_event(C, database, ?PGSQL_TIMEOUT).
+ database(C, ?PGSQL_TIMEOUT).
+
+database(C, Timeout) ->
+ gen_fsm:sync_send_event(C, database, Timeout).
%% -- gen_fsm implementation --
View
60 src/dbdrivers/postgresql/z_db.erl
@@ -36,12 +36,16 @@
assoc_props_row/3,
assoc/2,
assoc/3,
+ assoc/4,
assoc_props/2,
assoc_props/3,
+ assoc_props/4,
q/2,
q/3,
+ q/4,
q1/2,
q1/3,
+ q1/4,
q_row/2,
q_row/3,
equery/2,
@@ -226,10 +230,15 @@ get_parameter(Parameter, Context) ->
assoc(Sql, Context) ->
assoc(Sql, [], Context).
-assoc(Sql, Parameters, Context) ->
+assoc(Sql, Parameters, #context{} = Context) ->
+ assoc(Sql, Parameters, Context, ?PGSQL_TIMEOUT);
+assoc(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
+ assoc(Sql, [], Context, Timeout).
+
+assoc(Sql, Parameters, Context, Timeout) ->
F = fun(C) when C =:= none -> [];
(C) ->
- {ok, Result} = pgsql:assoc(C, Sql, Parameters),
+ {ok, Result} = pgsql:assoc(C, Sql, Parameters, Timeout),
Result
end,
with_connection(F, Context).
@@ -238,22 +247,32 @@ assoc(Sql, Parameters, Context) ->
assoc_props(Sql, Context) ->
assoc_props(Sql, [], Context).
-assoc_props(Sql, Parameters, Context) ->
+assoc_props(Sql, Parameters, #context{} = Context) ->
+ assoc_props(Sql, Parameters, Context, ?PGSQL_TIMEOUT);
+assoc_props(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
+ assoc_props(Sql, [], Context, Timeout).
+
+assoc_props(Sql, Parameters, Context, Timeout) ->
F = fun(C) when C =:= none -> [];
(C) ->
- {ok, Result} = pgsql:assoc(C, Sql, Parameters),
+ {ok, Result} = pgsql:assoc(C, Sql, Parameters, Timeout),
merge_props(Result)
end,
with_connection(F, Context).
q(Sql, Context) ->
- q(Sql, [], Context).
+ q(Sql, [], Context, ?PGSQL_TIMEOUT).
+
+q(Sql, Parameters, #context{} = Context) ->
+ q(Sql, Parameters, Context, ?PGSQL_TIMEOUT);
+q(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
+ q(Sql, [], Context, Timeout).
-q(Sql, Parameters, Context) ->
+q(Sql, Parameters, Context, Timeout) ->
F = fun(C) when C =:= none -> [];
(C) ->
- case pgsql:equery(C, Sql, Parameters) of
+ case pgsql:equery(C, Sql, Parameters, Timeout) of
{ok, _Affected, _Cols, Rows} -> Rows;
{ok, _Cols, Rows} -> Rows;
{ok, Rows} -> Rows
@@ -264,16 +283,21 @@ q(Sql, Parameters, Context) ->
q1(Sql, Context) ->
q1(Sql, [], Context).
-q1(Sql, Parameters, Context) ->
+q1(Sql, Parameters, #context{} = Context) ->
+ q1(Sql, Parameters, Context, ?PGSQL_TIMEOUT);
+q1(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
+ q1(Sql, [], Context, Timeout).
+
+q1(Sql, Parameters, Context, Timeout) ->
F = fun(C) when C =:= none -> undefined;
- (C) ->
- case pgsql:equery1(C, Sql, Parameters) of
+ (C) ->
+ case pgsql:equery1(C, Sql, Parameters, Timeout) of
{ok, Value} -> Value;
{error, noresult} -> undefined
end
- end,
+ end,
with_connection(F, Context).
-
+
q_row(Sql, Context) ->
q_row(Sql, [], Context).
@@ -288,11 +312,15 @@ q_row(Sql, Args, Context) ->
equery(Sql, Context) ->
equery(Sql, [], Context).
-equery(Sql, Parameters, Context) ->
+equery(Sql, Parameters, #context{} = Context) ->
+ equery(Sql, Parameters, Context, ?PGSQL_TIMEOUT);
+equery(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
+ equery(Sql, [], Context, Timeout).
+
+equery(Sql, Parameters, Context, Timeout) ->
F = fun(C) when C =:= none -> {error, noresult};
- (C) ->
- pgsql:equery(C, Sql, Parameters)
- end,
+ (C) -> pgsql:equery(C, Sql, Parameters, Timeout)
+ end,
with_connection(F, Context).
Please sign in to comment.
Something went wrong with that request. Please try again.