From 0ed24608f994ebe24a85d271a1cd41973119690f Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Tue, 29 Oct 2024 16:42:46 +0300 Subject: [PATCH 1/6] synchronized blocks were replaced with ReentrantLock in ReadStreamCall --- .../ydb/core/impl/call/ReadStreamCall.java | 57 ++++++++++++------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java index 184ae810a..07b3d5a36 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java @@ -3,6 +3,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; @@ -29,6 +30,7 @@ public class ReadStreamCall extends ClientCall.Listener impl private final String traceId; private final ClientCall call; + private final ReentrantLock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; private final ReqT request; private final Metadata headers; @@ -56,25 +58,27 @@ public CompletableFuture start(Observer observer) { throw new IllegalStateException("Read stream call is already started"); } - synchronized (call) { + callLock.lock(); + + try { + call.start(this, headers); + call.request(1); + if (logger.isTraceEnabled()) { + logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); + } + call.sendMessage(request); + // close stream by client side + call.halfClose(); + } catch (Throwable t) { try { - call.start(this, headers); - call.request(1); - if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); - } - call.sendMessage(request); - // close stream by client side - call.halfClose(); - } catch (Throwable t) { - try { - call.cancel(null, t); - } catch (Throwable ex) { - logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex); - } - - statusFuture.completeExceptionally(t); + call.cancel(null, t); + } catch (Throwable ex) { + logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex); } + + statusFuture.completeExceptionally(t); + } finally { + callLock.unlock(); } return statusFuture; @@ -82,8 +86,12 @@ public CompletableFuture start(Observer observer) { @Override public void cancel() { - synchronized (call) { + callLock.lock(); + + try { call.cancel("Cancelled on user request", new CancellationException()); + } finally { + callLock.unlock(); } } @@ -95,18 +103,23 @@ public void onMessage(RespT message) { } observerReference.get().onNext(message); // request delivery of the next inbound message. - synchronized (call) { + callLock.lock(); + + try { call.request(1); + } finally { + callLock.unlock(); } } catch (Exception ex) { statusFuture.completeExceptionally(ex); + callLock.lock(); try { - synchronized (call) { - call.cancel("Canceled by exception from observer", ex); - } + call.cancel("Canceled by exception from observer", ex); } catch (Throwable th) { logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th); + } finally { + callLock.unlock(); } } } From 7b78c306255b3d619868b641eb48e0f16d5159ff Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Tue, 29 Oct 2024 16:51:11 +0300 Subject: [PATCH 2/6] synchronized blocks were replaced with ReentrantLock in ReadWriteStreamCall --- .../core/impl/call/ReadWriteStreamCall.java | 69 ++++++++++++------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java index 1b57338d7..df8c0fa43 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java @@ -5,6 +5,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; @@ -18,7 +19,6 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadWriteStream; import tech.ydb.core.grpc.GrpcStatuses; -import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.impl.auth.AuthCallOptions; /** @@ -28,10 +28,11 @@ * @param type of message to be sent to the server */ public class ReadWriteStreamCall extends ClientCall.Listener implements GrpcReadWriteStream { - private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); + private static final Logger logger = LoggerFactory.getLogger(ReadWriteStreamCall.class); private final String traceId; private final ClientCall call; + private final ReentrantLock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; private final Metadata headers; private final AuthCallOptions callOptions; @@ -65,19 +66,21 @@ public CompletableFuture start(Observer observer) { throw new IllegalStateException("Read stream call is already started"); } - synchronized (call) { + callLock.lock(); + + try { + call.start(this, headers); + call.request(1); + } catch (Throwable t) { try { - call.start(this, headers); - call.request(1); - } catch (Throwable t) { - try { - call.cancel(null, t); - } catch (Throwable ex) { - logger.error("Exception encountered while closing the unary call", ex); - } - - statusFuture.completeExceptionally(t); + call.cancel(null, t); + } catch (Throwable ex) { + logger.error("Exception encountered while closing the unary call", ex); } + + statusFuture.completeExceptionally(t); + } finally { + callLock.unlock(); } return statusFuture; @@ -85,12 +88,16 @@ public CompletableFuture start(Observer observer) { @Override public void sendNext(W message) { - synchronized (call) { + callLock.lock(); + + try { if (flush()) { call.sendMessage(message); } else { messagesQueue.add(message); } + } finally { + callLock.unlock(); } } @@ -112,8 +119,12 @@ private boolean flush() { @Override public void cancel() { - synchronized (call) { + callLock.lock(); + + try { call.cancel("Cancelled on user request", new CancellationException()); + } finally { + callLock.unlock(); } } @@ -126,40 +137,53 @@ public void onMessage(R message) { observerReference.get().onNext(message); // request delivery of the next inbound message. - synchronized (call) { + callLock.lock(); + + try { call.request(1); + } finally { + callLock.unlock(); } } catch (Exception ex) { statusFuture.completeExceptionally(ex); + callLock.lock(); try { - synchronized (call) { - call.cancel("Canceled by exception from observer", ex); - } + call.cancel("Canceled by exception from observer", ex); } catch (Throwable th) { logger.error("Exception encountered while canceling the read write stream call", th); + } finally { + callLock.unlock(); } } } @Override public void onReady() { - synchronized (call) { + callLock.lock(); + + try { flush(); + } finally { + callLock.unlock(); } } @Override public void close() { - synchronized (call) { + callLock.lock(); + + try { call.halfClose(); + } finally { + callLock.unlock(); } } @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { if (logger.isTraceEnabled()) { - logger.trace("ReadWriteStreamCall[{}] closed with status {}", status); + logger.trace("ReadWriteStreamCall[{}] closed with status {}", traceId, status); } statusConsumer.accept(status, trailers); @@ -170,4 +194,3 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { } } } - From 6c2eddb40abb11da91322940f8330b78aef08f65 Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Tue, 29 Oct 2024 16:56:39 +0300 Subject: [PATCH 3/6] synchronized blocks were replaced with ReentrantLock in YdbDiscovery --- .../java/tech/ydb/core/impl/YdbDiscovery.java | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java index a6f677e30..1961b79ef 100644 --- a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java +++ b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java @@ -7,6 +7,8 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -52,7 +54,8 @@ public interface Handler { private final ScheduledExecutorService scheduler; private final String discoveryDatabase; private final Duration discoveryTimeout; - private final Object readyObj = new Object(); + private final ReentrantLock readyLock = new ReentrantLock(); + private final Condition readyCondition = readyLock.newCondition(); private volatile Instant lastUpdateTime; private volatile Future currentSchedule = null; @@ -90,18 +93,20 @@ public void waitReady(long millis) throws IllegalStateException { return; } - synchronized (readyObj) { - try { - if (isStarted) { - return; - } - - long timeout = millis > 0 ? millis : discoveryTimeout.toMillis(); - readyObj.wait(timeout); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - lastException = new IllegalStateException("Discovery waiting interrupted", ex); + readyLock.lock(); + + try { + if (isStarted) { + return; } + + long timeout = millis > 0 ? millis : discoveryTimeout.toMillis(); + readyCondition.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + lastException = new IllegalStateException("Discovery waiting interrupted", ex); + } finally { + readyLock.unlock(); } if (!isStarted) { @@ -170,19 +175,27 @@ private void runDiscovery() { } private void handleThrowable(Throwable th) { - synchronized (readyObj) { + readyLock.lock(); + + try { lastException = th; scheduleNextTick(); - readyObj.notifyAll(); + readyCondition.signalAll(); + } finally { + readyLock.unlock(); } } private void handleOk(String selfLocation, List endpoints) { - synchronized (readyObj) { + readyLock.lock(); + + try { isStarted = true; lastException = null; handler.handleEndpoints(endpoints, selfLocation).whenComplete((res, th) -> scheduleNextTick()); - readyObj.notifyAll(); + readyCondition.signalAll(); + } finally { + readyLock.unlock(); } } From 396b5db0eff89f13a2f6a585b6a0ec0368eb254d Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Wed, 30 Oct 2024 15:19:30 +0300 Subject: [PATCH 4/6] Revert "synchronized blocks were replaced with ReentrantLock in ReadWriteStreamCall" This reverts commit 7b78c306255b3d619868b641eb48e0f16d5159ff. --- .../core/impl/call/ReadWriteStreamCall.java | 69 +++++++------------ 1 file changed, 23 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java index df8c0fa43..1b57338d7 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java @@ -5,7 +5,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; @@ -19,6 +18,7 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadWriteStream; import tech.ydb.core.grpc.GrpcStatuses; +import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.impl.auth.AuthCallOptions; /** @@ -28,11 +28,10 @@ * @param type of message to be sent to the server */ public class ReadWriteStreamCall extends ClientCall.Listener implements GrpcReadWriteStream { - private static final Logger logger = LoggerFactory.getLogger(ReadWriteStreamCall.class); + private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); private final String traceId; private final ClientCall call; - private final ReentrantLock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; private final Metadata headers; private final AuthCallOptions callOptions; @@ -66,21 +65,19 @@ public CompletableFuture start(Observer observer) { throw new IllegalStateException("Read stream call is already started"); } - callLock.lock(); - - try { - call.start(this, headers); - call.request(1); - } catch (Throwable t) { + synchronized (call) { try { - call.cancel(null, t); - } catch (Throwable ex) { - logger.error("Exception encountered while closing the unary call", ex); + call.start(this, headers); + call.request(1); + } catch (Throwable t) { + try { + call.cancel(null, t); + } catch (Throwable ex) { + logger.error("Exception encountered while closing the unary call", ex); + } + + statusFuture.completeExceptionally(t); } - - statusFuture.completeExceptionally(t); - } finally { - callLock.unlock(); } return statusFuture; @@ -88,16 +85,12 @@ public CompletableFuture start(Observer observer) { @Override public void sendNext(W message) { - callLock.lock(); - - try { + synchronized (call) { if (flush()) { call.sendMessage(message); } else { messagesQueue.add(message); } - } finally { - callLock.unlock(); } } @@ -119,12 +112,8 @@ private boolean flush() { @Override public void cancel() { - callLock.lock(); - - try { + synchronized (call) { call.cancel("Cancelled on user request", new CancellationException()); - } finally { - callLock.unlock(); } } @@ -137,53 +126,40 @@ public void onMessage(R message) { observerReference.get().onNext(message); // request delivery of the next inbound message. - callLock.lock(); - - try { + synchronized (call) { call.request(1); - } finally { - callLock.unlock(); } } catch (Exception ex) { statusFuture.completeExceptionally(ex); - callLock.lock(); try { - call.cancel("Canceled by exception from observer", ex); + synchronized (call) { + call.cancel("Canceled by exception from observer", ex); + } } catch (Throwable th) { logger.error("Exception encountered while canceling the read write stream call", th); - } finally { - callLock.unlock(); } } } @Override public void onReady() { - callLock.lock(); - - try { + synchronized (call) { flush(); - } finally { - callLock.unlock(); } } @Override public void close() { - callLock.lock(); - - try { + synchronized (call) { call.halfClose(); - } finally { - callLock.unlock(); } } @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { if (logger.isTraceEnabled()) { - logger.trace("ReadWriteStreamCall[{}] closed with status {}", traceId, status); + logger.trace("ReadWriteStreamCall[{}] closed with status {}", status); } statusConsumer.accept(status, trailers); @@ -194,3 +170,4 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { } } } + From 2282b0913e5ddc9be4af66641c5c1f8f86be3a8b Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Wed, 30 Oct 2024 15:19:30 +0300 Subject: [PATCH 5/6] Revert "synchronized blocks were replaced with ReentrantLock in ReadStreamCall" This reverts commit 0ed24608f994ebe24a85d271a1cd41973119690f. --- .../ydb/core/impl/call/ReadStreamCall.java | 57 +++++++------------ 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java index 07b3d5a36..184ae810a 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java @@ -3,7 +3,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; @@ -30,7 +29,6 @@ public class ReadStreamCall extends ClientCall.Listener impl private final String traceId; private final ClientCall call; - private final ReentrantLock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; private final ReqT request; private final Metadata headers; @@ -58,27 +56,25 @@ public CompletableFuture start(Observer observer) { throw new IllegalStateException("Read stream call is already started"); } - callLock.lock(); - - try { - call.start(this, headers); - call.request(1); - if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); - } - call.sendMessage(request); - // close stream by client side - call.halfClose(); - } catch (Throwable t) { + synchronized (call) { try { - call.cancel(null, t); - } catch (Throwable ex) { - logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex); + call.start(this, headers); + call.request(1); + if (logger.isTraceEnabled()) { + logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); + } + call.sendMessage(request); + // close stream by client side + call.halfClose(); + } catch (Throwable t) { + try { + call.cancel(null, t); + } catch (Throwable ex) { + logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex); + } + + statusFuture.completeExceptionally(t); } - - statusFuture.completeExceptionally(t); - } finally { - callLock.unlock(); } return statusFuture; @@ -86,12 +82,8 @@ public CompletableFuture start(Observer observer) { @Override public void cancel() { - callLock.lock(); - - try { + synchronized (call) { call.cancel("Cancelled on user request", new CancellationException()); - } finally { - callLock.unlock(); } } @@ -103,23 +95,18 @@ public void onMessage(RespT message) { } observerReference.get().onNext(message); // request delivery of the next inbound message. - callLock.lock(); - - try { + synchronized (call) { call.request(1); - } finally { - callLock.unlock(); } } catch (Exception ex) { statusFuture.completeExceptionally(ex); - callLock.lock(); try { - call.cancel("Canceled by exception from observer", ex); + synchronized (call) { + call.cancel("Canceled by exception from observer", ex); + } } catch (Throwable th) { logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th); - } finally { - callLock.unlock(); } } } From 863193bc3dc2030b501151ada8e3814172bf1efb Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Wed, 30 Oct 2024 15:20:23 +0300 Subject: [PATCH 6/6] ReadWriteStreamCall minor enhancements --- .../main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java index 1b57338d7..609e3eb79 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java @@ -28,6 +28,7 @@ * @param type of message to be sent to the server */ public class ReadWriteStreamCall extends ClientCall.Listener implements GrpcReadWriteStream { + // GrpcTransport's logger is used intentionally private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); private final String traceId; @@ -159,7 +160,7 @@ public void close() { @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { if (logger.isTraceEnabled()) { - logger.trace("ReadWriteStreamCall[{}] closed with status {}", status); + logger.trace("ReadWriteStreamCall[{}] closed with status {}", traceId, status); } statusConsumer.accept(status, trailers);