diff --git a/pom.xml b/pom.xml
index b55a9c0..f6480d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
17
3.2.12
- 2.3.5
+ 2.3.20
tech.ydb.app.Application
@@ -23,7 +23,7 @@
tech.ydb
- ydb-sdk-table
+ ydb-sdk-query
tech.ydb
diff --git a/src/main/java/tech/ydb/app/Application.java b/src/main/java/tech/ydb/app/Application.java
index 8a58b2c..305b79a 100644
--- a/src/main/java/tech/ydb/app/Application.java
+++ b/src/main/java/tech/ydb/app/Application.java
@@ -70,6 +70,12 @@ public void run(String... args) {
warnings.add("No reader configs found!!");
}
+ int sessionPoolSize = 0;
+ for (CdcReader reader: readers) {
+ sessionPoolSize += reader.getWriter().getThreadsCount();
+ }
+ ydb.updatePoolSize(Math.max(sessionPoolSize, 50));
+
for (CdcReader reader: readers) {
reader.start();
}
diff --git a/src/main/java/tech/ydb/app/CdcMsgParser.java b/src/main/java/tech/ydb/app/CdcMsgParser.java
index 01f8adf..02ab7c8 100644
--- a/src/main/java/tech/ydb/app/CdcMsgParser.java
+++ b/src/main/java/tech/ydb/app/CdcMsgParser.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@@ -97,6 +98,7 @@ public Parser(YdbService ydb, XmlConfig.Cdc cdc, Map xm
this.xmlQueries = xmlQueries;
}
+ @SuppressWarnings("null")
public Result> parse() {
String changefeed = ydb.expandPath(cdc.getChangefeed());
@@ -129,13 +131,13 @@ public Result> parse() {
private Result> findUpdateQuery(TableDescription source) {
if (cdc.getQuery() != null && !cdc.getQuery().trim().isEmpty()) {
- return validate(source, cdc.getQuery().trim(), false);
+ return validate(source, new XmlConfig.Query(cdc.getQuery().trim()), false);
}
String queryId = cdc.getUpdateQueryId();
if (queryId != null && xmlQueries.containsKey(queryId)) {
XmlConfig.Query query = xmlQueries.get(queryId);
if (query.getText() != null && !query.getText().trim().isEmpty()) {
- return validate(source, query.getText().trim(), false);
+ return validate(source, query, false);
}
}
@@ -147,15 +149,17 @@ private Result> findDeleteQuery(TableDescription source) {
if (queryId != null && xmlQueries.containsKey(queryId)) {
XmlConfig.Query query = xmlQueries.get(queryId);
if (query.getText() != null && !query.getText().trim().isEmpty()) {
- return validate(source, query.getText().trim(), true);
+ return validate(source, query, true);
}
}
return Result.success(YqlQuery.skipMessages("erase", "deleteQueryId", source.getPrimaryKeys(), cdc));
}
- private Result> validate(TableDescription source, String query, boolean keysOnly) {
- Result parsed = ydb.parseQuery(query);
+ @SuppressWarnings("null")
+ private Result> validate(TableDescription source, XmlConfig.Query query, boolean keysOnly) {
+ String text = query.getText().trim();
+ Result parsed = ydb.parseQuery(text);
if (!parsed.isSuccess()) {
logger.error("Can't parse query for consumer {}, got status {}", cdc.getConsumer(), parsed.getStatus());
return parsed.map(null);
@@ -217,7 +221,34 @@ private Result> validate(TableDescription source, String quer
}
}
- return Result.success(YqlQuery.executeYql(query, source.getPrimaryKeys(), paramName, structType, cdc));
+ List keys = source.getPrimaryKeys();
+ if (query.getActionTable() != null && !query.getActionTable().trim().isEmpty()) {
+ String actionTable = query.getActionTable().trim();
+ String action = query.getActionMode();
+ if ("upsertInto".equalsIgnoreCase(action)) {
+ String execute = "UPSERT INTO `" + actionTable + "` ";
+ return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
+ }
+ if ("deleteFrom".equalsIgnoreCase(action)) {
+ String execute = "DELETE FROM `" + actionTable + "` ON ";
+ return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
+ }
+ if ("updateOn".equalsIgnoreCase(action)) {
+ String execute = "UPDATE `" + actionTable + "` ON ";
+ return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
+ }
+ if ("insertInto".equalsIgnoreCase(action)) {
+ String execute = "INSERT INTO `" + actionTable + "` ";
+ return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
+ }
+
+ return Result.fail(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, Issue.of(
+ "Uknown actionName " + action + ", expected upsertInto/deleteFrom/updateOn/insertInto",
+ Issue.Severity.ERROR
+ )));
+ }
+
+ return Result.success(YqlQuery.executeYql(text, keys, paramName, structType, cdc));
}
}
diff --git a/src/main/java/tech/ydb/app/XmlConfig.java b/src/main/java/tech/ydb/app/XmlConfig.java
index fad8d74..dc8b992 100644
--- a/src/main/java/tech/ydb/app/XmlConfig.java
+++ b/src/main/java/tech/ydb/app/XmlConfig.java
@@ -36,15 +36,26 @@ public static class Query {
@XmlAttribute(name = "id", required = true)
private String id;
-// @XmlAttribute(name = "upsertTo")
-// private String upsertTo;
-//
-// @XmlAttribute(name = "deleteFrom")
-// private String deleteFrom;
+ @XmlAttribute(name = "actionMode")
+ private String actionMode;
+
+ @XmlAttribute(name = "actionTable")
+ private String actionTable;
+
+// @XmlAttribute(name = "batchSize")
+// private Integer batchSize;
@XmlValue
private String text;
+ public Query() {
+ }
+
+ public Query(String query) {
+ this.id = "inplacement";
+ this.text = query;
+ }
+
public String getId() {
return this.id;
}
@@ -53,12 +64,19 @@ public String getText() {
return this.text;
}
-// public String getUpsertTo() {
-// return this.upsertTo;
-// }
-//
-// public String getDeleteFrom() {
-// return this.deleteFrom;
+ public String getActionMode() {
+ return this.actionMode;
+ }
+
+ public String getActionTable() {
+ return this.actionTable;
+ }
+
+// public int getBatchSize() {
+// if (batchSize == null) {
+// return DEFAULT_BATCH_SIZE;
+// }
+// return batchSize;
// }
}
@@ -80,6 +98,7 @@ public static class Cdc {
private String updateQueryId;
@XmlAttribute(name = "deleteQueryId")
private String deleteQueryId;
+
@XmlValue
private String query;
diff --git a/src/main/java/tech/ydb/app/YdbService.java b/src/main/java/tech/ydb/app/YdbService.java
index e22b817..ac8c78c 100644
--- a/src/main/java/tech/ydb/app/YdbService.java
+++ b/src/main/java/tech/ydb/app/YdbService.java
@@ -18,18 +18,22 @@
import tech.ydb.auth.TokenAuthProvider;
import tech.ydb.auth.iam.CloudAuthHelper;
+import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.auth.StaticCredentials;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.grpc.GrpcTransportBuilder;
+import tech.ydb.query.QuerySession;
+import tech.ydb.query.QueryStream;
+import tech.ydb.query.impl.QueryClientImpl;
+import tech.ydb.query.settings.ExecuteQuerySettings;
+import tech.ydb.query.tools.QueryReader;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.DataQuery;
import tech.ydb.table.query.Params;
-import tech.ydb.table.settings.ExecuteDataQuerySettings;
-import tech.ydb.table.transaction.TxControl;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
@@ -54,6 +58,7 @@ public class YdbService {
private final GrpcTransport transport;
private final TableClient tableClient;
+ private final QueryClientImpl queryClient;
private final TopicClient topicClient;
public YdbService(Environment env) {
@@ -93,11 +98,17 @@ public YdbService(Environment env) {
this.transport = builder.build();
this.tableClient = TableClient.newClient(transport).build();
+ this.queryClient = QueryClientImpl.newClient(transport).build();
this.topicClient = TopicClient.newClient(transport)
- .setCompressionExecutor(Runnable::run) // Prevent OOM
+ .setCompressionExecutor(Runnable::run)
.build();
}
+ public void updatePoolSize(int maxSize) {
+ logger.error("set session pool max size {}", maxSize);
+ queryClient.updatePoolMaxSize(maxSize);
+ }
+
@PreDestroy
public void close() {
this.topicClient.close();
@@ -122,6 +133,7 @@ public String expandPath(String name) {
return sb.toString();
}
+ @SuppressWarnings("null")
public Result parseQuery(String query) {
Result session = tableClient.createSession(Duration.ofSeconds(5)).join();
if (!session.isSuccess()) {
@@ -133,6 +145,7 @@ public Result parseQuery(String query) {
}
}
+ @SuppressWarnings("null")
public Result describeTable(String tablePath) {
Result session = tableClient.createSession(Duration.ofSeconds(5)).join();
if (!session.isSuccess()) {
@@ -144,18 +157,35 @@ public Result describeTable(String tablePath) {
}
}
- public Status executeQuery(String query, Params params, int timeoutSeconds) {
- Result session = tableClient.createSession(Duration.ofSeconds(5)).join();
+ public Status executeYqlQuery(String query, Params params, int timeoutSeconds) {
+ Result session = queryClient.createSession(Duration.ofSeconds(5)).join();
if (!session.isSuccess()) {
return session.getStatus();
}
- try (Session s = session.getValue()) {
- ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings();
+ try (QuerySession s = session.getValue()) {
+ ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
+ if (timeoutSeconds > 0) {
+ settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds));
+ }
+ return s.createQuery(query, TxMode.NONE, params, settings.build()).execute().join().getStatus();
+ }
+ }
+
+ @SuppressWarnings("null")
+ public Result readYqlQuery(String query, Params params, int timeoutSeconds) {
+ Result session = queryClient.createSession(Duration.ofSeconds(5)).join();
+ if (!session.isSuccess()) {
+ return session.map(null);
+ }
+
+ try (QuerySession s = session.getValue()) {
+ ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
if (timeoutSeconds > 0) {
- settings.setTimeout(Duration.ofSeconds(timeoutSeconds));
+ settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds));
}
- return s.executeDataQuery(query, TxControl.serializableRw(), params, settings).join().getStatus();
+ QueryStream stream = s.createQuery(query, TxMode.SNAPSHOT_RO, params, settings.build());
+ return QueryReader.readFrom(stream).join();
}
}
diff --git a/src/main/java/tech/ydb/app/YqlQuery.java b/src/main/java/tech/ydb/app/YqlQuery.java
index 5ccf52b..a197f95 100644
--- a/src/main/java/tech/ydb/app/YqlQuery.java
+++ b/src/main/java/tech/ydb/app/YqlQuery.java
@@ -15,8 +15,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.core.Result;
import tech.ydb.core.Status;
+import tech.ydb.query.tools.QueryReader;
import tech.ydb.table.query.Params;
+import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.values.DecimalType;
import tech.ydb.table.values.ListType;
import tech.ydb.table.values.NullValue;
@@ -143,7 +146,8 @@ private Value> readValue(JsonNode node, Type type) throws IOException {
case Date:
return PrimitiveValue.newDate(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDate());
case Datetime:
- return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDateTime());
+ return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC)
+ .toLocalDateTime());
case Timestamp:
return PrimitiveValue.newTimestamp(Instant.parse(node.asText()));
case Interval:
@@ -178,15 +182,67 @@ public Status execute(YdbService ydb) {
};
}
- public static Supplier executeYql(String query, List keys, String name, StructType type, XmlConfig.Cdc config) {
+ public static Supplier executeYql(String query, List keys, String name, StructType type,
+ XmlConfig.Cdc config) {
final int batchSize = config.getBatchSize();
final int timeout = config.getTimeoutSeconds();
return () -> new YqlQuery(type, keys, batchSize) {
@Override
public Status execute(YdbService ydb) {
Params prm = Params.of(name, ListType.of(type).newValue(batch));
- return ydb.executeQuery(query, prm, timeout);
+ return ydb.executeYqlQuery(query, prm, timeout);
}
};
}
+
+ public static Supplier readAndExecuteYql(String selectQuery, String query, List keys,
+ String name, StructType type, XmlConfig.Cdc config) {
+ final int batchSize = config.getBatchSize();
+ final int timeout = config.getTimeoutSeconds();
+ return () -> new YqlQuery(type, keys, batchSize) {
+ @Override
+ public Status execute(YdbService ydb) {
+ Params selectPrms = Params.of(name, ListType.of(type).newValue(batch));
+ Result res = ydb.readYqlQuery(selectQuery, selectPrms, timeout);
+ if (!res.isSuccess()) {
+ return res.getStatus();
+ }
+ QueryReader reader = res.getValue();
+ if (reader.getResultSetCount() < 1) {
+ return Status.SUCCESS;
+ }
+
+ ResultSetReader rs = reader.getResultSet(0);
+
+ StructType type = resultSetToType(rs);
+ Value> values = ListType.of(type).newValue(resultSetToValues(rs, type));
+ Params executePrms = Params.of("$b", values);
+ String executeQuery = "DECLARE $b AS List<" + type + ">; " + query + " SELECT * FROM AS_TABLE($b);";
+ return ydb.executeYqlQuery(executeQuery, executePrms, timeout);
+ }
+ };
+ }
+
+ private static StructType resultSetToType(ResultSetReader rs) {
+ String[] names = new String[rs.getColumnCount()];
+ Type[] types = new Type[rs.getColumnCount()];
+ for (int idx = 0; idx < rs.getColumnCount(); idx += 1) {
+ names[idx] = rs.getColumnName(idx);
+ types[idx] = rs.getColumnType(idx);
+ }
+
+ return StructType.ofOwn(names, types);
+ }
+
+ private static List> resultSetToValues(ResultSetReader rs, StructType type) {
+ List> values = new ArrayList<>();
+ while (rs.next()) {
+ Value>[] row = new Value[type.getMembersCount()];
+ for (int idx = 0; idx < type.getMembersCount(); idx += 1) {
+ row[idx] = rs.getColumn(type.getMemberName(idx)).getValue();
+ }
+ values.add(type.newValueUnsafe(row));
+ }
+ return values;
+ }
}
diff --git a/src/main/java/tech/ydb/app/YqlWriter.java b/src/main/java/tech/ydb/app/YqlWriter.java
index e34ae84..3e0b331 100644
--- a/src/main/java/tech/ydb/app/YqlWriter.java
+++ b/src/main/java/tech/ydb/app/YqlWriter.java
@@ -56,6 +56,10 @@ public YqlWriter(YdbService ydb, Supplier parser, XmlConfig.Cdc co
}
}
+ public int getThreadsCount() {
+ return writers.size();
+ }
+
public Status getLastStatus() {
for (int idx = 0; idx < writers.size(); idx++) {
Status last = writers.get(idx).lastStatus;