Skip to content

Commit

Permalink
[bug fix] ClickHouseUpsertExecutor supports update and delete statements
Browse files Browse the repository at this point in the history
  • Loading branch information
itinycheng committed Jan 5, 2022
1 parent c6db397 commit 4ef0612
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 81 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ The original code comes from AliYun. On this basis, I have done some bug fixes,
| sink.ignore-delete | optional | true | Boolean | Whether to ignore delete statements. |
| catalog.ignore-primary-key | optional | true | Boolean | Whether to ignore primary keys when using ClickHouseCatalog to create table. defaults to true. |

**Upsert mode notice:**
1. Distributed table don't support the update/delete statements, if you want to use the update/delete statements, please be sure to write records to local table or set `sink.write-local` to true.
2. The data is updated and deleted by the primary key, please be aware of this when using it in the partition table.

## Data Type Mapping

| Flink Type | ClickHouse Type (Sink) | ClickHouse Type (Source) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,32 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* A {@link DynamicTableSink} that describes how to create a {@link ClickHouseDynamicTableSink} from
* a logical description.
*
* <p>TODO: Partitioning strategy isn't well implemented.
*/
public class ClickHouseDynamicTableSink implements DynamicTableSink {
public class ClickHouseDynamicTableSink implements DynamicTableSink, SupportsPartitioning {

private final CatalogTable catalogTable;

private final TableSchema tableSchema;

private final ClickHouseOptions options;

private boolean dynamicGrouping = false;

private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();

public ClickHouseDynamicTableSink(ClickHouseOptions options, CatalogTable table) {
this.options = options;
this.catalogTable = table;
Expand Down Expand Up @@ -64,9 +74,28 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return OutputFormatProvider.of(outputFormat);
}

@Override
public void applyStaticPartition(Map<String, String> partition) {
staticPartitionSpec = new LinkedHashMap<>();
for (String partitionCol : catalogTable.getPartitionKeys()) {
if (partition.containsKey(partitionCol)) {
staticPartitionSpec.put(partitionCol, partition.get(partitionCol));
}
}
}

@Override
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
this.dynamicGrouping = supportsGrouping;
return supportsGrouping;
}

@Override
public DynamicTableSink copy() {
return new ClickHouseDynamicTableSink(options, catalogTable);
ClickHouseDynamicTableSink sink = new ClickHouseDynamicTableSink(options, catalogTable);
sink.dynamicGrouping = dynamicGrouping;
sink.staticPartitionSpec = staticPartitionSpec;
return sink;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.apache.flink.connector.clickhouse.catalog;

import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableFactory;
import org.apache.flink.connector.clickhouse.common.DistributedEngineFullSchema;
import org.apache.flink.connector.clickhouse.internal.common.DistributedEngineFullSchema;
import org.apache.flink.connector.clickhouse.util.ClickHouseTypeUtil;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;
import org.apache.flink.table.api.TableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.connector.clickhouse.internal.partitioner.ClickHousePartitioner;
Expand All @@ -18,7 +17,6 @@
import org.apache.flink.table.data.RowData.FieldGetter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -165,18 +163,17 @@ public AbstractClickHouseOutputFormat build() {
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
ClickHouseRowConverter converter = new ClickHouseRowConverter(RowType.of(logicalTypes));
if (primaryKey != null) {
LOG.warn("If primary key is specified, connector will be in UPSERT mode.");
LOG.warn("You will have significant performance loss.");
LOG.warn(
"The data will be updated / deleted by the primary key, you will have significant performance loss.");
}
return options.getWriteLocal()
? createShardOutputFormat(logicalTypes, converter)
: createBatchOutputFormat(converter);
? createShardOutputFormat(logicalTypes)
: createBatchOutputFormat(logicalTypes);
}

private ClickHouseBatchOutputFormat createBatchOutputFormat(
ClickHouseRowConverter converter) {
private ClickHouseBatchOutputFormat createBatchOutputFormat(LogicalType[] logicalTypes) {
String[] keyFields = new String[0];
if (primaryKey != null) {
keyFields = listToStringArray(primaryKey.getColumns());
Expand All @@ -187,12 +184,11 @@ private ClickHouseBatchOutputFormat createBatchOutputFormat(
fieldNames,
keyFields,
listToStringArray(partitionKeys),
converter,
logicalTypes,
options);
}

private ClickHouseShardOutputFormat createShardOutputFormat(
LogicalType[] logicalTypes, ClickHouseRowConverter converter) {
private ClickHouseShardOutputFormat createShardOutputFormat(LogicalType[] logicalTypes) {
String partitionStrategy = options.getPartitionStrategy();
ClickHousePartitioner partitioner;
switch (partitionStrategy) {
Expand Down Expand Up @@ -229,7 +225,7 @@ private ClickHouseShardOutputFormat createShardOutputFormat(
fieldNames,
keyFields,
listToStringArray(partitionKeys),
converter,
logicalTypes,
partitioner,
options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
package org.apache.flink.connector.clickhouse.internal;

import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand All @@ -35,7 +35,7 @@ public class ClickHouseBatchOutputFormat extends AbstractClickHouseOutputFormat

private final String[] partitionFields;

private final ClickHouseRowConverter converter;
private final LogicalType[] fieldTypes;

private final ClickHouseOptions options;

Expand All @@ -48,35 +48,35 @@ protected ClickHouseBatchOutputFormat(
@Nonnull String[] fieldNames,
@Nonnull String[] keyFields,
@Nonnull String[] partitionFields,
@Nonnull ClickHouseRowConverter converter,
@Nonnull LogicalType[] fieldTypes,
@Nonnull ClickHouseOptions options) {
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.fieldNames = Preconditions.checkNotNull(fieldNames);
this.keyFields = Preconditions.checkNotNull(keyFields);
this.partitionFields = Preconditions.checkNotNull(partitionFields);
this.converter = Preconditions.checkNotNull(converter);
this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
this.options = Preconditions.checkNotNull(options);
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
// TODO Distributed tables don't support update and delete statements.
executor =
ClickHouseExecutor.createClickHouseExecutor(
options.getTableName(),
options.getDatabaseName(),
null,
fieldNames,
keyFields,
partitionFields,
converter,
fieldTypes,
options);
executor.prepareStatement(connectionProvider);
executor.setRuntimeContext(getRuntimeContext());

long flushIntervalMillis = options.getFlushInterval().toMillis();
if (flushIntervalMillis > 0) {
scheduledFlush(flushIntervalMillis, "clickhouse-batch-output-format");
}
scheduledFlush(flushIntervalMillis, "clickhouse-batch-output-format");
} catch (Exception exception) {
throw new IOException("Unable to establish connection with ClickHouse.", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

package org.apache.flink.connector.clickhouse.internal;

import org.apache.flink.connector.clickhouse.common.DistributedEngineFullSchema;
import org.apache.flink.connector.clickhouse.internal.common.DistributedEngineFullSchema;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.connector.clickhouse.internal.partitioner.ClickHousePartitioner;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -40,7 +40,7 @@ public class ClickHouseShardOutputFormat extends AbstractClickHouseOutputFormat

private final String[] fieldNames;

private final ClickHouseRowConverter converter;
private final LogicalType[] logicalTypes;

private final ClickHousePartitioner partitioner;

Expand All @@ -63,14 +63,14 @@ protected ClickHouseShardOutputFormat(
@Nonnull String[] fieldNames,
@Nonnull String[] keyFields,
@Nonnull String[] partitionFields,
@Nonnull ClickHouseRowConverter converter,
@Nonnull LogicalType[] logicalTypes,
@Nonnull ClickHousePartitioner partitioner,
@Nonnull ClickHouseOptions options) {
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.fieldNames = Preconditions.checkNotNull(fieldNames);
this.keyFields = keyFields;
this.partitionFields = partitionFields;
this.converter = Preconditions.checkNotNull(converter);
this.logicalTypes = Preconditions.checkNotNull(logicalTypes);
this.partitioner = Preconditions.checkNotNull(partitioner);
this.options = Preconditions.checkNotNull(options);
this.shardExecutors = new ArrayList<>();
Expand Down Expand Up @@ -100,11 +100,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
ClickHouseExecutor executor =
ClickHouseExecutor.createClickHouseExecutor(
shardTableSchema.getTable(),
shardTableSchema.getDatabase(),
shardTableSchema.getCluster(),
fieldNames,
keyFields,
partitionFields,
converter,
logicalTypes,
options);
executor.prepareStatement(shardConnection);
shardExecutors.add(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.apache.commons.lang3.ArrayUtils;

import java.util.Arrays;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.util.stream.Collectors.joining;

/** Create an insert/update/delete ClickHouse statement. */
public class ClickHouseStatementFactory {
Expand All @@ -21,9 +23,8 @@ public static String getInsertIntoStatement(String tableName, String[] fieldName
String columns =
Arrays.stream(fieldNames)
.map(ClickHouseStatementFactory::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map((f) -> "?").collect(Collectors.joining(", "));
.collect(joining(", "));
String placeholders = Arrays.stream(fieldNames).map((f) -> "?").collect(joining(", "));
return String.join(
EMPTY,
"INSERT INTO ",
Expand All @@ -37,20 +38,21 @@ public static String getInsertIntoStatement(String tableName, String[] fieldName

public static String getUpdateStatement(
String tableName,
String databaseName,
String clusterName,
String[] fieldNames,
String[] keyFields,
String[] partitionFields,
String clusterName) {
String[] partitionFields) {
String setClause =
Arrays.stream(fieldNames)
.filter(f -> !ArrayUtils.contains(keyFields, f))
.filter(f -> !ArrayUtils.contains(partitionFields, f))
.map((f) -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(", "));
.collect(joining(", "));
String conditionClause =
Arrays.stream(keyFields)
.map((f) -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
.collect(joining(" AND "));
String onClusterClause = "";
if (clusterName != null) {
onClusterClause = " ON CLUSTER " + quoteIdentifier(clusterName);
Expand All @@ -59,7 +61,7 @@ public static String getUpdateStatement(
return String.join(
EMPTY,
"ALTER TABLE ",
quoteIdentifier(tableName),
fromTableClause(tableName, databaseName),
onClusterClause,
" UPDATE ",
setClause,
Expand All @@ -68,11 +70,11 @@ public static String getUpdateStatement(
}

public static String getDeleteStatement(
String tableName, String[] conditionFields, String clusterName) {
String tableName, String databaseName, String clusterName, String[] conditionFields) {
String conditionClause =
Arrays.stream(conditionFields)
.map((f) -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
.collect(joining(" AND "));
String onClusterClause = "";
if (clusterName != null) {
onClusterClause = " ON CLUSTER " + quoteIdentifier(clusterName);
Expand All @@ -81,13 +83,21 @@ public static String getDeleteStatement(
return String.join(
EMPTY,
"ALTER TABLE ",
quoteIdentifier(tableName),
fromTableClause(tableName, databaseName),
onClusterClause,
" DELETE WHERE ",
conditionClause);
}

public static String quoteIdentifier(String identifier) {
private static String fromTableClause(String tableName, String databaseName) {
if (databaseName == null) {
return quoteIdentifier(tableName);
}

return format("%s.%s", quoteIdentifier(databaseName), quoteIdentifier(tableName));
}

private static String quoteIdentifier(String identifier) {
return String.join(EMPTY, "`", identifier, "`");
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.flink.connector.clickhouse.common;
package org.apache.flink.connector.clickhouse.internal.common;

import org.apache.flink.util.StringUtils;

Expand Down
Loading

0 comments on commit 4ef0612

Please sign in to comment.