Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -52,7 +54,8 @@ public interface Handler {
private final ScheduledExecutorService scheduler;
private final String discoveryDatabase;
private final Duration discoveryTimeout;
private final Object readyObj = new Object();
private final ReentrantLock readyLock = new ReentrantLock();
private final Condition readyCondition = readyLock.newCondition();

private volatile Instant lastUpdateTime;
private volatile Future<?> currentSchedule = null;
Expand Down Expand Up @@ -90,18 +93,20 @@ public void waitReady(long millis) throws IllegalStateException {
return;
}

synchronized (readyObj) {
try {
if (isStarted) {
return;
}

long timeout = millis > 0 ? millis : discoveryTimeout.toMillis();
readyObj.wait(timeout);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
lastException = new IllegalStateException("Discovery waiting interrupted", ex);
readyLock.lock();

try {
if (isStarted) {
return;
}

long timeout = millis > 0 ? millis : discoveryTimeout.toMillis();
readyCondition.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
lastException = new IllegalStateException("Discovery waiting interrupted", ex);
} finally {
readyLock.unlock();
}

if (!isStarted) {
Expand Down Expand Up @@ -170,19 +175,27 @@ private void runDiscovery() {
}

private void handleThrowable(Throwable th) {
synchronized (readyObj) {
readyLock.lock();

try {
lastException = th;
scheduleNextTick();
readyObj.notifyAll();
readyCondition.signalAll();
} finally {
readyLock.unlock();
}
}

private void handleOk(String selfLocation, List<EndpointRecord> endpoints) {
synchronized (readyObj) {
readyLock.lock();

try {
isStarted = true;
lastException = null;
handler.handleEndpoints(endpoints, selfLocation).whenComplete((res, th) -> scheduleNextTick());
readyObj.notifyAll();
readyCondition.signalAll();
} finally {
readyLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* @param <W> type of message to be sent to the server
*/
public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements GrpcReadWriteStream<R, W> {
// GrpcTransport's logger is used intentionally
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);

private final String traceId;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void close() {
@Override
public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
if (logger.isTraceEnabled()) {
logger.trace("ReadWriteStreamCall[{}] closed with status {}", status);
logger.trace("ReadWriteStreamCall[{}] closed with status {}", traceId, status);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First log argument was absent

}
statusConsumer.accept(status, trailers);

Expand Down