Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

result filter/map capability #104

Merged
merged 5 commits into from
Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 190 additions & 45 deletions src/mysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
%% gen_server is locally registered.
-module(mysql).

-export([start_link/1, query/2, query/3, query/4, execute/3, execute/4,
-export([start_link/1, query/2, query/3, query/4, query/5,
execute/3, execute/4, execute/5,
prepare/2, prepare/3, unprepare/2,
warning_count/1, affected_rows/1, autocommit/1, insert_id/1,
encode/2, in_transaction/1,
Expand All @@ -45,7 +46,13 @@
Message :: binary()}.

-type column_names() :: [binary()].
-type rows() :: [[term()]].
-type row() :: [term()].
-type rows() :: [row()].

-type query_filtermap_fun() :: fun((row()) -> query_filtermap_res())
| fun((column_names(), row()) -> query_filtermap_res()).
-type query_filtermap_res() :: boolean()
| {true, term()}.

-type query_result() :: ok
| {ok, column_names(), rows()}
Expand Down Expand Up @@ -160,47 +167,75 @@ start_link(Options) ->
end,
Ret.

%% @doc Executes a query with the query timeout as given to start_link/1.
%%
%% It is possible to execute multiple semicolon-separated queries.
%%
%% Results are returned in the form `{ok, ColumnNames, Rows}' if there is one
%% result set. If there are more than one result sets, they are returned in the
%% form `{ok, [{ColumnNames, Rows}, ...]}'.
%%
%% For queries that don't return any rows (INSERT, UPDATE, etc.) only the atom
%% `ok' is returned.
%% @see query/5.
-spec query(Conn, Query) -> Result
when Conn :: connection(),
Query :: iodata(),
Result :: query_result().
query(Conn, Query) ->
query_call(Conn, {query, Query}).
query(Conn, Query, no_params, no_filtermap_fun, default_timeout).

%% @doc Depending on the 3rd argument this function does different things.
%%
%% If the 3rd argument is a list, it executes a parameterized query. This is
%% equivallent to query/4 with the query timeout as given to start_link/1.
%%
%% If the 3rd argument is a timeout, it executes a plain query with this
%% timeout.
%%
%% The return value is the same as for query/2.
%%
%% @see query/2.
%% @see query/4.
-spec query(Conn, Query, Params | Timeout) -> Result
%% @see query/5.
-spec query(Conn, Query, Params | FilterMap | Timeout) -> Result
when Conn :: connection(),
Query :: iodata(),
Timeout :: timeout(),
Params :: [term()],
Timeout :: default_timeout | timeout(),
Params :: no_params | [term()],
FilterMap :: no_filtermap_fun | query_filtermap_fun(),
Result :: query_result().
query(Conn, Query, Params) when is_list(Params) ->
query_call(Conn, {param_query, Query, Params});
query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
query_call(Conn, {query, Query, Timeout}).

%% @doc Executes a parameterized query with a timeout.
query(Conn, Query, Params) when Params == no_params;
is_list(Params) ->
query(Conn, Query, Params, no_filtermap_fun, default_timeout);
query(Conn, Query, FilterMap) when FilterMap == no_filtermap_fun;
is_function(FilterMap, 1);
is_function(FilterMap, 2) ->
query(Conn, Query, no_params, FilterMap, default_timeout);
query(Conn, Query, Timeout) when Timeout == default_timeout;
is_integer(Timeout);
Timeout == infinity ->
query(Conn, Query, no_params, no_filtermap_fun, Timeout).

%% @see query/5.
-spec query(Conn, Query, Params, Timeout) -> Result
when Conn :: connection(),
Query :: iodata(),
Timeout :: default_timeout | timeout(),
Params :: no_params | [term()],
Result :: query_result();
(Conn, Query, FilterMap, Timeout) -> Result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this multi-clause spec! Good job!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that this does not work with edoc... edoc will only generate documentation for the first clause :\

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bummer... the guys on #erlang say that's somewhat normal as edoc just doesn't seem to support the full extend of what -specs offer :( I opened an issue at bugs.erlang.org for this, but I don't think much will come of it anytime soon as it's not a pressing issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands, I see no alternative other than weakening the spec to:

-spec query(Conn, Query, Params | FilterMap, Timeout | FilterMap) -> Result
when Conn :: connection(),
Query :: iodata(),
Timeout :: default_timeout \| timeout(),
Params :: no_params \| [term()],
FilterMap :: no_filtermap_fun \| query_filtermap_fun(),
Result :: query_result().

which I am reluctant to do, since it is in fact incorrect as it suggests you can pass in two filtermap functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the good spec. It's more important than EDoc. I would rather drop EDoc if it's not good enough, but that can be done later. (What I do like about EDoc though is that you get the functions and types from the code for free.)

Can you post the link to the bug you opened for EDoc? I don't think it can be that hard to solve... (or is it?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sure, here you go: ERL-849. Not sure about how complicated it might be, have only glimpsed into the edoc code. edoc_specs seems to be the module to go for, I guess.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, ERL-849 has just been resolved as "won't fix"...

when Conn :: connection(),
Query :: iodata(),
Timeout :: default_timeout | timeout(),
FilterMap :: no_filtermap_fun | query_filtermap_fun(),
Result :: query_result();
(Conn, Query, Params, FilterMap) -> Result
when Conn :: connection(),
Query :: iodata(),
Params :: no_params | [term()],
FilterMap :: no_filtermap_fun | query_filtermap_fun(),
Result :: query_result().
query(Conn, Query, Params, Timeout) when (Params == no_params orelse
is_list(Params)) andalso
(Timeout == default_timeout orelse
is_integer(Timeout) orelse
Timeout == infinity) ->
query(Conn, Query, Params, no_filtermap_fun, Timeout);
query(Conn, Query, FilterMap, Timeout) when (FilterMap == no_filtermap_fun orelse
is_function(FilterMap, 1) orelse
is_function(FilterMap, 2)) andalso
(Timeout == default_timeout orelse
is_integer(Timeout) orelse
Timeout=:=infinity) ->
query(Conn, Query, no_params, FilterMap, Timeout);
query(Conn, Query, Params, FilterMap) when (Params == no_params orelse
is_list(Params)) andalso
(FilterMap == no_filtermap_fun orelse
is_function(FilterMap, 1) orelse
is_function(FilterMap, 2)) ->
query(Conn, Query, Params, FilterMap, default_timeout).

%% @doc Executes a parameterized query with a timeout and applies a filter/map
%% function to the result rows.
%%
%% A prepared statement is created, executed and then cached for a certain
%% time. If the same query is executed again when it is already cached, it does
Expand All @@ -209,40 +244,150 @@ query(Conn, Query, Timeout) when is_integer(Timeout); Timeout == infinity ->
%% The minimum time the prepared statement is cached can be specified using the
%% option `{query_cache_time, Milliseconds}' to start_link/1.
%%
%% The return value is the same as for query/2.
-spec query(Conn, Query, Params, Timeout) -> Result
%% Results are returned in the form `{ok, ColumnNames, Rows}' if there is one
%% result set. If there are more than one result sets, they are returned in the
%% form `{ok, [{ColumnNames, Rows}, ...]}'.
%%
%% For queries that don't return any rows (INSERT, UPDATE, etc.) only the atom
%% `ok' is returned.
%%
%% The `Params', `FilterMap' and `Timeout' arguments are optional.
%% <ul>
%% <li>If the `Params' argument is the atom `no_params' or is omitted, a plain
%% query will be executed instead of a parameterized one.</li>
%% <li>If the `FilterMap' argument is the atom `no_filtermap_fun' or is
%% omitted, no row filtering/mapping will be applied and all result rows
%% will be returned unchanged.</li>
%% <li>If the `Timeout' argument is the atom `default_timeout' or is omitted,
%% the timeout given in `start_link/1' is used.</li>
%% </ul>
%%
%% If the `FilterMap' argument is used, it must be a function of arity 1 or 2
%% that returns either `true', `false', or `{true, Value}'.
%%
%% Each result row is handed to the given function as soon as it is received
%% from the server, and only when the function has returned, the next row is
%% fetched. This provides the ability to prevent memory exhaustion; on the
%% other hand, it can cause the server to time out on sending if your function
%% is doing something slow (see the MySQL documentation on `NET_WRITE_TIMEOUT').
%%
%% If the function is of arity 1, only the row is passed to it as the single
%% argument, while if the function is of arity 2, the column names are passed
%% in as the first argument and the row as the second.
%%
%% The value returned is then used to decide if the row is to be included in
%% the result(s) returned from the `query' call (filtering), or if something
%% else is to be included in the result instead (mapping). You may also use
%% this function for side effects, like writing rows to disk or sending them
%% to another process etc.
%%
%% Here is an example showing some of the things that are possible:
%% ```
%% Query = "SELECT a, b, c FROM foo",
%% FilterMap = fun
%% %% Include all rows where the first column is < 10.
%% ([A|_]) when A < 10 ->
%% true;
%% %% Exclude all rows where the first column is >= 10 and < 20.
%% ([A|_]) when A < 20 ->
%% false;
%% %% For rows where the first column is >= 20 and < 30, include
%% %% the atom 'foo' in place of the row instead.
%% ([A|_]) when A < 30 ->
%% {true, foo}};
%% %% For rows where the first row is >= 30 and < 40, send the
%% %% row to a gen_server via call (ie, wait for a response),
%% %% and do not include the row in the result.
%% (R=[A|_]) when A < 40 ->
%% gen_server:call(Pid, R),
%% false;
%% %% For rows where the first column is >= 40 and < 50, send the
%% %% row to a gen_server via cast (ie, do not wait for a reply),
%% %% and include the row in the result, also.
%% (R=[A|_]) when A < 50 ->
%% gen_server:cast(Pid, R),
%% true;
%% %% Exclude all other rows from the result.
%% (_) ->
%% false
%% end,
%% query(Conn, Query, no_params, FilterMap, default_timeout).
%% '''
-spec query(Conn, Query, Params, FilterMap, Timeout) -> Result
when Conn :: connection(),
Query :: iodata(),
Timeout :: timeout(),
Params :: [term()],
Timeout :: default_timeout | timeout(),
Params :: no_params | [term()],
FilterMap :: no_filtermap_fun | query_filtermap_fun(),
Result :: query_result().
query(Conn, Query, Params, Timeout) ->
query_call(Conn, {param_query, Query, Params, Timeout}).
query(Conn, Query, no_params, FilterMap, Timeout) ->
query_call(Conn, {query, Query, FilterMap, Timeout});
query(Conn, Query, Params, FilterMap, Timeout) ->
query_call(Conn, {param_query, Query, Params, FilterMap, Timeout}).

%% @doc Executes a prepared statement with the default query timeout as given
%% to start_link/1.
%% @see prepare/2
%% @see prepare/3
%% @see prepare/4
%% @see execute/5
-spec execute(Conn, StatementRef, Params) -> Result | {error, not_prepared}
when Conn :: connection(),
StatementRef :: atom() | integer(),
Params :: [term()],
Result :: query_result().
execute(Conn, StatementRef, Params) ->
query_call(Conn, {execute, StatementRef, Params}).
execute(Conn, StatementRef, Params, no_filtermap_fun, default_timeout).

%% @doc Executes a prepared statement.
%% @see prepare/2
%% @see prepare/3
-spec execute(Conn, StatementRef, Params, Timeout) ->
%% @see prepare/4
%% @see execute/5
-spec execute(Conn, StatementRef, Params, FilterMap | Timeout) ->
Result | {error, not_prepared}
when Conn :: connection(),
StatementRef :: atom() | integer(),
Params :: [term()],
FilterMap :: no_filtermap_fun | query_filtermap_fun(),
Timeout :: default_timeout | timeout(),
Result :: query_result().
execute(Conn, StatementRef, Params, Timeout) when Timeout == default_timeout;
is_integer(Timeout);
Timeout=:=infinity ->
execute(Conn, StatementRef, Params, no_filtermap_fun, Timeout);
execute(Conn, StatementRef, Params, FilterMap) when FilterMap == no_filtermap_fun;
is_function(FilterMap, 1);
is_function(FilterMap, 2) ->
execute(Conn, StatementRef, Params, FilterMap, default_timeout).

%% @doc Executes a prepared statement.
%%
%% The `FilterMap' and `Timeout' arguments are optional.
%% <ul>
%% <li>If the `FilterMap' argument is the atom `no_filtermap_fun' or is
%% omitted, no row filtering/mapping will be applied and all result rows
%% will be returned unchanged.</li>
%% <li>If the `Timeout' argument is the atom `default_timeout' or is omitted,
%% the timeout given in `start_link/1' is used.</li>
%% </ul>
%%
%% See `query/5' for an explanation of the `FilterMap' argument.
%%
%% @see prepare/2
%% @see prepare/3
%% @see prepare/4
%% @see query/5
-spec execute(Conn, StatementRef, Params, FilterMap, Timeout) ->
Result | {error, not_prepared}
when Conn :: connection(),
StatementRef :: atom() | integer(),
Params :: [term()],
Timeout :: timeout(),
FilterMap :: no_filtermap_fun | query_filtermap_fun(),
Timeout :: default_timeout | timeout(),
Result :: query_result().
execute(Conn, StatementRef, Params, Timeout) ->
query_call(Conn, {execute, StatementRef, Params, Timeout}).
execute(Conn, StatementRef, Params, FilterMap, Timeout) ->
query_call(Conn, {execute, StatementRef, Params, FilterMap, Timeout}).

%% @doc Creates a prepared statement from the passed query.
%% @see prepare/3
Expand Down
49 changes: 26 additions & 23 deletions src/mysql_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,9 @@ init(Opts) ->
%% Query and execute calls:
%%
%% <ul>
%% <li>{query, Query}</li>
%% <li>{query, Query, Timeout}</li>
%% <li>{param_query, Query, Params}</li>
%% <li>{param_query, Query, Params, Timeout}</li>
%% <li>{execute, Stmt, Args}</li>
%% <li>{execute, Stmt, Args, Timeout}</li>
%% <li>{query, Query, FilterMap, Timeout}</li>
%% <li>{param_query, Query, Params, FilterMap, Timeout}</li>
%% <li>{execute, Stmt, Args, FilterMap, Timeout}</li>
%% </ul>
%%
%% For the calls listed above, we return these values:
Expand Down Expand Up @@ -154,15 +151,18 @@ init(Opts) ->
%% able to handle this in the caller's process, we also return the
%% nesting level.</dd>
%% </dl>
handle_call({query, Query}, From, State) ->
handle_call({query, Query, State#state.query_timeout}, From, State);
handle_call({query, Query, Timeout}, _From,
handle_call({query, Query, FilterMap, default_timeout}, From, State) ->
handle_call({query, Query, FilterMap, State#state.query_timeout}, From,
State);
handle_call({query, Query, FilterMap, Timeout}, _From,
#state{sockmod = SockMod, socket = Socket} = State) ->
setopts(SockMod, Socket, [{active, false}]),
{ok, Recs} = case mysql_protocol:query(Query, SockMod, Socket, Timeout) of
Result = mysql_protocol:query(Query, SockMod, Socket, FilterMap, Timeout),
{ok, Recs} = case Result of
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
kill_query(State),
mysql_protocol:fetch_query_response(SockMod, Socket, ?cmd_timeout);
mysql_protocol:fetch_query_response(SockMod, Socket, FilterMap,
?cmd_timeout);
{error, timeout} ->
%% For MySQL 4.x.x there is no way to recover from timeout except
%% killing the connection itself.
Expand All @@ -175,10 +175,11 @@ handle_call({query, Query, Timeout}, _From,
State1#state.warning_count > 0 andalso State1#state.log_warnings
andalso log_warnings(State1, Query),
handle_query_call_reply(Recs, Query, State1, []);
handle_call({param_query, Query, Params}, From, State) ->
handle_call({param_query, Query, Params, State#state.query_timeout}, From,
State);
handle_call({param_query, Query, Params, Timeout}, _From,
handle_call({param_query, Query, Params, FilterMap, default_timeout}, From,
State) ->
handle_call({param_query, Query, Params, FilterMap,
State#state.query_timeout}, From, State);
handle_call({param_query, Query, Params, FilterMap, Timeout}, _From,
#state{socket = Socket, sockmod = SockMod} = State) ->
%% Parametrized query: Prepared statement cached with the query as the key
QueryBin = iolist_to_binary(Query),
Expand Down Expand Up @@ -207,16 +208,17 @@ handle_call({param_query, Query, Params, Timeout}, _From,
case StmtResult of
{ok, StmtRec} ->
State1 = State#state{query_cache = Cache1},
execute_stmt(StmtRec, Params, Timeout, State1);
execute_stmt(StmtRec, Params, FilterMap, Timeout, State1);
PrepareError ->
{reply, PrepareError, State}
end;
handle_call({execute, Stmt, Args}, From, State) ->
handle_call({execute, Stmt, Args, State#state.query_timeout}, From, State);
handle_call({execute, Stmt, Args, Timeout}, _From, State) ->
handle_call({execute, Stmt, Args, FilterMap, default_timeout}, From, State) ->
handle_call({execute, Stmt, Args, FilterMap, State#state.query_timeout},
From, State);
handle_call({execute, Stmt, Args, FilterMap, Timeout}, _From, State) ->
case dict:find(Stmt, State#state.stmts) of
{ok, StmtRec} ->
execute_stmt(StmtRec, Args, Timeout, State);
execute_stmt(StmtRec, Args, FilterMap, Timeout, State);
error ->
{reply, {error, not_prepared}, State}
end;
Expand Down Expand Up @@ -382,14 +384,15 @@ code_change(_OldVsn, _State, _Extra) ->
%% --- Helpers ---

%% @doc Executes a prepared statement and returns {Reply, NextState}.
execute_stmt(Stmt, Args, Timeout, State = #state{socket = Socket, sockmod = SockMod}) ->
execute_stmt(Stmt, Args, FilterMap, Timeout,
State = #state{socket = Socket, sockmod = SockMod}) ->
setopts(SockMod, Socket, [{active, false}]),
{ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket,
Timeout) of
FilterMap, Timeout) of
{error, timeout} when State#state.server_version >= [5, 0, 0] ->
kill_query(State),
mysql_protocol:fetch_execute_response(SockMod, Socket,
?cmd_timeout);
FilterMap, ?cmd_timeout);
{error, timeout} ->
%% For MySQL 4.x.x there is no way to recover from timeout except
%% killing the connection itself.
Expand Down
Loading