Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow consuming arbitrary result segments (rows, out parameters, update count, messages) #27

Closed
lukaseder opened this issue Nov 27, 2018 · 27 comments · Fixed by #218
Closed
Labels
target: 0.9.x Issues we want to address with 0.9.x. type: enhancement A general enhancement
Milestone

Comments

@lukaseder
Copy link
Contributor

I'm assuming that if the statement's returning stream is finished, the Publisher returned by Statement#execute() will simply stop producing results.

But if there is a result, in order to distinguish whether it is an update count or a result set, should we simply try calling Result#getRowsUpdated(), and check if the resulting Publisher is empty (which would mean that there must be a result set)?

Maybe, it would be a bit nicer if there was a third method Result#getResultType() returning an enum that describes the result. Or, at least, specify the behaviour in the Javadoc

@mp911de
Copy link
Member

mp911de commented Nov 27, 2018

Publishers are one-time use only (similar to Java 8 Stream). With calling a method returning a Publisher, you indicate that you want to materialize a query using a specific projection. Subscribing to that Publisher will start executing whatever is required to get you that result.

That said, calling Statement#execute prepares SQL execution. Calling a method on Result starts consuming the server response in a specific way and as soon as the response is consumed, the Publisher signals completion. If you'd call an additional method on Result, there's no server response left to be processed.

Right now, Result#getRowsUpdated() are implemented to consider server frames reporting a count. If the server does not send a count, then we don't have a value to emit. I think it makes sense to get a standard behavior and make sure to emit a count value in any case.

I also think we should include bits of this answer into the Javadoc/spec doc.

@lukaseder
Copy link
Contributor Author

So, we cannot probe Result#getRowsUpdated() to see which kind of result we currently have? Then we definitely need a Result#getResultType() method, or some other mechanism

@mp911de
Copy link
Member

mp911de commented Nov 27, 2018

Exactly, if you consume rows, you can't get the update count and vice versa. I'm not sure whether a result type would work since reactive is all about deferring and streaming. So we would need to consume the response to inspect it but at the same time, we do not want to buffer responses so the response would be again consumed by peeking on the stream.

Can you shed a bit of light on the use case? In Spring Data R2DBC we basically have a similar situation. We have two cases where affected row count vs. rows plays a role:

  1. Usage of the client API executing arbitrary SQL: Client code decides whether it is interested in rows or the count.
  2. Repository usage: In the repository support we consume affected row count on DML queries and rows on SELECT queries.

@lukaseder
Copy link
Contributor Author

Well, the use case is simple, see also this discussion: https://groups.google.com/forum/#!topic/r2dbc/QZpTpQtj1HA

Some databases (e.g. SQL Server) allow for arbitrary statement batches, e.g.

DECLARE @t TABLE(i INT);
INSERT INTO @t VALUES (1),(2),(3);
RAISERROR('message 1', 16, 2, 3);
RAISERROR('message 2', 16, 2, 3);
SELECT * FROM @t
RAISERROR('message 3', 16, 2, 3);

It should produce a result like this:

Update Count: 3
Exception   : message 1
Exception   : message 2
Result      :
  i: 1
  i: 2
  i: 3
Exception   : message 3

Now, the behaviour of these batches are not necessarily constant / known in advance. For instance, you could wrap some SELECT in an IF, and if that IF gets skipped, then all the results will shift by one (or more). These batches can also be contained in any stored procedure, or even in a trigger (!).

There definitely needs to be some way to discover without knowing what kind of result we're getting at any given position. I know this is an edge case, even for SQL Server users. But I do think it's worth thinking about this before releasing 1.0.0.GA.

This is an SPI to be consumed by tools that are not necessarily in control of the SQL being executed, so those tools need to be able to discover the results entirely dynamically.

@mp911de
Copy link
Member

mp911de commented Nov 27, 2018

I think we should be able to allow result consumption and then asking for affected row counts. The other way round would not work. So you could call Result#getRowsUpdated() after Result#map but not Result#map after Result#getRowsUpdated().

@lukaseder
Copy link
Contributor Author

... in fact, if you do not want to peek on the stream, then the only solution I can see is to replace the current 2 methods on Result by a single one, that does everything:

public interface Result {
    <T> Publisher<T> map(
        Function<Integer, T> rowsUpdated,
        BiFunction<Row, RowMetadata, ? extends T> rowsFetched
    );
}

This will make it a bit harder to be forwards compatible, e.g. if stored procedure out parameter support will be added only in a later release. In that case, maybe a wrapper type for these functions might be useful:

public interface Result {
    interface Mapping<T> {
        Function<Integer, T> rowsUpdated();
        BiFunction<Row, RowMetadata, ? extends T> rowsFetched();
        Function<Object, T> outParameterFetched();
    }

    <T> Publisher<T> map(Mapping<T> mapping);
}

@lukaseder
Copy link
Contributor Author

I think we should be able to allow result consumption and then asking for affected row counts

It would be possible, but complicated to get right. This is what makes JDBC so messy, the fact that it is such a stateful implementation of a stream. In order to get things right, the correct order of method calls has to be observed, and that's quite hard.

Even if you say that this SPI shouldn't be used by clients directly, it will be, and there will be tons of questions by beginners. :)

@nebhale
Copy link
Member

nebhale commented Nov 27, 2018

Exactly, if you consume rows, you can't get the update count and vice versa.

Due to the poor documentation, it’s not clear, but this is not true. It may be PostgreSQL-specific (again, better specification) but you absolutely can peek and we do maintain a small amount of state to facilitate this. We .cache() the first three frames which includes the updated row count if it exists. Using filters in all the pertinent outputs, we can replay those first three frames, and any that follow to generate RowMetadatas, Rows, and update counts in parallel with each other. This would lead to behaviors like (and I’m freehanding, so forgive the likely non-compilation, you’ll get the idea):

result
  .rowsUpdated()
  .flatMap(count -> doSomethingWithCount(count))
  .switchIfEmpty(result.rows()
    .flatMap(row -> doSomethingWithRow(row)))

@mp911de
Copy link
Member

mp911de commented Nov 27, 2018

For SQL Server, we receive count along with the Done message which comes last (either with a direct response or with the consumption of the entire cursor) so for SQL server we can't easily cache and replay the entire stream.

@nebhale nebhale added the type: documentation A documentation update label Jan 18, 2019
@nebhale nebhale added this to the 1.0.0.M8 milestone Feb 1, 2019
@mp911de mp911de removed this from the 0.8.0.M8 milestone May 4, 2019
@elefeint
Copy link
Contributor

Have you considered having different API for select statements vs DML statements? ResultSet containing two almost mutually exclusive types of data (number of rows affected vs rows returned) seems to point at separate concerns being handled in one interface.

Having separate methods on Connection for createStatement() and createDmlStatement() would let the client application express intent, and then to receive a specific, expected kind of result back.

Also*, if R2DBC someday begins supporting streaming data into the database, it will need a separate method on Connection anyway, something like

StreamingStatement createStreamingStatement(Publisher<Query> queries);

*This is a complete hypothetical right now, as we have no way of supporting streaming into the database with Cloud Spanner.

@lukaseder
Copy link
Contributor Author

Having separate methods on Connection for createStatement() and createDmlStatement() would let the client application express intent, and then to receive a specific, expected kind of result back.

That may sound reasonable for most queries (on an 80/20 basis), but not for edge cases, which an SPI like R2DBC should support IMO. Have you seen my comment here: #27 (comment)

RDBMS like SQL Server support mixed result type statement batches, where a mix of result sets, update counts, and exceptions/messages/signals can result from a query execution.

On an SPI level, I'm not really fan of distinguishing between "types" of statements. On a higher level API level, such convenience API for the 80/20 case might definitely make sense.

@elefeint
Copy link
Contributor

@lukaseder I was kind of inspired by your comment :) Looking at the stored-proc-like syntax got me to wonder: what would/could a client do with such mixed output, other than print it. This makes a separate result type that knows what it is more useful -- instead of getting back a dozen Results, each of which would have to be checked for the presence of getRowsUpdated(), perhaps, similarly to your original request for Result#getResultType(), there could be different result classes in the first place.

createStatement() and createDmlStatement() sound like they would preclude such mixed sql, but since stored procedures are getting designed somewhere, I would assume the execution pathway for multiple mixed statements would be similar to those.

@lukaseder
Copy link
Contributor Author

what would/could a client do with such mixed output, other than print it

The main point of batching is always to reduce network traffic between client and server. If it is possible to send more "messages" in a single "envelope", we can improve throughput. Printing is a trivial use case of course, but composing messages for other use cases than printing them is surely desireable in general.

This was exceptionally difficult to get right with JDBC. I'm here to make sure it is less difficult with R2DBC - of course, without compromise on the much more popular use case of receiving only a single result set or a single update count.

createStatement() and createDmlStatement()

I understand the wish for distinction (it would have to focus on the nature of the outcome, not the statement type, as DML statements, assuming you mean insert/update/delete/merge, can also produce result sets via returning/output/data change delta table syntax, or triggers), but I still doubt the SPI needs to do it. An API building on top of it: Definitely. An example is this one: https://github.com/r2dbc/r2dbc-client

Perhaps, what you really wish for is to be able to provide a filter / predicate already when executing a statement.

This has been discussed elsewhere, among other places (I forgot?), here: https://groups.google.com/d/msg/r2dbc/12nq6d1l62Q/rPjStz5xBwAJ

That discussion on the group also evolves around out parameters (which can in turn be result sets, again). They're another way for a database to interleave a new type of result with the other ones.

@mp911de
Copy link
Member

mp911de commented Jun 1, 2019

Like Lukas mentioned, this SPI is intended for driver/client library communication. Neither of these components is aware which type of statement is going to be executed when passing-thru statements.
Postgres and SQL server report also the number of affected rows on SELECT statements. I'm not sure this assumption is true for other SQL databases, however, we see it as a by-product of queries.

Having this kind of split requires the client side to be aware of what type of SQL is going to be executed.

An INSERT statement can return tabular data when e.g. the database returns generated keys. Not sure if INSERT triggers could also return tabular data.

Regarding streaming into the database:

We entirely are missing the necessary infrastructure because most database protocols work command-oriented. If you had a stream source today, you would do the following:

Connection c = …;
Flux<String> stream = …;
stream.flatMap(it -> {
	return c.createStatement("INSERT INTO person VALUES($1)").bind("$1", it).execute();
}).flatMap(Result::getRowsUpdated)

@lukaseder
Copy link
Contributor Author

Not sure if INSERT triggers could also return tabular data.

In SQL Server, and since 12c in Oracle, yes, they can.

@mp911de
Copy link
Member

mp911de commented Jun 3, 2019

Thanks for confirming. This is yet another reason to keep the class structure as-is.

@elefeint
Copy link
Contributor

elefeint commented Jun 3, 2019

Thanks! Having the client-friendly layer above the driver makes sense.

For Cloud Spanner, we'll have to do query type detection to handle DML, since there is no API that supports every type of query we'd have to handle.

I'll stop hijacking this thread for now.

@lukaseder
Copy link
Contributor Author

I'm currently verifying what can be done with current versions of R2DBC. Looks like this is still an open issue?

  1. It seems the API is not yet ready to handle such cases.
  2. The interleaving of exceptions / warnings / signals between update counts and result sets hasn't been addressed at all, yet
  3. The SQL Server implementation doesn't even seem to get this case right:
System.out.println((
    Flux.from(connectionFactory.create())
        .flatMap(c -> c
            .createStatement("DECLARE @t TABLE(i INT);\n"
                + "INSERT INTO @t VALUES (1),(2),(3);\n"
                + "SELECT * FROM @t;\n")
            .execute())
        .flatMap(it -> {
            System.out.println("IT: " + it);
            return it.getRowsUpdated();
        })
        .collectList()
        .block()
));

The output being only a single update count:

IT: io.r2dbc.mssql.MssqlResult@3df5a869
[6]

I've created a new issue for this: r2dbc/r2dbc-mssql#196

@mp911de
Copy link
Member

mp911de commented Mar 31, 2021

This is still an open issue.

A challenge from what we've seen in drivers is to actually know what type of result something is. To know, whether it's an update-count only, drivers need to consume the entire response for Postgres and SQL Server (making sure there are no data rows). To determine whether something is an error/warning result, drivers need to consume the entire result to ensure there are no update counts or rows associated with the response.

In several cases, such an indicator would remove the streaming nature and that isn't something we're looking for.
Probably the best way out would something along the lines of your earlier proposal #27 (comment) to flatMap(…) incoming data segments (rows, output parameter, update count, error signals), ideally in combination with some possibility to apply a filter to skip unwanted elements.

@lukaseder
Copy link
Contributor Author

For context, I'm looking at the current implementation of MssqlResult. Simplified:

    public Mono<Integer> getRowsUpdated() {
        return this.messages
            .<Long>handle((message, sink) -> {
                if (message instanceof AbstractDoneToken) { ... }
                if (message instanceof ErrorToken) { ... }
    ...
    public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
        Assert.requireNonNull(f, "Mapping function must not be null");
        return this.messages
            .<T>handle((message, sink) -> {
                if (message.getClass() == ColumnMetadataToken.class) { ... }
                if (message.getClass() == RowToken.class || message.getClass() == NbcRowToken.class) { ... }
                if (message instanceof ErrorToken) { ... }
    ...

This looks exactly like what I'm thinking of here. Except that these instanceof calls or Class comparisons are distributed across two methods. Yes, that earlier proposal (#27 (comment)) still sounds like the way to go here.

It's not much more user-friendly than the JDBC version, but then again, this isn't something people do every day. In fact, it's hardly being done outside of SQL Server, Sybase, and very rarely MySQL, from what I've seen (I haven't noticed that Oracle's approach, using DBMS_SQL.RETURN_RESULT has been widely used since its introduction).

The cases where jOOQ needs to be able to distinguish between update counts and results, we already know the order of generated statements.

@mp911de
Copy link
Member

mp911de commented Mar 31, 2021

Do you want to come up with a pull request so we can discuss your proposal for R2DBC SPI 0.9?

@mp911de mp911de removed the type: documentation A documentation update label Mar 31, 2021
@lukaseder
Copy link
Contributor Author

Sure why not. Any guidelines of what a PR should include? Or is this for high level discussion?

@mp911de
Copy link
Member

mp911de commented Mar 31, 2021

Starting with the interface extensions and signal types would be a good starting point. I think we agree that there's a need for such an extension and design-wise we have a rough idea as well.

lukaseder added a commit to lukaseder/r2dbc-spi that referenced this issue Mar 31, 2021
Clients may not know in advance whether a Result contains update counts or row data. The current SPI design does not allow for checking the Result for its contents, nor for trying out both possibilities and reverting to the other if one fails.

This suggestion offers a solution to this problem:

- A new Mapping<T> type is added containing functions for the individual mappings. This type is important for forwards compatibility, such that additional types of results (e.g. out parameters or signals, exceptions, warnings) can be added later
- The new Mapping<T> allows for mapping between 1 element (update count, row) to 0-N elements via a Publisher<T>. This is useful to allow for skipping values directly on the individual Result level
- A new map(Mapping<T>) method is added as a breaking change
- Default implementations delegating to the new methods are added for the existing methods
lukaseder added a commit to lukaseder/r2dbc-spi that referenced this issue Mar 31, 2021
Clients may not know in advance whether a Result contains update counts or row data. The current SPI design does not allow for checking the Result for its contents, nor for trying out both possibilities and reverting to the other if one fails.

This suggestion offers a solution to this problem:

- A new Mapping<T> type is added containing functions for the individual mappings. This type is important for forwards compatibility, such that additional types of results (e.g. out parameters or signals, exceptions, warnings) can be added later
- The new Mapping<T> allows for mapping between 1 element (update count, row) to 0-N elements via a Publisher<T>. This is useful to allow for skipping values directly on the individual Result level
- A new map(Mapping<T>) method is added as a breaking change
- Default implementations delegating to the new methods are added for the existing methods

Signed-off-by: Lukas Eder <lukas.eder@gmail.com>
@lukaseder
Copy link
Contributor Author

I've created a PR without signal types yet (assuming this means warnings/exceptions/etc.): #207.

Looking forward to the discussion.

@lukaseder
Copy link
Contributor Author

While implementing a suggestion for #27 in #207, I couldn't help but wonder if the existing Result is well designed for the most general purposes. Since R2DBC is still in an early stage and with a 0.x release number, I wonder if breaking changes are still possible, knowing that for some changes, the ship might have already sailed.

Let's look at the inconsistency of this design:

public interface Result {
    Publisher<Integer> getRowsUpdated();
    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);
}

Observations:

  • Bikeshedding first. One method is very clear: getRowsUpdated(). The other one is simply called map(), when getRows() or getRowsFetched() would be much nicer, in my opinion. Especially, if other result types (OUT parameters or signals, warnings, exceptions) are to be streamed (see Allow consuming arbitrary result segments (rows, out parameters, update count, messages) #27), then map() is a bit too generic for my taste
  • map() offers a mapping function from R2DBC types to T. Why doesn't getRowsUpdated() offer such a function?
  • Once we discuss functions, throughout R2DBC when using reactor, I've grown used to everything that maps (or flatmaps) things to other things offering some sort of Function<T, Publisher<R>> mapping function to allow for handling the not so unreasonable cases of being able to map 1 element to 0, 1, or N elements right on the spot. Why not here? Why do we have to map every row to T? Why not allow for discarding rows right here, before we even fetch their contents, meta data, and allocate some custom wrapper record or POJO type for the data? In isolation, that might seem overengineered, but together with Allow consuming arbitrary result segments (rows, out parameters, update count, messages) #27, and the potential of adding new result types (OUT parameters or signals, warnings, exceptions), it may be desirable to discard output right when it arises, not afterwards in the consuming stream.

Regarding the continued evolution, I don't have a clear picture regarding OUT parameters. They're not really a uniform stream. But a stream is required if collection out parameters (e.g. cursors, arrays, etc.) are to be supported, because those are streams nested in the OUT parameter stream. But then again, Row.get() doesn't offer streaming either, so I don't know how to bring consistency to this topic (I remember having discussed this on the mailing list, and on a phone call years ago, but I forgot what the conclusion was)

@mp911de
Copy link
Member

mp911de commented Apr 1, 2021

I wonder if breaking changes are still possible, knowing that for some changes, the ship might have already sailed.

The chance to break the API was Ever Given since we introduced the 0.x versioning scheme. While we don't want to fully redesign R2DBC, we are free to change things.

The difference in design between both methods is based on how they are supposed to work. We didn't want to impose additional implementation/consumption complexity than already necessary. The row update count is a pure value being emitted eventually. There's no streaming of multiple values.

Row consumption is subject to streaming and typically, each row is associated with some resources (at least buffers). Therefore handing these out as Publisher<Row> would raise the question how to release resources after consuming these.

From what we've seen in drivers, consuming the updated rows count is the most-lightweight consumption of results. Row consumption (or avoiding consumption of rows) doesn't add a significant overhead as drivers need to decode metadata/row frames anyway to be able to parse the wire protocol. Additional overhead comes only with Row.get(…) into play where raw data is being fed into data decoders.

Why not allow for discarding rows right here, before we even fetch their contents

flatMap operators come typically with additional overhead.

The idea of Result filter(Predicate<IsItAUpdateCountRowOutParamOrError>) has been already around for a while and something like that could serve the purpose of early filtering. Another approach stems from a handler method that accepts a BiConsumer<Signal, Sink<T>>. Handlers are still lightweight and give more flexibility than map by emitting 0, 1, error or completing the stream. Emitting N isn't possible with a handler though.

Regarding the continued evolution, I don't have a clear picture regarding OUT parameters. They're not really a uniform stream. But a stream is required if collection out parameters (e.g. cursors, arrays, etc.) are to be supported

Having a mapping/flat-mapping operator that can handle out parameters seems to be a way out. Getting more insights into how one would like to consume out params would be indeed beneficial to come up with a proper design. In any case, collection out parameters such as arrays should work well with a map/flatMap design as the mapping function gets invoked only when the out parameter is fully materialized. I can only speak for Postgres regarding arrays. In Postgres, the entire row/parameter must be received and decoded by a client prior to handing in the value to the consumer. Not sure whether this applies to Oracle or other database systems. In any case, aggregation can be handled by the driver without blocking.

Regarding cursors, it seems that databases return a reference to a cursor that then needs to be fetched (see RefCursor). Cursors tend to remain valid until transaction cleanup. One should be able to escape the cursor object beyond the mapping function and fetch the result thereafter.

@lukaseder
Copy link
Contributor Author

Alright, thanks for the explanations. I will try to think about this stuff again next week to see what a client like jOOQ would expect from the SPI to be able to read all sorts of OUT parameters. For most cases, consuming them all in one go doesn't seem unreasonable, so a dedicated API like Row could do the trick. In a way, OUT parameters do form a "Row", and PostgreSQL even pretends they are:

create or replace function f (a out int, b out int)
as 
$$
begin
  a := 1;
  b := 2;
end;
$$
language plpgsql;

select * from f()

Resulting in:

|a  |b  |
|---|---|
|1  |2  |

It's both weird and elegant 😀.

Other RDBMS are not like that, but they could be. So, perhaps, the existing map(BiFunction<Row, RowMetadata, T>) API could also be used to map OUT parameters to some suitable container? I have mixed feelings (because they're similar but not exactly the same thing), but I think it could work for most RDBMS.

A REF CURSOR isn't so much different when it is returned as OUT parameter or as a nested collection in an ordinary result set.

@mp911de mp911de linked a pull request Jun 23, 2021 that will close this issue
mp911de added a commit that referenced this issue Jun 23, 2021
R2DBC now specifies OutParameters as interface for consumption of out parameters. Additionally, Readable and ReadableMetadata are defined as top-level interfaces for Row and OutParameters respective their metadata types.

Signed-off-by: Mark Paluch <mpaluch@vmware.com>

[resolves #27]][#218]
mp911de added a commit that referenced this issue Jun 23, 2021
Allow consumption of rows and out parameters by accepting Function<Readable, T>.

Signed-off-by: Mark Paluch <mpaluch@vmware.com>

[#27][#218]
mp911de added a commit that referenced this issue Jun 23, 2021
Allow consumption of rows and out parameters by accepting Function<Readable, T>.

Signed-off-by: Mark Paluch <mpaluch@vmware.com>

[#27][#218]
@mp911de mp911de added this to the 0.9.0.M2 milestone Jun 23, 2021
@mp911de mp911de added the type: enhancement A general enhancement label Jun 23, 2021
mp911de added a commit that referenced this issue Jul 1, 2021
We now allow Result filtering for individual response segments of a query that are represented within the Segment type hierarchy.

Also, refine behavior of getRowsUpdated() and map(…) regarding error signal emission.

Signed-off-by: Mark Paluch <mpaluch@vmware.com>

[#27][resolves #215]
@mp911de mp911de changed the title Better specification in io.r2dbc.spi.Result needed Allow consuming arbitrary result segments (rows, out parameters, update count, messages) Jul 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
target: 0.9.x Issues we want to address with 0.9.x. type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants