Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
<maven.compiler.target>17</maven.compiler.target>

<spring.boot.version>3.2.12</spring.boot.version>
<ydb.sdk.version>2.3.5</ydb.sdk.version>
<ydb.sdk.version>2.3.20</ydb.sdk.version>

<exec.mainClass>tech.ydb.app.Application</exec.mainClass>
</properties>

<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-table</artifactId>
<artifactId>ydb-sdk-query</artifactId>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/tech/ydb/app/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public void run(String... args) {
warnings.add("No reader configs found!!");
}

int sessionPoolSize = 0;
for (CdcReader reader: readers) {
sessionPoolSize += reader.getWriter().getThreadsCount();
}
ydb.updatePoolSize(Math.max(sessionPoolSize, 50));

for (CdcReader reader: readers) {
reader.start();
}
Expand Down
43 changes: 37 additions & 6 deletions src/main/java/tech/ydb/app/CdcMsgParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -97,6 +98,7 @@ public Parser(YdbService ydb, XmlConfig.Cdc cdc, Map<String, XmlConfig.Query> xm
this.xmlQueries = xmlQueries;
}

@SuppressWarnings("null")
public Result<Supplier<CdcMsgParser>> parse() {
String changefeed = ydb.expandPath(cdc.getChangefeed());

Expand Down Expand Up @@ -129,13 +131,13 @@ public Result<Supplier<CdcMsgParser>> parse() {

private Result<Supplier<YqlQuery>> findUpdateQuery(TableDescription source) {
if (cdc.getQuery() != null && !cdc.getQuery().trim().isEmpty()) {
return validate(source, cdc.getQuery().trim(), false);
return validate(source, new XmlConfig.Query(cdc.getQuery().trim()), false);
}
String queryId = cdc.getUpdateQueryId();
if (queryId != null && xmlQueries.containsKey(queryId)) {
XmlConfig.Query query = xmlQueries.get(queryId);
if (query.getText() != null && !query.getText().trim().isEmpty()) {
return validate(source, query.getText().trim(), false);
return validate(source, query, false);
}
}

Expand All @@ -147,15 +149,17 @@ private Result<Supplier<YqlQuery>> findDeleteQuery(TableDescription source) {
if (queryId != null && xmlQueries.containsKey(queryId)) {
XmlConfig.Query query = xmlQueries.get(queryId);
if (query.getText() != null && !query.getText().trim().isEmpty()) {
return validate(source, query.getText().trim(), true);
return validate(source, query, true);
}
}

return Result.success(YqlQuery.skipMessages("erase", "deleteQueryId", source.getPrimaryKeys(), cdc));
}

private Result<Supplier<YqlQuery>> validate(TableDescription source, String query, boolean keysOnly) {
Result<DataQuery> parsed = ydb.parseQuery(query);
@SuppressWarnings("null")
private Result<Supplier<YqlQuery>> validate(TableDescription source, XmlConfig.Query query, boolean keysOnly) {
String text = query.getText().trim();
Result<DataQuery> parsed = ydb.parseQuery(text);
if (!parsed.isSuccess()) {
logger.error("Can't parse query for consumer {}, got status {}", cdc.getConsumer(), parsed.getStatus());
return parsed.map(null);
Expand Down Expand Up @@ -217,7 +221,34 @@ private Result<Supplier<YqlQuery>> validate(TableDescription source, String quer
}
}

return Result.success(YqlQuery.executeYql(query, source.getPrimaryKeys(), paramName, structType, cdc));
List<String> keys = source.getPrimaryKeys();
if (query.getActionTable() != null && !query.getActionTable().trim().isEmpty()) {
String actionTable = query.getActionTable().trim();
String action = query.getActionMode();
if ("upsertInto".equalsIgnoreCase(action)) {
String execute = "UPSERT INTO `" + actionTable + "` ";
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
}
if ("deleteFrom".equalsIgnoreCase(action)) {
String execute = "DELETE FROM `" + actionTable + "` ON ";
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
}
if ("updateOn".equalsIgnoreCase(action)) {
String execute = "UPDATE `" + actionTable + "` ON ";
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
}
if ("insertInto".equalsIgnoreCase(action)) {
String execute = "INSERT INTO `" + actionTable + "` ";
return Result.success(YqlQuery.readAndExecuteYql(text, execute, keys, paramName, structType, cdc));
}

return Result.fail(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, Issue.of(
"Uknown actionName " + action + ", expected upsertInto/deleteFrom/updateOn/insertInto",
Issue.Severity.ERROR
)));
}

return Result.success(YqlQuery.executeYql(text, keys, paramName, structType, cdc));
}
}

Expand Down
41 changes: 30 additions & 11 deletions src/main/java/tech/ydb/app/XmlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,26 @@ public static class Query {
@XmlAttribute(name = "id", required = true)
private String id;

// @XmlAttribute(name = "upsertTo")
// private String upsertTo;
//
// @XmlAttribute(name = "deleteFrom")
// private String deleteFrom;
@XmlAttribute(name = "actionMode")
private String actionMode;

@XmlAttribute(name = "actionTable")
private String actionTable;

// @XmlAttribute(name = "batchSize")
// private Integer batchSize;

@XmlValue
private String text;

public Query() {
}

public Query(String query) {
this.id = "inplacement";
this.text = query;
}

public String getId() {
return this.id;
}
Expand All @@ -53,12 +64,19 @@ public String getText() {
return this.text;
}

// public String getUpsertTo() {
// return this.upsertTo;
// }
//
// public String getDeleteFrom() {
// return this.deleteFrom;
public String getActionMode() {
return this.actionMode;
}

public String getActionTable() {
return this.actionTable;
}

// public int getBatchSize() {
// if (batchSize == null) {
// return DEFAULT_BATCH_SIZE;
// }
// return batchSize;
// }
}

Expand All @@ -80,6 +98,7 @@ public static class Cdc {
private String updateQueryId;
@XmlAttribute(name = "deleteQueryId")
private String deleteQueryId;

@XmlValue
private String query;

Expand Down
48 changes: 39 additions & 9 deletions src/main/java/tech/ydb/app/YdbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@

import tech.ydb.auth.TokenAuthProvider;
import tech.ydb.auth.iam.CloudAuthHelper;
import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.auth.StaticCredentials;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.grpc.GrpcTransportBuilder;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
import tech.ydb.query.impl.QueryClientImpl;
import tech.ydb.query.settings.ExecuteQuerySettings;
import tech.ydb.query.tools.QueryReader;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.DataQuery;
import tech.ydb.table.query.Params;
import tech.ydb.table.settings.ExecuteDataQuerySettings;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
Expand All @@ -54,6 +58,7 @@ public class YdbService {
private final GrpcTransport transport;

private final TableClient tableClient;
private final QueryClientImpl queryClient;
private final TopicClient topicClient;

public YdbService(Environment env) {
Expand Down Expand Up @@ -93,11 +98,17 @@ public YdbService(Environment env) {

this.transport = builder.build();
this.tableClient = TableClient.newClient(transport).build();
this.queryClient = QueryClientImpl.newClient(transport).build();
this.topicClient = TopicClient.newClient(transport)
.setCompressionExecutor(Runnable::run) // Prevent OOM
.setCompressionExecutor(Runnable::run)
.build();
}

public void updatePoolSize(int maxSize) {
logger.error("set session pool max size {}", maxSize);
queryClient.updatePoolMaxSize(maxSize);
}

@PreDestroy
public void close() {
this.topicClient.close();
Expand All @@ -122,6 +133,7 @@ public String expandPath(String name) {
return sb.toString();
}

@SuppressWarnings("null")
public Result<DataQuery> parseQuery(String query) {
Result<Session> session = tableClient.createSession(Duration.ofSeconds(5)).join();
if (!session.isSuccess()) {
Expand All @@ -133,6 +145,7 @@ public Result<DataQuery> parseQuery(String query) {
}
}

@SuppressWarnings("null")
public Result<TableDescription> describeTable(String tablePath) {
Result<Session> session = tableClient.createSession(Duration.ofSeconds(5)).join();
if (!session.isSuccess()) {
Expand All @@ -144,18 +157,35 @@ public Result<TableDescription> describeTable(String tablePath) {
}
}

public Status executeQuery(String query, Params params, int timeoutSeconds) {
Result<Session> session = tableClient.createSession(Duration.ofSeconds(5)).join();
public Status executeYqlQuery(String query, Params params, int timeoutSeconds) {
Result<QuerySession> session = queryClient.createSession(Duration.ofSeconds(5)).join();
if (!session.isSuccess()) {
return session.getStatus();
}

try (Session s = session.getValue()) {
ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings();
try (QuerySession s = session.getValue()) {
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
if (timeoutSeconds > 0) {
settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds));
}
return s.createQuery(query, TxMode.NONE, params, settings.build()).execute().join().getStatus();
}
}

@SuppressWarnings("null")
public Result<QueryReader> readYqlQuery(String query, Params params, int timeoutSeconds) {
Result<QuerySession> session = queryClient.createSession(Duration.ofSeconds(5)).join();
if (!session.isSuccess()) {
return session.map(null);
}

try (QuerySession s = session.getValue()) {
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
if (timeoutSeconds > 0) {
settings.setTimeout(Duration.ofSeconds(timeoutSeconds));
settings.withRequestTimeout(Duration.ofSeconds(timeoutSeconds));
}
return s.executeDataQuery(query, TxControl.serializableRw(), params, settings).join().getStatus();
QueryStream stream = s.createQuery(query, TxMode.SNAPSHOT_RO, params, settings.build());
return QueryReader.readFrom(stream).join();
}
}

Expand Down
62 changes: 59 additions & 3 deletions src/main/java/tech/ydb/app/YqlQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.query.tools.QueryReader;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.values.DecimalType;
import tech.ydb.table.values.ListType;
import tech.ydb.table.values.NullValue;
Expand Down Expand Up @@ -143,7 +146,8 @@ private Value<?> readValue(JsonNode node, Type type) throws IOException {
case Date:
return PrimitiveValue.newDate(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDate());
case Datetime:
return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC).toLocalDateTime());
return PrimitiveValue.newDatetime(Instant.parse(node.asText()).atOffset(ZoneOffset.UTC)
.toLocalDateTime());
case Timestamp:
return PrimitiveValue.newTimestamp(Instant.parse(node.asText()));
case Interval:
Expand Down Expand Up @@ -178,15 +182,67 @@ public Status execute(YdbService ydb) {
};
}

public static Supplier<YqlQuery> executeYql(String query, List<String> keys, String name, StructType type, XmlConfig.Cdc config) {
public static Supplier<YqlQuery> executeYql(String query, List<String> keys, String name, StructType type,
XmlConfig.Cdc config) {
final int batchSize = config.getBatchSize();
final int timeout = config.getTimeoutSeconds();
return () -> new YqlQuery(type, keys, batchSize) {
@Override
public Status execute(YdbService ydb) {
Params prm = Params.of(name, ListType.of(type).newValue(batch));
return ydb.executeQuery(query, prm, timeout);
return ydb.executeYqlQuery(query, prm, timeout);
}
};
}

public static Supplier<YqlQuery> readAndExecuteYql(String selectQuery, String query, List<String> keys,
String name, StructType type, XmlConfig.Cdc config) {
final int batchSize = config.getBatchSize();
final int timeout = config.getTimeoutSeconds();
return () -> new YqlQuery(type, keys, batchSize) {
@Override
public Status execute(YdbService ydb) {
Params selectPrms = Params.of(name, ListType.of(type).newValue(batch));
Result<QueryReader> res = ydb.readYqlQuery(selectQuery, selectPrms, timeout);
if (!res.isSuccess()) {
return res.getStatus();
}
QueryReader reader = res.getValue();
if (reader.getResultSetCount() < 1) {
return Status.SUCCESS;
}

ResultSetReader rs = reader.getResultSet(0);

StructType type = resultSetToType(rs);
Value<?> values = ListType.of(type).newValue(resultSetToValues(rs, type));
Params executePrms = Params.of("$b", values);
String executeQuery = "DECLARE $b AS List<" + type + ">; " + query + " SELECT * FROM AS_TABLE($b);";
return ydb.executeYqlQuery(executeQuery, executePrms, timeout);
}
};
}

private static StructType resultSetToType(ResultSetReader rs) {
String[] names = new String[rs.getColumnCount()];
Type[] types = new Type[rs.getColumnCount()];
for (int idx = 0; idx < rs.getColumnCount(); idx += 1) {
names[idx] = rs.getColumnName(idx);
types[idx] = rs.getColumnType(idx);
}

return StructType.ofOwn(names, types);
}

private static List<Value<?>> resultSetToValues(ResultSetReader rs, StructType type) {
List<Value<?>> values = new ArrayList<>();
while (rs.next()) {
Value<?>[] row = new Value[type.getMembersCount()];
for (int idx = 0; idx < type.getMembersCount(); idx += 1) {
row[idx] = rs.getColumn(type.getMemberName(idx)).getValue();
}
values.add(type.newValueUnsafe(row));
}
return values;
}
}
4 changes: 4 additions & 0 deletions src/main/java/tech/ydb/app/YqlWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public YqlWriter(YdbService ydb, Supplier<CdcMsgParser> parser, XmlConfig.Cdc co
}
}

public int getThreadsCount() {
return writers.size();
}

public Status getLastStatus() {
for (int idx = 0; idx < writers.size(); idx++) {
Status last = writers.get(idx).lastStatus;
Expand Down