Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suppport for creating table with MergeTree engine. #7135

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/src/main/sphinx/connector/clickhouse.rst
Expand Up @@ -65,6 +65,43 @@ Run ``SELECT`` to access the ``clicks`` table in the ``web`` database::
If you used a different name for your catalog properties file, use
that catalog name instead of ``myclickhouse`` in the above examples.

Table properties - ClickHouse
------------------------------

Table property usage example::

CREATE TABLE default.trino_ck (
id int NOT NULL,
birthday DATE,
name VARCHAR,
age BIGINT,
logdate DATE
)
WITH (
engine = 'MergeTree',
order_by = 'id, birthday',
partition_by = 'toYYYYMM(logdate)'
);

The following are supported ClickHouse table properties from `<https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/>`_

=========================== ================ ==============================================================================================================
Property Name Default Value Description
=========================== ================ ==============================================================================================================
``engine`` ``Log`` Name and parameters of the engine.

``order_by`` (none) list of columns to be the sorting key. It's required if ``engine`` is ``MergeTree``

``partition_by`` (none) list of columns to be the partition key. It's optional.

``primary_key`` (none) list of columns to be the primary key. It's optional.

``sample_by`` (none) An expression for sampling. It's optional.
wgzhao marked this conversation as resolved.
Show resolved Hide resolved

=========================== ================ ==============================================================================================================

Currently the connector only supports ``Log`` and ``MergeTree`` table engines in create table.
wgzhao marked this conversation as resolved.
Show resolved Hide resolved

Limitations
-----------

Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -78,6 +79,7 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand Down Expand Up @@ -138,9 +140,20 @@ protected void copyTableSchema(Connection connection, String catalogName, String
protected String createTableSql(RemoteTableName remoteTableName, List<String> columns, ConnectorTableMetadata tableMetadata)
{
ImmutableList.Builder<String> tableOptions = ImmutableList.builder();
ClickHouseTableProperties.getEngine(tableMetadata.getProperties())
.ifPresent(value -> tableOptions.add("ENGINE =" + quoted(value)));
return format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(", ", tableOptions.build()));
Map<String, Object> tableProperties = tableMetadata.getProperties();
ClickHouseEngineType engine = ClickHouseTableProperties.getEngine(tableProperties);
tableOptions.add("ENGINE = " + engine.getEngineType());
if (engine == ClickHouseEngineType.MERGETREE && formatProperty(ClickHouseTableProperties.getOrderBy(tableProperties)).isEmpty()) {
// order_by property is required
throw new TrinoException(INVALID_TABLE_PROPERTY,
format("The property of %s is required for table engine %s", ClickHouseTableProperties.ORDER_BY_PROPERTY, engine.getEngineType()));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relating to #7135 (comment),

what happens if i issue

CREATE TABLE clickhouse.default.t(a int) 
WITH (partition_by = ARRAY['x'])

?

I would expect some kind of failure (either from the connector, or from ClickHouse), because there is no column x.

Then, what happens if i issue

CREATE TABLE clickhouse.default.t(a int, x int) 
WITH (engine = 'LOG', partition_by = ARRAY['x'])

?

I would expect some kind of failure (either from the connector, or from ClickHouse), because i asked for the table to be partitioned, but it cannot be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CREATE TABLE clickhouse.default.t(a int) 
WITH (partition_by = ARRAY['x'])
CREATE TABLE clickhouse.default.t(a int, x int) 
WITH (engine = 'LOG', partition_by = ARRAY['x'])

Yes, the above sqls will both create successfully, because the default table engine Log will ignore all other properties.

we can pass all table property regardless table engine, it will be kind of failure from ClickHouse side。
So we take this approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute the following sql in ClickHouse directly

create table default.t (a int)  engine = Log order by a;

It print the following errors:

ClickHouse exception, code: 36, host: 10.60.242.112, port: 18123; Code: 36, e.displayText() = DB::Exception: 
Engine Log doesn't support PARTITION_BY, PRIMARY_KEY, ORDER_BY or SAMPLE_BY clauses. 
Currently only the following engines have support for the feature: [MergeTree, ReplicatedVersionedCollapsingMergeTree, 
ReplacingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedCollapsingMergeTree, ReplicatedGraphiteMergeTree, ReplicatedMergeTree, 
ReplicatedReplacingMergeTree, VersionedCollapsingMergeTree, SummingMergeTree,
GraphiteMergeTree, CollapsingMergeTree, AggregatingMergeTree] (version 20.3.5.21 (official build))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified codes:

    @Override
    protected String createTableSql(RemoteTableName remoteTableName, List<String> columns, ConnectorTableMetadata tableMetadata)
    {
        ImmutableList.Builder<String> tableOptions = ImmutableList.builder();
        Map<String, Object> tableProperties = tableMetadata.getProperties();
        ClickHouseEngineType engine = ClickHouseTableProperties.getEngine(tableProperties);
        tableOptions.add("ENGINE = " + engine.getEngineType());
        if (engine == ClickHouseEngineType.MERGETREE && formatProperty(ClickHouseTableProperties.getOrderBy(tableProperties)).isEmpty()) {
            // order_by property is required
            throw new TrinoException(INVALID_TABLE_PROPERTY, format("The property of %s is required for table engine %s", ClickHouseTableProperties.ORDER_BY_PROPERTY, engine.getEngineType()));
        }
        formatProperty(ClickHouseTableProperties.getOrderBy(tableProperties)).ifPresent(value -> tableOptions.add("ORDER BY " + value));
        formatProperty(ClickHouseTableProperties.getPrimaryKey(tableProperties)).ifPresent(value -> tableOptions.add("PRIMARY KEY " + value));
        formatProperty(ClickHouseTableProperties.getPartitionBy(tableProperties)).ifPresent(value -> tableOptions.add("PARTITION BY " + value));
        ClickHouseTableProperties.getSampleBy(tableProperties).ifPresent(value -> tableOptions.add("SAMPLE BY " + value));

        return format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build()));
    }

formatProperty(ClickHouseTableProperties.getOrderBy(tableProperties)).ifPresent(value -> tableOptions.add("ORDER BY " + value));
formatProperty(ClickHouseTableProperties.getPrimaryKey(tableProperties)).ifPresent(value -> tableOptions.add("PRIMARY KEY " + value));
formatProperty(ClickHouseTableProperties.getPartitionBy(tableProperties)).ifPresent(value -> tableOptions.add("PARTITION BY " + value));
ClickHouseTableProperties.getSampleBy(tableProperties).ifPresent(value -> tableOptions.add("SAMPLE BY " + value));

return format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build()));
}

@Override
Expand Down Expand Up @@ -172,6 +185,25 @@ public void dropSchema(ConnectorSession session, String schemaName)
execute(session, "DROP DATABASE " + quoted(schemaName));
}

@Override
public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column)
{
try (Connection connection = connectionFactory.openConnection(session)) {
String columnName = column.getName();
if (connection.getMetaData().storesUpperCaseIdentifiers()) {
columnName = columnName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s ADD COLUMN %s",
quoted(handle.asPlainTable().getRemoteTableName()),
getColumnDefinitionSql(session, column, columnName));
execute(connection, sql);
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

@Override
public void renameColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName)
{
Expand Down Expand Up @@ -369,4 +401,25 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
}

/**
* format property to match ClickHouse create table statement
*
* @param prop property will be formatted
* @return formatted property
*/
private Optional<String> formatProperty(List<String> prop)
{
if (prop == null || prop.isEmpty()) {
return Optional.empty();
}
else if (prop.size() == 1) {
// only one column
return Optional.of(prop.get(0));
}
else {
// include more than one columns
return Optional.of("(" + String.join(",", prop) + ")");
}
}
}
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.clickhouse;

public enum ClickHouseEngineType
{
STRIPELOG("StripeLog"),
wgzhao marked this conversation as resolved.
Show resolved Hide resolved
LOG("Log"),
TINYLOG("TinyLog"),
MERGETREE("MergeTree()");

private final String engineType;

ClickHouseEngineType(String engineType)
{
this.engineType = engineType;
}

public String getEngineType()
{
return this.engineType;
}
}
Expand Up @@ -16,14 +16,17 @@
import com.google.common.collect.ImmutableList;
import io.trino.plugin.jdbc.TablePropertiesProvider;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -36,26 +39,92 @@ public final class ClickHouseTableProperties
implements TablePropertiesProvider
{
public static final String ENGINE_PROPERTY = "engine";
public static final String DEFAULT_TABLE_ENGINE = "Log";
// MergeTree engine properties
public static final String ORDER_BY_PROPERTY = "order_by"; //required
public static final String PARTITION_BY_PROPERTY = "partition_by"; //optional
public static final String PRIMARY_KEY_PROPERTY = "primary_key"; //optional
public static final String SAMPLE_BY_PROPERTY = "sample_by"; //optional

public static final ClickHouseEngineType DEFAULT_TABLE_ENGINE = ClickHouseEngineType.LOG;

private final List<PropertyMetadata<?>> tableProperties;

@Inject
public ClickHouseTableProperties()
{
tableProperties = ImmutableList.of(
stringProperty(
enumProperty(
ENGINE_PROPERTY,
"ClickHouse Table Engine, defaults to Log",
ClickHouseEngineType.class,
DEFAULT_TABLE_ENGINE,
false),
new PropertyMetadata<>(
ORDER_BY_PROPERTY,
"columns to be the sorting key, it's required for table MergeTree engine family",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value),
new PropertyMetadata<>(
PARTITION_BY_PROPERTY,
"columns to be the partition key. it's optional for table MergeTree engine family",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value),
new PropertyMetadata<>(
PRIMARY_KEY_PROPERTY,
"columns to be the primary key. it's optional for table MergeTree engine family",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value),
stringProperty(
SAMPLE_BY_PROPERTY,
"An expression for sampling. it's optional for table MergeTree engine family",
null,
false));
}

public static Optional<String> getEngine(Map<String, Object> tableProperties)
public static ClickHouseEngineType getEngine(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return (ClickHouseEngineType) tableProperties.get(ENGINE_PROPERTY);
}

@SuppressWarnings("unchecked")
public static List<String> getOrderBy(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return (List<String>) tableProperties.get(ORDER_BY_PROPERTY);
}

@SuppressWarnings("unchecked")
public static List<String> getPartitionBy(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return (List<String>) tableProperties.get(PARTITION_BY_PROPERTY);
}

@SuppressWarnings("unchecked")
public static List<String> getPrimaryKey(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return (List<String>) tableProperties.get(PRIMARY_KEY_PROPERTY);
}

public static Optional<String> getSampleBy(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties);
requireNonNull(tableProperties, "tableProperties is null");

return Optional.ofNullable(tableProperties.get(ENGINE_PROPERTY)).map(String.class::cast);
return Optional.ofNullable(tableProperties.get(SAMPLE_BY_PROPERTY)).map(String.class::cast);
}

@Override
Expand Down