Skip to content

Commit

Permalink
Add partition key support for ClickHouseCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
itinycheng committed Jan 4, 2022
1 parent b15a499 commit c6db397
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -48,9 +46,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
ReadableConfig config = helper.getOptions();
helper.validate();
this.validateConfigOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new ClickHouseDynamicTableSink(getOptions(config), physicalSchema);
return new ClickHouseDynamicTableSink(getOptions(config), context.getCatalogTable());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

Expand All @@ -20,13 +22,16 @@
*/
public class ClickHouseDynamicTableSink implements DynamicTableSink {

private final CatalogTable catalogTable;

private final TableSchema tableSchema;

private final ClickHouseOptions options;

public ClickHouseDynamicTableSink(ClickHouseOptions options, TableSchema tableSchema) {
public ClickHouseDynamicTableSink(ClickHouseOptions options, CatalogTable table) {
this.options = options;
this.tableSchema = tableSchema;
this.catalogTable = table;
this.tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
}

@Override
Expand Down Expand Up @@ -54,13 +59,14 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.withFieldNames(tableSchema.getFieldNames())
.withFieldDataTypes(tableSchema.getFieldDataTypes())
.withPrimaryKey(tableSchema.getPrimaryKey().orElse(null))
.withPartitionKey(catalogTable.getPartitionKeys())
.build();
return OutputFormatProvider.of(outputFormat);
}

@Override
public DynamicTableSink copy() {
return new ClickHouseDynamicTableSink(options, tableSchema);
return new ClickHouseDynamicTableSink(options, catalogTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.lang.reflect.Method;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -256,7 +255,6 @@ public List<String> listViews(String databaseName)
@Override
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
// TODO add partition key in the future?
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
Expand All @@ -268,8 +266,27 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
configuration.put(USERNAME, username);
configuration.put(PASSWORD, password);

String databaseName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
try {
DistributedEngineFullSchema engineFullSchema =
ClickHouseUtil.getAndParseEngineFullSchema(
connection, tablePath.getDatabaseName(), tablePath.getObjectName());
if (engineFullSchema != null) {
databaseName = engineFullSchema.getDatabase();
tableName = engineFullSchema.getTable();
}
} catch (Exception e) {
throw new CatalogException(
String.format(
"Failed getting engine full of %s.%s.%s",
getName(), databaseName, tableName),
e);
}

return new CatalogTableImpl(
createTableSchema(tablePath.getDatabaseName(), tablePath.getObjectName()),
createTableSchema(databaseName, tableName),
getPartitionKeys(databaseName, tableName),
configuration,
"");
}
Expand Down Expand Up @@ -315,18 +332,11 @@ private synchronized TableSchema createTableSchema(String databaseName, String t
}
}

private List<String> getPrimaryKeys(String databaseName, String tableName) throws SQLException {
private List<String> getPrimaryKeys(String databaseName, String tableName) {
if (ignorePrimaryKey) {
return Collections.emptyList();
}

DistributedEngineFullSchema engineFullSchema =
ClickHouseUtil.getAndParseEngineFullSchema(connection, databaseName, tableName);
if (engineFullSchema != null) {
databaseName = engineFullSchema.getDatabase();
tableName = engineFullSchema.getTable();
}

try (PreparedStatement stmt =
connection.prepareStatement(
String.format(
Expand All @@ -348,6 +358,28 @@ private List<String> getPrimaryKeys(String databaseName, String tableName) throw
}
}

private List<String> getPartitionKeys(String databaseName, String tableName) {
try (PreparedStatement stmt =
connection.prepareStatement(
String.format(
"SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_partition_key = 1",
databaseName, tableName));
ResultSet rs = stmt.executeQuery()) {
List<String> partitionKeys = new ArrayList<>();
while (rs.next()) {
partitionKeys.add(rs.getString(1));
}

return partitionKeys;
} catch (Exception e) {
throw new CatalogException(
String.format(
"Failed getting partition keys of %s.%s.%s",
getName(), databaseName, tableName),
e);
}
}

@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public static class Builder {

private UniqueConstraint primaryKey;

private List<String> partitionKeys;

public Builder() {}

public AbstractClickHouseOutputFormat.Builder withOptions(ClickHouseOptions options) {
Expand All @@ -150,6 +152,11 @@ public AbstractClickHouseOutputFormat.Builder withPrimaryKey(UniqueConstraint pr
return this;
}

public AbstractClickHouseOutputFormat.Builder withPartitionKey(List<String> partitionKeys) {
this.partitionKeys = partitionKeys;
return this;
}

public AbstractClickHouseOutputFormat build() {
Preconditions.checkNotNull(options);
Preconditions.checkNotNull(fieldNames);
Expand All @@ -174,10 +181,12 @@ private ClickHouseBatchOutputFormat createBatchOutputFormat(
if (primaryKey != null) {
keyFields = listToStringArray(primaryKey.getColumns());
}

return new ClickHouseBatchOutputFormat(
new ClickHouseConnectionProvider(options),
fieldNames,
keyFields,
listToStringArray(partitionKeys),
converter,
options);
}
Expand Down Expand Up @@ -214,10 +223,12 @@ private ClickHouseShardOutputFormat createShardOutputFormat(
if (primaryKey != null) {
keyFields = listToStringArray(primaryKey.getColumns());
}

return new ClickHouseShardOutputFormat(
new ClickHouseConnectionProvider(options),
fieldNames,
keyFields,
listToStringArray(partitionKeys),
converter,
partitioner,
options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ClickHouseBatchOutputFormat extends AbstractClickHouseOutputFormat

private final String[] keyFields;

private final String[] partitionFields;

private final ClickHouseRowConverter converter;

private final ClickHouseOptions options;
Expand All @@ -45,11 +47,13 @@ protected ClickHouseBatchOutputFormat(
@Nonnull ClickHouseConnectionProvider connectionProvider,
@Nonnull String[] fieldNames,
@Nonnull String[] keyFields,
@Nonnull String[] partitionFields,
@Nonnull ClickHouseRowConverter converter,
@Nonnull ClickHouseOptions options) {
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.fieldNames = Preconditions.checkNotNull(fieldNames);
this.keyFields = Preconditions.checkNotNull(keyFields);
this.partitionFields = Preconditions.checkNotNull(partitionFields);
this.converter = Preconditions.checkNotNull(converter);
this.options = Preconditions.checkNotNull(options);
}
Expand All @@ -63,6 +67,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
null,
fieldNames,
keyFields,
partitionFields,
converter,
options);
executor.prepareStatement(connectionProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class ClickHouseShardOutputFormat extends AbstractClickHouseOutputFormat

private final String[] keyFields;

private final String[] partitionFields;

private transient int[] batchCounts;

private transient DistributedEngineFullSchema shardTableSchema;
Expand All @@ -60,17 +62,19 @@ protected ClickHouseShardOutputFormat(
@Nonnull ClickHouseConnectionProvider connectionProvider,
@Nonnull String[] fieldNames,
@Nonnull String[] keyFields,
@Nonnull String[] partitionFields,
@Nonnull ClickHouseRowConverter converter,
@Nonnull ClickHousePartitioner partitioner,
@Nonnull ClickHouseOptions options) {
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.fieldNames = Preconditions.checkNotNull(fieldNames);
this.keyFields = keyFields;
this.partitionFields = partitionFields;
this.converter = Preconditions.checkNotNull(converter);
this.partitioner = Preconditions.checkNotNull(partitioner);
this.options = Preconditions.checkNotNull(options);
this.shardExecutors = new ArrayList<>();
this.ignoreDelete = options.getIgnoreDelete();
this.keyFields = keyFields;
}

@Override
Expand Down Expand Up @@ -99,6 +103,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
shardTableSchema.getCluster(),
fieldNames,
keyFields,
partitionFields,
converter,
options);
executor.prepareStatement(shardConnection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

package org.apache.flink.connector.clickhouse.internal;

import org.apache.commons.lang3.ArrayUtils;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/** Create an insert/update/delete ClickHouse statement. */
Expand Down Expand Up @@ -35,15 +36,19 @@ public static String getInsertIntoStatement(String tableName, String[] fieldName
}

public static String getUpdateStatement(
String tableName, String[] fieldNames, String[] conditionFields, String clusterName) {
final List<String> conditionFieldList = Arrays.asList(conditionFields);
String tableName,
String[] fieldNames,
String[] keyFields,
String[] partitionFields,
String clusterName) {
String setClause =
Arrays.stream(fieldNames)
.filter(f -> !conditionFieldList.contains(f))
.filter(f -> !ArrayUtils.contains(keyFields, f))
.filter(f -> !ArrayUtils.contains(partitionFields, f))
.map((f) -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(", "));
String conditionClause =
Arrays.stream(conditionFields)
Arrays.stream(keyFields)
.map((f) -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
String onClusterClause = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,18 @@ static ClickHouseExecutor createClickHouseExecutor(
String clusterName,
String[] fieldNames,
String[] keyFields,
String[] partitionFields,
ClickHouseRowConverter converter,
ClickHouseOptions options) {
if (keyFields.length > 0) {
return createUpsertExecutor(
tableName, clusterName, fieldNames, keyFields, converter, options);
tableName,
clusterName,
fieldNames,
keyFields,
partitionFields,
converter,
options);
} else {
return createBatchExecutor(tableName, fieldNames, converter, options);
}
Expand All @@ -88,12 +95,13 @@ static ClickHouseUpsertExecutor createUpsertExecutor(
String clusterName,
String[] fieldNames,
String[] keyFields,
String[] partitionFields,
ClickHouseRowConverter converter,
ClickHouseOptions options) {
String insertSql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames);
String updateSql =
ClickHouseStatementFactory.getUpdateStatement(
tableName, fieldNames, keyFields, clusterName);
tableName, fieldNames, keyFields, partitionFields, clusterName);
String deleteSql =
ClickHouseStatementFactory.getDeleteStatement(tableName, keyFields, clusterName);
return new ClickHouseUpsertExecutor(insertSql, updateSql, deleteSql, converter, options);
Expand Down

0 comments on commit c6db397

Please sign in to comment.