Skip to content

Commit

Permalink
support asynchronous notices and notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
wg committed Jun 8, 2010
1 parent 27762ea commit 122028f
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 14 deletions.
40 changes: 33 additions & 7 deletions README
Expand Up @@ -9,17 +9,18 @@ Erlang PostgreSQL Database Client
Password - optional password to authenticate with. Password - optional password to authenticate with.
Opts - property list of extra options. Supported properties: Opts - property list of extra options. Supported properties:


+ database + {database, String}
+ port + {port, Integer}
+ ssl (true | false | required) + {ssl, Atom} true | false | required
+ ssl_opts (see ssl docs in OTP) + {ssl_opts List} see ssl application docs in OTP
+ timeout (milliseconds, defaults to 5000) + {timeout, Integer} milliseconds, defaults to 5000
+ {async, Pid} see Asynchronous Messages section


{ok, C} = pgsql:connect("localhost", "username", [{database, "test_db"}]). {ok, C} = pgsql:connect("localhost", "username", [{database, "test_db"}]).
ok = pgsql:close(C). ok = pgsql:close(C).


The timeout parameter is applied to all operations. In the case of equery The timeout parameter will trigger an {error, timeout} result when the
this means that total execution time may exceed the timeout value. server fails to respond within Timeout milliseconds.


* Simple Query * Simple Query


Expand Down Expand Up @@ -109,3 +110,28 @@ Erlang PostgreSQL Database Client
Errors originating from the PostgreSQL backend are returned as {error, #error{}}, Errors originating from the PostgreSQL backend are returned as {error, #error{}},
see pgsql.hrl for the record definition. epgsql may also return {error, Atom} see pgsql.hrl for the record definition. epgsql may also return {error, Atom}
where Atom is 'timeout' or 'closed'. where Atom is 'timeout' or 'closed'.

* Asynchronous Messages

PostgreSQL may deliver two types of asynchronous message: "notices" in response
to notice and warning messages generated by the server, and "notifications" which
are generated by the LISTEN/NOTIFY mechanism.

Passing the {async, Pid} option to pgsql:connect will result in these async
messages being sent to the specified process, otherwise they will be dropped.

Message formats:

{pgsql, Connection, {notification, Channel, Pid, Payload}}

Connection - connection the notification occured on

Channel - channel the notification occured on
Pid - database session pid that sent notification
Payload - optional payload, only available from PostgreSQL >= 9.0

{pgsql, Connection, {notice, Error}}

Connection - connection the notice occured on
Error - an #error{} record, see pgsql.hrl

4 changes: 0 additions & 4 deletions src/pgsql.erl
Expand Up @@ -143,8 +143,6 @@ receive_result(C, Cols, Rows) ->
end; end;
{pgsql, C, {complete, _Type}} -> {pgsql, C, {complete, _Type}} ->
{ok, Cols, lists:reverse(Rows)}; {ok, Cols, lists:reverse(Rows)};
{pgsql, C, {notice, _N}} ->
receive_result(C, Cols, Rows);
{pgsql, C, done} -> {pgsql, C, done} ->
done; done;
{pgsql, C, timeout} -> {pgsql, C, timeout} ->
Expand All @@ -171,8 +169,6 @@ receive_extended_result(C, Rows) ->
end; end;
{pgsql, C, {complete, _Type}} -> {pgsql, C, {complete, _Type}} ->
{ok, lists:reverse(Rows)}; {ok, lists:reverse(Rows)};
{pgsql, C, {notice, _N}} ->
receive_extended_result(C, Rows);
{pgsql, C, timeout} -> {pgsql, C, timeout} ->
{error, timeout}; {error, timeout};
{'EXIT', C, _Reason} -> {'EXIT', C, _Reason} ->
Expand Down
22 changes: 19 additions & 3 deletions src/pgsql_connection.erl
Expand Up @@ -25,6 +25,7 @@
parameters = [], parameters = [],
reply, reply,
reply_to, reply_to,
async,
backend, backend,
statement, statement,
txstatus}). txstatus}).
Expand Down Expand Up @@ -76,8 +77,12 @@ init([]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, startup, #state{}}. {ok, startup, #state{}}.


handle_event({notice, Notice}, State_Name, State) -> handle_event({notice, _Notice} = Msg, State_Name, State) ->
notify(State, {notice, Notice}), notify_async(State, Msg),
{next_state, State_Name, State};

handle_event({notification, _Channel, _Pid, _Payload} = Msg, State_Name, State) ->
notify_async(State, Msg),
{next_state, State_Name, State}; {next_state, State_Name, State};


handle_event({parameter_status, Name, Value}, State_Name, State) -> handle_event({parameter_status, Name, Value}, State_Name, State) ->
Expand Down Expand Up @@ -113,11 +118,16 @@ code_change(_Old_Vsn, State_Name, State, _Extra) ->


startup({connect, Host, Username, Password, Opts}, From, State) -> startup({connect, Host, Username, Password, Opts}, From, State) ->
Timeout = proplists:get_value(timeout, Opts, 5000), Timeout = proplists:get_value(timeout, Opts, 5000),
Async = proplists:get_value(async, Opts, undefined),
case pgsql_sock:start_link(self(), Host, Username, Opts) of case pgsql_sock:start_link(self(), Host, Username, Opts) of
{ok, Sock} -> {ok, Sock} ->
put(username, Username), put(username, Username),
put(password, Password), put(password, Password),
State2 = State#state{sock = Sock, timeout = Timeout, reply_to = From}, State2 = State#state{
sock = Sock,
timeout = Timeout,
reply_to = From,
async = Async},
{next_state, auth, State2, Timeout}; {next_state, auth, State2, Timeout};
Error -> Error ->
{stop, normal, Error, State} {stop, normal, Error, State}
Expand Down Expand Up @@ -619,6 +629,12 @@ encode_list(L) ->
notify(#state{reply_to = {Pid, _Tag}}, Msg) -> notify(#state{reply_to = {Pid, _Tag}}, Msg) ->
Pid ! {pgsql, self(), Msg}. Pid ! {pgsql, self(), Msg}.


notify_async(#state{async = Pid}, Msg) ->
case is_pid(Pid) of
true -> Pid ! {pgsql, self(), Msg};
false -> false
end.

to_binary(B) when is_binary(B) -> B; to_binary(B) when is_binary(B) -> B;
to_binary(L) when is_list(L) -> list_to_binary(L). to_binary(L) when is_list(L) -> list_to_binary(L).


Expand Down
8 changes: 8 additions & 0 deletions src/pgsql_sock.erl
Expand Up @@ -147,6 +147,14 @@ decode(<<Type:8, Len:?int32, Rest/binary>> = Bin, #state{c = C} = State) ->
<<Data:Len2/binary, Tail/binary>> when Type == $E -> <<Data:Len2/binary, Tail/binary>> when Type == $E ->
gen_fsm:send_event(C, {error, decode_error(Data)}), gen_fsm:send_event(C, {error, decode_error(Data)}),
decode(Tail, State); decode(Tail, State);
<<Data:Len2/binary, Tail/binary>> when Type == $A ->
<<Pid:?int32, Strings/binary>> = Data,
case decode_strings(Strings) of
[Channel, Payload] -> ok;
[Channel] -> Payload = <<>>
end,
gen_fsm:send_all_state_event(C, {notification, Channel, Pid, Payload}),
decode(Tail, State);
<<Data:Len2/binary, Tail/binary>> -> <<Data:Len2/binary, Tail/binary>> ->
gen_fsm:send_event(C, {Type, Data}), gen_fsm:send_event(C, {Type, Data}),
decode(Tail, State); decode(Tail, State);
Expand Down
53 changes: 53 additions & 0 deletions test_src/pgsql_tests.erl
Expand Up @@ -484,6 +484,47 @@ active_connection_closed_test() ->
end, end,
flush(). flush().


warning_notice_test() ->
with_connection(
fun(C) ->
{ok, _, _} = pgsql:squery(C, "select 'test\\n'"),
receive
{pgsql, C, {notice, #error{code = <<"22P06">>}}} -> ok
after
100 -> erlang:error(didnt_receive_notice)
end
end,
[{async, self()}]).

listen_notify_test() ->
with_connection(
fun(C) ->
{ok, [], []} = pgsql:squery(C, "listen epgsql_test"),
{ok, _, [{Pid}]} = pgsql:equery(C, "select pg_backend_pid()"),
{ok, [], []} = pgsql:squery(C, "notify epgsql_test"),
receive
{pgsql, C, {notification, <<"epgsql_test">>, Pid, <<>>}} -> ok
after
100 -> erlang:error(didnt_receive_notification)
end
end,
[{async, self()}]).

listen_notify_payload_test() ->
with_min_version(
9.0,
fun(C) ->
{ok, [], []} = pgsql:squery(C, "listen epgsql_test"),
{ok, _, [{Pid}]} = pgsql:equery(C, "select pg_backend_pid()"),
{ok, [], []} = pgsql:squery(C, "notify epgsql_test, 'test!'"),
receive
{pgsql, C, {notification, <<"epgsql_test">>, Pid, <<"test!">>}} -> ok
after
100 -> erlang:error(didnt_receive_notification)
end
end,
[{async, self()}]).

%% -- run all tests -- %% -- run all tests --


run_tests() -> run_tests() ->
Expand Down Expand Up @@ -531,6 +572,18 @@ with_rollback(F) ->
end end
end). end).


with_min_version(Min, F, Args) ->
with_connection(
fun(C) ->
{ok, Bin} = pgsql:get_parameter(C, <<"server_version">>),
{ok, [{float, 1, Ver} | _], _} = erl_scan:string(binary_to_list(Bin)),
case Ver >= Min of
true -> F(C);
false -> ?debugFmt("skipping test requiring PostgreSQL >= ~.2f~n", [Min])
end
end,
Args).

check_type(Type, In, Out, Values) -> check_type(Type, In, Out, Values) ->
Column = "c_" ++ atom_to_list(Type), Column = "c_" ++ atom_to_list(Type),
check_type(Type, In, Out, Values, Column). check_type(Type, In, Out, Values, Column).
Expand Down

0 comments on commit 122028f

Please sign in to comment.