Skip to content

Commit

Permalink
extract cast api to apgsql, use gen_server:call in pgsql
Browse files Browse the repository at this point in the history
  • Loading branch information
mabrek committed Nov 14, 2011
1 parent 7e47b86 commit 974a6a2
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 86 deletions.
69 changes: 69 additions & 0 deletions src/apgsql.erl
@@ -0,0 +1,69 @@
%%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.

-module(apgsql).

-export([start_link/0,
connect/5,
close/1,
get_parameter/2,
squery/2,
equery/3,
parse/4,
bind/4,
execute/4,
describe/3,
close/3,
sync/1,
cancel/1]).

%% -- client interface --

start_link() ->
pgqsq_sock:start_link().

connect(C, Host, Username, Password, Opts) ->
cast(C, {connect, Host, Username, Password, Opts}).

close(C) ->
pgsql_sock:close(C).

get_parameter(C, Name) ->
pgsql_sock:get_parameter(C, Name).

squery(C, Sql) ->
cast(C, {squery, Sql}).

equery(C, Statement, Parameters) ->
cast(C, {equery, Statement, Parameters}).

parse(C, Name, Sql, Types) ->
cast(C, {parse, Name, Sql, Types}).

bind(C, Statement, PortalName, Parameters) ->
cast(C, {bind, Statement, PortalName, Parameters}).

execute(C, Statement, PortalName, MaxRows) ->
cast(C, {execute, Statement, PortalName, MaxRows}).

describe(C, statement, Name) ->
cast(C, {describe_statement, Name});

describe(C, portal, Name) ->
cast(C, {describe_portal, Name}).

close(C, Type, Name) ->
cast(C, {close, Type, Name}).

sync(C) ->
cast(C, sync).

cancel(C) ->
pgsql_sock:cancel(C).


%% -- internal functions --

cast(C, Command) ->
Ref = make_ref(),
gen_server:cast(C, {{self(), Ref}, Command}),
Ref.
59 changes: 20 additions & 39 deletions src/pgsql.erl
Expand Up @@ -9,7 +9,7 @@
-export([bind/3, bind/4, execute/2, execute/3, execute/4]).
-export([close/2, close/3, sync/1]).
-export([with_transaction/2]).
-export([receive_result/2, sync_on_error/2]).
-export([sync_on_error/2]).

-include("pgsql.hrl").

Expand All @@ -23,15 +23,14 @@ connect(Host, Username, Opts) ->

connect(Host, Username, Password, Opts) ->
{ok, C} = pgsql_sock:start_link(),
Ref = pgsql_sock:connect(C, Host, Username, Password, Opts),
%% TODO connect timeout
receive
{Ref, connected} ->
case gen_server:call(C,
{connect, Host, Username, Password, Opts},
infinity) of
connected ->
{ok, C};
{Ref, Error = {error, _}} ->
Error;
{'EXIT', C, _Reason} ->
{error, closed}
Error = {error, _} ->
Error
end.

close(C) ->
Expand All @@ -41,8 +40,7 @@ get_parameter(C, Name) ->
pgsql_sock:get_parameter(C, Name).

squery(C, Sql) ->
Ref = pgsql_sock:squery(C, Sql),
receive_result(C, Ref).
gen_server:call(C, {squery, Sql}, infinity).

equery(C, Sql) ->
equery(C, Sql, []).
Expand All @@ -52,8 +50,7 @@ equery(C, Sql, Parameters) ->
case parse(C, Sql) of
{ok, #statement{types = Types} = S} ->
Typed_Parameters = lists:zip(Types, Parameters),
Ref = pgsql_sock:equery(C, S, Typed_Parameters),
receive_result(C, Ref);
gen_server:call(C, {equery, S, Typed_Parameters}, infinity);
Error ->
Error
end.
Expand All @@ -67,17 +64,17 @@ parse(C, Sql, Types) ->
parse(C, "", Sql, Types).

parse(C, Name, Sql, Types) ->
Ref = pgsql_sock:parse(C, Name, Sql, Types),
sync_on_error(C, receive_result(C, Ref)).
sync_on_error(C, gen_server:call(C, {parse, Name, Sql, Types}, infinity)).

%% bind

bind(C, Statement, Parameters) ->
bind(C, Statement, "", Parameters).

bind(C, Statement, PortalName, Parameters) ->
Ref = pgsql_sock:bind(C, Statement, PortalName, Parameters),
sync_on_error(C, receive_result(C, Ref)).
sync_on_error(
C,
gen_server:call(C, {bind, Statement, PortalName, Parameters}, infinity)).

%% execute

Expand All @@ -88,29 +85,25 @@ execute(C, S, N) ->
execute(C, S, "", N).

execute(C, S, PortalName, N) ->
Ref = pgsql_sock:execute(C, S, PortalName, N),
receive_result(C, Ref).
gen_server:call(C, {execute, S, PortalName, N}, infinity).

%% statement/portal functions

describe(C, #statement{name = Name}) ->
describe(C, statement, Name).

%% TODO unknown result format of Describe portal
describe(C, Type, Name) ->
Ref = pgsql_sock:describe(C, Type, Name),
%% TODO unknown result format of Describe portal
sync_on_error(C, receive_result(C, Ref)).
sync_on_error(C, gen_server:call(C, {describe, Type, Name}, infinity)).

close(C, #statement{name = Name}) ->
close(C, statement, Name).

close(C, Type, Name) ->
Ref = pgsql_sock:close(C, Type, Name),
receive_result(C, Ref).
gen_server:call(C, {close, Type, Name}).

sync(C) ->
Ref = pgsql_sock:sync(C),
receive_result(C, Ref).
gen_server:call(C, sync).

%% misc helper functions
with_transaction(C, F) ->
Expand All @@ -125,20 +118,8 @@ with_transaction(C, F) ->
{rollback, Why}
end.

receive_result(C, Ref) ->
%% TODO timeout
receive
{Ref, Result} ->
Result;
%% TODO no 'EXIT' for not linked processes
{'EXIT', C, _Reason} ->
{error, closed}
end.

sync_on_error(C, Error = {error, _}) ->
Ref = pgsql_sock:sync(C),
receive_result(C, Ref),
Error;
sync_on_error(C, {error, _}) ->
sync(C);

sync_on_error(_C, R) ->
R.
Expand Down
48 changes: 1 addition & 47 deletions src/pgsql_sock.erl
Expand Up @@ -6,17 +6,8 @@
-behavior(gen_server).

-export([start_link/0,
connect/5,
close/1,
get_parameter/2,
squery/2,
equery/3,
parse/4,
bind/4,
execute/4,
describe/3,
close/3,
sync/1,
cancel/1]).

-export([handle_call/3, handle_cast/2, handle_info/2]).
Expand Down Expand Up @@ -48,44 +39,13 @@
start_link() ->
gen_server:start_link(?MODULE, [], []).

connect(C, Host, Username, Password, Opts) ->
cast(C, {connect, Host, Username, Password, Opts}).

%% TODO extract API functions
close(C) when is_pid(C) ->
catch gen_server:cast(C, stop),
ok.

get_parameter(C, Name) ->
gen_server:call(C, {get_parameter, to_binary(Name)}, infinity).

squery(C, Sql) ->
cast(C, {squery, Sql}).

equery(C, Statement, Parameters) ->
cast(C, {equery, Statement, Parameters}).

parse(C, Name, Sql, Types) ->
cast(C, {parse, Name, Sql, Types}).

bind(C, Statement, PortalName, Parameters) ->
cast(C, {bind, Statement, PortalName, Parameters}).

execute(C, Statement, PortalName, MaxRows) ->
cast(C, {execute, Statement, PortalName, MaxRows}).

describe(C, statement, Name) ->
cast(C, {describe_statement, Name});

describe(C, portal, Name) ->
cast(C, {describe_portal, Name}).

close(C, Type, Name) ->
cast(C, {close, Type, Name}).

sync(C) ->
cast(C, sync).

cancel(S) ->
gen_server:cast(S, cancel).

Expand Down Expand Up @@ -145,11 +105,6 @@ code_change(_OldVsn, State, _Extra) ->

%% -- internal functions --

cast(C, Command) ->
Ref = make_ref(),
gen_server:cast(C, {{self(), Ref}, Command}),
Ref.

command(Command, State = #state{sync_required = true})
when Command /= sync ->
{noreply, finish(State, {error, sync_required})};
Expand Down Expand Up @@ -214,8 +169,7 @@ command({bind, Statement, PortalName, Parameters}, State) ->
send(State, $H, []),
{noreply, State};

%% TODO unused parameter?
command({execute, _, PortalName, MaxRows}, State) ->
command({execute, _Statement, PortalName, MaxRows}, State) ->
send(State, $E, [PortalName, 0, <<MaxRows:?int32>>]),
send(State, $H, []),
{noreply, State};
Expand Down

0 comments on commit 974a6a2

Please sign in to comment.