From fe3588bcd15d903cb3c7cd8c1b58e0d380996625 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 29 Sep 2025 09:54:06 +0100 Subject: [PATCH 1/4] Added option upsertTo/insertTo/deleteFrom --- pom.xml | 2 +- src/main/java/tech/ydb/app/CdcMsgParser.java | 15 +++--- src/main/java/tech/ydb/app/XmlConfig.java | 50 +++++++++++++++----- src/main/java/tech/ydb/app/YdbService.java | 2 + 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index b55a9c0..c8c3c27 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 17 3.2.12 - 2.3.5 + 2.3.20 tech.ydb.app.Application diff --git a/src/main/java/tech/ydb/app/CdcMsgParser.java b/src/main/java/tech/ydb/app/CdcMsgParser.java index 01f8adf..5a51e69 100644 --- a/src/main/java/tech/ydb/app/CdcMsgParser.java +++ b/src/main/java/tech/ydb/app/CdcMsgParser.java @@ -97,6 +97,7 @@ public Parser(YdbService ydb, XmlConfig.Cdc cdc, Map xm this.xmlQueries = xmlQueries; } + @SuppressWarnings("null") public Result> parse() { String changefeed = ydb.expandPath(cdc.getChangefeed()); @@ -129,13 +130,13 @@ public Result> parse() { private Result> findUpdateQuery(TableDescription source) { if (cdc.getQuery() != null && !cdc.getQuery().trim().isEmpty()) { - return validate(source, cdc.getQuery().trim(), false); + return validate(source, new XmlConfig.Query(cdc.getQuery().trim()), false); } String queryId = cdc.getUpdateQueryId(); if (queryId != null && xmlQueries.containsKey(queryId)) { XmlConfig.Query query = xmlQueries.get(queryId); if (query.getText() != null && !query.getText().trim().isEmpty()) { - return validate(source, query.getText().trim(), false); + return validate(source, query, false); } } @@ -147,15 +148,17 @@ private Result> findDeleteQuery(TableDescription source) { if (queryId != null && xmlQueries.containsKey(queryId)) { XmlConfig.Query query = xmlQueries.get(queryId); if (query.getText() != null && !query.getText().trim().isEmpty()) { - return validate(source, query.getText().trim(), true); + return validate(source, query, true); } } return Result.success(YqlQuery.skipMessages("erase", "deleteQueryId", source.getPrimaryKeys(), cdc)); } - private Result> validate(TableDescription source, String query, boolean keysOnly) { - Result parsed = ydb.parseQuery(query); + @SuppressWarnings("null") + private Result> validate(TableDescription source, XmlConfig.Query query, boolean keysOnly) { + String queryText = query.getText().trim(); + Result parsed = ydb.parseQuery(queryText); if (!parsed.isSuccess()) { logger.error("Can't parse query for consumer {}, got status {}", cdc.getConsumer(), parsed.getStatus()); return parsed.map(null); @@ -217,7 +220,7 @@ private Result> validate(TableDescription source, String quer } } - return Result.success(YqlQuery.executeYql(query, source.getPrimaryKeys(), paramName, structType, cdc)); + return Result.success(YqlQuery.executeYql(queryText, source.getPrimaryKeys(), paramName, structType, cdc)); } } diff --git a/src/main/java/tech/ydb/app/XmlConfig.java b/src/main/java/tech/ydb/app/XmlConfig.java index fad8d74..35f18fb 100644 --- a/src/main/java/tech/ydb/app/XmlConfig.java +++ b/src/main/java/tech/ydb/app/XmlConfig.java @@ -36,15 +36,29 @@ public static class Query { @XmlAttribute(name = "id", required = true) private String id; -// @XmlAttribute(name = "upsertTo") -// private String upsertTo; -// -// @XmlAttribute(name = "deleteFrom") -// private String deleteFrom; + @XmlAttribute(name = "upsertTo") + private String upsertTo; + + @XmlAttribute(name = "insertTo") + private String insertTo; + + @XmlAttribute(name = "deleteFrom") + private String deleteFrom; + + @XmlAttribute(name = "batchSize") + private Integer batchSize; @XmlValue private String text; + public Query() { + } + + public Query(String query) { + this.id = "inplacement"; + this.text = query; + } + public String getId() { return this.id; } @@ -53,13 +67,24 @@ public String getText() { return this.text; } -// public String getUpsertTo() { -// return this.upsertTo; -// } -// -// public String getDeleteFrom() { -// return this.deleteFrom; -// } + public String getUpsertTo() { + return this.upsertTo; + } + + public String getInsertTo() { + return this.insertTo; + } + + public String getDeleteFrom() { + return this.deleteFrom; + } + + public int getBatchSize() { + if (batchSize == null) { + return DEFAULT_BATCH_SIZE; + } + return batchSize; + } } public static class Cdc { @@ -80,6 +105,7 @@ public static class Cdc { private String updateQueryId; @XmlAttribute(name = "deleteQueryId") private String deleteQueryId; + @XmlValue private String query; diff --git a/src/main/java/tech/ydb/app/YdbService.java b/src/main/java/tech/ydb/app/YdbService.java index e22b817..6eef359 100644 --- a/src/main/java/tech/ydb/app/YdbService.java +++ b/src/main/java/tech/ydb/app/YdbService.java @@ -122,6 +122,7 @@ public String expandPath(String name) { return sb.toString(); } + @SuppressWarnings("null") public Result parseQuery(String query) { Result session = tableClient.createSession(Duration.ofSeconds(5)).join(); if (!session.isSuccess()) { @@ -133,6 +134,7 @@ public Result parseQuery(String query) { } } + @SuppressWarnings("null") public Result describeTable(String tablePath) { Result session = tableClient.createSession(Duration.ofSeconds(5)).join(); if (!session.isSuccess()) { From cc63832b5f52547c2f793fa58a87d8cdba9f67f0 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 29 Sep 2025 10:36:02 +0100 Subject: [PATCH 2/4] Switched to QueryClient --- pom.xml | 2 +- src/main/java/tech/ydb/app/Application.java | 6 +++++ src/main/java/tech/ydb/app/XmlConfig.java | 16 ++++++------ src/main/java/tech/ydb/app/YdbService.java | 27 ++++++++++++++------- src/main/java/tech/ydb/app/YqlQuery.java | 2 +- src/main/java/tech/ydb/app/YqlWriter.java | 4 +++ 6 files changed, 38 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index c8c3c27..f6480d7 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ tech.ydb - ydb-sdk-table + ydb-sdk-query tech.ydb diff --git a/src/main/java/tech/ydb/app/Application.java b/src/main/java/tech/ydb/app/Application.java index 8a58b2c..c25fcb1 100644 --- a/src/main/java/tech/ydb/app/Application.java +++ b/src/main/java/tech/ydb/app/Application.java @@ -70,6 +70,12 @@ public void run(String... args) { warnings.add("No reader configs found!!"); } + int sessionPoolSize = 0; + for (CdcReader reader: readers) { + sessionPoolSize += reader.getWriter().getThreadsCount(); + } + ydb.updatePoolSize(Math.min(sessionPoolSize, 50)); + for (CdcReader reader: readers) { reader.start(); } diff --git a/src/main/java/tech/ydb/app/XmlConfig.java b/src/main/java/tech/ydb/app/XmlConfig.java index 35f18fb..99d1ec7 100644 --- a/src/main/java/tech/ydb/app/XmlConfig.java +++ b/src/main/java/tech/ydb/app/XmlConfig.java @@ -45,8 +45,8 @@ public static class Query { @XmlAttribute(name = "deleteFrom") private String deleteFrom; - @XmlAttribute(name = "batchSize") - private Integer batchSize; +// @XmlAttribute(name = "batchSize") +// private Integer batchSize; @XmlValue private String text; @@ -79,12 +79,12 @@ public String getDeleteFrom() { return this.deleteFrom; } - public int getBatchSize() { - if (batchSize == null) { - return DEFAULT_BATCH_SIZE; - } - return batchSize; - } +// public int getBatchSize() { +// if (batchSize == null) { +// return DEFAULT_BATCH_SIZE; +// } +// return batchSize; +// } } public static class Cdc { diff --git a/src/main/java/tech/ydb/app/YdbService.java b/src/main/java/tech/ydb/app/YdbService.java index 6eef359..812fed6 100644 --- a/src/main/java/tech/ydb/app/YdbService.java +++ b/src/main/java/tech/ydb/app/YdbService.java @@ -18,18 +18,20 @@ import tech.ydb.auth.TokenAuthProvider; import tech.ydb.auth.iam.CloudAuthHelper; +import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.auth.StaticCredentials; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.grpc.GrpcTransportBuilder; +import tech.ydb.query.QuerySession; +import tech.ydb.query.impl.QueryClientImpl; +import tech.ydb.query.settings.ExecuteQuerySettings; import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.description.TableDescription; import tech.ydb.table.query.DataQuery; import tech.ydb.table.query.Params; -import tech.ydb.table.settings.ExecuteDataQuerySettings; -import tech.ydb.table.transaction.TxControl; import tech.ydb.topic.TopicClient; import tech.ydb.topic.read.AsyncReader; import tech.ydb.topic.settings.ReadEventHandlersSettings; @@ -54,6 +56,7 @@ public class YdbService { private final GrpcTransport transport; private final TableClient tableClient; + private final QueryClientImpl queryClient; private final TopicClient topicClient; public YdbService(Environment env) { @@ -93,11 +96,17 @@ public YdbService(Environment env) { this.transport = builder.build(); this.tableClient = TableClient.newClient(transport).build(); + this.queryClient = QueryClientImpl.newClient(transport).build(); this.topicClient = TopicClient.newClient(transport) - .setCompressionExecutor(Runnable::run) // Prevent OOM + .setCompressionExecutor(Runnable::run) .build(); } + public void updatePoolSize(int maxSize) { + logger.error("set session pool max size {}", maxSize); + queryClient.updatePoolMaxSize(maxSize); + } + @PreDestroy public void close() { this.topicClient.close(); @@ -146,18 +155,18 @@ public Result describeTable(String tablePath) { } } - public Status executeQuery(String query, Params params, int timeoutSeconds) { - Result session = tableClient.createSession(Duration.ofSeconds(5)).join(); + public Status executeYqlQuery(String query, Params params, int timeoutSeconds) { + Result session = queryClient.createSession(Duration.ofSeconds(5)).join(); if (!session.isSuccess()) { return session.getStatus(); } - try (Session s = session.getValue()) { - ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings(); + try (QuerySession s = session.getValue()) { + ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder(); if (timeoutSeconds > 0) { - settings.setTimeout(Duration.ofSeconds(timeoutSeconds)); + settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds)); } - return s.executeDataQuery(query, TxControl.serializableRw(), params, settings).join().getStatus(); + return s.createQuery(query, TxMode.NONE, params, settings.build()).execute().join().getStatus(); } } diff --git a/src/main/java/tech/ydb/app/YqlQuery.java b/src/main/java/tech/ydb/app/YqlQuery.java index 5ccf52b..b056f60 100644 --- a/src/main/java/tech/ydb/app/YqlQuery.java +++ b/src/main/java/tech/ydb/app/YqlQuery.java @@ -185,7 +185,7 @@ public static Supplier executeYql(String query, List keys, Str @Override public Status execute(YdbService ydb) { Params prm = Params.of(name, ListType.of(type).newValue(batch)); - return ydb.executeQuery(query, prm, timeout); + return ydb.executeYqlQuery(query, prm, timeout); } }; } diff --git a/src/main/java/tech/ydb/app/YqlWriter.java b/src/main/java/tech/ydb/app/YqlWriter.java index e34ae84..3e0b331 100644 --- a/src/main/java/tech/ydb/app/YqlWriter.java +++ b/src/main/java/tech/ydb/app/YqlWriter.java @@ -56,6 +56,10 @@ public YqlWriter(YdbService ydb, Supplier parser, XmlConfig.Cdc co } } + public int getThreadsCount() { + return writers.size(); + } + public Status getLastStatus() { for (int idx = 0; idx < writers.size(); idx++) { Status last = writers.get(idx).lastStatus; From 1a27c308d7eb177bbd620587e14bd34e2204009e Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 29 Sep 2025 11:42:07 +0100 Subject: [PATCH 3/4] Added YqlQuery implementation of updateTo/insertTo/updateOn/deleteFrom --- src/main/java/tech/ydb/app/CdcMsgParser.java | 21 ++++++- src/main/java/tech/ydb/app/XmlConfig.java | 6 ++ src/main/java/tech/ydb/app/YdbService.java | 19 +++++++ src/main/java/tech/ydb/app/YqlQuery.java | 60 +++++++++++++++++++- 4 files changed, 103 insertions(+), 3 deletions(-) diff --git a/src/main/java/tech/ydb/app/CdcMsgParser.java b/src/main/java/tech/ydb/app/CdcMsgParser.java index 5a51e69..9014772 100644 --- a/src/main/java/tech/ydb/app/CdcMsgParser.java +++ b/src/main/java/tech/ydb/app/CdcMsgParser.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -220,7 +221,25 @@ private Result> validate(TableDescription source, XmlConfig.Q } } - return Result.success(YqlQuery.executeYql(queryText, source.getPrimaryKeys(), paramName, structType, cdc)); + List keys = source.getPrimaryKeys(); + if (query.getUpsertTo() != null && !query.getUpsertTo().trim().isEmpty()) { + String execute = "UPSERT INTO `" + query.getUpsertTo().trim() + "` "; + return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); + } + if (query.getDeleteFrom() != null && !query.getDeleteFrom().trim().isEmpty()) { + String execute = "DELETE FROM `" + query.getDeleteFrom().trim() + "` ON "; + return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); + } + if (query.getUpdateOn() != null && !query.getUpdateOn().trim().isEmpty()) { + String execute = "UPDATE `" + query.getUpdateOn().trim() + "` ON "; + return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); + } + if (query.getInsertTo() != null && !query.getInsertTo().trim().isEmpty()) { + String execute = "INSERT INTO `" + query.getInsertTo().trim() + "` "; + return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); + } + + return Result.success(YqlQuery.executeYql(queryText, keys, paramName, structType, cdc)); } } diff --git a/src/main/java/tech/ydb/app/XmlConfig.java b/src/main/java/tech/ydb/app/XmlConfig.java index 99d1ec7..286972a 100644 --- a/src/main/java/tech/ydb/app/XmlConfig.java +++ b/src/main/java/tech/ydb/app/XmlConfig.java @@ -42,6 +42,9 @@ public static class Query { @XmlAttribute(name = "insertTo") private String insertTo; + @XmlAttribute(name = "updateOn") + private String updateOn; + @XmlAttribute(name = "deleteFrom") private String deleteFrom; @@ -79,6 +82,9 @@ public String getDeleteFrom() { return this.deleteFrom; } + public String getUpdateOn() { + return this.updateOn; + } // public int getBatchSize() { // if (batchSize == null) { // return DEFAULT_BATCH_SIZE; diff --git a/src/main/java/tech/ydb/app/YdbService.java b/src/main/java/tech/ydb/app/YdbService.java index 812fed6..ac8c78c 100644 --- a/src/main/java/tech/ydb/app/YdbService.java +++ b/src/main/java/tech/ydb/app/YdbService.java @@ -25,8 +25,10 @@ import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.grpc.GrpcTransportBuilder; import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryStream; import tech.ydb.query.impl.QueryClientImpl; import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.tools.QueryReader; import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.description.TableDescription; @@ -170,6 +172,23 @@ public Status executeYqlQuery(String query, Params params, int timeoutSeconds) { } } + @SuppressWarnings("null") + public Result readYqlQuery(String query, Params params, int timeoutSeconds) { + Result session = queryClient.createSession(Duration.ofSeconds(5)).join(); + if (!session.isSuccess()) { + return session.map(null); + } + + try (QuerySession s = session.getValue()) { + ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder(); + if (timeoutSeconds > 0) { + settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds)); + } + QueryStream stream = s.createQuery(query, TxMode.SNAPSHOT_RO, params, settings.build()); + return QueryReader.readFrom(stream).join(); + } + } + public AsyncReader createReader(ReaderSettings rs, ReadEventHandlersSettings settings) { return topicClient.createAsyncReader(rs, settings); } diff --git a/src/main/java/tech/ydb/app/YqlQuery.java b/src/main/java/tech/ydb/app/YqlQuery.java index b056f60..b147311 100644 --- a/src/main/java/tech/ydb/app/YqlQuery.java +++ b/src/main/java/tech/ydb/app/YqlQuery.java @@ -15,8 +15,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.core.Result; import tech.ydb.core.Status; +import tech.ydb.query.tools.QueryReader; import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.values.DecimalType; import tech.ydb.table.values.ListType; import tech.ydb.table.values.NullValue; @@ -143,7 +146,8 @@ private Value readValue(JsonNode node, Type type) throws IOException { case Date: return PrimitiveValue.newDate(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDate()); case Datetime: - return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDateTime()); + return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC) + .toLocalDateTime()); case Timestamp: return PrimitiveValue.newTimestamp(Instant.parse(node.asText())); case Interval: @@ -178,7 +182,8 @@ public Status execute(YdbService ydb) { }; } - public static Supplier executeYql(String query, List keys, String name, StructType type, XmlConfig.Cdc config) { + public static Supplier executeYql(String query, List keys, String name, StructType type, + XmlConfig.Cdc config) { final int batchSize = config.getBatchSize(); final int timeout = config.getTimeoutSeconds(); return () -> new YqlQuery(type, keys, batchSize) { @@ -189,4 +194,55 @@ public Status execute(YdbService ydb) { } }; } + + public static Supplier readAndExecuteYql(String selectQuery, String query, List keys, + String name, StructType type, XmlConfig.Cdc config) { + final int batchSize = config.getBatchSize(); + final int timeout = config.getTimeoutSeconds(); + return () -> new YqlQuery(type, keys, batchSize) { + @Override + public Status execute(YdbService ydb) { + Params selectPrms = Params.of(name, ListType.of(type).newValue(batch)); + Result res = ydb.readYqlQuery(selectQuery, selectPrms, timeout); + if (!res.isSuccess()) { + return res.getStatus(); + } + QueryReader reader = res.getValue(); + if (reader.getResultSetCount() < 1) { + return Status.SUCCESS; + } + + ResultSetReader rs = reader.getResultSet(0); + + StructType type = resultSetToType(rs); + Value values = ListType.of(type).newValue(resultSetToValues(rs, type)); + Params executePrms = Params.of("$input", values); + String executeQuery = "DECLARE $input AS " + type + "; " + query + " SELECT FROM AS_TABLE($input);"; + return ydb.executeYqlQuery(executeQuery, executePrms, timeout); + } + }; + } + + private static StructType resultSetToType(ResultSetReader rs) { + String[] names = new String[rs.getColumnCount()]; + Type[] types = new Type[rs.getColumnCount()]; + for (int idx = 0; idx < rs.getColumnCount(); idx += 1) { + names[idx] = rs.getColumnName(idx); + types[idx] = rs.getColumnType(idx); + } + + return StructType.ofOwn(names, types); + } + + private static List> resultSetToValues(ResultSetReader rs, StructType type) { + List> values = new ArrayList<>(); + while (rs.next()) { + Value[] row = new Value[type.getMembersCount()]; + for (int idx = 0; idx < type.getMembersCount(); idx += 1) { + row[idx] = rs.getColumn(type.getMemberIndex(type.getMemberName(idx))).getValue(); + } + values.add(type.newValueUnsafe(row)); + } + return values; + } } From b85a86d59e93d42b67277b827ac1efd0d4c10fed Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 29 Sep 2025 12:56:31 +0100 Subject: [PATCH 4/4] Fixed errors --- src/main/java/tech/ydb/app/Application.java | 2 +- src/main/java/tech/ydb/app/CdcMsgParser.java | 45 ++++++++++++-------- src/main/java/tech/ydb/app/XmlConfig.java | 29 ++++--------- src/main/java/tech/ydb/app/YqlQuery.java | 6 +-- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/src/main/java/tech/ydb/app/Application.java b/src/main/java/tech/ydb/app/Application.java index c25fcb1..305b79a 100644 --- a/src/main/java/tech/ydb/app/Application.java +++ b/src/main/java/tech/ydb/app/Application.java @@ -74,7 +74,7 @@ public void run(String... args) { for (CdcReader reader: readers) { sessionPoolSize += reader.getWriter().getThreadsCount(); } - ydb.updatePoolSize(Math.min(sessionPoolSize, 50)); + ydb.updatePoolSize(Math.max(sessionPoolSize, 50)); for (CdcReader reader: readers) { reader.start(); diff --git a/src/main/java/tech/ydb/app/CdcMsgParser.java b/src/main/java/tech/ydb/app/CdcMsgParser.java index 9014772..02ab7c8 100644 --- a/src/main/java/tech/ydb/app/CdcMsgParser.java +++ b/src/main/java/tech/ydb/app/CdcMsgParser.java @@ -158,8 +158,8 @@ private Result> findDeleteQuery(TableDescription source) { @SuppressWarnings("null") private Result> validate(TableDescription source, XmlConfig.Query query, boolean keysOnly) { - String queryText = query.getText().trim(); - Result parsed = ydb.parseQuery(queryText); + String text = query.getText().trim(); + Result parsed = ydb.parseQuery(text); if (!parsed.isSuccess()) { logger.error("Can't parse query for consumer {}, got status {}", cdc.getConsumer(), parsed.getStatus()); return parsed.map(null); @@ -222,24 +222,33 @@ private Result> validate(TableDescription source, XmlConfig.Q } List keys = source.getPrimaryKeys(); - if (query.getUpsertTo() != null && !query.getUpsertTo().trim().isEmpty()) { - String execute = "UPSERT INTO `" + query.getUpsertTo().trim() + "` "; - return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); - } - if (query.getDeleteFrom() != null && !query.getDeleteFrom().trim().isEmpty()) { - String execute = "DELETE FROM `" + query.getDeleteFrom().trim() + "` ON "; - return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); - } - if (query.getUpdateOn() != null && !query.getUpdateOn().trim().isEmpty()) { - String execute = "UPDATE `" + query.getUpdateOn().trim() + "` ON "; - return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); - } - if (query.getInsertTo() != null && !query.getInsertTo().trim().isEmpty()) { - String execute = "INSERT INTO `" + query.getInsertTo().trim() + "` "; - return Result.success(YqlQuery.readAndExecuteYql(queryText, execute, keys, paramName, structType, cdc)); + if (query.getActionTable() != null && !query.getActionTable().trim().isEmpty()) { + String actionTable = query.getActionTable().trim(); + String action = query.getActionMode(); + if ("upsertInto".equalsIgnoreCase(action)) { + String execute = "UPSERT INTO `" + actionTable + "` "; + return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc)); + } + if ("deleteFrom".equalsIgnoreCase(action)) { + String execute = "DELETE FROM `" + actionTable + "` ON "; + return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc)); + } + if ("updateOn".equalsIgnoreCase(action)) { + String execute = "UPDATE `" + actionTable + "` ON "; + return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc)); + } + if ("insertInto".equalsIgnoreCase(action)) { + String execute = "INSERT INTO `" + actionTable + "` "; + return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc)); + } + + return Result.fail(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, Issue.of( + "Uknown actionName " + action + ", expected upsertInto/deleteFrom/updateOn/insertInto", + Issue.Severity.ERROR + ))); } - return Result.success(YqlQuery.executeYql(queryText, keys, paramName, structType, cdc)); + return Result.success(YqlQuery.executeYql(text, keys, paramName, structType, cdc)); } } diff --git a/src/main/java/tech/ydb/app/XmlConfig.java b/src/main/java/tech/ydb/app/XmlConfig.java index 286972a..dc8b992 100644 --- a/src/main/java/tech/ydb/app/XmlConfig.java +++ b/src/main/java/tech/ydb/app/XmlConfig.java @@ -36,17 +36,11 @@ public static class Query { @XmlAttribute(name = "id", required = true) private String id; - @XmlAttribute(name = "upsertTo") - private String upsertTo; + @XmlAttribute(name = "actionMode") + private String actionMode; - @XmlAttribute(name = "insertTo") - private String insertTo; - - @XmlAttribute(name = "updateOn") - private String updateOn; - - @XmlAttribute(name = "deleteFrom") - private String deleteFrom; + @XmlAttribute(name = "actionTable") + private String actionTable; // @XmlAttribute(name = "batchSize") // private Integer batchSize; @@ -70,21 +64,14 @@ public String getText() { return this.text; } - public String getUpsertTo() { - return this.upsertTo; + public String getActionMode() { + return this.actionMode; } - public String getInsertTo() { - return this.insertTo; + public String getActionTable() { + return this.actionTable; } - public String getDeleteFrom() { - return this.deleteFrom; - } - - public String getUpdateOn() { - return this.updateOn; - } // public int getBatchSize() { // if (batchSize == null) { // return DEFAULT_BATCH_SIZE; diff --git a/src/main/java/tech/ydb/app/YqlQuery.java b/src/main/java/tech/ydb/app/YqlQuery.java index b147311..a197f95 100644 --- a/src/main/java/tech/ydb/app/YqlQuery.java +++ b/src/main/java/tech/ydb/app/YqlQuery.java @@ -216,8 +216,8 @@ public Status execute(YdbService ydb) { StructType type = resultSetToType(rs); Value values = ListType.of(type).newValue(resultSetToValues(rs, type)); - Params executePrms = Params.of("$input", values); - String executeQuery = "DECLARE $input AS " + type + "; " + query + " SELECT FROM AS_TABLE($input);"; + Params executePrms = Params.of("$b", values); + String executeQuery = "DECLARE $b AS List<" + type + ">; " + query + " SELECT * FROM AS_TABLE($b);"; return ydb.executeYqlQuery(executeQuery, executePrms, timeout); } }; @@ -239,7 +239,7 @@ private static List> resultSetToValues(ResultSetReader rs, StructType t while (rs.next()) { Value[] row = new Value[type.getMembersCount()]; for (int idx = 0; idx < type.getMembersCount(); idx += 1) { - row[idx] = rs.getColumn(type.getMemberIndex(type.getMemberName(idx))).getValue(); + row[idx] = rs.getColumn(type.getMemberName(idx)).getValue(); } values.add(type.newValueUnsafe(row)); }