Skip to content

Fetch rows never completes #401

@Leuteris

Description

@Leuteris

Bug Report

Versions

  • Driver: 0.8.6.RELEASE, 0.8.7.RELEASE
  • Database: PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
  • Java: Java(TM) SE Runtime Environment (build 16+36-2231)
  • OS: Ubuntu 18.04.5 LTS

Current Behavior

I have created the following method that fetches some rows from database and returns a Flux of objects. When the number of rows returned by the query are e.g. 1200 or 15.000.000, Flux never emits the onComplete signal. Here's a sample code:

public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
	ConnectionFactory connectionFactory = getConnectionFactory(config.getDatabaseDetails());
	return Flux.usingWhen(connectionFactory.create(),
			connection -> {
				return Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1200")
						.execute()).flatMap(result -> result.map(this::toProfile));
			}, Connection::close)
                        .log();//never emits the onComplete signal
}

When adding a .log() call to the chain I see 256 request(1) before the stream stops. I would expect to see the onComplete signal:

Logs
2021-05-04 12:34:58,015  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - onSubscribe(FluxUsingWhen.ResourceSubscriber)
2021-05-04 12:34:58,015  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - request(256)
2021-05-04 12:34:58,091  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=1953401825, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
 .
 .
 .
2021-05-04 12:34:58,753  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=9095453157, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
2021-05-04 12:34:58,754  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,756  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,757  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,757  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,757  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,758  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,758  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,759  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,760  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,760  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,760  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,761  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,761  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,761  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,763  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,763  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,764  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,764  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,765  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,765  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,765  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,767  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,767  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,767  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,768  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,768  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,768  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,769  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,772  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,772  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,772  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,773  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,773  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,773  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,774  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,774  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,775  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,775  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,777  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,780  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,780  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,781  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,782  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,784  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,784  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,785  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,786  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,786  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,786  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,789  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,789  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,790  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,791  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,791  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,793  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,793  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,794  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,796  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,796  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,797  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,797  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,797  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,798  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,798  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,798  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,799  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,800  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,805  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,807  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,807  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,808  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,808  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,809  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,810  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,812  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,813  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,813  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,813  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,814  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,817  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,817  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,820  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,820  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,821  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,822  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,822  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,823  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,823  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,824  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,824  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,824  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,825  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,825  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,825  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,826  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,826  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,827  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,830  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,832  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,834  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,834  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,838  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,838  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,839  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,840  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,840  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,844  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,848  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,850  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,850  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,850  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,853  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,853  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,853  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,855  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,855  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,855  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,857  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,857  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,857  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,859  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,859  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,860  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,860  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,862  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,863  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,863  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,864  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,864  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,864  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,866  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,866  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,867  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,867  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,867  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,868  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,868  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,868  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,869  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,870  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,872  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,873  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,873  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,873  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,874  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,876  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)

If I change the limit to 1201 instead of 1200, Flux emits the onComplete signal as expected:

public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
	ConnectionFactory connectionFactory = getConnectionFactory(config.getDatabaseDetails());
	return Flux.usingWhen(connectionFactory.create(),
			connection -> {
				return Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1201")
						.execute()).flatMap(result -> result.map(this::toProfile));
			}, Connection::close)
                        .log();//emits the onComplete signal
}
Logs
2021-05-04 12:41:35,307  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - onSubscribe(FluxUsingWhen.ResourceSubscriber)
2021-05-04 12:41:35,307  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - request(256)
2021-05-04 12:41:35,362  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=3389648496, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
.
.
.
2021-05-04 12:41:35,982  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=9637753866, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
2021-05-04 12:41:35,996  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onComplete()

Table schema

Input Code
create table if not exists test_r2dbc.users
(
	"userId" integer,
	attr1 integer,
	attr2 integer,
	attr3 integer,
	attr4 integer,
	attr5 integer,
	attr6 integer,
	attr7 integer,
	attr8 integer,
	attr9 integer,
	attr10 integer,
	attr11 text,
	attr12 text,
	attr13 text,
	attr14 text,
	attr15 text,
	attr16 text,
	attr17 text,
	attr18 text,
	attr19 text,
	attr20 text
);

Steps to reproduce

Input Code
	public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
		ConnectionFactory connectionFactory = getConnectionFactory(config.getDatabaseDetails());
		return Flux.usingWhen(connectionFactory.create(),
				connection -> {
					return Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1200")
							.execute()).flatMap(result -> result.map(this::toProfile));
				}, Connection::close)
				.log();
	}

	private UserProfile toProfile(Row row, RowMetadata rowMetadata) {
		Map<String, Object> attributes = mapAttributes(row, rowMetadata);
		return UserProfile.builder()
				.identifier(row.get("userId", String.class))
				.attributes(attributes)
				.build();
	}

	private Map<String, Object> mapAttributes(Row row, RowMetadata rowMetadata) {
		Map<String, Object> columns = new HashMap<>();
		rowMetadata.getColumnNames()
				.forEach(columnName -> {
					Object columnValue = row.get(columnName);
					if (columnValue != null) {
						columns.put(columnName, columnValue);
					}
				});
		columns.remove("userId");
		return columns;
	}

	private ConnectionFactory getConnectionFactory(DatabaseDetails details) {
		return ConnectionFactories.get(ConnectionFactoryOptions.builder()
				.option(ConnectionFactoryOptions.DRIVER, details.getDriver())
				.option(ConnectionFactoryOptions.HOST, details.getHost())
				.option(ConnectionFactoryOptions.PORT, details.getPort())
				.option(ConnectionFactoryOptions.USER, details.getUser())
				.option(ConnectionFactoryOptions.PASSWORD, details.getPassword())
				.option(ConnectionFactoryOptions.DATABASE, details.getDatabase())
				.build());
	}

	public Mono<Void> synchronizeProfiles(RelationalDBJobConfiguration jobConfig) {
		Flux<UserProfile> profileFlux = fetchProfiles(jobConfig);
		return submitProfiles(profileFlux) //submit items to external system using RSocket
				.then();
	}

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions