Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment Concurrent Pools in HTTP/2 Connection Pool #2986

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ public class Metrics {
*/
public static final String PENDING_STREAMS = ".pending.streams";

/**
* The number of HTTP/2 stream acquisitions steal count.
*/
public static final String STEAL_STREAMS = ".steal.streams";

// ByteBufAllocator Metrics
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,20 @@ public InstrumentedPool<T> newPool(
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
}

public InstrumentedPool<T> newPool(
PoolBuilder<T, PoolConfig<T>> poolBuilder,
int maxConnections,
@Nullable AllocationStrategy<?> allocationStrategy,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
if (disposeTimeout != null) {
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null)
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
}
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null).build(poolFactory);
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
Expand All @@ -540,6 +554,21 @@ public InstrumentedPool<T> newPool(
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
}

public InstrumentedPool<T> newPool(
PoolBuilder<T, PoolConfig<T>> poolBuilder,
int maxConnections,
@Nullable AllocationStrategy<?> allocationStrategy,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
PoolMetricsRecorder poolMetricsRecorder,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
if (disposeTimeout != null) {
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder)
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
}
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
Expand All @@ -552,11 +581,22 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
PoolBuilder<T, PoolConfig<T>> poolBuilder =
PoolBuilder.from(allocator)
.destroyHandler(destroyHandler)
.maxPendingAcquire(pendingAcquireMaxCount)
.evictInBackground(evictionInterval);
return newPoolInternal(PoolBuilder.from(allocator), -1, null, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
PoolBuilder<T, PoolConfig<T>> poolBuilder,
int maxConnections,
@Nullable AllocationStrategy<?> allocationStrategy,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
maxConnections = (maxConnections == -1) ? this.maxConnections : maxConnections;
allocationStrategy = (allocationStrategy == null) ? this.allocationStrategy : allocationStrategy;
poolBuilder = poolBuilder
.destroyHandler(destroyHandler)
.maxPendingAcquire(pendingAcquireMaxCount)
.evictInBackground(evictionInterval);

if (this.evictionPredicate != null) {
poolBuilder = poolBuilder.evictionPredicate(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,23 @@ public interface Builder {
* @return {@code this}
*/
Builder minConnections(int minConnections);

/**
* Enables or disables work stealing mode for managing HTTP2 Connection Pools.
* <p>
* By default, a single Connection Pool is used by multiple Netty event loop threads.
* When work stealing is enabled, each Netty event loop will maintain its own
* HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available
* pools using a work stealing strategy. This approach maximizes throughput and
* resource utilization in a multithreaded environment.
*
* @param progressive true if the HTTP2 Connection pools should be enabled gradually (when the nth pool becomes
* is starting to get some pendingg acquisitions request, then enable one more
* pool until all available pools are enabled).
*
* @return {@code this}
*/
Builder enableWorkStealing(boolean progressive);
}

/**
Expand All @@ -77,6 +94,18 @@ public static Http2AllocationStrategy.Builder builder() {
return new Http2AllocationStrategy.Build();
}

/**
* Creates a builder for {@link Http2AllocationStrategy} and initialize it
* with an existing strategy. This method can be used to create a mutated version
* of an existing strategy.
*
* @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2
* allocation strategy.
*/
public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) {
return new Http2AllocationStrategy.Build(existing);
}

@Override
public Http2AllocationStrategy copy() {
return new Http2AllocationStrategy(this);
Expand Down Expand Up @@ -141,9 +170,14 @@ public void returnPermits(int returned) {
}
}

public boolean enableWorkStealing() {
return enableWorkStealing;
}

final long maxConcurrentStreams;
final int maxConnections;
final int minConnections;
final boolean enableWorkStealing;

volatile int permits;
static final AtomicIntegerFieldUpdater<Http2AllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
Expand All @@ -152,13 +186,15 @@ public void returnPermits(int returned) {
this.maxConcurrentStreams = build.maxConcurrentStreams;
this.maxConnections = build.maxConnections;
this.minConnections = build.minConnections;
this.enableWorkStealing = build.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}

Http2AllocationStrategy(Http2AllocationStrategy copy) {
this.maxConcurrentStreams = copy.maxConcurrentStreams;
this.maxConnections = copy.maxConnections;
this.minConnections = copy.minConnections;
this.enableWorkStealing = copy.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}

Expand All @@ -170,6 +206,17 @@ static final class Build implements Builder {
long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
int maxConnections = DEFAULT_MAX_CONNECTIONS;
int minConnections = DEFAULT_MIN_CONNECTIONS;
boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing");

Build() {
}

Build(Http2AllocationStrategy existing) {
this.maxConcurrentStreams = existing.maxConcurrentStreams;
this.minConnections = existing.minConnections;
this.maxConnections = existing.maxConnections;
this.enableWorkStealing = existing.enableWorkStealing;
}

@Override
public Http2AllocationStrategy build() {
Expand Down Expand Up @@ -206,5 +253,11 @@ public Builder minConnections(int minConnections) {
this.minConnections = minConnections;
return this;
}

@Override
public Builder enableWorkStealing(boolean progressive) {
this.enableWorkStealing = true;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
import reactor.netty.transport.ClientTransportConfig;
Expand All @@ -51,13 +54,20 @@
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.function.Tuples;

import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.ReactorNetty.getChannelContext;
Expand Down Expand Up @@ -565,12 +575,56 @@ static final class PooledConnectionAllocator {
this.config = (HttpClientConfig) config;
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = id == null ?
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));

Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ?
(Http2AllocationStrategy) poolFactory.allocationStrategy() : null;

if (http2Strategy == null || !http2Strategy.enableWorkStealing) {
this.pool = id == null ?
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
}
else {
// Create one connection allocator (it will be shared by all Http2Pool instances)
Publisher<Connection> allocator = connectChannel();

List<Executor> execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false)
.limit(http2Strategy.maxConnections)
.collect(Collectors.toList());
Iterator<Executor> execsIter = execs.iterator();

MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress);
AtomicInteger subPoolIndex = new AtomicInteger();

this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(), allocator,
(PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder) -> {
int index = subPoolIndex.getAndIncrement();
int minDiv = http2Strategy.minConnections / execs.size();
int minMod = http2Strategy.minConnections % execs.size();
int maxDiv = http2Strategy.maxConnections / execs.size();
int maxMod = http2Strategy.maxConnections % execs.size();

int minConn = index < minMod ? minDiv + 1 : minDiv;
int maxConn = index < maxMod ? maxDiv + 1 : maxDiv;

Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy)
.minConnections(minConn)
.maxConnections(maxConn)
.build();

InstrumentedPool<Connection> pool =
id == null ?
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) :
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
micrometerRecorder,
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy));
return Tuples.of(pool, execsIter.next());
});
}
}

Publisher<Connection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,26 @@ public Meter.Type getType() {
}
},

/**
* The number of HTTP/2 stream acquisition steal count.
*/
STEAL_STREAMS {
@Override
public String getName() {
return "reactor.netty.connection.provider.steal.streams";
}

@Override
public KeyName[] getKeyNames() {
return Http2ConnectionProviderMetersTags.values();
}

@Override
public Meter.Type getType() {
return Meter.Type.COUNTER;
}
},

/**
* The number of the idle connections in the connection pool.
*/
Expand Down
Loading
Loading