Skip to content

Commit 7cd12bd

Browse files
committed
refactoring AsyncQueue
1 parent 973ff1f commit 7cd12bd

File tree

17 files changed

+95
-84
lines changed

17 files changed

+95
-84
lines changed

core/src/main/java/com/arangodb/internal/net/Communication.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private CompletableFuture<InternalResponse> executeAsync(final InternalRequest r
5252
long reqId = reqCount.getAndIncrement();
5353
return host.connection().thenCompose(c ->
5454
doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId)
55-
.whenComplete((r, t) -> host.release(c)));
55+
.whenComplete((r, t) -> c.release()));
5656
}
5757

5858
private CompletableFuture<InternalResponse> doExecuteAsync(

core/src/main/java/com/arangodb/internal/net/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ public interface Connection extends Closeable {
3535
void setJwt(String jwt);
3636

3737
CompletableFuture<InternalResponse> executeAsync(InternalRequest request);
38+
39+
void release();
3840
}

core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@
2929
*/
3030
@UsedInApi
3131
public interface ConnectionFactory {
32-
Connection create(ArangoConfig config, HostDescription host);
32+
Connection create(ArangoConfig config, HostDescription host, ConnectionPool pool);
3333
}

core/src/main/java/com/arangodb/internal/net/ConnectionPool.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,18 @@
2020

2121
package com.arangodb.internal.net;
2222

23-
import com.arangodb.config.HostDescription;
23+
import com.arangodb.arch.UsedInApi;
2424

2525
import java.io.Closeable;
2626
import java.util.concurrent.CompletableFuture;
2727

2828
/**
2929
* @author Mark Vollmary
3030
*/
31+
@UsedInApi
3132
public interface ConnectionPool extends Closeable {
3233

33-
Connection createConnection(final HostDescription host);
34+
Connection createConnection();
3435

3536
CompletableFuture<Connection> connection();
3637

core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@
3232

3333
public class ConnectionPoolImpl implements ConnectionPool {
3434

35-
public static final int HTTP1_PIPELINING_LIMIT = 10;
36-
public static final int HTTP2_STREAMS = 32; // hard-coded, see BTS-2049
35+
public static final int HTTP1_SLOTS = 1; // HTTP/1: max 1 pending request
36+
public static final int HTTP1_SLOTS_PIPELINING = 10; // HTTP/1: max pipelining
37+
public static final int HTTP2_SLOTS = 32; // HTTP/2: max streams, hard-coded see BTS-2049
3738

3839
private final AsyncQueue<Connection> slots = new AsyncQueue<>();
3940
private final HostDescription host;
@@ -55,16 +56,16 @@ public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config,
5556
switch (config.getProtocol()) {
5657
case HTTP_JSON:
5758
case HTTP_VPACK:
58-
maxSlots = config.getPipelining() ? HTTP1_PIPELINING_LIMIT : 1;
59+
maxSlots = config.getPipelining() ? HTTP1_SLOTS_PIPELINING : HTTP1_SLOTS;
5960
break;
6061
default:
61-
maxSlots = HTTP2_STREAMS;
62+
maxSlots = HTTP2_SLOTS;
6263
}
6364
}
6465

6566
@Override
66-
public Connection createConnection(final HostDescription host) {
67-
Connection c = factory.create(config, host);
67+
public Connection createConnection() {
68+
Connection c = factory.create(config, host, this);
6869
c.setJwt(jwt);
6970
return c;
7071
}
@@ -76,7 +77,7 @@ public CompletableFuture<Connection> connection() {
7677
}
7778

7879
if (connections.size() < maxConnections) {
79-
Connection connection = createConnection(host);
80+
Connection connection = createConnection();
8081
connections.add(connection);
8182
for (int i = 0; i < maxSlots; i++) {
8283
slots.offer((connection));

core/src/main/java/com/arangodb/internal/net/Host.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@ public interface Host {
3636

3737
CompletableFuture<Connection> connection();
3838

39-
void release(Connection c);
40-
41-
void closeOnError();
42-
4339
void close() throws IOException;
4440

4541
boolean isMarkforDeletion();

core/src/main/java/com/arangodb/internal/net/HostImpl.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package com.arangodb.internal.net;
2222

23-
import com.arangodb.ArangoDBException;
2423
import com.arangodb.config.HostDescription;
2524

2625
import java.io.IOException;
@@ -56,20 +55,6 @@ public CompletableFuture<Connection> connection() {
5655
return connectionPool.connection();
5756
}
5857

59-
@Override
60-
public void release(Connection c) {
61-
connectionPool.release(c);
62-
}
63-
64-
@Override
65-
public void closeOnError() {
66-
try {
67-
connectionPool.close();
68-
} catch (final IOException e) {
69-
throw ArangoDBException.of(e);
70-
}
71-
}
72-
7358
@Override
7459
public String toString() {
7560
return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + ", markforDeletion="
Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,45 @@
11
package com.arangodb.internal.util;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.ArrayDeque;
37
import java.util.Queue;
4-
import java.util.concurrent.CompletableFuture;
5-
import java.util.concurrent.ConcurrentLinkedQueue;
8+
import java.util.concurrent.*;
69

710
public class AsyncQueue<T> {
11+
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncQueue.class);
812
private final Queue<CompletableFuture<T>> requests = new ConcurrentLinkedQueue<>();
9-
private final Queue<T> offers = new ConcurrentLinkedQueue<>();
13+
private final Queue<T> offers = new ArrayDeque<>();
1014

1115
public synchronized CompletableFuture<T> poll() {
16+
LOGGER.trace("poll()");
1217
T o = offers.poll();
1318
if (o != null) {
19+
LOGGER.trace("poll(): short-circuit: {}", o);
1420
return CompletableFuture.completedFuture(o);
15-
} else {
16-
CompletableFuture<T> res = new CompletableFuture<>();
17-
requests.offer(res);
18-
update();
19-
return res;
2021
}
22+
CompletableFuture<T> r = new CompletableFuture<>();
23+
LOGGER.trace("poll(): enqueue request: {}", r);
24+
requests.add(r);
25+
return r;
2126
}
2227

2328
public void offer(T o) {
29+
LOGGER.trace("offer({})", o);
2430
CompletableFuture<T> r = requests.poll();
31+
if (r == null) {
32+
synchronized (this) {
33+
r = requests.poll();
34+
if (r == null) {
35+
LOGGER.trace("offer({}): enqueue", o);
36+
offers.add(o);
37+
}
38+
}
39+
}
2540
if (r != null) {
41+
LOGGER.trace("offer({}): short-circuit: {}", o, r);
2642
r.complete(o);
27-
} else {
28-
offers.offer(o);
29-
update();
30-
}
31-
}
32-
33-
private void update() {
34-
CompletableFuture<T> r;
35-
T o;
36-
synchronized (this) {
37-
if (offers.isEmpty()) return;
38-
r = requests.poll();
39-
if (r == null) return;
40-
o = offers.poll();
4143
}
42-
r.complete(o);
4344
}
4445
}

http-protocol/src/main/java/com/arangodb/http/HttpConnection.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.arangodb.internal.RequestType;
3030
import com.arangodb.internal.config.ArangoConfig;
3131
import com.arangodb.internal.net.Connection;
32+
import com.arangodb.internal.net.ConnectionPool;
3233
import com.arangodb.internal.serde.ContentTypeFactory;
3334
import com.arangodb.internal.util.EncodeUtils;
3435
import io.netty.handler.ssl.ApplicationProtocolConfig;
@@ -63,8 +64,8 @@
6364
import java.util.concurrent.TimeUnit;
6465
import java.util.concurrent.atomic.AtomicInteger;
6566

66-
import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_PIPELINING_LIMIT;
67-
import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_STREAMS;
67+
import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_SLOTS_PIPELINING;
68+
import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_SLOTS;
6869

6970

7071
/**
@@ -84,13 +85,16 @@ public class HttpConnection implements Connection {
8485
private final WebClient client;
8586
private final Integer timeout;
8687
private final MultiMap commonHeaders = MultiMap.caseInsensitiveMultiMap();
88+
private final Vertx vertx;
8789
private final Vertx vertxToClose;
90+
private final ConnectionPool pool;
8891

8992
private static String getUserAgent() {
9093
return "JavaDriver/" + PackageVersion.VERSION + " (JVM/" + System.getProperty("java.specification.version") + ")";
9194
}
9295

93-
HttpConnection(final ArangoConfig config, final HostDescription host, final HttpProtocolConfig protocolConfig) {
96+
HttpConnection(final ArangoConfig config, final HttpProtocolConfig protocolConfig, final HostDescription host, final ConnectionPool pool) {
97+
this.pool = pool;
9498
Protocol protocol = config.getProtocol();
9599
ContentType contentType = ContentTypeFactory.of(protocol);
96100
if (contentType == ContentType.VPACK) {
@@ -114,20 +118,19 @@ private static String getUserAgent() {
114118
config.getUser(), Optional.ofNullable(config.getPassword()).orElse("")
115119
).toHttpAuthorization();
116120

117-
Vertx vertxToUse;
118121
if (protocolConfig.getVertx() != null) {
119122
// reuse existing Vert.x
120-
vertxToUse = protocolConfig.getVertx();
123+
vertx = protocolConfig.getVertx();
121124
// Vert.x will not be closed when connection is closed
122125
vertxToClose = null;
123126
LOGGER.debug("Reusing existing Vert.x instance");
124127
} else {
125128
// create a new Vert.x instance
126129
LOGGER.debug("Creating new Vert.x instance");
127-
vertxToUse = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1));
128-
vertxToUse.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement()));
130+
vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1));
131+
vertx.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement()));
129132
// Vert.x be closed when connection is closed
130-
vertxToClose = vertxToUse;
133+
vertxToClose = vertx;
131134
}
132135

133136
int intTtl = Optional.ofNullable(config.getConnectionTtl())
@@ -151,8 +154,8 @@ private static String getUserAgent() {
151154
.setKeepAlive(true)
152155
.setTcpKeepAlive(true)
153156
.setPipelining(config.getPipelining())
154-
.setPipeliningLimit(HTTP1_PIPELINING_LIMIT)
155-
.setHttp2MultiplexingLimit(HTTP2_STREAMS)
157+
.setPipeliningLimit(HTTP1_SLOTS_PIPELINING)
158+
.setHttp2MultiplexingLimit(HTTP2_SLOTS)
156159
.setReuseAddress(true)
157160
.setReusePort(true)
158161
.setHttp2ClearTextUpgrade(false)
@@ -209,7 +212,7 @@ public SslContextFactory sslContextFactory() {
209212
});
210213
}
211214

212-
client = WebClient.create(vertxToUse, webClientOptions);
215+
client = WebClient.create(vertx, webClientOptions);
213216
}
214217

215218
private static String buildUrl(final InternalRequest request) {
@@ -269,6 +272,11 @@ private HttpMethod requestTypeToHttpMethod(RequestType requestType) {
269272
}
270273
}
271274

275+
@Override
276+
public void release() {
277+
vertx.runOnContext(__ -> pool.release(this));
278+
}
279+
272280
@Override
273281
@UnstableApi
274282
public CompletableFuture<InternalResponse> executeAsync(@UnstableApi final InternalRequest request) {

http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.arangodb.internal.config.ArangoConfig;
2727
import com.arangodb.internal.net.Connection;
2828
import com.arangodb.internal.net.ConnectionFactory;
29+
import com.arangodb.internal.net.ConnectionPool;
2930
import io.vertx.core.Vertx;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -49,7 +50,9 @@ public HttpConnectionFactory(@UnstableApi final HttpProtocolConfig cfg) {
4950

5051
@Override
5152
@UnstableApi
52-
public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host) {
53-
return new HttpConnection(config, host, protocolConfig);
53+
public Connection create(@UnstableApi final ArangoConfig config,
54+
final HostDescription host,
55+
@UnstableApi final ConnectionPool pool) {
56+
return new HttpConnection(config, protocolConfig, host, pool);
5457
}
5558
}

test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class HostHandlerTest {
4040

4141
private static final ConnectionPool mockCP = new ConnectionPool() {
4242
@Override
43-
public Connection createConnection(HostDescription host) {
43+
public Connection createConnection() {
4444
return null;
4545
}
4646

test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ public class ConnectionLoadBalanceTest {
2323

2424
public static Stream<Arguments> configs() {
2525
return Stream.of(
26-
// FIXME: DE-1017
27-
// new Config(Protocol.VST, 1),
28-
// new Config(Protocol.VST, 2),
26+
// FIXME: DE-1017
27+
// new Config(Protocol.VST, 1),
28+
// new Config(Protocol.VST, 2),
2929
new Config(Protocol.HTTP_JSON, 10),
3030
new Config(Protocol.HTTP_JSON, 20),
3131
new Config(Protocol.HTTP2_JSON, 1),
@@ -59,6 +59,8 @@ void doTestLoadBalance(Config cfg, int sleepCycles) throws InterruptedException
5959
.maxConnections(cfg.maxConnections)
6060
.build().async().db();
6161

62+
LOGGER.debug("starting...");
63+
6264
CompletableFuture<Void> longRunningTasks = CompletableFuture.allOf(
6365
IntStream.range(0, longTasksCount)
6466
.mapToObj(__ ->
@@ -103,8 +105,8 @@ private record Config(
103105
) {
104106
int maxStreams() {
105107
return switch (protocol) {
106-
case HTTP_JSON, HTTP_VPACK -> 1;
107-
default -> ConnectionPoolImpl.HTTP2_STREAMS;
108+
case HTTP_JSON, HTTP_VPACK -> ConnectionPoolImpl.HTTP1_SLOTS;
109+
default -> ConnectionPoolImpl.HTTP2_SLOTS;
108110
};
109111
}
110112
}

test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
import com.arangodb.internal.InternalRequest;
55
import com.arangodb.internal.InternalResponse;
66
import com.arangodb.internal.config.ArangoConfig;
7-
import com.arangodb.internal.net.Connection;
8-
import com.arangodb.internal.net.ConnectionFactory;
9-
import com.arangodb.internal.net.ConnectionPool;
10-
import com.arangodb.internal.net.ConnectionPoolImpl;
7+
import com.arangodb.internal.net.*;
118
import org.junit.jupiter.api.Test;
129

1310
import java.io.IOException;
@@ -23,7 +20,7 @@ public class ConnectionPoolConcurrencyTest {
2320
cfg.setMaxConnections(10_000);
2421
}
2522

26-
private final ConnectionFactory cf = (config, host) -> new Connection() {
23+
private final ConnectionFactory cf = (config, host, pool) -> new Connection() {
2724
@Override
2825
public void setJwt(String jwt) {
2926
}
@@ -33,6 +30,10 @@ public CompletableFuture<InternalResponse> executeAsync(InternalRequest request)
3330
throw new UnsupportedOperationException();
3431
}
3532

33+
@Override
34+
public void release() {
35+
}
36+
3637
@Override
3738
public void close() {
3839
}
@@ -45,7 +46,7 @@ void foo() throws InterruptedException, ExecutionException, IOException {
4546

4647
List<? extends Future<?>> futures = es.invokeAll(Collections.nCopies(8, (Callable<?>) () -> {
4748
for (int i = 0; i < 10_000; i++) {
48-
cp.createConnection(HostDescription.parse("127.0.0.1:8529"));
49+
cp.createConnection();
4950
cp.connection();
5051
cp.setJwt("foo");
5152
}

test-non-functional/src/test/resources/simplelogger.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ org.slf4j.simpleLogger.defaultLogLevel=info
1111
#org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug
1212
#org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug
1313
#org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug
14+
#org.slf4j.simpleLogger.log.com.arangodb.internal.util.AsyncQueue=trace

0 commit comments

Comments
 (0)