Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize pool warmup #171

Merged
merged 9 commits into from
Jul 9, 2023
Merged

Optimize pool warmup #171

merged 9 commits into from
Jul 9, 2023

Conversation

pderop
Copy link
Member

@pderop pderop commented Jun 22, 2023

Motivations:

From reactor/reactor-netty#2781 issue, there is a performance problem when
using r2dbc drivers configured with spring.r2dbc.pool.initial-size > 1.

For example, with spring.r2dbc.pool.initial-size=4, and spring.r2dbc.pool.max-size=10, the first four db connections acquired during warmup will actually use the same TcpResource event loop thread.

Worsed situation is when initial-size == max-size, in this case all db connections will actually use the same TcpResource event loop thread.

This is related to Reactor Netty TcpResources, which are by default colocated.

While it's possible to create custom LoopResources in Reactor Netty without colocation, the #2781 issue requests for the ability to reuse the shared TcpResource EventLoopGroup without colocation instead of creating a custom LoopResource without coloc for each drivers, like Mardiadb, Postgres.

whilst this change request is making sense, I tend to think that before doing this in reactor-netty, it might be worth to address the problem in reactor pool: indeed the primary root cause of the problem is that during pool warmups (either when calling pool.warmup(), or when pool.initial-size is > 1), the reactor-pool uses some Reactor operators like Flux.concat which sequentially subscribe to the first source then wait for it to complete before subscribing to the next. This leads to a situation where during warmup, a first DB connection is created from a random TcpResource EventLoop thread, and once this first DB Connection completes (from the TcpResource EventLoop thread), then the pool will then acquire another DB Connection ... but from the TcpResource thread ; so in this case, colocation will then reuse the same TcpResource event loop thread for all remaining DB connections to be acquired during warmup process.

Modifications:

  1. In SimpleDequeuePool.warmup(), at the end of the method, replace Flux.concat by Flux.merge, because unlike Flux.concat, Flux.merge eagerly subscribes to all sources from the current thread, and this will then ensure that we won't allocate a DB connection from one of the TcpResource event loops, so this is good because colocation will not happen

  2. in SimpleDequeuePool.drainloop(), at the end of the method, replace:

		Flux<Void> warmupFlux = Flux.range(1, toWarmup)
				.flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator));

		primary.onErrorResume(e -> Mono.empty())
				.thenMany(warmupFlux)
				.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);

by:

		Flux<POOLABLE> warmupFlux = Flux.range(1, toWarmup)
				.flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator));

		Flux.mergeSequential(toWarmup + 1, primary, warmupFlux)
				.onErrorResume(e -> Mono.empty())
				.subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain());

Same thing here: Flux.mergeSequential is used in order to eagerly subscribe to the primary, as well as to all warmupFlux from the current thread. It will avoid any subscriptions done from within an TcpResource eventloop thread, hence will avoid any colocation to take place. Note that mergeSequential is used here because we want to ensure that it will be the primary that will complete first.

Note that the poolable -> drain()has been added in the subscribe: this allows to fix a performance issue when running the sample from r2dbc/r2dbc-pool#190 (comment): indeed, even when using a custom LoopResource without colocation, only two event loop threads will actually be used. Explanation: before, the primary was first delivering a connection to one of the borrowers , and then only one "drain" method was called after the warmupFlux completed. So, only two borrowers delivered.

  1. the borrower is now scheduled using the acquisition scheduler, if any, see here.

So, if Mariadb driver is configure with an acquisition scheduler, this will ensure that pool acquisition is never done from any TcpResource event loop thread, hence this will enforce avoidance of colocation.

  1. Added a PoolWarmupTest test, which reproduces the same issued described in reactor-netty #2781 (see Use Schedulers.single() to avoid accidental thread co-location r2dbc/r2dbc-pool#190 (comment)).

When running the test, you will see all DB Connections thread are used:

Jun 22 2023 18:40:48.449 [dbthread-5] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 6-findAll done
Jun 22 2023 18:40:48.449 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 1-findAll done
Jun 22 2023 18:40:48.449 [dbthread-2] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 2-findAll done
Jun 22 2023 18:40:48.449 [dbthread-0] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 4-findAll done
Jun 22 2023 18:40:48.449 [dbthread-4] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 9-findAll done
Jun 22 2023 18:40:48.449 [dbthread-7] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 5-findAll done
Jun 22 2023 18:40:48.449 [dbthread-6] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 7-findAll done
Jun 22 2023 18:40:48.449 [dbthread-8] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 8-findAll done
Jun 22 2023 18:40:48.449 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 3-findAll done
Jun 22 2023 18:40:48.449 [dbthread-3] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 10-findAll done
Jun 22 2023 18:40:48.451 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.450 [dbthread-5] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.450 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.451 [dbthread-0] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.451 [dbthread-2] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.451 [dbthread-4] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.451 [dbthread-7] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.451 [dbthread-6] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.451 [dbthread-8] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.452 [dbthread-3] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:40:48.453 [Test worker] INFO  r.p.PoolWarmupTest [Loggers.java:274] - Elapsed time: 1387

Notice that if you remove `poolable -> drain() in the first parameter of the subscribe, at the end of the SimpleDequeuePool.drainloop() method like this:

		Flux<POOLABLE> warmupFlux = Flux.range(1, toWarmup)
				.flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator));

		Flux.merge(toWarmup + 1, primary, warmupFlux)
				.onErrorResume(e -> Mono.empty())
				.subscribe(poolable -> {}, alreadyPropagatedOrLogged -> drain(), () -> drain());

then only two threads will actually be used when you run the PoolWarmupTest:

Jun 22 2023 18:42:34.739 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 1-findAll done
Jun 22 2023 18:42:34.739 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 2-findAll done
Jun 22 2023 18:42:34.740 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:34.741 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:36.020 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 3-findAll done
Jun 22 2023 18:42:36.020 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 4-findAll done
Jun 22 2023 18:42:36.021 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:36.021 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:37.352 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 5-findAll done
Jun 22 2023 18:42:37.353 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:37.355 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 6-findAll done
Jun 22 2023 18:42:37.355 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:38.691 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 7-findAll done
Jun 22 2023 18:42:38.693 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:38.691 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 8-findAll done
Jun 22 2023 18:42:38.696 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:40.000 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 10-findAll done
Jun 22 2023 18:42:40.000 [dbthread-1] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:40.001 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - : 9-findAll done
Jun 22 2023 18:42:40.001 [dbthread-9] INFO  r.p.PoolWarmupTest [Loggers.java:274] - num:1000
Jun 22 2023 18:42:40.001 [Test worker] INFO  r.p.PoolWarmupTest [Loggers.java:274] - Elapsed time: 6689
  1. Adapted CommonPoolTest.recordsAllocationLatenciesInWarmup:

This test was expected only two total allocations, because the second allocation was failing. But now, all resources allocation are attempted, so slightly adapted the test in order to expect at least 6 allocations (5 successful acquisitions, and at least one failed acquisition).

  1. Added some synchronize in TestUtils.InMemoryPoolMetrics methods

while doing the PR, I got some concurrency issues related to the TestUtils.InMemoryPoolMetrics, and I think the methods need to be synchronized.

Results:

So, using this patch, I could verify that all threads are used when running the scenario from r2dbc/r2dbc-pool#190 (comment).

Did also some Gatling tests with a webflux reactor-netty that is retrieving 1000 entries from a locally installed Mariadb database:

    @GetMapping("/findAll")
    public Mono<Void> findAll() {
        return template.select(Task.class).all()
                .then();
    }

here are the results with spring.r2dbc.pool.initial-size=10, spring.r2dbc.pool.max-size=10

  • without reactor-pool patch and without any custom LOOP_RESOURCES created with colocation disabled:

results are very bad (only the reactor-tcp-kqueue-2 is used):

================================================================================
---- Global Information --------------------------------------------------------
> request count                                      52578 (OK=52578  KO=0     )
> min response time                                      5 (OK=5      KO=-     )
> max response time                                    618 (OK=618    KO=-     )
> mean response time                                   328 (OK=328    KO=-     )
> std deviation                                        178 (OK=178    KO=-     )
> response time 50th percentile                        335 (OK=335    KO=-     )
> response time 75th percentile                        498 (OK=498    KO=-     )
> response time 95th percentile                        574 (OK=574    KO=-     )
> response time 99th percentile                        608 (OK=608    KO=-     )
> mean requests/sec                                  876.3 (OK=876.3  KO=-     )
---- Response Time Distribution ------------------------------------------------
> t < 800 ms                                         52578 (100%)
> 800 ms <= t < 1200 ms                                  0 (  0%)
> t >= 1200 ms                                           0 (  0%)
> failed                                                 0 (  0%)
================================================================================

and indeed, only the reactor-tcp-kqueue-2 is used:

"reactor-tcp-kqueue-5"	cpu=1.09ms
"reactor-tcp-kqueue-10"	cpu=1.13ms
"reactor-tcp-kqueue-7"	cpu=1.18ms
"reactor-tcp-kqueue-3"	cpu=1.19ms
"reactor-tcp-kqueue-6"	cpu=1.25ms
"reactor-tcp-kqueue-4"	cpu=1.32ms
"reactor-tcp-kqueue-9"	cpu=1.38ms
"reactor-tcp-kqueue-1"	cpu=2.60ms
"reactor-tcp-kqueue-8"	cpu=2.76ms
"reactor-http-kqueue-10"	cpu=575.08ms
"reactor-http-kqueue-9"	cpu=587.08ms
"reactor-http-kqueue-8"	cpu=609.02ms
"reactor-http-kqueue-7"	cpu=621.89ms
"reactor-http-kqueue-1"	cpu=629.48ms
"reactor-http-kqueue-6"	cpu=642.70ms
"reactor-http-kqueue-5"	cpu=646.50ms
"reactor-http-kqueue-4"	cpu=649.29ms
"reactor-http-kqueue-3"	cpu=671.03ms
"reactor-http-kqueue-2"	cpu=763.32ms
"reactor-tcp-kqueue-2"	cpu=58786.43ms
  • without reactor-pool patch and with a custom LOOP_RESOURCES created with colocation disabled:

performances are good, and all "custom" threads are used:

@Component
public class R2DBCOptionsCustomizer implements ConnectionFactoryOptionsBuilderCustomizer {
    static final Logger log = LoggerFactory.getLogger(R2DBCOptionsCustomizer.class);

    @Override
    public void customize(ConnectionFactoryOptions.Builder builder) {
        LoopResources loopResources = LoopResources.create("custom", -1, 10, false, false);
        builder.option(MariadbConnectionFactoryProvider.LOOP_RESOURCES, loopResources);
    }
}

================================================================================
---- Global Information --------------------------------------------------------
> request count                                     128113 (OK=128113 KO=0     )
> min response time                                      4 (OK=4      KO=-     )
> max response time                                    319 (OK=319    KO=-     )
> mean response time                                   135 (OK=135    KO=-     )
> std deviation                                         71 (OK=71     KO=-     )
> response time 50th percentile                        138 (OK=138    KO=-     )
> response time 75th percentile                        202 (OK=202    KO=-     )
> response time 95th percentile                        226 (OK=226    KO=-     )
> response time 99th percentile                        285 (OK=285    KO=-     )
> mean requests/sec                                2135.217 (OK=2135.217 KO=-     )
---- Response Time Distribution ------------------------------------------------
> t < 800 ms                                        128113 (100%)
> 800 ms <= t < 1200 ms                                  0 (  0%)
> t >= 1200 ms                                           0 (  0%)
> failed                                                 0 (  0%)
================================================================================

all custom threads used:

"reactor-http-kqueue-10"	cpu=1807.30ms
"reactor-http-kqueue-9"	cpu=1840.75ms
"reactor-http-kqueue-1"	cpu=1853.20ms
"reactor-http-kqueue-8"	cpu=1873.39ms
"reactor-http-kqueue-7"	cpu=1888.22ms
"reactor-http-kqueue-6"	cpu=1912.38ms
"reactor-http-kqueue-4"	cpu=1955.23ms
"reactor-http-kqueue-5"	cpu=1968.50ms
"reactor-http-kqueue-3"	cpu=1984.29ms
"reactor-http-kqueue-2"	cpu=2112.31ms
"custom-kqueue-4"	elapsed=62.61s
"custom-kqueue-3"	elapsed=62.61s
"custom-kqueue-2"	elapsed=62.66s
"custom-kqueue-9"	elapsed=62.61s
"custom-kqueue-8"	elapsed=62.61s
"custom-kqueue-10"	elapsed=62.61s
"custom-kqueue-7"	elapsed=62.61s
"custom-kqueue-5"	elapsed=62.61s
"custom-kqueue-1"	elapsed=62.67s
"custom-kqueue-6"	elapsed=62.61s
  • with reactor-pool patch only (no custom LOOP_RESOURCES configured for mariadb driver):
================================================================================
---- Global Information --------------------------------------------------------
> request count                                     140029 (OK=140029 KO=0     )
> min response time                                      3 (OK=3      KO=-     )
> max response time                                    322 (OK=322    KO=-     )
> mean response time                                   123 (OK=123    KO=-     )
> std deviation                                         65 (OK=65     KO=-     )
> response time 50th percentile                        125 (OK=125    KO=-     )
> response time 75th percentile                        188 (OK=188    KO=-     )
> response time 95th percentile                        212 (OK=212    KO=-     )
> response time 99th percentile                        277 (OK=277    KO=-     )
> mean requests/sec                                2333.817 (OK=2333.817 KO=-     )
---- Response Time Distribution ------------------------------------------------
> t < 800 ms                                        140029 (100%)
> 800 ms <= t < 1200 ms                                  0 (  0%)
> t >= 1200 ms                                           0 (  0%)
> failed                                                 0 (  0%)
================================================================================

all reactor-tcp-kqueue are now used:

"reactor-http-kqueue-10"	cpu=1655.42ms
"reactor-http-kqueue-9"	cpu=1678.97ms
"reactor-http-kqueue-8"	cpu=1712.09ms
"reactor-http-kqueue-1"	cpu=1725.76ms
"reactor-http-kqueue-7"	cpu=1742.18ms
"reactor-http-kqueue-6"	cpu=1769.14ms
"reactor-http-kqueue-5"	cpu=1784.67ms
"reactor-http-kqueue-4"	cpu=1819.07ms
"reactor-http-kqueue-3"	cpu=1819.81ms
"reactor-http-kqueue-2"	cpu=1943.64ms
"reactor-tcp-kqueue-8"	cpu=27838.00ms
"reactor-tcp-kqueue-7"	cpu=27922.01ms
"reactor-tcp-kqueue-9"	cpu=27931.59ms
"reactor-tcp-kqueue-4"	cpu=27962.03ms
"reactor-tcp-kqueue-5"	cpu=27968.87ms
"reactor-tcp-kqueue-1"	cpu=28002.96ms
"reactor-tcp-kqueue-10"	cpu=28078.99ms
"reactor-tcp-kqueue-3"	cpu=28139.11ms
"reactor-tcp-kqueue-6"	cpu=28292.76ms
"reactor-tcp-kqueue-2"	cpu=28312.35ms

@pderop
Copy link
Member Author

pderop commented Jun 22, 2023

@violetagg , can you take a look ? thanks.

@violetagg
Copy link
Member

@OlegDokuka PTAL

@violetagg violetagg requested a review from a team June 22, 2023 18:44
@pderop
Copy link
Member Author

pderop commented Jun 23, 2023

There was still something wrong, I just did a commit, now I think the PR is finalized and ready to be reviewed.
Thanks.

…is can be done in a separate PR.

- resources allocated during warmup are subscribed to eagerly, but now you can configure the level of concurrency using `PoolBuilder.warmupConcurrency(int concurrency)`
- improved the warpmupTest
@pderop
Copy link
Member Author

pderop commented Jul 5, 2023

updated the PR with the following:

  • reverted delivery of borrower through the acquisition scheduler: that's not needed, because the allocator is configured to publish with the acquisition scheduler (see allocatorWithScheduler() method).

  • resources allocated during warmup are subscribed to eagerly, but now you can configure the level of concurrency using PoolBuilder.warmupConcurrency(int concurrency)

  • improved the WarpupTest, which is now testing the new PoolBuilder.warmupConcurrency(int concurrency) method. Also the test is now checking if the r2dbc issue is fixed when you configure the allocator with subscribeOn(Schedulers.single())

…lBuilder.parallelizeWarmup(boolean).

Warmups are not parallel (as before).
Reverted CommonPoolTest.recordsAllocationLatenciesInWarmup test, because warmup is like before, by default: no concurrency.
Improved PoolWarmupTest.
@pderop
Copy link
Member Author

pderop commented Jul 5, 2023

In the last commit:

  • Replaced PoolBuilder.warmupConcurrency(int) by PoolBuilder.parallelizeWarmup(boolean); false by default.
  • Reverted CommonPoolTest.recordsAllocationLatenciesInWarmup and added CommonPoolTest.recordsAllocationLatenciesInEagerWarmup
  • Improved WarmupTest

@pderop
Copy link
Member Author

pderop commented Jul 5, 2023

Did a new attempt:

this time, let's not introduce a new PoolBuilder.parallelizeWarmup(boolean) method.
Instead of that, add a new signature for the PoolBuilder.sizeBetween method:

	/**
	 * Replace the {@link AllocationStrategy} with one that lets the {@link Pool} allocate between {@code min} and {@code max} resources.
	 * When acquiring and there is no available resource, the pool should strive to warm up enough resources to reach
	 * {@code min} live resources before serving the acquire with (one of) the newly created resource(s).
	 * At the same time it MUST NOT allocate any resource if that would bring the number of live resources
	 * over the {@code max}, rejecting further allocations until some resources have been {@link PooledRef#release() released}.
	 *
	 * @param min the minimum number of live resources to keep in the pool (can be best effort)
	 * @param max the maximum number of live resources to keep in the pool. use {@link Integer#MAX_VALUE} when you only need a
	 * minimum and no upper bound
	 * @param warmupParallelism Specifies the concurrency level used when the allocator is subscribed to during the warmup phase, if any.
	 * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code warmupParallelism} resources are
	 * subscribed to at the same time.
	 * A {@code warmupParallelism} of 1 means that pre-allocation of resources is achieved by sequentially subscribing to the allocator,
	 * waiting for a resource to be created before subscribing a next time to the allocator, and so on until the last allocation
	 * completes.
	 * @return this {@link Pool} builder
	 * @see #sizeUnbounded()
	 * @see #allocationStrategy(AllocationStrategy)
	 * @since 1.0.1
	 */
	public PoolBuilder<T, CONF> sizeBetween(int min, int max, int warmupParallelism) {
		return allocationStrategy(new AllocationStrategies.SizeBasedAllocationStrategy(min, max, warmupParallelism));
	}

Copy link
Contributor

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some comments

@pderop
Copy link
Member Author

pderop commented Jul 9, 2023

@OlegDokuka , thanks for the review.

@pderop pderop merged commit fc78b83 into reactor:main Jul 9, 2023
2 checks passed
@pderop pderop deleted the 1.0.x-optimize-warmup branch July 9, 2023 19:35
pderop added a commit that referenced this pull request Jan 4, 2024
…le (#181)

This fixes a regression that comes from #171, from 1.0.1 version: since the pool warmup method allows to be called at anytime, then if no more permits are available from the allocation strategy, the Flux.merge method is then called with a mergeConcurrencypameter that is set to 0, causing the following exception:

maxConcurrency > 0 required but it was 0
java.lang.IllegalArgumentException: maxConcurrency > 0 required but it was 0
	at reactor.core.publisher.FluxFlatMap.<init>(FluxFlatMap.java:74)
	at reactor.core.publisher.Flux.merge(Flux.java:1406)
	at reactor.core.publisher.Flux.merge(Flux.java:1375)
Added a check in warmup to immediately return in case no permits are available.
Also added a test case.

Fixes #180
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants