From d4458b1e5c460179ad55e7a396b2b219d67e7e18 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 28 Aug 2025 21:58:34 +0100 Subject: [PATCH 1/3] Depracate incorrect grpc stream proxy implementation --- .../ydb/core/impl/call/ProxyReadStream.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/call/ProxyReadStream.java b/core/src/main/java/tech/ydb/core/impl/call/ProxyReadStream.java index 888384935..908e5f4f7 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ProxyReadStream.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ProxyReadStream.java @@ -11,6 +11,7 @@ * @param type of origin stream message * @param new stream message type */ +@Deprecated public class ProxyReadStream implements GrpcReadStream { public interface MessageFunctor { void apply(BaseR message, CompletableFuture promise, Observer observer); @@ -25,18 +26,19 @@ public ProxyReadStream(GrpcReadStream origin, MessageFunctor start(Observer observer) { - origin.start(response -> functor.apply(response, future, observer)).whenComplete((status, th) -> { - // promise may be completed by functor and in that case this code will be ignored - if (th != null) { - future.completeExceptionally(th); - } - if (status != null) { - future.complete(status); - } - }); - + origin.start(response -> functor.apply(response, future, observer)).whenComplete(this::onClose); return future; } From 6fa3dfbe4b0aa6d0e9e1e38a256e7a8b012bb267 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 28 Aug 2025 21:59:50 +0100 Subject: [PATCH 2/3] Added new implementatin of grpc stream proxies --- .../java/tech/ydb/query/impl/SessionImpl.java | 28 +++-- .../src/main/java/tech/ydb/table/Session.java | 16 ++- .../java/tech/ydb/table/impl/BaseSession.java | 112 +++++++++++++----- 3 files changed, 114 insertions(+), 42 deletions(-) diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index ef4d4c6bb..3b0557d71 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -20,7 +20,6 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.core.impl.call.ProxyReadStream; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.URITools; @@ -137,12 +136,27 @@ GrpcReadStream attach(AttachSessionSettings settings) { .setSessionId(sessionId) .build(); GrpcRequestSettings grpcSettings = makeOptions(settings).build(); - return new ProxyReadStream<>(rpc.attachSession(request, grpcSettings), (message, promise, observer) -> { - logger.trace("session '{}' got attach stream message {}", sessionId, TextFormat.shortDebugString(message)); - Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList())); - updateSessionState(status); - observer.onNext(status); - }); + GrpcReadStream origin = rpc.attachSession(request, grpcSettings); + return new GrpcReadStream() { + @Override + public CompletableFuture start(GrpcReadStream.Observer observer) { + return origin.start(message -> { + if (logger.isTraceEnabled()) { + String msg = TextFormat.shortDebugString(message); + logger.trace("session '{}' got attach stream message {}", sessionId, msg); + } + StatusCode code = StatusCode.fromProto(message.getStatus()); + Status status = Status.of(code, Issue.fromPb(message.getIssuesList())); + updateSessionState(status); + observer.onNext(status); + }); + } + + @Override + public void cancel() { + origin.cancel(); + } + }; } private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) { diff --git a/table/src/main/java/tech/ydb/table/Session.java b/table/src/main/java/tech/ydb/table/Session.java index 51929a1bb..819f2a659 100644 --- a/table/src/main/java/tech/ydb/table/Session.java +++ b/table/src/main/java/tech/ydb/table/Session.java @@ -9,7 +9,6 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; -import tech.ydb.core.impl.call.ProxyReadStream; import tech.ydb.table.description.TableDescription; import tech.ydb.table.description.TableOptionDescription; import tech.ydb.table.query.DataQuery; @@ -151,9 +150,18 @@ default CompletableFuture> beginTransaction(TxMode txMo @Deprecated default GrpcReadStream readTable(String tablePath, ReadTableSettings settings) { - return new ProxyReadStream<>(executeReadTable(tablePath, settings), (part, promise, observer) -> { - observer.onNext(part.getResultSetReader()); - }); + GrpcReadStream stream = executeReadTable(tablePath, settings); + return new GrpcReadStream() { + @Override + public CompletableFuture start(GrpcReadStream.Observer observer) { + return stream.start(part -> observer.onNext(part.getResultSetReader())); + } + + @Override + public void cancel() { + stream.cancel(); + } + }; } @Deprecated diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index 86c6e4982..08d33f4d3 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -31,7 +31,6 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.YdbHeaders; -import tech.ydb.core.impl.call.ProxyReadStream; import tech.ydb.core.operation.Operation; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.ProtobufUtils; @@ -39,6 +38,7 @@ import tech.ydb.proto.StatusCodesProtos.StatusIds; import tech.ydb.proto.ValueProtos; import tech.ydb.proto.ValueProtos.TypedValue; +import tech.ydb.proto.YdbIssueMessage; import tech.ydb.proto.common.CommonProtos; import tech.ydb.proto.scheme.SchemeOperationProtos; import tech.ydb.proto.table.YdbTable; @@ -1205,22 +1205,22 @@ public GrpcReadStream executeReadTable(String tablePath, ReadTabl } GrpcReadStream origin = rpc.streamReadTable(request.build(), options.build()); - return new ProxyReadStream<>(origin, (response, future, observer) -> { - StatusIds.StatusCode statusCode = response.getStatus(); - if (statusCode == StatusIds.StatusCode.SUCCESS) { - try { - observer.onNext(new ReadTablePart(response.getResult(), response.getSnapshot())); - } catch (Throwable t) { - future.completeExceptionally(t); - origin.cancel(); - } - } else { - Issue[] issues = Issue.fromPb(response.getIssuesList()); - StatusCode code = StatusCode.fromProto(statusCode); - future.complete(Status.of(code, issues)); - origin.cancel(); + return new ProxyStream(origin) { + @Override + StatusIds.StatusCode readStatusCode(YdbTable.ReadTableResponse message) { + return message.getStatus(); } - }); + + @Override + List readIssues(YdbTable.ReadTableResponse message) { + return message.getIssuesList(); + } + + @Override + ReadTablePart readValue(YdbTable.ReadTableResponse message) { + return new ReadTablePart(message.getResult(), message.getSnapshot()); + } + }; } @Override @@ -1239,22 +1239,22 @@ public GrpcReadStream executeScanQuery(String query, Params par } GrpcReadStream origin = rpc.streamExecuteScanQuery(req, opts.build()); - return new ProxyReadStream<>(origin, (response, future, observer) -> { - StatusIds.StatusCode statusCode = response.getStatus(); - if (statusCode == StatusIds.StatusCode.SUCCESS) { - try { - observer.onNext(ProtoValueReaders.forResultSet(response.getResult().getResultSet())); - } catch (Throwable t) { - future.completeExceptionally(t); - origin.cancel(); - } - } else { - Issue[] issues = Issue.fromPb(response.getIssuesList()); - StatusCode code = StatusCode.fromProto(statusCode); - future.complete(Status.of(code, issues)); - origin.cancel(); + return new ProxyStream(origin) { + @Override + StatusIds.StatusCode readStatusCode(YdbTable.ExecuteScanQueryPartialResponse message) { + return message.getStatus(); } - }); + + @Override + List readIssues(YdbTable.ExecuteScanQueryPartialResponse message) { + return message.getIssuesList(); + } + + @Override + ResultSetReader readValue(YdbTable.ExecuteScanQueryPartialResponse message) { + return ProtoValueReaders.forResultSet(message.getResult().getResultSet()); + } + }; } private CompletableFuture commitTransactionInternal(String txId, CommitTxSettings settings) { @@ -1378,6 +1378,56 @@ public String toString() { return "Session{" + id + "}"; } + private abstract class ProxyStream implements GrpcReadStream { + private final GrpcReadStream origin; + private final CompletableFuture result = new CompletableFuture<>(); + + ProxyStream(GrpcReadStream origin) { + this.origin = origin; + } + + abstract StatusIds.StatusCode readStatusCode(R message); + abstract List readIssues(R message); + abstract T readValue(R message); + + private void onClose(Status status, Throwable th) { + if (th != null) { + updateSessionState(th, null, false); + result.completeExceptionally(th); + } + if (status != null) { + updateSessionState(null, status.getCode(), false); + result.complete(status); + } + } + + @Override + public CompletableFuture start(Observer observer) { + origin.start(message -> { + StatusIds.StatusCode statusCode = readStatusCode(message); + if (statusCode == StatusIds.StatusCode.SUCCESS) { + try { + observer.onNext(readValue(message)); + } catch (Throwable t) { + result.completeExceptionally(t); + origin.cancel(); + } + } else { + Issue[] issues = Issue.fromPb(readIssues(message)); + StatusCode code = StatusCode.fromProto(statusCode); + result.complete(Status.of(code, issues)); + origin.cancel(); + } + }).whenComplete(this::onClose); + return result; + } + + @Override + public void cancel() { + origin.cancel(); + } + } + class TableTransactionImpl extends YdbTransactionImpl implements TableTransaction { TableTransactionImpl(TxMode txMode, String txId) { From 64c939386204d8c2f58aa98eff4afbac3f436b72 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 28 Aug 2025 22:00:20 +0100 Subject: [PATCH 3/3] Added integration tests on session pool behaviour --- .../ydb/query/impl/GrpcTestInterceptor.java | 123 ++++++++++++++ .../tech/ydb/query/impl/QueryClientTest.java | 120 ++++++++++++++ .../integration/GrpcTestInterceptor.java | 123 ++++++++++++++ .../table/integration/TableClientTest.java | 154 ++++++++++++++++++ 4 files changed, 520 insertions(+) create mode 100644 query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java create mode 100644 query/src/test/java/tech/ydb/query/impl/QueryClientTest.java create mode 100644 table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java create mode 100644 table/src/test/java/tech/ydb/table/integration/TableClientTest.java diff --git a/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java b/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java new file mode 100644 index 000000000..50c6a75e9 --- /dev/null +++ b/query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java @@ -0,0 +1,123 @@ +package tech.ydb.query.impl; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; + +import javax.annotation.Nullable; + +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; + +/** + * + * @author Aleksandr Gorshenin + */ +public class GrpcTestInterceptor implements Consumer>, ClientInterceptor { + private final Queue nextStatus = new ConcurrentLinkedQueue<>(); + + public void reset() { + nextStatus.clear(); + } + + public void addNextStatus(Status status) { + nextStatus.add(status); + } + + @Override + public void accept(ManagedChannelBuilder t) { + t.intercept(this); + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ProxyClientCall<>(next, method, callOptions); + } + + private class ProxyClientCall extends ClientCall { + private final ClientCall delegate; + + private ProxyClientCall(Channel channel, MethodDescriptor method, + CallOptions callOptions) { + this.delegate = channel.newCall(method, callOptions); + } + + @Override + public void request(int numMessages) { + delegate.request(numMessages); + } + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + delegate.cancel(message, cause); + } + + @Override + public void halfClose() { + delegate.halfClose(); + } + + @Override + public void setMessageCompression(boolean enabled) { + delegate.setMessageCompression(enabled); + } + + @Override + public boolean isReady() { + return delegate.isReady(); + } + + @Override + public Attributes getAttributes() { + return delegate.getAttributes(); + } + + @Override + public void start(ClientCall.Listener listener, Metadata headers) { + delegate.start(new ProxyListener(listener), headers); + } + + @Override + public void sendMessage(ReqT message) { + delegate.sendMessage(message); + } + + private class ProxyListener extends ClientCall.Listener { + private final ClientCall.Listener delegate; + + public ProxyListener(ClientCall.Listener delegate) { + this.delegate = delegate; + } + + + @Override + public void onHeaders(Metadata headers) { + delegate.onHeaders(headers); + } + + @Override + public void onMessage(RespT message) { + delegate.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + Status next = nextStatus.poll(); + delegate.onClose(next != null ? next : status, trailers); + } + + @Override + public void onReady() { + delegate.onReady(); + } + } + } +} diff --git a/query/src/test/java/tech/ydb/query/impl/QueryClientTest.java b/query/src/test/java/tech/ydb/query/impl/QueryClientTest.java new file mode 100644 index 000000000..b15793b3e --- /dev/null +++ b/query/src/test/java/tech/ydb/query/impl/QueryClientTest.java @@ -0,0 +1,120 @@ +package tech.ydb.query.impl; + +import java.time.Duration; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QuerySession; +import tech.ydb.query.result.QueryInfo; +import tech.ydb.test.junit4.YdbHelperRule; + +/** + * + * @author Aleksandr Gorshenin + */ +public class QueryClientTest { + + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final GrpcTestInterceptor grpcInterceptor = new GrpcTestInterceptor(); + + private static GrpcTransport transport; + + private static QueryClient queryClient; + + @BeforeClass + public static void initTransport() { + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .addChannelInitializer(grpcInterceptor) + .build(); + } + + @AfterClass + public static void closeTransport() { + transport.close(); + } + + @Before + public void initTableClient() { + grpcInterceptor.reset(); + queryClient = QueryClient.newClient(transport).build(); + } + + @After + public void closeTableClient() { + queryClient.close(); + } + + private QuerySession getSession() { + return queryClient.createSession(Duration.ofSeconds(5)).join().getValue(); + } + + @Test + public void sessionReuseTest() { + QuerySession s1 = getSession(); + String id1 = s1.getId(); + s1.close(); + + QuerySession s2 = getSession(); + Assert.assertEquals(id1, s2.getId()); + + QuerySession s3 = getSession(); + Assert.assertNotEquals(id1, s3.getId()); + String id2 = s3.getId(); + + s2.close(); + s3.close(); + + QuerySession s4 = getSession(); + QuerySession s5 = getSession(); + + Assert.assertEquals(id2, s4.getId()); + Assert.assertEquals(id1, s5.getId()); + + s4.close(); + s5.close(); + } + + @Test + public void sessionExecuteQueryTest() { + QuerySession s1 = getSession(); + String id1 = s1.getId(); + + grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE); + + Result res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join(); + Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode()); + + res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join(); + Assert.assertEquals(StatusCode.SUCCESS, res.getStatus().getCode()); + + s1.close(); + + QuerySession s2 = getSession(); + Assert.assertNotEquals(id1, s2.getId()); + String id2 = s2.getId(); + + res = s2.createQuery("SELECT * FROM wrongTable", TxMode.NONE).execute().join(); + Assert.assertEquals(StatusCode.SCHEME_ERROR, res.getStatus().getCode()); + + s2.close(); + + try (QuerySession s3 = getSession()) { + Assert.assertEquals(id2, s3.getId()); + } + } +} diff --git a/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java b/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java new file mode 100644 index 000000000..468553fb9 --- /dev/null +++ b/table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java @@ -0,0 +1,123 @@ +package tech.ydb.table.integration; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; + +import javax.annotation.Nullable; + +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; + +/** + * + * @author Aleksandr Gorshenin + */ +public class GrpcTestInterceptor implements Consumer>, ClientInterceptor { + private final Queue nextStatus = new ConcurrentLinkedQueue<>(); + + public void reset() { + nextStatus.clear(); + } + + public void addNextStatus(Status status) { + nextStatus.add(status); + } + + @Override + public void accept(ManagedChannelBuilder t) { + t.intercept(this); + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ProxyClientCall<>(next, method, callOptions); + } + + private class ProxyClientCall extends ClientCall { + private final ClientCall delegate; + + private ProxyClientCall(Channel channel, MethodDescriptor method, + CallOptions callOptions) { + this.delegate = channel.newCall(method, callOptions); + } + + @Override + public void request(int numMessages) { + delegate.request(numMessages); + } + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + delegate.cancel(message, cause); + } + + @Override + public void halfClose() { + delegate.halfClose(); + } + + @Override + public void setMessageCompression(boolean enabled) { + delegate.setMessageCompression(enabled); + } + + @Override + public boolean isReady() { + return delegate.isReady(); + } + + @Override + public Attributes getAttributes() { + return delegate.getAttributes(); + } + + @Override + public void start(ClientCall.Listener listener, Metadata headers) { + delegate.start(new ProxyListener(listener), headers); + } + + @Override + public void sendMessage(ReqT message) { + delegate.sendMessage(message); + } + + private class ProxyListener extends ClientCall.Listener { + private final ClientCall.Listener delegate; + + public ProxyListener(ClientCall.Listener delegate) { + this.delegate = delegate; + } + + + @Override + public void onHeaders(Metadata headers) { + delegate.onHeaders(headers); + } + + @Override + public void onMessage(RespT message) { + delegate.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + Status next = nextStatus.poll(); + delegate.onClose(next != null ? next : status, trailers); + } + + @Override + public void onReady() { + delegate.onReady(); + } + } + } +} diff --git a/table/src/test/java/tech/ydb/table/integration/TableClientTest.java b/table/src/test/java/tech/ydb/table/integration/TableClientTest.java new file mode 100644 index 000000000..ee96039d0 --- /dev/null +++ b/table/src/test/java/tech/ydb/table/integration/TableClientTest.java @@ -0,0 +1,154 @@ +package tech.ydb.table.integration; + +import java.time.Duration; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.table.Session; +import tech.ydb.table.TableClient; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.settings.ExecuteScanQuerySettings; +import tech.ydb.table.transaction.TxControl; +import tech.ydb.test.junit4.YdbHelperRule; + +/** + * + * @author Aleksandr Gorshenin + */ +public class TableClientTest { + + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final GrpcTestInterceptor grpcInterceptor = new GrpcTestInterceptor(); + + private static GrpcTransport transport; + + private static TableClient tableClient; + + @BeforeClass + public static void initTransport() { + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .addChannelInitializer(grpcInterceptor) + .build(); + } + + @AfterClass + public static void closeTransport() { + transport.close(); + } + + @Before + public void initTableClient() { + grpcInterceptor.reset(); + tableClient = TableClient.newClient(transport).build(); + } + + @After + public void closeTableClient() { + tableClient.close(); + } + + private Session getSession() { + return tableClient.createSession(Duration.ofSeconds(5)).join().getValue(); + } + + @Test + public void sessionReuseTest() { + Session s1 = getSession(); + String id1 = s1.getId(); + s1.close(); + + Session s2 = getSession(); + Assert.assertEquals(id1, s2.getId()); + + Session s3 = getSession(); + Assert.assertNotEquals(id1, s3.getId()); + String id2 = s3.getId(); + + s2.close(); + s3.close(); + + Session s4 = getSession(); + Session s5 = getSession(); + + Assert.assertEquals(id2, s4.getId()); + Assert.assertEquals(id1, s5.getId()); + + s4.close(); + s5.close(); + } + + @Test + public void sessionExecuteDataQueryTest() { + Session s1 = getSession(); + String id1 = s1.getId(); + + grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE); + + Result res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join(); + Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode()); + + res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join(); + Assert.assertEquals(StatusCode.SUCCESS, res.getStatus().getCode()); + + s1.close(); + + Session s2 = getSession(); + Assert.assertNotEquals(id1, s2.getId()); + String id2 = s2.getId(); + + res = s2.executeDataQuery("SELECT * FROM wrongTable", TxControl.snapshotRo()).join(); + Assert.assertEquals(StatusCode.SCHEME_ERROR, res.getStatus().getCode()); + + s2.close(); + + try (Session s3 = getSession()) { + Assert.assertEquals(id2, s3.getId()); + } + } + + @Test + public void sessionExecuteScanQueryTest() { + ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder().build(); + + Session s1 = getSession(); + String id1 = s1.getId(); + + grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE); + + Status res = s1.executeScanQuery("SELECT 1 + 2", Params.empty(), settings).start(rsr -> {}).join(); + Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getCode()); + + res = s1.executeScanQuery("SELECT 1 + 2", Params.empty(), settings).start(rsr -> {}).join(); + Assert.assertEquals(StatusCode.SUCCESS, res.getCode()); + + s1.close(); + + Session s2 = getSession(); + Assert.assertNotEquals(id1, s2.getId()); + String id2 = s2.getId(); + + res = s2.executeScanQuery("SELECT * FROM wrongTable", Params.empty(), settings).start(rsr -> {}).join(); + Assert.assertEquals(StatusCode.SCHEME_ERROR, res.getCode()); + + s2.close(); + + try (Session s3 = getSession()) { + Assert.assertEquals(id2, s3.getId()); + } + } +}