Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.Nullable;

Expand All @@ -29,6 +31,7 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl

private final String traceId;
private final ClientCall<ReqT, RespT> call;
private final Lock callLock = new ReentrantLock();
private final GrpcStatusHandler statusConsumer;
private final ReqT request;
private final Metadata headers;
Expand Down Expand Up @@ -56,34 +59,38 @@ public CompletableFuture<Status> start(Observer<RespT> observer) {
throw new IllegalStateException("Read stream call is already started");
}

synchronized (call) {
callLock.lock();
try {
call.start(this, headers);
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
}
call.sendMessage(request);
// close stream by client side
call.halfClose();
call.request(1);
} catch (Throwable t) {
try {
call.start(this, headers);
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
}
call.sendMessage(request);
// close stream by client side
call.halfClose();
call.request(1);
} catch (Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable ex) {
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
}

statusFuture.completeExceptionally(t);
call.cancel(null, t);
} catch (Throwable ex) {
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
}

statusFuture.completeExceptionally(t);
} finally {
callLock.unlock();
}

return statusFuture;
}

@Override
public void cancel() {
synchronized (call) {
callLock.lock();
try {
call.cancel("Cancelled on user request", new CancellationException());
} finally {
callLock.unlock();
}
}

Expand All @@ -95,15 +102,21 @@ public void onMessage(RespT message) {
}
observerReference.get().onNext(message);
// request delivery of the next inbound message.
synchronized (call) {
callLock.lock();
try {
call.request(1);
} finally {
callLock.unlock();
}
} catch (Exception ex) {
statusFuture.completeExceptionally(ex);

try {
synchronized (call) {
callLock.lock();
try {
call.cancel("Canceled by exception from observer", ex);
} finally {
callLock.unlock();
}
} catch (Throwable th) {
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th);
Expand Down
57 changes: 39 additions & 18 deletions core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -33,6 +35,7 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements

private final String traceId;
private final ClientCall<W, R> call;
private final Lock callLock = new ReentrantLock();
private final GrpcStatusHandler statusConsumer;
private final Metadata headers;
private final AuthCallOptions callOptions;
Expand Down Expand Up @@ -66,27 +69,29 @@ public CompletableFuture<Status> start(Observer<R> observer) {
throw new IllegalStateException("Read stream call is already started");
}

synchronized (call) {
callLock.lock();
try {
call.start(this, headers);
call.request(1);
} catch (Throwable t) {
try {
call.start(this, headers);
call.request(1);
} catch (Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable ex) {
logger.error("Exception encountered while closing the unary call", ex);
}

statusFuture.completeExceptionally(t);
call.cancel(null, t);
} catch (Throwable ex) {
logger.error("Exception encountered while closing the unary call", ex);
}

statusFuture.completeExceptionally(t);
} finally {
callLock.unlock();
}

return statusFuture;
}

@Override
public void sendNext(W message) {
synchronized (call) {
callLock.lock();
try {
if (flush()) {
if (logger.isTraceEnabled()) {
String msg = TextFormat.shortDebugString((Message) message);
Expand All @@ -96,6 +101,8 @@ public void sendNext(W message) {
} else {
messagesQueue.add(message);
}
} finally {
callLock.unlock();
}
}

Expand All @@ -118,8 +125,11 @@ private boolean flush() {

@Override
public void cancel() {
synchronized (call) {
callLock.lock();
try {
call.cancel("Cancelled on user request", new CancellationException());
} finally {
callLock.unlock();
}
}

Expand All @@ -132,15 +142,21 @@ public void onMessage(R message) {

observerReference.get().onNext(message);
// request delivery of the next inbound message.
synchronized (call) {
callLock.lock();
try {
call.request(1);
} finally {
callLock.unlock();
}
} catch (Exception ex) {
statusFuture.completeExceptionally(ex);

try {
synchronized (call) {
callLock.lock();
try {
call.cancel("Canceled by exception from observer", ex);
} finally {
callLock.unlock();
}
} catch (Throwable th) {
logger.error("Exception encountered while canceling the read write stream call", th);
Expand All @@ -150,15 +166,21 @@ public void onMessage(R message) {

@Override
public void onReady() {
synchronized (call) {
callLock.lock();
try {
flush();
} finally {
callLock.unlock();
}
}

@Override
public void close() {
synchronized (call) {
callLock.lock();
try {
call.halfClose();
} finally {
callLock.unlock();
}
}

Expand All @@ -176,4 +198,3 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
}
}
}

30 changes: 19 additions & 11 deletions core/src/test/java/tech/ydb/core/impl/MockedScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.Assert;

Expand All @@ -27,6 +29,7 @@
*/
public class MockedScheduler implements ScheduledExecutorService {
private final MockedClock clock;
private final Lock nextTaskLock = new ReentrantLock();
private final Queue<MockedTask<?>> tasks = new ConcurrentLinkedQueue<>();

private volatile boolean queueIsBlocked = false;
Expand All @@ -46,18 +49,23 @@ public MockedScheduler hasTasksCount(int count) {
return this;
}

public synchronized MockedScheduler runNextTask() {
queueIsBlocked = true;
MockedTask<?> next = tasks.poll();
Assert.assertNotNull("Scheduler's queue is empty", next);
clock.goToFuture(next.time);
next.run();
if (next.time != null) {
tasks.add(next);
}
public MockedScheduler runNextTask() {
nextTaskLock.lock();
try {
queueIsBlocked = true;
MockedTask<?> next = tasks.poll();
Assert.assertNotNull("Scheduler's queue is empty", next);
clock.goToFuture(next.time);
next.run();
if (next.time != null) {
tasks.add(next);
}

queueIsBlocked = false;
return this;
queueIsBlocked = false;
return this;
} finally {
nextTaskLock.unlock();
}
}

@Override
Expand Down
19 changes: 15 additions & 4 deletions core/src/test/java/tech/ydb/core/impl/pool/ManagedChannelMock.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand All @@ -26,7 +28,7 @@ public class ManagedChannelMock extends ManagedChannel {

private final ExecutorService executor = Executors.newFixedThreadPool(1);
private final BlockingQueue<ConnectivityState> nextStates = new LinkedBlockingDeque<>();
private final Object sync = new Object();
private final Lock sync = new ReentrantLock();

private volatile ConnectivityState state;
private Runnable listener = null;
Expand All @@ -49,14 +51,17 @@ private void requestUpdate() {

logger.trace("next mock state {}", next);

synchronized (sync) {
sync.lock();
try {
this.state = next;
if (this.listener != null) {
Runnable callback = this.listener;
logger.trace("call listener {}", callback.hashCode());
this.listener = null;
callback.run();
}
} finally {
sync.unlock();
}
});
}
Expand Down Expand Up @@ -90,25 +95,31 @@ public String authority() {

@Override
public ConnectivityState getState(boolean requestConnection) {
synchronized (sync) {
sync.lock();
try {
logger.trace("get state {} with request {}", state, requestConnection);
if (requestConnection) {
requestUpdate();
}
return state;
} finally {
sync.unlock();
}
}

@Override
public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
synchronized (sync) {
sync.lock();
try {
logger.trace("notify of changes for state {} with current {} and callback {}",
source, state, callback.hashCode());
if (source != state) {
callback.run();
} else {
this.listener = callback;
}
} finally {
sync.unlock();
}
requestUpdate();
}
Expand Down
Loading