Skip to content

Commit

Permalink
Optimize Http2Pool by reducing calls to CLDQ size method (#2090)
Browse files Browse the repository at this point in the history
To improve performance, let's reduce calls to CLDQ.size() method, which is not a constant time operation.
Co-authored-by: Violeta Georgieva <violetag@vmware.com>
  • Loading branch information
pderop authored Mar 11, 2022
1 parent 5ddb240 commit 66e8a53
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,24 @@ final class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.
static final AtomicReferenceFieldUpdater<Http2Pool, ConcurrentLinkedQueue> CONNECTIONS =
AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections");

/**
* Pending borrowers queue. Never invoke directly the poll/add/remove methods and instead of that,
* use addPending/pollPending/removePending methods which take care of maintaining the pending queue size.
* @see #removePending(ConcurrentLinkedDeque, Borrower)
* @see #addPending(ConcurrentLinkedDeque, Borrower, boolean)
* @see #pollPending(ConcurrentLinkedDeque, boolean)
* @see #PENDING_SIZE
* @see #pendingSize
*/
volatile ConcurrentLinkedDeque<Borrower> pending;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<Http2Pool, ConcurrentLinkedDeque> PENDING =
AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedDeque.class, "pending");

volatile int pendingSize;
private static final AtomicIntegerFieldUpdater<Http2Pool> PENDING_SIZE =
AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "pendingSize");

@SuppressWarnings("rawtypes")
static final ConcurrentLinkedDeque TERMINATED = new ConcurrentLinkedDeque();

Expand Down Expand Up @@ -180,7 +193,7 @@ public Mono<Void> disposeLater() {
ConcurrentLinkedDeque<Borrower> q = PENDING.getAndSet(this, TERMINATED);
if (q != TERMINATED) {
Borrower p;
while ((p = q.pollFirst()) != null) {
while ((p = pollPending(q, true)) != null) {
p.fail(new PoolShutdownException());
}

Expand Down Expand Up @@ -226,7 +239,7 @@ public PoolMetrics metrics() {

@Override
public int pendingAcquireSize() {
return PENDING.get(this).size();
return pendingSize;
}

@Override
Expand All @@ -243,7 +256,7 @@ public Mono<Integer> warmup() {
void cancelAcquire(Borrower borrower) {
if (!isDisposed()) {
ConcurrentLinkedDeque<Borrower> q = pending;
q.remove(borrower);
removePending(q, borrower);
}
}

Expand Down Expand Up @@ -294,13 +307,13 @@ void drainLoop() {
return;
}

int borrowersCount = borrowers.size();
int borrowersCount = pendingSize;

if (borrowersCount != 0) {
// find a connection that can be used for opening a new stream
Slot slot = findConnection(resources);
if (slot != null) {
Borrower borrower = borrowers.pollFirst();
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
resources.offer(slot);
continue;
Expand All @@ -320,26 +333,26 @@ void drainLoop() {
poolConfig.acquisitionScheduler().schedule(() -> borrower.deliver(new Http2PooledRef(slot)));
}
else {
borrowers.offerFirst(borrower);
addPending(borrowers, borrower, true);
continue;
}
}
else {
int permits = poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
if (maxPending >= 0) {
borrowersCount = borrowers.size();
borrowersCount = pendingSize;
int toCull = borrowersCount - maxPending;
for (int i = 0; i < toCull; i++) {
Borrower extraneous = borrowers.pollFirst();
Borrower extraneous = pollPending(borrowers, true);
if (extraneous != null) {
pendingAcquireLimitReached(extraneous, maxPending);
}
}
}
}
else {
Borrower borrower = borrowers.pollFirst();
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
continue;
}
Expand Down Expand Up @@ -468,13 +481,12 @@ void pendingOffer(Borrower borrower) {
if (pendingQueue == TERMINATED) {
return;
}
pendingQueue.offerLast(borrower);
int postOffer = pendingQueue.size();
int postOffer = addPending(pendingQueue, borrower, false);

if (WIP.getAndIncrement(this) == 0) {
ConcurrentLinkedQueue<Slot> ir = connections;
if (maxPending >= 0 && postOffer > maxPending && ir.isEmpty() && poolConfig.allocationStrategy().estimatePermitCount() == 0) {
Borrower toCull = pendingQueue.pollLast();
Borrower toCull = pollPending(pendingQueue, false);
if (toCull != null) {
pendingAcquireLimitReached(toCull, maxPending);
}
Expand All @@ -493,6 +505,31 @@ void recordInteractionTimestamp() {
this.lastInteractionTimestamp = clock.millis();
}

@Nullable
Borrower pollPending(ConcurrentLinkedDeque<Borrower> borrowers, boolean pollFirst) {
Borrower borrower = pollFirst ? borrowers.pollFirst() : borrowers.pollLast();
if (borrower != null) {
PENDING_SIZE.decrementAndGet(this);
}
return borrower;
}

void removePending(ConcurrentLinkedDeque<Borrower> borrowers, Borrower borrower) {
if (borrowers.remove(borrower)) {
PENDING_SIZE.decrementAndGet(this);
}
}

int addPending(ConcurrentLinkedDeque<Borrower> borrowers, Borrower borrower, boolean first) {
if (first) {
borrowers.offerFirst(borrower);
}
else {
borrowers.offerLast(borrower);
}
return PENDING_SIZE.incrementAndGet(this);
}

static boolean offerSlot(Slot slot) {
@SuppressWarnings("unchecked")
ConcurrentLinkedQueue<Slot> q = CONNECTIONS.get(slot.pool);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.BaseHttpTest;
import reactor.netty.http.Http2SslContextSpec;
Expand All @@ -35,6 +36,7 @@
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand Down Expand Up @@ -217,6 +219,57 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, poolName)).isEqualTo(expectedMaxPendingAcquire);
}

@Test
void testConnectionPoolPendingAcquireSize() throws Exception {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx =
Http2SslContextSpec.forClient()
.configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));

disposableServer =
HttpServer.create()
.port(0)
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(serverCtx))
.handle((req, res) ->
res.sendString(Flux.range(0, 10)
.map(i -> "test")
.delayElements(Duration.ofMillis(4))))
.bindNow();

ConnectionProvider provider = ConnectionProvider
.builder("testConnectionPoolPendingAcquireSize")
.pendingAcquireMaxCount(1000)
.maxConnections(500)
.metrics(true)
.build();

try {
HttpClient client =
HttpClient.create(provider)
.port(disposableServer.port())
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(clientCtx));

Flux.range(0, 1000)
.flatMap(i ->
client.get()
.uri("/")
.responseContent()
.aggregate()
.asString()
.timeout(Duration.ofMillis(ThreadLocalRandom.current().nextInt(1, 35)), Mono.just("timeout")))
.blockLast(Duration.ofSeconds(30));

Thread.sleep(1000);
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2.testConnectionPoolPendingAcquireSize")).isEqualTo(0);
}
finally {
provider.disposeLater()
.block(Duration.ofSeconds(30));
}
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2.testConnectionPoolPendingAcquireSize")).isEqualTo(0);
}

private double getGaugeValue(String gaugeName, String poolName) {
Gauge gauge = registry.find(gaugeName).tag(NAME, poolName).gauge();
Expand All @@ -227,4 +280,4 @@ private double getGaugeValue(String gaugeName, String poolName) {
return result;
}

}
}

0 comments on commit 66e8a53

Please sign in to comment.