diff --git a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java index a6f677e30..1961b79ef 100644 --- a/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java +++ b/core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java @@ -7,6 +7,8 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -52,7 +54,8 @@ public interface Handler { private final ScheduledExecutorService scheduler; private final String discoveryDatabase; private final Duration discoveryTimeout; - private final Object readyObj = new Object(); + private final ReentrantLock readyLock = new ReentrantLock(); + private final Condition readyCondition = readyLock.newCondition(); private volatile Instant lastUpdateTime; private volatile Future currentSchedule = null; @@ -90,18 +93,20 @@ public void waitReady(long millis) throws IllegalStateException { return; } - synchronized (readyObj) { - try { - if (isStarted) { - return; - } - - long timeout = millis > 0 ? millis : discoveryTimeout.toMillis(); - readyObj.wait(timeout); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - lastException = new IllegalStateException("Discovery waiting interrupted", ex); + readyLock.lock(); + + try { + if (isStarted) { + return; } + + long timeout = millis > 0 ? millis : discoveryTimeout.toMillis(); + readyCondition.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + lastException = new IllegalStateException("Discovery waiting interrupted", ex); + } finally { + readyLock.unlock(); } if (!isStarted) { @@ -170,19 +175,27 @@ private void runDiscovery() { } private void handleThrowable(Throwable th) { - synchronized (readyObj) { + readyLock.lock(); + + try { lastException = th; scheduleNextTick(); - readyObj.notifyAll(); + readyCondition.signalAll(); + } finally { + readyLock.unlock(); } } private void handleOk(String selfLocation, List endpoints) { - synchronized (readyObj) { + readyLock.lock(); + + try { isStarted = true; lastException = null; handler.handleEndpoints(endpoints, selfLocation).whenComplete((res, th) -> scheduleNextTick()); - readyObj.notifyAll(); + readyCondition.signalAll(); + } finally { + readyLock.unlock(); } } diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java index 1b57338d7..609e3eb79 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java @@ -28,6 +28,7 @@ * @param type of message to be sent to the server */ public class ReadWriteStreamCall extends ClientCall.Listener implements GrpcReadWriteStream { + // GrpcTransport's logger is used intentionally private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); private final String traceId; @@ -159,7 +160,7 @@ public void close() { @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { if (logger.isTraceEnabled()) { - logger.trace("ReadWriteStreamCall[{}] closed with status {}", status); + logger.trace("ReadWriteStreamCall[{}] closed with status {}", traceId, status); } statusConsumer.accept(status, trailers);