Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 8 commits
  • 4 files changed
  • 0 commit comments
  • 3 contributors
View
3  .gitignore
@@ -1,4 +1,5 @@
-\.beam
+*.beam
+ebin/
\.boot
\.script
View
2  src/epgsql_pool.app
@@ -1,6 +1,6 @@
{application, epgsql_pool,
[{description, "PostgreSQL Connection Pool"},
- {vsn, "0.1"},
+ {vsn, "0.1.0"},
{modules, [epgsql_pool, pgsql_pool]},
{registered, [epgsql_pool]},
{mod, {epgsql_pool, []}},
View
48 src/epgsql_pool.erl
@@ -1,44 +1,30 @@
-module(epgsql_pool).
--behavior(application).
--behavior(supervisor).
+-behaviour(supervisor).
-export([start_pool/3]).
--export([start/2, stop/1, init/1]).
+-export([start_link/1, init/1]).
%% -- client interface --
start_pool(Name, Size, Opts) ->
- supervisor:start_child(?MODULE, [Name, Size, Opts]).
+ Pool = {Name, {pgsql_pool, start_link, [Name, Size, Opts]}, permanent, 2000, worker, dynamic},
+ supervisor:start_child(?MODULE, Pool).
-%% -- application implementation --
+%% -- supervisor implementation --
-start(_Type, _Args) ->
- {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
- {ok, Pools} = application:get_env(pools),
- case catch lists:foreach(fun start_pool/1, Pools) of
- {'EXIT', Why} -> {error, Why};
- _Other -> {ok, Pid}
- end.
+%% @spec start_link(Opts) -> ServerRet
+%% @doc API for starting the supervisor.
+start_link(Opts) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, Opts).
-stop(_State) ->
- ok.
-%% -- supervisor implementation --
+%% @spec init([]) -> SupervisorTree
+%% @doc supervisor callback.
+init(Opts) ->
+ Pools = [
+ {Name, {pgsql_pool, start_link, [Name, Size, PoolOpts]}, permanent, 2000, worker, dynamic}
+ || {Name, Size, PoolOpts} <- Opts
+ ],
+ {ok, {{one_for_one, 2, 60}, Pools}}.
-init([]) ->
- {ok,
- {{simple_one_for_one, 2, 60},
- [{pool,
- {pgsql_pool, start_link, []},
- permanent, 2000, supervisor,
- [pgsql_pool]}]}}.
-
-%% -- internal functions --
-
-start_pool(Name) ->
- case application:get_env(Name) of
- {ok, {Size, Opts}} -> start_pool(Name, Size, Opts);
- {ok, Value} -> exit({invalid_pool_spec, Value});
- undefined -> exit({missing_pool_spec, Name})
- end.
View
138 src/pgsql_pool.erl
@@ -1,22 +1,24 @@
+% vim: ts=4 sts=4 sw=4 et:
-module(pgsql_pool).
-export([start_link/2, start_link/3, stop/1]).
-export([get_connection/1, get_connection/2, return_connection/2]).
--export([get_database/1]).
+-export([status/1]).
-export([init/1, code_change/3, terminate/2]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
-record(state, {id, size, connections, monitors, waiting, opts, timer}).
- %% -- client interface --
+%% -- client interface --
opts(Opts) ->
Defaults = [{host, "localhost"},
{port, 5432},
{password, ""},
- {username, os:getenv("USER")},
- {database, "not_given"}],
+ {username, "zotonic"},
+ {database, "zotonic"},
+ {schema, "public"}],
Opts2 = lists:ukeysort(1, proplists:unfold(Opts)),
proplists:normalize(lists:ukeymerge(1, Opts2, Defaults), []).
@@ -39,63 +41,87 @@ get_connection(P) ->
%% @doc Get a db connection, wait at most Timeout seconds before giving up.
get_connection(P, Timeout) ->
- try
- gen_server:call(P, get_connection, Timeout)
- catch
- _:_ ->
+ try
+ gen_server:call(P, get_connection, Timeout)
+ catch
+ _:_ ->
gen_server:cast(P, {cancel_wait, self()}),
{error, timeout}
- end.
+ end.
%% @doc Return a db connection back to the connection pool.
return_connection(P, C) ->
gen_server:cast(P, {return_connection, C}).
-%% @doc Return the name of the database used for the pool.
-get_database(P) ->
- {ok, C} = get_connection(P),
- {ok, Db} = pgsql_connection:database(C),
- return_connection(P, C),
- {ok, Db}.
+%% @doc Return the current status of the connection pool.
+status(P) ->
+ gen_server:call(P, status).
+
%% -- gen_server implementation --
init({Name, Size, Opts}) ->
process_flag(trap_exit, true),
Id = case Name of
- undefined -> self();
- _Name -> Name
- end,
- {ok, Connection} = connect(Opts),
- {ok, TRef} = timer:send_interval(60000, close_unused),
+ undefined -> self();
+ _Name -> Name
+ end,
+ Connections = case connect(Opts) of
+ {ok, Connection} ->
+ [{Connection, now_secs()}];
+ {error, _Error} ->
+ []
+ end,
+ {ok, TRef} = timer:send_interval(60000, close_unused),
State = #state{
id = Id,
size = Size,
opts = Opts,
- connections = [{Connection, now_secs()}],
+ connections = Connections,
monitors = [],
waiting = queue:new(),
timer = TRef},
{ok, State}.
%% Requestor wants a connection. When available then immediately return, otherwise add to the waiting queue.
-handle_call(get_connection, From, #state{connections = Connections, waiting = Waiting} = State) ->
+handle_call(get_connection, From, State) ->
+ handle_call({get_connection, wait}, From, State);
+
+%% Requestor wants a connection immediately.
+handle_call(get_connection_nowait, From, State) ->
+ handle_call({get_connection, nowait}, From, State);
+
+handle_call({get_connection, WaitMode}, From, #state{connections = Connections, waiting = Waiting} = State) ->
case Connections of
- [{C,_} | T] ->
- % Return existing unused connection
- {noreply, deliver(From, C, State#state{connections = T})};
+ [{C, _} | T] ->
+ {noreply, deliver(From, C, State#state{connections = T})};
[] ->
- case length(State#state.monitors) < State#state.size of
- true ->
- % Allocate a new connection and return it.
- {ok, C} = connect(State#state.opts),
- {noreply, deliver(From, C, State)};
- false ->
- % Reached max connections, let the requestor wait
- {noreply, State#state{waiting = queue:in(From, Waiting)}}
- end
+ case length(State#state.monitors) < State#state.size of
+ true ->
+ case connect(State#state.opts) of
+ {ok, C} ->
+ {noreply, deliver(From, C, State)};
+ {error, _Error} ->
+ {reply, {error, busy}, State}
+ end;
+ false ->
+ case WaitMode of
+ wait ->
+ {noreply, State#state{waiting = queue:in(From, Waiting)}};
+ nowait ->
+ {reply, {error, busy}, State}
+ end
+ end
end;
+%% Return the status of the connection pool
+handle_call(status, _From, State) ->
+ {reply, [{free, length(State#state.connections)}, {in_use, length(State#state.monitors)}, {size, State#state.size}, {waiting, queue:len(State#state.waiting)}], State};
+
+%% Return full status of the connection pool
+handle_call(full_status, _From, State) ->
+ {reply, State, State};
+
%% Trap unsupported calls
handle_call(Request, _From, State) ->
{stop, {unsupported_call, Request}, State}.
@@ -124,17 +150,21 @@ handle_cast(Request, State) ->
{stop, {unsupported_cast, Request}, State}.
%% Close all connections that are unused for longer than a minute.
+%% echo: disabled by siden (2011-07)
handle_info(close_unused, State) ->
- Old = now_secs() - 60,
- {Unused, Used} = lists:partition(fun({_C,Time}) -> Time < Old end, State#state.connections),
- [ pgsql:close(C) || {C,_} <- Unused ],
- {noreply, State#state{connections=Used}};
+ {noreply, State};
+%% echo: not used for now (2011-07)
+handle_info(never_close_unused, State) ->
+ Old = now_secs() - 60,
+ {Unused, Used} = lists:partition(fun({_C,Time}) -> Time < Old end, State#state.connections),
+ [ pgsql:close(C) || {C,_} <- Unused ],
+ {noreply, State#state{connections=Used}};
%% Requestor we are monitoring went down. Kill the associated connection, as it might be in an unknown state.
handle_info({'DOWN', M, process, _Pid, _Info}, #state{monitors = Monitors} = State) ->
case lists:keytake(M, 2, Monitors) of
{value, {C, M}, Monitors2} ->
- pgsql:close(C),
+ catch pgsql:close(C),
{noreply, State#state{monitors = Monitors2}};
false ->
{noreply, State}
@@ -144,9 +174,11 @@ handle_info({'DOWN', M, process, _Pid, _Info}, #state{monitors = Monitors} = Sta
handle_info({'EXIT', ConnectionPid, _Reason}, State) ->
#state{connections = Connections, monitors = Monitors} = State,
Connections2 = proplists:delete(ConnectionPid, Connections),
- F = fun({C, M}) when C == ConnectionPid -> erlang:demonitor(M), false;
- ({_, _}) -> true
- end,
+ F = fun({C, M}) when C == ConnectionPid ->
+ erlang:demonitor(M),
+ false;
+ ({_, _}) -> true
+ end,
Monitors2 = lists:filter(F, Monitors),
{noreply, State#state{connections = Connections2, monitors = Monitors2}};
@@ -155,7 +187,7 @@ handle_info(Info, State) ->
{stop, {unsupported_info, Info}, State}.
terminate(_Reason, State) ->
- timer:cancel(State#state.timer),
+ timer:cancel(State#state.timer),
ok.
code_change(_OldVsn, State, _Extra) ->
@@ -167,12 +199,21 @@ connect(Opts) ->
Host = proplists:get_value(host, Opts),
Username = proplists:get_value(username, Opts),
Password = proplists:get_value(password, Opts),
- pgsql:connect(Host, Username, Password, Opts).
+ try
+ connect_ll(Host, Username, Password, Opts)
+ catch C:R ->
+ {error, {C, R}}
+ end.
+
+connect_ll(Host, Username, Password, Opts) ->
+ {ok, Conn} = pgsql:connect(Host, Username, Password, Opts),
+ {ok, [], []} = pgsql:squery(Conn, "SET search_path TO " ++ proplists:get_value(schema, Opts)),
+ {ok, Conn}.
deliver({Pid,_Tag} = From, C, #state{monitors=Monitors} = State) ->
M = erlang:monitor(process, Pid),
- gen_server:reply(From, {ok, C}),
- State#state{ monitors=[{C, M} | Monitors] }.
+ gen_server:reply(From, {ok, C}),
+ State#state{ monitors=[{C, M} | Monitors] }.
return(C, #state{connections = Connections, waiting = Waiting} = State) ->
case queue:out(Waiting) of
@@ -180,12 +221,11 @@ return(C, #state{connections = Connections, waiting = Waiting} = State) ->
State2 = deliver(From, C, State),
State2#state{waiting = Waiting2};
{empty, _Waiting} ->
- Connections2 = [{C, now_secs()} | Connections],
+ Connections2 = Connections ++ [{C, now_secs()}],
State#state{connections = Connections2}
end.
-
%% Return the current time in seconds, used for timeouts.
now_secs() ->
{M,S,_M} = erlang:now(),
- M*1000 + S.
+ M*1000000 + S.

No commit comments for this range

Something went wrong with that request. Please try again.