diff --git a/README.md b/README.md
index 3112e69..9bb065f 100644
--- a/README.md
+++ b/README.md
@@ -22,6 +22,10 @@ The original code comes from AliYun. On this basis, I have done some bug fixes,
| sink.ignore-delete | optional | true | Boolean | Whether to ignore delete statements. |
| catalog.ignore-primary-key | optional | true | Boolean | Whether to ignore primary keys when using ClickHouseCatalog to create table. defaults to true. |
+**Upsert mode notice:**
+1. Distributed table don't support the update/delete statements, if you want to use the update/delete statements, please be sure to write records to local table or set `sink.write-local` to true.
+2. The data is updated and deleted by the primary key, please be aware of this when using it in the partition table.
+
## Data Type Mapping
| Flink Type | ClickHouse Type (Sink) | ClickHouse Type (Source) |
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSink.java b/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSink.java
index 64de7fd..cec0b24 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSink.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSink.java
@@ -12,15 +12,21 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
/**
* A {@link DynamicTableSink} that describes how to create a {@link ClickHouseDynamicTableSink} from
* a logical description.
+ *
+ *
TODO: Partitioning strategy isn't well implemented.
*/
-public class ClickHouseDynamicTableSink implements DynamicTableSink {
+public class ClickHouseDynamicTableSink implements DynamicTableSink, SupportsPartitioning {
private final CatalogTable catalogTable;
@@ -28,6 +34,10 @@ public class ClickHouseDynamicTableSink implements DynamicTableSink {
private final ClickHouseOptions options;
+ private boolean dynamicGrouping = false;
+
+ private LinkedHashMap staticPartitionSpec = new LinkedHashMap<>();
+
public ClickHouseDynamicTableSink(ClickHouseOptions options, CatalogTable table) {
this.options = options;
this.catalogTable = table;
@@ -64,9 +74,28 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return OutputFormatProvider.of(outputFormat);
}
+ @Override
+ public void applyStaticPartition(Map partition) {
+ staticPartitionSpec = new LinkedHashMap<>();
+ for (String partitionCol : catalogTable.getPartitionKeys()) {
+ if (partition.containsKey(partitionCol)) {
+ staticPartitionSpec.put(partitionCol, partition.get(partitionCol));
+ }
+ }
+ }
+
+ @Override
+ public boolean requiresPartitionGrouping(boolean supportsGrouping) {
+ this.dynamicGrouping = supportsGrouping;
+ return supportsGrouping;
+ }
+
@Override
public DynamicTableSink copy() {
- return new ClickHouseDynamicTableSink(options, catalogTable);
+ ClickHouseDynamicTableSink sink = new ClickHouseDynamicTableSink(options, catalogTable);
+ sink.dynamicGrouping = dynamicGrouping;
+ sink.staticPartitionSpec = staticPartitionSpec;
+ return sink;
}
@Override
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java b/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java
index c4d84a8..82281ec 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java
@@ -1,7 +1,7 @@
package org.apache.flink.connector.clickhouse.catalog;
import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableFactory;
-import org.apache.flink.connector.clickhouse.common.DistributedEngineFullSchema;
+import org.apache.flink.connector.clickhouse.internal.common.DistributedEngineFullSchema;
import org.apache.flink.connector.clickhouse.util.ClickHouseTypeUtil;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;
import org.apache.flink.table.api.TableSchema;
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/AbstractClickHouseOutputFormat.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/AbstractClickHouseOutputFormat.java
index 0019c07..7b807dc 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/internal/AbstractClickHouseOutputFormat.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/AbstractClickHouseOutputFormat.java
@@ -8,7 +8,6 @@
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
-import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.connector.clickhouse.internal.partitioner.ClickHousePartitioner;
@@ -18,7 +17,6 @@
import org.apache.flink.table.data.RowData.FieldGetter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -165,18 +163,17 @@ public AbstractClickHouseOutputFormat build() {
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
- ClickHouseRowConverter converter = new ClickHouseRowConverter(RowType.of(logicalTypes));
if (primaryKey != null) {
LOG.warn("If primary key is specified, connector will be in UPSERT mode.");
- LOG.warn("You will have significant performance loss.");
+ LOG.warn(
+ "The data will be updated / deleted by the primary key, you will have significant performance loss.");
}
return options.getWriteLocal()
- ? createShardOutputFormat(logicalTypes, converter)
- : createBatchOutputFormat(converter);
+ ? createShardOutputFormat(logicalTypes)
+ : createBatchOutputFormat(logicalTypes);
}
- private ClickHouseBatchOutputFormat createBatchOutputFormat(
- ClickHouseRowConverter converter) {
+ private ClickHouseBatchOutputFormat createBatchOutputFormat(LogicalType[] logicalTypes) {
String[] keyFields = new String[0];
if (primaryKey != null) {
keyFields = listToStringArray(primaryKey.getColumns());
@@ -187,12 +184,11 @@ private ClickHouseBatchOutputFormat createBatchOutputFormat(
fieldNames,
keyFields,
listToStringArray(partitionKeys),
- converter,
+ logicalTypes,
options);
}
- private ClickHouseShardOutputFormat createShardOutputFormat(
- LogicalType[] logicalTypes, ClickHouseRowConverter converter) {
+ private ClickHouseShardOutputFormat createShardOutputFormat(LogicalType[] logicalTypes) {
String partitionStrategy = options.getPartitionStrategy();
ClickHousePartitioner partitioner;
switch (partitionStrategy) {
@@ -229,7 +225,7 @@ private ClickHouseShardOutputFormat createShardOutputFormat(
fieldNames,
keyFields,
listToStringArray(partitionKeys),
- converter,
+ logicalTypes,
partitioner,
options);
}
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseBatchOutputFormat.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseBatchOutputFormat.java
index ad026bb..a1b6fea 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseBatchOutputFormat.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseBatchOutputFormat.java
@@ -6,10 +6,10 @@
package org.apache.flink.connector.clickhouse.internal;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
-import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -35,7 +35,7 @@ public class ClickHouseBatchOutputFormat extends AbstractClickHouseOutputFormat
private final String[] partitionFields;
- private final ClickHouseRowConverter converter;
+ private final LogicalType[] fieldTypes;
private final ClickHouseOptions options;
@@ -48,35 +48,35 @@ protected ClickHouseBatchOutputFormat(
@Nonnull String[] fieldNames,
@Nonnull String[] keyFields,
@Nonnull String[] partitionFields,
- @Nonnull ClickHouseRowConverter converter,
+ @Nonnull LogicalType[] fieldTypes,
@Nonnull ClickHouseOptions options) {
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.fieldNames = Preconditions.checkNotNull(fieldNames);
this.keyFields = Preconditions.checkNotNull(keyFields);
this.partitionFields = Preconditions.checkNotNull(partitionFields);
- this.converter = Preconditions.checkNotNull(converter);
+ this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
this.options = Preconditions.checkNotNull(options);
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
+ // TODO Distributed tables don't support update and delete statements.
executor =
ClickHouseExecutor.createClickHouseExecutor(
options.getTableName(),
+ options.getDatabaseName(),
null,
fieldNames,
keyFields,
partitionFields,
- converter,
+ fieldTypes,
options);
executor.prepareStatement(connectionProvider);
executor.setRuntimeContext(getRuntimeContext());
long flushIntervalMillis = options.getFlushInterval().toMillis();
- if (flushIntervalMillis > 0) {
- scheduledFlush(flushIntervalMillis, "clickhouse-batch-output-format");
- }
+ scheduledFlush(flushIntervalMillis, "clickhouse-batch-output-format");
} catch (Exception exception) {
throw new IOException("Unable to establish connection with ClickHouse.", exception);
}
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseShardOutputFormat.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseShardOutputFormat.java
index bbe7893..6286842 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseShardOutputFormat.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseShardOutputFormat.java
@@ -5,14 +5,14 @@
package org.apache.flink.connector.clickhouse.internal;
-import org.apache.flink.connector.clickhouse.common.DistributedEngineFullSchema;
+import org.apache.flink.connector.clickhouse.internal.common.DistributedEngineFullSchema;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
-import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.connector.clickhouse.internal.partitioner.ClickHousePartitioner;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -40,7 +40,7 @@ public class ClickHouseShardOutputFormat extends AbstractClickHouseOutputFormat
private final String[] fieldNames;
- private final ClickHouseRowConverter converter;
+ private final LogicalType[] logicalTypes;
private final ClickHousePartitioner partitioner;
@@ -63,14 +63,14 @@ protected ClickHouseShardOutputFormat(
@Nonnull String[] fieldNames,
@Nonnull String[] keyFields,
@Nonnull String[] partitionFields,
- @Nonnull ClickHouseRowConverter converter,
+ @Nonnull LogicalType[] logicalTypes,
@Nonnull ClickHousePartitioner partitioner,
@Nonnull ClickHouseOptions options) {
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.fieldNames = Preconditions.checkNotNull(fieldNames);
this.keyFields = keyFields;
this.partitionFields = partitionFields;
- this.converter = Preconditions.checkNotNull(converter);
+ this.logicalTypes = Preconditions.checkNotNull(logicalTypes);
this.partitioner = Preconditions.checkNotNull(partitioner);
this.options = Preconditions.checkNotNull(options);
this.shardExecutors = new ArrayList<>();
@@ -100,11 +100,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
ClickHouseExecutor executor =
ClickHouseExecutor.createClickHouseExecutor(
shardTableSchema.getTable(),
+ shardTableSchema.getDatabase(),
shardTableSchema.getCluster(),
fieldNames,
keyFields,
partitionFields,
- converter,
+ logicalTypes,
options);
executor.prepareStatement(shardConnection);
shardExecutors.add(executor);
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java
index 81a0567..d2a0ab2 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java
@@ -8,7 +8,9 @@
import org.apache.commons.lang3.ArrayUtils;
import java.util.Arrays;
-import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.joining;
/** Create an insert/update/delete ClickHouse statement. */
public class ClickHouseStatementFactory {
@@ -21,9 +23,8 @@ public static String getInsertIntoStatement(String tableName, String[] fieldName
String columns =
Arrays.stream(fieldNames)
.map(ClickHouseStatementFactory::quoteIdentifier)
- .collect(Collectors.joining(", "));
- String placeholders =
- Arrays.stream(fieldNames).map((f) -> "?").collect(Collectors.joining(", "));
+ .collect(joining(", "));
+ String placeholders = Arrays.stream(fieldNames).map((f) -> "?").collect(joining(", "));
return String.join(
EMPTY,
"INSERT INTO ",
@@ -37,20 +38,21 @@ public static String getInsertIntoStatement(String tableName, String[] fieldName
public static String getUpdateStatement(
String tableName,
+ String databaseName,
+ String clusterName,
String[] fieldNames,
String[] keyFields,
- String[] partitionFields,
- String clusterName) {
+ String[] partitionFields) {
String setClause =
Arrays.stream(fieldNames)
.filter(f -> !ArrayUtils.contains(keyFields, f))
.filter(f -> !ArrayUtils.contains(partitionFields, f))
.map((f) -> quoteIdentifier(f) + "=?")
- .collect(Collectors.joining(", "));
+ .collect(joining(", "));
String conditionClause =
Arrays.stream(keyFields)
.map((f) -> quoteIdentifier(f) + "=?")
- .collect(Collectors.joining(" AND "));
+ .collect(joining(" AND "));
String onClusterClause = "";
if (clusterName != null) {
onClusterClause = " ON CLUSTER " + quoteIdentifier(clusterName);
@@ -59,7 +61,7 @@ public static String getUpdateStatement(
return String.join(
EMPTY,
"ALTER TABLE ",
- quoteIdentifier(tableName),
+ fromTableClause(tableName, databaseName),
onClusterClause,
" UPDATE ",
setClause,
@@ -68,11 +70,11 @@ public static String getUpdateStatement(
}
public static String getDeleteStatement(
- String tableName, String[] conditionFields, String clusterName) {
+ String tableName, String databaseName, String clusterName, String[] conditionFields) {
String conditionClause =
Arrays.stream(conditionFields)
.map((f) -> quoteIdentifier(f) + "=?")
- .collect(Collectors.joining(" AND "));
+ .collect(joining(" AND "));
String onClusterClause = "";
if (clusterName != null) {
onClusterClause = " ON CLUSTER " + quoteIdentifier(clusterName);
@@ -81,13 +83,21 @@ public static String getDeleteStatement(
return String.join(
EMPTY,
"ALTER TABLE ",
- quoteIdentifier(tableName),
+ fromTableClause(tableName, databaseName),
onClusterClause,
" DELETE WHERE ",
conditionClause);
}
- public static String quoteIdentifier(String identifier) {
+ private static String fromTableClause(String tableName, String databaseName) {
+ if (databaseName == null) {
+ return quoteIdentifier(tableName);
+ }
+
+ return format("%s.%s", quoteIdentifier(databaseName), quoteIdentifier(tableName));
+ }
+
+ private static String quoteIdentifier(String identifier) {
return String.join(EMPTY, "`", identifier, "`");
}
}
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/common/DistributedEngineFullSchema.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/common/DistributedEngineFullSchema.java
similarity index 96%
rename from src/main/java/org/apache/flink/connector/clickhouse/common/DistributedEngineFullSchema.java
rename to src/main/java/org/apache/flink/connector/clickhouse/internal/common/DistributedEngineFullSchema.java
index f713377..7b9578e 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/common/DistributedEngineFullSchema.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/common/DistributedEngineFullSchema.java
@@ -1,4 +1,4 @@
-package org.apache.flink.connector.clickhouse.common;
+package org.apache.flink.connector.clickhouse.internal.common;
import org.apache.flink.util.StringUtils;
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java
index 5362107..b4f55fe 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java
@@ -21,7 +21,7 @@ public class ClickHouseBatchExecutor implements ClickHouseExecutor {
private static final long serialVersionUID = 1L;
- private final String sql;
+ private final String insertSql;
private final ClickHouseRowConverter converter;
@@ -32,15 +32,15 @@ public class ClickHouseBatchExecutor implements ClickHouseExecutor {
private transient ClickHouseConnectionProvider connectionProvider;
public ClickHouseBatchExecutor(
- String sql, ClickHouseRowConverter converter, ClickHouseOptions options) {
- this.sql = sql;
+ String insertSql, ClickHouseRowConverter converter, ClickHouseOptions options) {
+ this.insertSql = insertSql;
this.converter = converter;
this.maxRetries = options.getMaxRetries();
}
@Override
public void prepareStatement(ClickHouseConnection connection) throws SQLException {
- statement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
+ statement = (ClickHousePreparedStatement) connection.prepareStatement(insertSql);
}
@Override
@@ -88,13 +88,11 @@ public void closeStatement() throws SQLException {
@Override
public String toString() {
return "ClickHouseBatchExecutor{"
- + "sql='"
- + sql
+ + "insertSql='"
+ + insertSql
+ '\''
- + ", converter="
- + converter
- + ", statement="
- + statement
+ + ", maxRetries="
+ + maxRetries
+ ", connectionProvider="
+ connectionProvider
+ '}';
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java
index 18fb96d..ce7931b 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java
@@ -10,8 +10,12 @@
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
@@ -19,6 +23,11 @@
import java.io.Serializable;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
/** Executor interface for submitting data to ClickHouse. */
public interface ClickHouseExecutor extends Serializable {
@@ -61,49 +70,101 @@ default void attemptExecuteBatch(ClickHousePreparedStatement stmt, int maxRetrie
static ClickHouseExecutor createClickHouseExecutor(
String tableName,
+ String databaseName,
String clusterName,
String[] fieldNames,
String[] keyFields,
String[] partitionFields,
- ClickHouseRowConverter converter,
+ LogicalType[] fieldTypes,
ClickHouseOptions options) {
if (keyFields.length > 0) {
return createUpsertExecutor(
tableName,
+ databaseName,
clusterName,
fieldNames,
keyFields,
partitionFields,
- converter,
+ fieldTypes,
options);
} else {
- return createBatchExecutor(tableName, fieldNames, converter, options);
+ return createBatchExecutor(tableName, fieldNames, fieldTypes, options);
}
}
static ClickHouseBatchExecutor createBatchExecutor(
String tableName,
String[] fieldNames,
- ClickHouseRowConverter converter,
+ LogicalType[] fieldTypes,
ClickHouseOptions options) {
- String sql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames);
- return new ClickHouseBatchExecutor(sql, converter, options);
+ String insertSql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames);
+ ClickHouseRowConverter converter = new ClickHouseRowConverter(RowType.of(fieldTypes));
+ return new ClickHouseBatchExecutor(insertSql, converter, options);
}
static ClickHouseUpsertExecutor createUpsertExecutor(
String tableName,
+ String databaseName,
String clusterName,
String[] fieldNames,
String[] keyFields,
String[] partitionFields,
- ClickHouseRowConverter converter,
+ LogicalType[] fieldTypes,
ClickHouseOptions options) {
String insertSql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames);
String updateSql =
ClickHouseStatementFactory.getUpdateStatement(
- tableName, fieldNames, keyFields, partitionFields, clusterName);
+ tableName,
+ databaseName,
+ clusterName,
+ fieldNames,
+ keyFields,
+ partitionFields);
String deleteSql =
- ClickHouseStatementFactory.getDeleteStatement(tableName, keyFields, clusterName);
- return new ClickHouseUpsertExecutor(insertSql, updateSql, deleteSql, converter, options);
+ ClickHouseStatementFactory.getDeleteStatement(
+ tableName, databaseName, clusterName, keyFields);
+
+ // Re-sort the order of fields to fit the sql statement.
+ int[] delFields =
+ Arrays.stream(keyFields)
+ .mapToInt(pk -> ArrayUtils.indexOf(fieldNames, pk))
+ .toArray();
+ int[] updatableFields =
+ IntStream.range(0, fieldNames.length)
+ .filter(idx -> !ArrayUtils.contains(keyFields, fieldNames[idx]))
+ .filter(idx -> !ArrayUtils.contains(partitionFields, fieldNames[idx]))
+ .toArray();
+ int[] updFields = ArrayUtils.addAll(updatableFields, delFields);
+
+ LogicalType[] delTypes =
+ Arrays.stream(delFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
+ LogicalType[] updTypes =
+ Arrays.stream(updFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
+
+ return new ClickHouseUpsertExecutor(
+ insertSql,
+ updateSql,
+ deleteSql,
+ new ClickHouseRowConverter(RowType.of(fieldTypes)),
+ new ClickHouseRowConverter(RowType.of(updTypes)),
+ new ClickHouseRowConverter(RowType.of(delTypes)),
+ createExtractor(fieldTypes, updFields),
+ createExtractor(fieldTypes, delFields),
+ options);
+ }
+
+ static Function createExtractor(LogicalType[] logicalTypes, int[] fields) {
+ final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[fields[i]], fields[i]);
+ }
+
+ return row -> {
+ GenericRowData rowData = new GenericRowData(row.getRowKind(), fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ rowData.setField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ return rowData;
+ };
}
}
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java
index ff8af70..4809b7c 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java
@@ -16,6 +16,7 @@
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.function.Function;
/** ClickHouse's upsert executor. */
public class ClickHouseUpsertExecutor implements ClickHouseExecutor {
@@ -28,7 +29,15 @@ public class ClickHouseUpsertExecutor implements ClickHouseExecutor {
private final String deleteSql;
- private final ClickHouseRowConverter converter;
+ private final ClickHouseRowConverter insertConverter;
+
+ private final ClickHouseRowConverter updateConverter;
+
+ private final ClickHouseRowConverter deleteConverter;
+
+ private final Function updateExtractor;
+
+ private final Function deleteExtractor;
private final int maxRetries;
@@ -44,12 +53,20 @@ public ClickHouseUpsertExecutor(
String insertSql,
String updateSql,
String deleteSql,
- ClickHouseRowConverter converter,
+ ClickHouseRowConverter insertConverter,
+ ClickHouseRowConverter updateConverter,
+ ClickHouseRowConverter deleteConverter,
+ Function updateExtractor,
+ Function deleteExtractor,
ClickHouseOptions options) {
this.insertSql = insertSql;
this.updateSql = updateSql;
this.deleteSql = deleteSql;
- this.converter = converter;
+ this.insertConverter = insertConverter;
+ this.updateConverter = updateConverter;
+ this.deleteConverter = deleteConverter;
+ this.updateExtractor = updateExtractor;
+ this.deleteExtractor = deleteExtractor;
this.maxRetries = options.getMaxRetries();
}
@@ -74,15 +91,15 @@ public void setRuntimeContext(RuntimeContext context) {}
public void addToBatch(RowData record) throws SQLException {
switch (record.getRowKind()) {
case INSERT:
- converter.toExternal(record, insertStmt);
+ insertConverter.toExternal(record, insertStmt);
insertStmt.addBatch();
break;
case UPDATE_AFTER:
- converter.toExternal(record, updateStmt);
+ updateConverter.toExternal(updateExtractor.apply(record), updateStmt);
updateStmt.addBatch();
break;
case DELETE:
- converter.toExternal(record, deleteStmt);
+ deleteConverter.toExternal(deleteExtractor.apply(record), deleteStmt);
deleteStmt.addBatch();
break;
case UPDATE_BEFORE:
@@ -118,15 +135,7 @@ public void closeStatement() throws SQLException {
@Override
public String toString() {
return "ClickHouseUpsertExecutor{"
- + "insertStmt="
- + insertStmt
- + ", updateStmt="
- + updateStmt
- + ", deleteStmt="
- + deleteStmt
- + ", connectionProvider="
- + connectionProvider
- + ", insertSql='"
+ + "insertSql='"
+ insertSql
+ '\''
+ ", updateSql='"
@@ -135,10 +144,10 @@ public String toString() {
+ ", deleteSql='"
+ deleteSql
+ '\''
- + ", converter="
- + converter
+ ", maxRetries="
+ maxRetries
+ + ", connectionProvider="
+ + connectionProvider
+ '}';
}
}
diff --git a/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java b/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java
index 663b403..d4940df 100644
--- a/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java
+++ b/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java
@@ -1,6 +1,6 @@
package org.apache.flink.connector.clickhouse.util;
-import org.apache.flink.connector.clickhouse.common.DistributedEngineFullSchema;
+import org.apache.flink.connector.clickhouse.internal.common.DistributedEngineFullSchema;
import org.apache.http.client.utils.URIBuilder;
import ru.yandex.clickhouse.ClickHouseConnection;