Permalink
Browse files

Initial import of source

  • Loading branch information...
1 parent 9f7a187 commit 0038e8c9ef9d9c6c59ad8f96e96121377ccdd421 Joseph Wecker committed Apr 23, 2010
Showing with 263 additions and 0 deletions.
  1. +45 −0 Makefile
  2. +1 −0 ebin/epgsql_pool.app
  3. +9 −0 src/epgsql_pool.app
  4. +44 −0 src/epgsql_pool.erl
  5. +164 −0 src/pgsql_pool.erl
View
@@ -0,0 +1,45 @@
+NAME := epgsql_pool
+VERSION := 0.1
+
+ERL := erl
+ERLC := erlc
+
+EPGSQL_EBIN := ~/src/epgsql/ebin
+
+# ------------------------------------------------------------------------
+
+ERLC_FLAGS := -Wall
+
+SRC := $(wildcard src/*.erl)
+TESTS := $(wildcard test_src/*.erl)
+RELEASE := $(NAME)-$(VERSION).tar.gz
+
+APPDIR := $(NAME)-$(VERSION)
+BEAMS := $(SRC:src/%.erl=ebin/%.beam)
+
+compile: $(BEAMS)
+
+app: compile
+ @mkdir -p $(APPDIR)/ebin
+ @cp -r ebin/* $(APPDIR)/ebin/
+
+release: app
+ @tar czvf $(RELEASE) $(APPDIR)
+
+clean:
+ @rm -f ebin/*.beam
+ @rm -rf $(NAME)-$(VERSION) $(NAME)-*.tar.gz
+
+test: $(TESTS:test_src/%.erl=test_ebin/%.beam) $(BEAMS)
+ $(ERL) -pa $(EPGSQL_EBIN) -pa ebin/ -pa test_ebin/ -noshell -s pgsql_pool_tests test -s init stop
+
+# ------------------------------------------------------------------------
+
+.SUFFIXES: .erl .beam
+.PHONY: app compile clean test
+
+ebin/%.beam : src/%.erl
+ $(ERLC) $(ERLC_FLAGS) -o $(dir $@) $<
+
+test_ebin/%.beam : test_src/%.erl
+ $(ERLC) $(ERLC_FLAGS) -o $(dir $@) $<
View
@@ -0,0 +1,9 @@
+{application, epgsql_pool,
+ [{description, "PostgreSQL Connection Pool"},
+ {vsn, "0.1"},
+ {modules, [epgsql_pool, pgsql_pool]},
+ {registered, [epgsql_pool]},
+ {mod, {epgsql_pool, []}},
+ {applications, [kernel, stdlib, epgsql]},
+ {included_applications, []},
+ {env, [{pools, []}]}]}.
View
@@ -0,0 +1,44 @@
+-module(epgsql_pool).
+
+-behavior(application).
+-behavior(supervisor).
+
+-export([start_pool/3]).
+-export([start/2, stop/1, init/1]).
+
+%% -- client interface --
+
+start_pool(Name, Size, Opts) ->
+ supervisor:start_child(?MODULE, [Name, Size, Opts]).
+
+%% -- application implementation --
+
+start(_Type, _Args) ->
+ {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
+ {ok, Pools} = application:get_env(pools),
+ case catch lists:foreach(fun start_pool/1, Pools) of
+ {'EXIT', Why} -> {error, Why};
+ _Other -> {ok, Pid}
+ end.
+
+stop(_State) ->
+ ok.
+
+%% -- supervisor implementation --
+
+init([]) ->
+ {ok,
+ {{simple_one_for_one, 2, 60},
+ [{pool,
+ {pgsql_pool, start_link, []},
+ permanent, 2000, supervisor,
+ [pgsql_pool]}]}}.
+
+%% -- internal functions --
+
+start_pool(Name) ->
+ case application:get_env(Name) of
+ {ok, {Size, Opts}} -> start_pool(Name, Size, Opts);
+ {ok, Value} -> exit({invalid_pool_spec, Value});
+ undefined -> exit({missing_pool_spec, Name})
+ end.
View
@@ -0,0 +1,164 @@
+-module(pgsql_pool).
+
+-export([start_link/2, start_link/3, stop/1]).
+-export([get_connection/2, return_connection/2]).
+
+-export([init/1, code_change/3, terminate/2]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-record(state, {id, connections, monitors, waiting, opts}).
+
+ %% -- client interface --
+
+opts(Opts) ->
+ Defaults = [{host, "localhost"},
+ {password, ""},
+ {username, os:getenv("USER")}],
+ Opts2 = lists:ukeysort(1, proplists:unfold(Opts)),
+ proplists:normalize(lists:ukeymerge(1, Opts2, Defaults), []).
+
+start_link(Size, Opts) ->
+ gen_server:start_link(?MODULE, {undefined, Size, opts(Opts)}, []).
+
+start_link(Name, Size, Opts) ->
+ gen_server:start_link({local, Name}, ?MODULE, {Name, Size, opts(Opts)}, []).
+
+stop(P) ->
+ gen_server:cast(P, stop).
+
+get_connection(P, Timeout) ->
+ gen_server:cast(P, {get_connection, self()}),
+ receive
+ {pgsql_pool, P, {connection, C}} ->
+ P ! {pgsql_pool, P, {ack, self(), C}},
+ {ok, C}
+ after
+ Timeout ->
+ gen_server:cast(P, {cancel_wait, self()}),
+ {error, timeout}
+ end.
+
+return_connection(P, C) ->
+ gen_server:call(P, {return_connection, C}).
+
+%% -- gen_server implementation --
+
+init({Name, Size, Opts}) ->
+ process_flag(trap_exit, true),
+
+ case Name of
+ undefined -> Id = self();
+ _Name -> Id = Name
+ end,
+ Connections = connect(Size, Opts),
+
+ State = #state{
+ id = Id,
+ opts = Opts,
+ connections = Connections,
+ monitors = [],
+ waiting = queue:new()},
+ {ok, State}.
+
+handle_call({return_connection, C}, _From, State) ->
+ #state{monitors = Monitors} = State,
+ case lists:keytake(C, 1, Monitors) of
+ {value, {C, M}, Monitors2} ->
+ erlang:demonitor(M),
+ {reply, ok, return(C, State#state{monitors = Monitors2})};
+ false ->
+ {reply, ok, State}
+ end;
+
+handle_call(Request, _From, State) ->
+ {stop, {unsupported_call, Request}, State}.
+
+handle_cast({get_connection, Pid}, State) ->
+ #state{connections = Connections, waiting = Waiting} = State,
+ case Connections of
+ [C | T] -> {noreply, deliver(Pid, C, State#state{connections = T})};
+ [] -> {noreply, State#state{waiting = queue:in(Pid, Waiting)}}
+ end;
+
+handle_cast({cancel_wait, Pid}, State) ->
+ #state{waiting = Waiting} = State,
+ Waiting2 = queue:filter(fun(P) -> P =/= Pid end, Waiting),
+ {noreply, State#state{waiting = Waiting2}};
+
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
+handle_cast(Request, State) ->
+ {stop, {unsupported_cast, Request}, State}.
+
+handle_info({'DOWN', M, process, _Pid, _Info}, State) ->
+ #state{monitors = Monitors} = State,
+ case lists:keytake(M, 2, Monitors) of
+ {value, {C, M}, Monitors2} ->
+ State2 = return(C, State#state{monitors = Monitors2}),
+ {noreply, State2};
+ false ->
+ {noreply, State}
+ end;
+
+handle_info({'EXIT', Pid, _Reason}, State) ->
+ #state{opts = Opts, connections = Connections, monitors = Monitors} = State,
+ Connections2 = lists:delete(Pid, Connections),
+ F = fun({C, M}) when C == Pid -> erlang:demonitor(M), false;
+ ({_, _}) -> true
+ end,
+ Monitors2 = lists:filter(F, Monitors),
+ [C] = connect(1, Opts),
+ State2 = return(C, State#state{connections = Connections2, monitors = Monitors2}),
+ {noreply, State2};
+
+handle_info({pgsql_pool, P, {ack, Pid, _C}}, #state{id = P} = State) ->
+ error_logger:error_msg("pgsql_pool ~p received late ack from ~p~n", [P, Pid]),
+ {noreply, State};
+
+handle_info(Info, State) ->
+ {stop, {unsupported_info, Info}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+%% -- internal functions --
+
+connect(N, Opts) ->
+ connect(N, Opts, []).
+
+connect(0, _Opts, Acc) ->
+ Acc;
+connect(N, Opts, Acc) ->
+ Host = proplists:get_value(host, Opts),
+ Username = proplists:get_value(username, Opts),
+ Password = proplists:get_value(password, Opts),
+ {ok, C} = pgsql:connect(Host, Username, Password, Opts),
+ connect(N - 1, Opts, [C | Acc]).
+
+deliver(Pid, C, State) ->
+ #state{id = Id, connections = Connections, monitors = Monitors} = State,
+ Pid ! {pgsql_pool, Id, {connection, C}},
+ receive
+ {pgsql_pool, Id, {ack, Pid, C}} ->
+ M = erlang:monitor(process, Pid),
+ Monitors2 = [{C, M} | Monitors],
+ State#state{monitors = Monitors2}
+ after
+ 100 ->
+ State#state{connections = [C | Connections]}
+ end.
+
+return(C, State) ->
+ #state{connections = Connections, waiting = Waiting} = State,
+ case queue:out(Waiting) of
+ {{value, Pid}, Waiting2} ->
+ State2 = deliver(Pid, C, State),
+ State2#state{waiting = Waiting2};
+ {empty, _Waiting} ->
+ Connections2 = [C | Connections],
+ State#state{connections = Connections2}
+ end.

0 comments on commit 0038e8c

Please sign in to comment.