Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres stream support #116

Closed
Globegitter opened this issue Sep 6, 2019 · 12 comments
Closed

postgres stream support #116

Globegitter opened this issue Sep 6, 2019 · 12 comments
Milestone

Comments

@Globegitter
Copy link

Globegitter commented Sep 6, 2019

With the reactive/vertx sql client it is possible to stream results straight out of the database like so:

import io.vertx.pgclient.PgPool

        val client = PgPool.pool("jdbc:postgresql://$host:$port/$database?user=$username&password=$password")
        client.getConnection {
            it.result().prepare("SELECT * FROM table") { rs ->
                val stream = rs.result().createStream(1, Tuple.tuple())
                stream.handler { row ->
                    row.getString("column1")
                }
            }
        }

I guess API wise, the simplest way would be to extend the DAO with functions like findManyByIdsStream(int fetch, Collection<T> ids) and then it must return some kind of Stream.

Being able to get a real stream straight out of the database has the potential to reduce memory usage and make things faster.

What do you think about that?

@Globegitter
Copy link
Author

I implemented one of these methods myself now (in Kotlin returning a Flow), but the problem I ran into is that it is not very easy to have a query like DSL.using(dao.queryExecutor().configuration()).selectFrom(someTable).where(someConfition) and get the prepared query and bind values from that as there are no public methods for this. So I had to reimplement them myself to be able to get to something like:

        val client = PgPool.pool("jdbc:postgresql://$host:$port/$database?user=$username&password=$password")
       val query = DSL.using(dao.queryExecutor().configuration()).selectFrom(someTable).where(someConfition)
        client.getConnection {
            it.result().prepare(toPreparedQuery(query)) { rs ->
                val stream = rs.result().createStream(1, getBindValues(query))
                stream.handler { row ->
                    val pojo = RowMappers.getMyMapper().apply(it)
                }
            }
        }

It would be nice if there was some easy way to turn a query into its respective prepared query and bind values, which would make it easier to implement streaming on our end for now but also make it more friendly to extension in general. Would be nice to get this in as a quick measure if you agree.

@Globegitter
Copy link
Author

I just looked into the unreleased 5.1.0 and it seems at least for some of the case I could now create a different solution so I would not need the protected methods. I will try and investigate all of our uses as soon as I have fully time for this.

@badgerwithagun
Copy link

For the RX client/async client, I ended up writing methods like this, which I'm trying to decide how to integrate as a PR into the generated DAOs

  Flowable<JsonArray> queryStream(SelectFinalStep<?> sql, JsonArray params) {
    return Flowable.create(emitter -> {
      sqlClient.queryStreamWithParams(sql.toString(), params, result -> {
        if (result.failed()) {
          emitter.onError(result.cause());
          return;
        }
        emitter.setCancellable(result.result()::rxClose);
        result.result()
            .exceptionHandler(emitter::onError)
            .endHandler(x -> emitter.onComplete())
            .handler(emitter::onNext);
      });
    }, BackpressureStrategy.BUFFER);
  }

I'd like to map it to the appropriate POJO on the way out.

@jklingsporn
Copy link
Owner

I am currently evaluating the implementation details. Because I had some questions of how to release all related resources in the right way, I've started a discussion in the vertx google group. Once this is sorted out I will start adding it to vertx-jooq (with a lag of one year 🕐 ).

@jklingsporn
Copy link
Owner

I added support for cursors and streams for the classic API. Unfortunately I wasn't able to finish the RX-API because of some errors I do not understand :D

This is what I have for the classic API and here is how you would use it.
Here is what I got for the RX-API so far (plus the failing tests).

Maybe @Globegitter or @Jotschi can help?

Also any comments on the actual API are appreciated, if it is useful like that, etc.

@jklingsporn
Copy link
Owner

This is btw the error I get for the RX implementation:

java.util.NoSuchElementException
	at io.reactivex.internal.operators.maybe.MaybeFlatMapSingle$FlatMapMaybeObserver.onComplete(MaybeFlatMapSingle.java:106)
	at io.reactivex.internal.operators.maybe.MaybeFromCompletable$FromCompletableObserver.onComplete(MaybeFromCompletable.java:76)
	at io.reactivex.internal.operators.single.SingleFlatMapCompletable$FlatMapCompletableObserver.onComplete(SingleFlatMapCompletable.java:102)
	at io.reactivex.internal.operators.maybe.MaybeIgnoreElementCompletable$IgnoreMaybeObserver.onComplete(MaybeIgnoreElementCompletable.java:79)
	at io.vertx.reactivex.impl.AsyncResultMaybe.lambda$subscribeActual$0(AsyncResultMaybe.java:52)
	at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
	at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
	at io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:230)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
	at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
	at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
	at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
	at io.vertx.core.impl.future.SucceededFuture.addListener(SucceededFuture.java:82)
	at io.vertx.core.impl.future.Composition.onSuccess(Composition.java:43)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
	at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
	at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
	at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
	at io.vertx.sqlclient.impl.TransactionImpl.lambda$txCommand$2(TransactionImpl.java:169)
	at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
	at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
	at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
	at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
	at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
	at io.vertx.pgclient.impl.PgSocketConnection.lambda$doSchedule$2(PgSocketConnection.java:149)
	at io.vertx.sqlclient.impl.command.CommandResponse.fire(CommandResponse.java:46)
	at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:258)
	at io.vertx.pgclient.impl.PgSocketConnection.handleMessage(PgSocketConnection.java:94)
	at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:96)
	at io.vertx.core.net.impl.NetSocketImpl.lambda$new$1(NetSocketImpl.java:97)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:240)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:130)
	at io.vertx.core.net.impl.NetSocketImpl.lambda$handleMessage$9(NetSocketImpl.java:390)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52)
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:294)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:389)
	at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:153)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:154)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:88)
	at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:139)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:235)
	at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:95)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

@jklingsporn
Copy link
Owner

I just figured out that there is an example in the vertx-docs of how to properly use the streaming-API together with RX-Java:

Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
  .rxBegin()
  .flatMapPublisher(tx ->
    conn
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapPublisher(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toFlowable();
      })
      .doAfterTerminate(tx::commit)));

@lorylo
Copy link

lorylo commented May 11, 2021

@jklingsporn I am wondering if you run in any problems with this. Using the above example I end up into the connection not being released to the pool. After all the pools are used (in my case I set it up to 5) everything just hangs as all the pools are being used ...

@jklingsporn
Copy link
Owner

You are right of course. I am now ended up with the following function and it seems to work (not sure about the onError-rollback-part):

public Flowable<io.vertx.reactivex.sqlclient.Row> rowStream(Function<DSLContext, ? extends Query> queryFunction, int fetchSize){
        Query query = createQuery(queryFunction);
        return ((Pool) delegate).rxGetConnection()
                .flatMapPublisher(conn -> conn
                    .rxBegin()
                    .flatMapPublisher(tx ->
                            conn
                                    .rxPrepare(toPreparedQuery(query))
                                    .flatMapPublisher(preparedQuery -> preparedQuery.createStream(fetchSize).toFlowable())
                                    .doOnComplete(tx::commit)
                                    .doOnError(x -> tx.rollback())
                                    .doAfterTerminate(conn::close) // result ignored
                    )
                );
    }

@lorylo
Copy link

lorylo commented May 11, 2021

@jklingsporn thanks for your reply ..

I was just wondering, shouldn't the tx::commit part take care to release the connection back to the pool?

I got very confused due to the lack of information on connection close/transaction commit from vertx documentation .. and therefore I am not sure if is indeed intended when using streams to close the connection explicitly. Do you think it might have any side effect? I tested on my environment and it seems it works (regarding the rollback I still have to test it)

@jklingsporn
Copy link
Owner

jklingsporn commented May 11, 2021

I was just wondering, shouldn't the tx::commit part take care to release the connection back to the pool?

Maybe, but it doesn't. For example, there is a withTransaction-method inside Pool which looks like this:

default <T> Future<@Nullable T> withTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
    return getConnection()
      .flatMap(conn -> conn
        .begin()
        .flatMap(tx -> function
          .apply(conn)
          .compose(
            res -> tx
              .commit()
              .flatMap(v -> Future.succeededFuture(res)),
            err -> {
              if (err instanceof TransactionRollbackException) {
                return Future.failedFuture(err);
              } else {
                return tx
                  .rollback()
                  .compose(v -> Future.failedFuture(err), failure -> Future.failedFuture(err));
              }
            }))
        .onComplete(ar -> conn.close()));
  }

It manually closes the connection in the end.

I got very confused due to the lack of information on connection close/transaction commit from vertx documentation .. and therefore I am not sure if is indeed intended when using streams to close the connection explicitly.

I agree, especially when they post incomplete examples (e.g. leaving out connection handling).

Do you think it might have any side effect? I tested on my environment and it seems it works (regarding the rollback I still have to test it)

I don't think it has side-effects, because doOnComplete and doAfterTerminate are only called when the stream is "over" so it should be safe to be called there.
What worries me more is, that the result of tx::commit, tx.rollback() and conn::close is more or less ignored, since all the doXYZ-methods use Actions as method arguments which are just Runnables and therefore ignore the result of those methods.

jklingsporn added a commit that referenced this issue May 12, 2021
@jklingsporn
Copy link
Owner

jklingsporn commented May 12, 2021

I decided to not add Cursor-Support for the RXJava-Api but only a Flowable<Row> method as it feels more natural. Here is how you would use it:

@Test
    public void queryFlowableShouldSucceed(){
        Something pojo1 = createWithId();
        Something pojo2 = createWithId();
        /*
         * Latch has to count down four times
         * - one for each processed item (2)
         * - when the items have been deleted (1)
         * - upon successful processing (1)
         */
        CountDownLatch completionLatch = new CountDownLatch(4);
        dao
                .insert(Arrays.asList(pojo1,pojo2))
                .map(res -> dao.queryExecutor()
                        .queryFlowable(
                                dslContext -> dslContext.selectFrom(generated.classic.reactive.regular.Tables.SOMETHING),
                                2
                        )
                        .subscribe(
//on next
                                row -> {
                                    //conveniently map it to a pojo
                                    Something mapped = RowMappers.getSomethingMapper().apply(row.getDelegate());
                                    Assert.assertNotNull(mapped);
                                    completionLatch.countDown();
                                },
//on error
                                Functions.ON_ERROR_MISSING,
//on complete
                                () -> dao.deleteByIds(Arrays.asList(pojo1.getSomeid(),pojo2.getSomeid()))
                                        .subscribe(i->completionLatch.countDown())
                                )

                )
                .subscribe(countdownLatchHandler(completionLatch));
        await(completionLatch);
    }

@jklingsporn jklingsporn added this to the 6.3.0 milestone May 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants