Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions core/src/main/java/tech/ydb/core/impl/call/ProxyReadStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* @param <BaseR> type of origin stream message
* @param <DestR> new stream message type
*/
@Deprecated
public class ProxyReadStream<BaseR, DestR> implements GrpcReadStream<DestR> {
public interface MessageFunctor<BaseR, DestR> {
void apply(BaseR message, CompletableFuture<Status> promise, Observer<DestR> observer);
Expand All @@ -25,18 +26,19 @@ public ProxyReadStream(GrpcReadStream<BaseR> origin, MessageFunctor<BaseR, DestR
this.functor = functor;
}

protected void onClose(Status status, Throwable th) {
// promise may be completed by functor and in that case this code will be ignored
if (th != null) {
future.completeExceptionally(th);
}
if (status != null) {
future.complete(status);
}
}

@Override
public CompletableFuture<Status> start(Observer<DestR> observer) {
origin.start(response -> functor.apply(response, future, observer)).whenComplete((status, th) -> {
// promise may be completed by functor and in that case this code will be ignored
if (th != null) {
future.completeExceptionally(th);
}
if (status != null) {
future.complete(status);
}
});

origin.start(response -> functor.apply(response, future, observer)).whenComplete(this::onClose);
return future;
}

Expand Down
28 changes: 21 additions & 7 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.impl.call.ProxyReadStream;
import tech.ydb.core.operation.StatusExtractor;
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.utils.URITools;
Expand Down Expand Up @@ -137,12 +136,27 @@ GrpcReadStream<Status> attach(AttachSessionSettings settings) {
.setSessionId(sessionId)
.build();
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
return new ProxyReadStream<>(rpc.attachSession(request, grpcSettings), (message, promise, observer) -> {
logger.trace("session '{}' got attach stream message {}", sessionId, TextFormat.shortDebugString(message));
Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList()));
updateSessionState(status);
observer.onNext(status);
});
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
return new GrpcReadStream<Status>() {
@Override
public CompletableFuture<Status> start(GrpcReadStream.Observer<Status> observer) {
return origin.start(message -> {
if (logger.isTraceEnabled()) {
String msg = TextFormat.shortDebugString(message);
logger.trace("session '{}' got attach stream message {}", sessionId, msg);
}
StatusCode code = StatusCode.fromProto(message.getStatus());
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
updateSessionState(status);
observer.onNext(status);
});
}

@Override
public void cancel() {
origin.cancel();
}
};
}

private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) {
Expand Down
123 changes: 123 additions & 0 deletions query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package tech.ydb.query.impl;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

/**
*
* @author Aleksandr Gorshenin
*/
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();

public void reset() {
nextStatus.clear();
}

public void addNextStatus(Status status) {
nextStatus.add(status);
}

@Override
public void accept(ManagedChannelBuilder<?> t) {
t.intercept(this);
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ProxyClientCall<>(next, method, callOptions);
}

private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final ClientCall<ReqT, RespT> delegate;

private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
this.delegate = channel.newCall(method, callOptions);
}

@Override
public void request(int numMessages) {
delegate.request(numMessages);
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
delegate.cancel(message, cause);
}

@Override
public void halfClose() {
delegate.halfClose();
}

@Override
public void setMessageCompression(boolean enabled) {
delegate.setMessageCompression(enabled);
}

@Override
public boolean isReady() {
return delegate.isReady();
}

@Override
public Attributes getAttributes() {
return delegate.getAttributes();
}

@Override
public void start(ClientCall.Listener<RespT> listener, Metadata headers) {
delegate.start(new ProxyListener(listener), headers);
}

@Override
public void sendMessage(ReqT message) {
delegate.sendMessage(message);
}

private class ProxyListener extends ClientCall.Listener<RespT> {
private final ClientCall.Listener<RespT> delegate;

public ProxyListener(ClientCall.Listener<RespT> delegate) {
this.delegate = delegate;
}


@Override
public void onHeaders(Metadata headers) {
delegate.onHeaders(headers);
}

@Override
public void onMessage(RespT message) {
delegate.onMessage(message);
}

@Override
public void onClose(Status status, Metadata trailers) {
Status next = nextStatus.poll();
delegate.onClose(next != null ? next : status, trailers);
}

@Override
public void onReady() {
delegate.onReady();
}
}
}
}
120 changes: 120 additions & 0 deletions query/src/test/java/tech/ydb/query/impl/QueryClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package tech.ydb.query.impl;

import java.time.Duration;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

import tech.ydb.auth.TokenAuthProvider;
import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.query.QueryClient;
import tech.ydb.query.QuerySession;
import tech.ydb.query.result.QueryInfo;
import tech.ydb.test.junit4.YdbHelperRule;

/**
*
* @author Aleksandr Gorshenin
*/
public class QueryClientTest {

@ClassRule
public static final YdbHelperRule YDB = new YdbHelperRule();

private static final GrpcTestInterceptor grpcInterceptor = new GrpcTestInterceptor();

private static GrpcTransport transport;

private static QueryClient queryClient;

@BeforeClass
public static void initTransport() {
transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database())
.withAuthProvider(new TokenAuthProvider(YDB.authToken()))
.addChannelInitializer(grpcInterceptor)
.build();
}

@AfterClass
public static void closeTransport() {
transport.close();
}

@Before
public void initTableClient() {
grpcInterceptor.reset();
queryClient = QueryClient.newClient(transport).build();
}

@After
public void closeTableClient() {
queryClient.close();
}

private QuerySession getSession() {
return queryClient.createSession(Duration.ofSeconds(5)).join().getValue();
}

@Test
public void sessionReuseTest() {
QuerySession s1 = getSession();
String id1 = s1.getId();
s1.close();

QuerySession s2 = getSession();
Assert.assertEquals(id1, s2.getId());

QuerySession s3 = getSession();
Assert.assertNotEquals(id1, s3.getId());
String id2 = s3.getId();

s2.close();
s3.close();

QuerySession s4 = getSession();
QuerySession s5 = getSession();

Assert.assertEquals(id2, s4.getId());
Assert.assertEquals(id1, s5.getId());

s4.close();
s5.close();
}

@Test
public void sessionExecuteQueryTest() {
QuerySession s1 = getSession();
String id1 = s1.getId();

grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);

Result<QueryInfo> res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join();
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());

res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join();
Assert.assertEquals(StatusCode.SUCCESS, res.getStatus().getCode());

s1.close();

QuerySession s2 = getSession();
Assert.assertNotEquals(id1, s2.getId());
String id2 = s2.getId();

res = s2.createQuery("SELECT * FROM wrongTable", TxMode.NONE).execute().join();
Assert.assertEquals(StatusCode.SCHEME_ERROR, res.getStatus().getCode());

s2.close();

try (QuerySession s3 = getSession()) {
Assert.assertEquals(id2, s3.getId());
}
}
}
16 changes: 12 additions & 4 deletions table/src/main/java/tech/ydb/table/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.impl.call.ProxyReadStream;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.description.TableOptionDescription;
import tech.ydb.table.query.DataQuery;
Expand Down Expand Up @@ -151,9 +150,18 @@ default CompletableFuture<Result<TableTransaction>> beginTransaction(TxMode txMo

@Deprecated
default GrpcReadStream<ResultSetReader> readTable(String tablePath, ReadTableSettings settings) {
return new ProxyReadStream<>(executeReadTable(tablePath, settings), (part, promise, observer) -> {
observer.onNext(part.getResultSetReader());
});
GrpcReadStream<ReadTablePart> stream = executeReadTable(tablePath, settings);
return new GrpcReadStream<ResultSetReader>() {
@Override
public CompletableFuture<Status> start(GrpcReadStream.Observer<ResultSetReader> observer) {
return stream.start(part -> observer.onNext(part.getResultSetReader()));
}

@Override
public void cancel() {
stream.cancel();
}
};
}

@Deprecated
Expand Down
Loading