Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Schedulers.single() to avoid accidental thread co-location #190

Closed
dstepanov opened this issue Apr 21, 2023 · 33 comments
Closed

Use Schedulers.single() to avoid accidental thread co-location #190

dstepanov opened this issue Apr 21, 2023 · 33 comments
Labels
type: enhancement A general enhancement
Milestone

Comments

@dstepanov
Copy link

Bug Report

The issue was originally reported here micronaut-projects/micronaut-data#2136
I did debug a bit. It looks like some calculations aren't correct when min and max are the same, the pool only allows one connection. The example project has a reproducible example, setting maxSize bigger than initialSize makes the connections in parallel. This also explains some bad performances in TechEmpower/FrameworkBenchmarks because I configured the values with the same value.

@mp911de
Copy link
Member

mp911de commented Apr 24, 2023

I wasn't able to reproduce any computation issue regarding the pool size. The pool itself properly creates the configured number of connections:

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock)
		.initialSize(5)
		.maxSize(5)
		.build();
ConnectionPool pool = new ConnectionPool(configuration);

List<Connection> connections = new ArrayList<>();

for (int i = 0; i <5; i++) {
	pool.create().as(StepVerifier::create).consumeNextWith(connections::add).verifyComplete();
}
assertThat(connections).hasSize(5);

The event loop co-location is indeed an issue. It is the default setting in Reactor Netty, likely, this came from allocating channels for HTTP requests.

Let me know whether I missed something regarding the sizing.

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Apr 24, 2023
@dstepanov
Copy link
Author

It might have the correct size, but it only provides one connection.

You can run the example attached to the Micronaut Data issue and see that the queries are run sequentially.
I changed the code a bit to reproduce it:

     List<Flux<Item>> fluxes = List.of(
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded1")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded2")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded3")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded4")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded5"))
        );
        List<Mono<Long>> next = new ArrayList<>();
        for (Flux<Item> flux : fluxes) {
            next.add(
                    flux.count().doOnNext(number -> log.info("num:" + number))
            );
        }

        Flux.fromIterable(next).flatMap(x -> x).collectList().block();

@mp911de
Copy link
Member

mp911de commented Apr 24, 2023

And this makes totally sense because if connections are co-located on the same event-loop thread, then a single thread can only run things sequentially. If you warmup the pool (ConnectionPool.warmup().block()), then you see a more parallel behavior:

15:45:53.640 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.653 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:45:53.655 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded3
15:45:53.655 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - num:0
15:45:53.656 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:45:53.656 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded2

Without warmup:

15:46:49.414 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:46:49.431 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:46:49.432 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded1
15:46:49.433 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - num:0
15:46:49.433 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:46:49.435 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:46:49.435 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded2
15:46:49.435 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - num:0

@dstepanov
Copy link
Author

I didn't know that the pool needs to be initialized somehow, there is no mention of warmup in the readme.
Do you suggest that the warmup should be called when the pool is built by the container?

@PiotrDuz
Copy link

Let me just add a thought here:
Even if warmup helps here, I do not see it as a solution. We still will have more sockets than available threads, so some queries might get congested while other cores are available.
What do you guys think of just creating a slightly modified DefaultLoopResources class, where instead of .colocate on onClient() method we would return a new Nio / Epoll eventLoopGroup with maxThreads = maxSize of connection pool?
Regards

@mp911de
Copy link
Member

mp911de commented Apr 25, 2023

Warmup can be a slight workaround, but as mentioned by @PiotrDuz, it's by no means a fix. In an ideal world, the pool would be event-loop aware and hand out a connection that runs on the same thread.

In reality, we do not have the event loop information being exposed from the connections. Additionally, pools of that size would negatively affect database servers.

Disabling colocation is the only viable solution for long-lived connections. I already reached out to the Reactor Netty team for guidance and will share it once I have more details.

@mp911de
Copy link
Member

mp911de commented Apr 25, 2023

Since Reactor Netty 1.0.28, LoopResources can be created without co-location (see reactor/reactor-netty#2655). The Postgres driver allows already configuration of LoopResources that help disabling co-location while other drivers (like SQL Server) do not yet have that configuration option.

Setting LoopResources creates additional EventLoopGroups increasing the system resource usage, however.

@mp911de
Copy link
Member

mp911de commented Apr 25, 2023

During investigation with a customized LoopResources instance I found that the empty pool still behaves sequentially. I have no idea why this is, allocating a connection or warming the pool leads to the desired concurrency profile.

@PiotrDuz
Copy link

PiotrDuz commented Apr 25, 2023

How does your custom loop resource look like?
Here is the one that works for us:

@Override
    public EventLoopGroup onClient(boolean useNative) {
        return cacheLoops();
    }

EventLoopGroup cacheLoops() {
        EventLoopGroup eventLoopGroup = loops.get();
        if (null == eventLoopGroup) {
            EventLoopGroup newEventLoopGroup = createNewEventLoopGroup();
            if (!loops.compareAndSet(null, newEventLoopGroup)) {
                //"FutureReturnValueIgnored" this is deliberate
                newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
            }
            eventLoopGroup = cacheLoops();
        }
        return eventLoopGroup;
    }

    private NioEventLoopGroup createNewEventLoopGroup() {
        return new NioEventLoopGroup(threads, new ThreadPerTaskExecutor(new DefaultThreadFactory(THREAD_PREFIX)));
    }

where "threads" = pool max size

@1528110566
Copy link

1528110566 commented Apr 25, 2023

During investigation with a customized LoopResources instance I found that the empty pool still behaves sequentially. I have no idea why this is, allocating a connection or warming the pool leads to the desired concurrency profile.

@mp911de I found a problem when I calling warmup().subscibe() and findAll().block() in parallel, reactor/reactor-pool#170
Is this the cause of your problem?

@mp911de
Copy link
Member

mp911de commented Apr 25, 2023

@PiotrDuz there's a LoopResources.create(…) method taking a colocate boolean flag, introduced with Reactor Netty 1.0.28., It removes the need to implement LoopResources.

I additionally filed a ticket to reuse the default event loop groups with a LoopResources wrapper that doesn't apply colocation.

@dstepanov
Copy link
Author

What is the best way to inject that LoopResources.create?

@mp911de
Copy link
Member

mp911de commented Apr 25, 2023

For the Postgres driver, either via PostgresqlConnectionConfiguration.Builder#loopResources(…) or via the R2DBC config SPI:

ConnectionFactoryOptions.builder()
 .option(Option.valueOf("loopResources"), loopResources). …
 .build();

@samueldlightfoot
Copy link

@mp911de Is there an agreed fix R2DBC Postgres users can apply to avoid this? I've had a read through this thread but have found it difficult to piece together what exactly is needed.

@artemik
Copy link

artemik commented May 29, 2023

@dstepanov Big thanks for spotting the area of the cause! I've been struggling a lot (looking down to PG TCP packets) to find it until I noticed R2DBC is really using only a single thread like you said.

@mp911de I confirm to experience the same. This makes R2DBC Postgres driver 3 times slower compared to JDBC blocking driver. Setting different maxSize and initialSize settings makes it faster (very weird effect), however still ~1.6 slower than JDBC. @mp911de This is a serious major issue that makes R2DC drivers basically unusable for any production usage. This needs serious attention. I'm honestly surprised how r2dbc-pool library made it up to 1.0.0.RELEASE over 3 years without this being noticed by maintainers nor users. The benchmarks were even already there on TechEmpower, like @dstepanov mentioned, but it's also very easy to reproduce anyway. I assume it affects not only Postgres driver (which I was testing), since the issue is in the r2dbc-pool/netty.

My Reproduction

I have 4 (8) cores CPU i7-7700K. I do a single request to a WebFlux endpoint. Inside, it executes very simple SELECT (returning 1 record) 40 000 times concurrently. I.e. I create a list of 40 000 Monos of below:

databaseClient.sql("SELECT * FROM companies WHERE company_id = $1")
    .bind(0, currentId)
    .fetch()
    .first()
    .map(stringObjectMap -> ((Number) stringObjectMap.get("company_id")).longValue());

and execute them concurrently, calculating time spent once all Monos finish:

long timeStartNs = System.nanoTime();
return Mono.zip(dbCallMonos, objects -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - timeStartNs));

100 maxSize for R2DBC pool.

For JDBC, I do the above in a fixed thread pool, with the same size of 100 (matches the connection pool size in Hikari), with blocking Callable calls, waiting for all Futures to finish at the end.

Results:

  • Equal maxSize and initialSize - 4000ms. Only a single reactor-tcp-epoll-1 doing all the DB work (look at the 90% percentage value of the thread, on the left). equal-sizes-(slow)-4000ms
  • Different maxSize and initialSize - 2500ms. Better. Now there are 8 TCP threads, but still reactor-tcp-epoll-1 does 31% of the work, while the rest chill at 10%. different-sizes-(faster)-2500ms
  • JDBC - 1600ms. The fastest. All 100 threads are working evenly, each taking 1% of work. jdbc-(fastest)-1600ms

The above correlates with TPS I see in Postgres dashboard. JVM warmup was done prior. Everything on the latest version (Ubuntu, Spring, drivers, Postgres). Postgres is on a separate machine. The absolute latency values don't matter, since it will be different on each machine, only the relative difference is important.

40 000 calls are done inside the endpoint (as opposed to calling 40 000 times the endpoint), to eliminate any framework/http latencies. However, I have another test with Spring MVC (JDBC) and WebFlux (R2DBC) doing the same SELECT single time during endpoint call, and endpoints are bombarded with a benchmarking tool from another machine on the local network with 1024 concurrency. In this setup, there are 8 Spring threads and 8 reactor-tcp-epoll threads. I observe the same speeds like above (even a bit worse). As for the threads, here is the WebFlux R2DBC graph - reactor-tcp-epoll-1 does 15% of the work, with the rest doing only ~3.5%; Spring HTTP threads are evenly spread: different-sizes-1024-concurrency-(faster)

In Spring MVC JDBC there is Hibernate on top, but it makes no difference - I've done tests without it as well, same results. All records returned from DB are unique, so Hibernate has no advantage (instead, even makes things a bit slower, probably).

Setting colocate to false in LoopResources.create(…) didn't have any effect. The warmup() didn't help as well.

I am able to provide the full reproducibles with .jfr's, but it's very easy to reproduce.

@1528110566
Copy link

@artemik Thanks for your test. In my opinion, one single thread can not support high TPS if one single job(one DB query/update) is heavy. For example, when we using R2dbcRepository, framework help us do some serializations/deserializations or reflections to inject values, and they are heavy works. Setting colocate to false is a necessarily way to improve performance in some circumstances.
Sorry I can't provide any evidence for now.

@PiotrDuz
Copy link

PiotrDuz commented May 29, 2023

Hello,
I am the original author of an issue posted on micronaut-data project. It has been escalated from there by libraries mantainers.
Problem explanation is here:
micronaut-projects/micronaut-data#2136

So what is the fix for this problem? Having separate threads per connection socket.
Is colocate = false achieving that? Looking at the source it is not easy to tell. There is no clear, separate eventLoopGroup created for client. Aso the created loop has some complicated thread factory model.

I have proposed custom, simple solution. During our tests it seemed to fix the issue. A picture is worth more than words, so here is the code:

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.resources.LoopResources;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class NioClientEventLoopResources implements LoopResources {
    public static final String THREAD_PREFIX = "prefix-";
    final int threads;
    final AtomicReference<EventLoopGroup> loops = new AtomicReference<>();
    final AtomicBoolean running;

    NioClientEventLoopResources(int threads) {
        this.running = new AtomicBoolean(true);
        this.threads = threads;
    }


    @Override
    @SuppressWarnings("unchecked")
    public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
        return Mono.defer(() -> {
            long quietPeriodMillis = quietPeriod.toMillis();
            long timeoutMillis = timeout.toMillis();
            EventLoopGroup serverLoopsGroup = loops.get();
            Mono<?> slMono = Mono.empty();
            if (running.compareAndSet(true, false)) {
                if (serverLoopsGroup != null) {
                    slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully(
                            quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
                }
            }
            return Mono.when(slMono);
        });
    }

    @Override
    public boolean isDisposed() {
        return !running.get();
    }

    @Override
    public EventLoopGroup onClient(boolean useNative) {
        return cacheLoops();
    }

    @Override
    public EventLoopGroup onServer(boolean useNative) {
        throw new UnsupportedOperationException("This event loop is designed only for client DB calls.");
    }

    @Override
    public EventLoopGroup onServerSelect(boolean useNative) {
        throw new UnsupportedOperationException("This event loop is designed only for client DB calls.");
    }

    @Override
    public <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
        if (channelType.equals(SocketChannel.class)) {
            return (CHANNEL) new NioSocketChannel();
        }
        if (channelType.equals(ServerSocketChannel.class)) {
            return (CHANNEL) new NioServerSocketChannel();
        }
        if (channelType.equals(DatagramChannel.class)) {
            return (CHANNEL) new NioDatagramChannel();
        }
        throw new IllegalArgumentException("Unsupported channel type: " + channelType.getSimpleName());
    }

    @Override
    public <CHANNEL extends Channel> Class<? extends CHANNEL> onChannelClass(Class<CHANNEL> channelType,
            EventLoopGroup group) {
        if (channelType.equals(SocketChannel.class)) {
            return (Class<? extends CHANNEL>) NioSocketChannel.class;
        }
        if (channelType.equals(ServerSocketChannel.class)) {
            return (Class<? extends CHANNEL>) NioServerSocketChannel.class;
        }
        if (channelType.equals(DatagramChannel.class)) {
            return (Class<? extends CHANNEL>) NioDatagramChannel.class;
        }
        throw new IllegalArgumentException("Unsupported channel type: " + channelType.getSimpleName());
    }

    @SuppressWarnings("FutureReturnValueIgnored")
    EventLoopGroup cacheLoops() {
        EventLoopGroup eventLoopGroup = loops.get();
        if (null == eventLoopGroup) {
            EventLoopGroup newEventLoopGroup = createNewEventLoopGroup();
            if (!loops.compareAndSet(null, newEventLoopGroup)) {
                //"FutureReturnValueIgnored" this is deliberate
                newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
            }
            eventLoopGroup = cacheLoops();
        }
        return eventLoopGroup;
    }

    private NioEventLoopGroup createNewEventLoopGroup() {
        return new NioEventLoopGroup(threads, new ThreadPerTaskExecutor(new DefaultThreadFactory(THREAD_PREFIX)));
    }
}

This class should be used in
ConnectionFactoryOptions.option(PostgresqlConnectionFactoryProvider.LOOP_RESOURCES, new <our_class> )
to replace default eventLoopGroup.
Number of threads = max connections in r2dbc-pool config

@artemik can you run your tests once again, using above class?
Your tests seem more detailed so I am curious whether this example will work fine here too.

Of course this theoretically is slower than single thread handling many sockets, as we are inducing cpu context switching for each thread. But load balancing the eventLoopGroup seems to be too complicated to implement, and the cost of performance loss is negligible when compared to random slowness introduced by overloaded thread.

Regards

@artemik
Copy link

artemik commented Jun 12, 2023

@PiotrDuz Thanks, I've included it in my tests. I didn't check your class in details, but afaik it forces R2DBC to use the specified number of threads?

@mp911de I've cleaned up and prepared the reproducable: https://github.com/artemik/r2dbc-jdbc-vertx-issue-190

It tests more than just initialSize vs maxSize - it tests JDBC vs R2DBC vs Vertx (as another reactive driver example based on Netty) in standalone and WebFlux environments. Different environments produce different interesting results. Please check readme, it has all the details. I'll duplicate the key parts here.


Benchmark

There are 6 applications doing the same - run 500 000 SELECTs and measure the total time spent. Each SELECT returns the same (for max stability) single record by ID. All SELECTs are executed with concurrency equal to max DB connection pool size:

  • 3 standalone apps (one per R2DBC, JDBC and Vertx DB clients): command line app, prints the duration on each run and repeats it 50 times;
  • 3 web apps (one per R2DBC, JDBC and Vertx DB clients): endpoint /benchmark does a single run and returns duration in response. No automatic repeats - you call it and repeat manually.

Results

Note: In all R2DBC tests below, all 8 setup permutations were tested: with/without custom LoopResources; with/without ConnectionPool.warmup(); equal/non-equal initialSize and maxSize - they're displayed as one shared measurement, only those that differ are displayed separately.

Standalone

App Duration
JDBC 18 sec (baseline)
R2DBC Connection 18.3 sec (+1.5%)
R2DBC DatabaseClient 19.5 sec (+8.3%)
Vertx 18.1 sec (+0.5%)

Web App

App Duration
Spring MVC, JDBC 18 sec (baseline)
Spring WebFlux, R2DBC Connection 25.5 sec (+42% = 1.42 times)*
- *41.5 sec (+130% = 2.3 times) (without custom LoopResources; with (!) ConnectionPool.warmup())
- *41.5 sec (+130% = 2.3 times) (without custom LoopResources; without ConnectionPool.warmup(); equal initialSize and maxSize)
Spring WebFlux, R2DBC DatabaseClient 31.5 sec (+75% = 1.75 times)*
-*51 sec (+183% = 2.83 times) (without custom LoopResources; with (!) ConnectionPool.warmup())
-*51 sec (+183% = 2.83 times) (without custom LoopResources; without ConnectionPool.warmup(); equal initialSize and maxSize)
Spring WebFlux, Vertx 19 sec (+5.5%)

Conclusions

  1. First of all, in WebFlux, all R2DBC setups are slower than its standalone counterpart by at least ~40% (25.5 sec vs 18.3 sec).
  2. In WebFlux, without custom LoopResources, you MUST NOT run warmup(), otherwise things are slow.
  3. In WebFlux, the choice of equal vs non-equal initialSize and maxSize matters only when without custom LoopResources and without warmup - must be non-equal, otherwise things are slow. In all other cases, this choice doesn't matter.
  4. In standalone, all R2DBC settings perform the same.
  5. R2DBC is slower than JDBC by at least 42% in WebFlux, and by 1.5% in standalone.
  6. Vertx driver performs great in both WebFlux and standalone environments, and close to JDBC (especially in standalone). It doesn't seem to be affected by WebFlux like R2DBC.

Apparently, because Vertx also uses Netty but doesn't suffer from issues like R2DBC under WebFlux (and doesn't require any tricky settings), there should be a way to fix R2DBC.

@PiotrDuz
Copy link

PiotrDuz commented Jun 13, 2023

@artemik in this post: #190 (comment)
you have mentioned that in one test example only one thread was heavily utilised, colocate didnt help.
Can you confirm that custom LoopResources posted by me result in even thread utilisation?

Answering your question:
Yes, it forces r2dbc to use exactly one thread per open socket. As we would have maxSize of connection pool sockets open, thus we need maxSize number of threads.

Regards

@artemik
Copy link

artemik commented Jun 14, 2023

@PiotrDuz I used number_of_cores threads for your LoopResources, because otherwise making it equal to max connection pool size breaks the reactive sense, and in theory shouldn't give much more performance. I tried it - same performance as with number_of_cores.

Anyway, to your question - I confirm your LoopResources class forces the number of threads specified, and the utilization is quite even.

However:

  1. Based on the results, as you can see, the presence of LoopResources doesn't make things faster.
  2. In standalone version, R2DBC works fine even with a single thread. Even more so - just all R2DBC cases perform equally good in standalone.
  3. Vertx performs great in both standalone and Webflux, and it used a single thread. (I mean, I think I saw it using multiple threads sometimes, it's probably smart to auto increase them when needed, but even with a single thread it performed fast as in the benchmarks.)

It means that how many threads R2DBC is using doesn't seem to matter - R2DBC is just slow with WebFlux for some unclear reason. And maybe the collocation, which was originally assumed to be the cause in this ticket, is not the issue.


As for initialSize vs maxSize equality, which was also the original issue raised in this ticket - it seems to be true - your default setup is: without custom LoopResources; without ConnectionPool.warmup(); equal initialSize and maxSize - this is exactly the case where R2DBC shows the worst performance, as per benchmarks. The workaround is - use any other setup, for example start calling warmup, or use LoopResources, etc, but only don't use this combination - "without custom LoopResources; with (!) ConnectionPool.warmup()" - because surprisingly it works the worst as well. But let me highlight here - equality of initialSize and maxSize itself doesn't make things slow - for example, the case "without LoopResources, with warmup(), with equal sizes" works fine... And lastly, these all workarounds give you a better performance, but still much slower than JDBC or Vertx.


So two things to be fixed here:

  • fix that big drop in performance in special cases with WebFlux
  • fix the overall slow base performance with WebFlux, because based on standalone cases, it seems R2DBC can do better.

@PiotrDuz
Copy link

I think your benchmarks do not address the original problem of an issue.
You have many small queries which have plenty of time in between to overlap with others. But when you start a heavy query, then other colocated query might be heavily impacted. This can be seen as huge drop of performance while other cores are idling.
This is why I have found thread - per - socket approach to be a solution. I think by using colocation users just must be aware of drawbacks.

Your tests however are also helpful as they show other problems with r2dbc. But at this moment maybe it is worth to split topics in 2, here we could focus on colocation forcing queries on the same thread when other cores could be idle.
Your findings seem to be connected with other causes.

Am I getting it right?
Regards

@artemik
Copy link

artemik commented Jun 17, 2023

@PiotrDuz you're right that many small queries don't clearly show how concurrently connections are used. I've added additional tests.

Results (SELECT 100 records)

Standalone

Not tested. WebFlux results show it's not needed.

Web App

Both MVC JDBC and WebFlux Vertx: 51 sec (baseline).

R2DBC results are more diverse, so I provide a table specifically for it with all setups. I only tested DatabaseClient.
Note: for readability, + means with, - means without.

WebFlux R2DBC DatabaseClient Setup Duration
+ LoopResources, + warmup(), initialSize != maxSize 51 sec
+ LoopResources, + warmup(), initialSize == maxSize 51 sec
+ LoopResources, - warmup(), initialSize != maxSize 51 sec
+ LoopResources, - warmup(), initialSize == maxSize 51 sec
- LoopResources, + warmup(), initialSize != maxSize 111 sec (+117% = 2.17 times)
- LoopResources, - warmup(), initialSize != maxSize 111 sec (+117% = 2.17 times)
- LoopResources, + warmup(), initialSize == maxSize 120 sec (+135% = 2.35 times)
- LoopResources, - warmup(), initialSize == maxSize 120 sec (+135% = 2.35 times)

How to Interpret R2DBC Results

  1. Unlike with fast single-record selects, longer DB processing time of multi-record SELECTs probably makes R2DBC slowness itself unnoticeable, compared to the total processing time. So R2DBC has 4 setups where it performs normally (similarly to JDBC and Vertx) with 51 secs.
  2. Lack of custom LoopResources drops performance ~2.17 times:
    • and additionally having initialSize == maxSize drops it a bit more: ~2.35 times.
  3. R2DBC performs the worst in the default setup (-LoopResources, -warmup(), initialSize==maxSize).

Results (Connections concurrency / Threading)

In all the tests above, it was visible on DB side monitoring that R2DBC established all maxSize connections just fine, but it wasn't really clear how many of them were concurrently active, because the queries were too fast to build up concurrent processing on DB side.

You saw 4 cases with multi-record SELECTs where R2DBC performed as fast as JDBC and Vertx, but from single-record SELECT tests we know R2DBC is slower, so longer DB processing time of multi-record SELECTs just helps to hide R2DBC slowness itself. Moreover, those multi-record SELECTs were still too fast to simulate some heavy long query.

It all means that we need to make a test specifically for concurrent connections usage. To do that, I just modified the single-record SELECT to select pg_sleep(2) as well, to simulate 2 secs processing time. It means in ideal case, for 100 connections pool size, for example, all 100 sessions should become seen as active, proving that all connections are used. Let's see.

I also included observations on threading usage. It corresponds to what I saw in single/multi record SELECT tests as well.

Standalone and Web App

Vertx - all 100 connections were active in parallel. As for threading, it used only 1 thread, like in all previous benchmarks above as well, but it doesn't seem to cause performance issues.

R2DBC

  • Threads
    • with LoopResources: 8 threads (as many as specified, in my case it's CPU_CORES);
    • without LoopResources:
      • initialSize != maxSize: 2 threads;
      • initialSize == maxSize: 1 thread.
  • Connections
    • all 100 connections were active in parallel;
    • but, without warmup() R2DBC suffered on the first run - before all SELECT Monos were completed, it used only ~40 out of 100 connections (exact number was different on each app launch, but it remained unchanged during the first run), and TPS dropped accordingly. Only on the next run all 100 connections became in use.

How to Interpret R2DBC Results

  1. Because R2DBC underutilizes connections on first runs, it's a problem, for example, for specific cases where you have a cron job starting your app to run a batch of Mono queries in one go, and then shutdown - you'll must use warmup() in this case.
  2. Otherwise, connections usage is fine and I didn't see, for example, R2DBC being stuck with a single connection (regardless of how many threads were used).
  3. Threading - not clear whether it's an issue, because even with LoopResources R2DBC performed slow in single-record SELECTs for example. And also Vertx, for example, was able to perform fine even with 1 thread in all tests.
  4. It's not in the benchmarks, but in addition to triggering SELECTs from the single http request, I also separately tested triggering them from multiple http requests bombarded from a load testing tool, to simulate query calls triggered from different WebFlux threads - R2DBC showed the same threading usage like mentioned above.
    • Vertx, however, used multiple threads (~ CPU cores) just fine.

So to summarize:

  • WebFlux usage issue - yes;
  • threading issue - yes
  • connections concurrency issue - only first runs without warmup, otherwise no.

@dstepanov
Copy link
Author

@artemik Would you also mind including Micronaut V4 (latest milestone or RC) into the benchmark and see how it does behave vs. WebFlux? We may need some optimization too.

mp911de added a commit that referenced this issue Jul 11, 2023
Avoid co-location of event loops within the pool by default for drivers that use Reactor Netty with activated co-location.

[resolves #190]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
@mp911de mp911de added type: enhancement A general enhancement and removed status: waiting-for-feedback We need additional information before we can continue labels Jul 11, 2023
@mp911de mp911de changed the title Setting maxSize equal to initialSize makes pool use only single connection Use Schedulers.single() to avoid accidental thread co-location Jul 11, 2023
@mp911de mp911de added this to the 1.0.1.RELEASE milestone Jul 11, 2023
@mp911de
Copy link
Member

mp911de commented Jul 11, 2023

After a few iterations, the best we can do here is using a dedicated thread pool for subscribing (Schedulers.single) to the connection factory to prevent accidental co-location. In combination with the optimized pool warmup, the single-threaded issue should be gone.

There is no good place to solve this issue as Reactor Netty wants to keep their colocated default. R2DBC Pool isn't opinionated about the underlying driver nor its threading model, yet it doesn't make sense to have connections with colocated threads in a pool, so I guess we're the one now compensating for a default that makes only sense in certain client-request scenarios.

@artemik
Copy link

artemik commented Jul 13, 2023

@mp911de I think performance results (before/after) need to be provided from you here, before officially closing this issue.

P.S. @dstepanov I'll try checking Micronaut, if I have time.

@samueldlightfoot
Copy link

@mp911de Once we pick up the pool parallelism changes from reactor-pool, does it make sense to add this parallelism as a config property to ConnectionPoolConfiguration?

@mp911de
Copy link
Member

mp911de commented Jul 18, 2023

It makes sense to do something. Can you file a new ticket?

@samueldlightfoot
Copy link

#195

@agorbachenko
Copy link

agorbachenko commented Jul 25, 2023

Probably, It is time to replace my custom solution with an out-of-the-box one!

My custom fix was:

public class PostgresqlConnectionFactoryProvider implements ConnectionFactoryProvider {

    private final io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider delegate =
        new io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider();

    @Override
    public ConnectionFactory create(ConnectionFactoryOptions connectionFactoryOptions) {
        return new DelegatingConnectionFactory(delegate.create(connectionFactoryOptions)) {
            @Override
            public Mono<? extends Connection> create() {
                /* IMPORTANT: We have to create connection on dedicated thread, to ensure that connection
                wouldn't be created on eventLoop's reactor-tcp-<TRANSPORT>-X thread and bound to this
                thread because of this. Without this fix, database connections will be bound to the same
                eventLoop's thread causing performance bottleneck. See details at:
                https://gitter.im/reactor/reactor-netty?at=6145cdac99b7d9752817ad74.
                 */
                return super.create().publishOn(Schedulers.single());
            }
        };
    }

    @Override
    public boolean supports(ConnectionFactoryOptions connectionFactoryOptions) {
        return delegate.supports(connectionFactoryOptions);
    }

    @Override
    public String getDriver() {
        return delegate.getDriver();
    }
}

@artemik
Copy link

artemik commented Aug 16, 2023

Hi @PiotrDuz , @mp911de , while I understand the low performance cause (or one of them) is threads colocation, any idea why I didn't see any performance degradation in the standalone driver tests (exactly the same tests, but just without WebFlux)? Why does it become slow only when the series of queries is launched from a WebFlux thread? What kind of relation between r2dbc-pool and WebFlux might be causing that?

@mp911de
Copy link
Member

mp911de commented Aug 17, 2023

Colocation takes effect if a connection is obtained from a process that runs on a thread which is part of the colocated event loop.

Since WebFlux and most drivers use the same default event loop, any WebFlux thread requesting a database connection will create a connection that uses the request thread.

Does that make sense?

@artemik
Copy link

artemik commented Aug 17, 2023

@mp911de, "any WebFlux thread requesting a database connection will create a connection that uses the request thread" - From what I saw, the threads were created new (reactor-tcp-*) in addition to the WebFlux thread that started the database queries (reactor-http-*). But if you mean the cause is that all of them were on the same event loop causing the colocation, then it probably makes sense.

Thanks for clarification.

@artemik
Copy link

artemik commented Aug 17, 2023

Though @mp911de if colocation is default behaviour, I'm still confused why it doesn't happen in standalone environment, wouldn't we similarly have 8 threads handling all 100 connections (them being colocated as well)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

7 participants