From 122028f9e1e0b6750d400a93f104a5939702c20f Mon Sep 17 00:00:00 2001 From: Will Date: Mon, 7 Jun 2010 21:22:12 -0700 Subject: [PATCH] support asynchronous notices and notifications --- README | 40 ++++++++++++++++++++++++------ src/pgsql.erl | 4 --- src/pgsql_connection.erl | 22 ++++++++++++++--- src/pgsql_sock.erl | 8 ++++++ test_src/pgsql_tests.erl | 53 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 14 deletions(-) diff --git a/README b/README index 24d8bca..725c30f 100644 --- a/README +++ b/README @@ -9,17 +9,18 @@ Erlang PostgreSQL Database Client Password - optional password to authenticate with. Opts - property list of extra options. Supported properties: - + database - + port - + ssl (true | false | required) - + ssl_opts (see ssl docs in OTP) - + timeout (milliseconds, defaults to 5000) + + {database, String} + + {port, Integer} + + {ssl, Atom} true | false | required + + {ssl_opts List} see ssl application docs in OTP + + {timeout, Integer} milliseconds, defaults to 5000 + + {async, Pid} see Asynchronous Messages section {ok, C} = pgsql:connect("localhost", "username", [{database, "test_db"}]). ok = pgsql:close(C). - The timeout parameter is applied to all operations. In the case of equery - this means that total execution time may exceed the timeout value. + The timeout parameter will trigger an {error, timeout} result when the + server fails to respond within Timeout milliseconds. * Simple Query @@ -109,3 +110,28 @@ Erlang PostgreSQL Database Client Errors originating from the PostgreSQL backend are returned as {error, #error{}}, see pgsql.hrl for the record definition. epgsql may also return {error, Atom} where Atom is 'timeout' or 'closed'. + +* Asynchronous Messages + + PostgreSQL may deliver two types of asynchronous message: "notices" in response + to notice and warning messages generated by the server, and "notifications" which + are generated by the LISTEN/NOTIFY mechanism. + + Passing the {async, Pid} option to pgsql:connect will result in these async + messages being sent to the specified process, otherwise they will be dropped. + + Message formats: + + {pgsql, Connection, {notification, Channel, Pid, Payload}} + + Connection - connection the notification occured on + + Channel - channel the notification occured on + Pid - database session pid that sent notification + Payload - optional payload, only available from PostgreSQL >= 9.0 + + {pgsql, Connection, {notice, Error}} + + Connection - connection the notice occured on + Error - an #error{} record, see pgsql.hrl + diff --git a/src/pgsql.erl b/src/pgsql.erl index 48b629e..aef2fcb 100644 --- a/src/pgsql.erl +++ b/src/pgsql.erl @@ -143,8 +143,6 @@ receive_result(C, Cols, Rows) -> end; {pgsql, C, {complete, _Type}} -> {ok, Cols, lists:reverse(Rows)}; - {pgsql, C, {notice, _N}} -> - receive_result(C, Cols, Rows); {pgsql, C, done} -> done; {pgsql, C, timeout} -> @@ -171,8 +169,6 @@ receive_extended_result(C, Rows) -> end; {pgsql, C, {complete, _Type}} -> {ok, lists:reverse(Rows)}; - {pgsql, C, {notice, _N}} -> - receive_extended_result(C, Rows); {pgsql, C, timeout} -> {error, timeout}; {'EXIT', C, _Reason} -> diff --git a/src/pgsql_connection.erl b/src/pgsql_connection.erl index 23be620..695932d 100644 --- a/src/pgsql_connection.erl +++ b/src/pgsql_connection.erl @@ -25,6 +25,7 @@ parameters = [], reply, reply_to, + async, backend, statement, txstatus}). @@ -76,8 +77,12 @@ init([]) -> process_flag(trap_exit, true), {ok, startup, #state{}}. -handle_event({notice, Notice}, State_Name, State) -> - notify(State, {notice, Notice}), +handle_event({notice, _Notice} = Msg, State_Name, State) -> + notify_async(State, Msg), + {next_state, State_Name, State}; + +handle_event({notification, _Channel, _Pid, _Payload} = Msg, State_Name, State) -> + notify_async(State, Msg), {next_state, State_Name, State}; handle_event({parameter_status, Name, Value}, State_Name, State) -> @@ -113,11 +118,16 @@ code_change(_Old_Vsn, State_Name, State, _Extra) -> startup({connect, Host, Username, Password, Opts}, From, State) -> Timeout = proplists:get_value(timeout, Opts, 5000), + Async = proplists:get_value(async, Opts, undefined), case pgsql_sock:start_link(self(), Host, Username, Opts) of {ok, Sock} -> put(username, Username), put(password, Password), - State2 = State#state{sock = Sock, timeout = Timeout, reply_to = From}, + State2 = State#state{ + sock = Sock, + timeout = Timeout, + reply_to = From, + async = Async}, {next_state, auth, State2, Timeout}; Error -> {stop, normal, Error, State} @@ -619,6 +629,12 @@ encode_list(L) -> notify(#state{reply_to = {Pid, _Tag}}, Msg) -> Pid ! {pgsql, self(), Msg}. +notify_async(#state{async = Pid}, Msg) -> + case is_pid(Pid) of + true -> Pid ! {pgsql, self(), Msg}; + false -> false + end. + to_binary(B) when is_binary(B) -> B; to_binary(L) when is_list(L) -> list_to_binary(L). diff --git a/src/pgsql_sock.erl b/src/pgsql_sock.erl index d29cb67..a750396 100644 --- a/src/pgsql_sock.erl +++ b/src/pgsql_sock.erl @@ -147,6 +147,14 @@ decode(<> = Bin, #state{c = C} = State) -> <> when Type == $E -> gen_fsm:send_event(C, {error, decode_error(Data)}), decode(Tail, State); + <> when Type == $A -> + <> = Data, + case decode_strings(Strings) of + [Channel, Payload] -> ok; + [Channel] -> Payload = <<>> + end, + gen_fsm:send_all_state_event(C, {notification, Channel, Pid, Payload}), + decode(Tail, State); <> -> gen_fsm:send_event(C, {Type, Data}), decode(Tail, State); diff --git a/test_src/pgsql_tests.erl b/test_src/pgsql_tests.erl index a9dd051..62d30c3 100644 --- a/test_src/pgsql_tests.erl +++ b/test_src/pgsql_tests.erl @@ -484,6 +484,47 @@ active_connection_closed_test() -> end, flush(). +warning_notice_test() -> + with_connection( + fun(C) -> + {ok, _, _} = pgsql:squery(C, "select 'test\\n'"), + receive + {pgsql, C, {notice, #error{code = <<"22P06">>}}} -> ok + after + 100 -> erlang:error(didnt_receive_notice) + end + end, + [{async, self()}]). + +listen_notify_test() -> + with_connection( + fun(C) -> + {ok, [], []} = pgsql:squery(C, "listen epgsql_test"), + {ok, _, [{Pid}]} = pgsql:equery(C, "select pg_backend_pid()"), + {ok, [], []} = pgsql:squery(C, "notify epgsql_test"), + receive + {pgsql, C, {notification, <<"epgsql_test">>, Pid, <<>>}} -> ok + after + 100 -> erlang:error(didnt_receive_notification) + end + end, + [{async, self()}]). + +listen_notify_payload_test() -> + with_min_version( + 9.0, + fun(C) -> + {ok, [], []} = pgsql:squery(C, "listen epgsql_test"), + {ok, _, [{Pid}]} = pgsql:equery(C, "select pg_backend_pid()"), + {ok, [], []} = pgsql:squery(C, "notify epgsql_test, 'test!'"), + receive + {pgsql, C, {notification, <<"epgsql_test">>, Pid, <<"test!">>}} -> ok + after + 100 -> erlang:error(didnt_receive_notification) + end + end, + [{async, self()}]). + %% -- run all tests -- run_tests() -> @@ -531,6 +572,18 @@ with_rollback(F) -> end end). +with_min_version(Min, F, Args) -> + with_connection( + fun(C) -> + {ok, Bin} = pgsql:get_parameter(C, <<"server_version">>), + {ok, [{float, 1, Ver} | _], _} = erl_scan:string(binary_to_list(Bin)), + case Ver >= Min of + true -> F(C); + false -> ?debugFmt("skipping test requiring PostgreSQL >= ~.2f~n", [Min]) + end + end, + Args). + check_type(Type, In, Out, Values) -> Column = "c_" ++ atom_to_list(Type), check_type(Type, In, Out, Values, Column).