Skip to content

Commit

Permalink
Expose table properties in ClickHouse
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 14, 2022
1 parent 4c81c20 commit 40bbbde
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 14 deletions.
Expand Up @@ -15,7 +15,10 @@

import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.client.ClickHouseDataType;
import com.google.common.base.Enums;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.InetAddresses;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -64,6 +67,7 @@
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.time.LocalDate;
Expand All @@ -83,6 +87,10 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.plugin.clickhouse.ClickHouseSessionProperties.isMapStringAsVarchar;
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.ENGINE_PROPERTY;
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.ORDER_BY_PROPERTY;
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.PARTITION_BY_PROPERTY;
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.PRIMARY_KEY_PROPERTY;
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.SAMPLE_BY_PROPERTY;
import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalDefaultScale;
Expand Down Expand Up @@ -140,10 +148,13 @@
import static java.lang.String.join;
import static java.lang.System.arraycopy;
import static java.time.ZoneOffset.UTC;
import static java.util.Locale.ENGLISH;

public class ClickHouseClient
extends BaseJdbcClient
{
private static final Splitter TABLE_PROPERTY_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private static final long MIN_SUPPORTED_DATE_EPOCH = LocalDate.parse("1970-01-01").toEpochDay();
private static final long MAX_SUPPORTED_DATE_EPOCH = LocalDate.parse("2106-02-07").toEpochDay(); // The max date is '2148-12-31' in new ClickHouse version

Expand Down Expand Up @@ -243,6 +254,51 @@ protected String createTableSql(RemoteTableName remoteTableName, List<String> co
return format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build()));
}

@Override
public Map<String, Object> getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle)
{
try (Connection connection = connectionFactory.openConnection(session);
PreparedStatement statement = connection.prepareStatement("" +
"SELECT engine, sorting_key, partition_key, primary_key, sampling_key " +
"FROM system.tables " +
"WHERE database = ? AND name = ?")) {
statement.setString(1, tableHandle.asPlainTable().getRemoteTableName().getSchemaName().orElse(null));
statement.setString(2, tableHandle.asPlainTable().getRemoteTableName().getTableName());

try (ResultSet resultSet = statement.executeQuery()) {
ImmutableMap.Builder<String, Object> properties = new ImmutableMap.Builder<>();
while (resultSet.next()) {
String engine = resultSet.getString("engine");
if (!isNullOrEmpty(engine)) {
// Don't throw an exception because many table engines aren't supported in ClickHouseEngineType
Optional<ClickHouseEngineType> engineType = Enums.getIfPresent(ClickHouseEngineType.class, engine.toUpperCase(ENGLISH)).toJavaUtil();
engineType.ifPresent(type -> properties.put(ENGINE_PROPERTY, type));
}
String sortingKey = resultSet.getString("sorting_key");
if (!isNullOrEmpty(sortingKey)) {
properties.put(ORDER_BY_PROPERTY, TABLE_PROPERTY_SPLITTER.splitToList(sortingKey));
}
String partitionKey = resultSet.getString("partition_key");
if (!isNullOrEmpty(partitionKey)) {
properties.put(PARTITION_BY_PROPERTY, TABLE_PROPERTY_SPLITTER.splitToList(partitionKey));
}
String primaryKey = resultSet.getString("primary_key");
if (!isNullOrEmpty(primaryKey)) {
properties.put(PRIMARY_KEY_PROPERTY, TABLE_PROPERTY_SPLITTER.splitToList(primaryKey));
}
String samplingKey = resultSet.getString("sampling_key");
if (!isNullOrEmpty(samplingKey)) {
properties.put(SAMPLE_BY_PROPERTY, samplingKey);
}
}
return properties.build();
}
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

@Override
public void setTableProperties(ConnectorSession session, JdbcTableHandle handle, Map<String, Optional<Object>> nullableProperties)
{
Expand Down
Expand Up @@ -15,6 +15,9 @@

import io.trino.plugin.jdbc.BaseJdbcConnectorSmokeTest;
import io.trino.testing.TestingConnectorBehavior;
import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class BaseClickHouseConnectorSmokeTest
extends BaseJdbcConnectorSmokeTest
Expand All @@ -29,4 +32,21 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return super.hasBehavior(connectorBehavior);
}
}

@Test
@Override
public void testShowCreateTable()
{
// Override to add table properties
assertThat((String) computeScalar("SHOW CREATE TABLE region"))
.isEqualTo("" +
"CREATE TABLE clickhouse.tpch.region (\n" +
" regionkey bigint,\n" +
" name varchar,\n" +
" comment varchar\n" +
")\n" +
"WITH (\n" +
" engine = 'LOG'\n" +
")");
}
}
Expand Up @@ -171,6 +171,7 @@ public void testAddColumn()
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
}

@Test
@Override
public void testShowCreateTable()
{
Expand All @@ -185,6 +186,9 @@ public void testShowCreateTable()
" clerk varchar,\n" +
" shippriority integer,\n" +
" comment varchar\n" +
")\n" +
"WITH (\n" +
" engine = 'LOG'\n" +
")");
}

Expand Down Expand Up @@ -264,13 +268,6 @@ public void testDifferentEngine()
"Unable to set catalog 'clickhouse' table property 'engine' to.*");
}

/**
* test clickhouse table properties
* <p>
* Because the current connector does not support the `show create table` statement to display all the table properties,
* so we cannot use this statement to test whether the properties of the created table meet our expectations,
* and we will modify the test case after the `show create table` is full supported
*/
@Test
public void testTableProperty()
{
Expand All @@ -282,15 +279,39 @@ public void testTableProperty()

// one required property
assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR) WITH (engine = 'Log')");
assertTrue(getQueryRunner().tableExists(getSession(), tableName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" x varchar\n" +
")\n" +
"WITH (\n" +
" engine = 'LOG'\n" +
")", tableName));
assertUpdate("DROP TABLE " + tableName);

assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR) WITH (engine = 'StripeLog')");
assertTrue(getQueryRunner().tableExists(getSession(), tableName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" x varchar\n" +
")\n" +
"WITH (\n" +
" engine = 'STRIPELOG'\n" +
")", tableName));
assertUpdate("DROP TABLE " + tableName);

assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR) WITH (engine = 'TinyLog')");
assertTrue(getQueryRunner().tableExists(getSession(), tableName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" x varchar\n" +
")\n" +
"WITH (\n" +
" engine = 'TINYLOG'\n" +
")", tableName));
assertUpdate("DROP TABLE " + tableName);

// Log engine DOES NOT any property
Expand All @@ -300,22 +321,82 @@ public void testTableProperty()

// optional properties
assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR) WITH (engine = 'MergeTree', order_by = ARRAY['id'])");
assertTrue(getQueryRunner().tableExists(getSession(), tableName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" x varchar\n" +
")\n" +
"WITH (\n" +
" engine = 'MERGETREE',\n" +
" order_by = ARRAY['id'],\n" +
" primary_key = ARRAY['id']\n" + // order_by become primary_key automatically in ClickHouse
")", tableName));
assertUpdate("DROP TABLE " + tableName);

// the column refers by order by must be not null
assertQueryFails("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR) WITH (engine = 'MergeTree', order_by = ARRAY['id', 'x'])", ".* Sorting key cannot contain nullable columns.*\\n.*");

assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR) WITH (engine = 'MergeTree', order_by = ARRAY['id'], primary_key = ARRAY['id'])");
assertTrue(getQueryRunner().tableExists(getSession(), tableName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" x varchar\n" +
")\n" +
"WITH (\n" +
" engine = 'MERGETREE',\n" +
" order_by = ARRAY['id'],\n" +
" primary_key = ARRAY['id']\n" +
")", tableName));
assertUpdate("DROP TABLE " + tableName);

assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR NOT NULL, y VARCHAR NOT NULL) WITH (engine = 'MergeTree', order_by = ARRAY['id', 'x', 'y'], primary_key = ARRAY['id', 'x'])");
assertTrue(getQueryRunner().tableExists(getSession(), tableName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" x varchar NOT NULL,\n" +
" y varchar NOT NULL\n" +
")\n" +
"WITH (\n" +
" engine = 'MERGETREE',\n" +
" order_by = ARRAY['id','x','y'],\n" +
" primary_key = ARRAY['id','x']\n" +
")", tableName));
assertUpdate("DROP TABLE " + tableName);

assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR NOT NULL, y VARCHAR NOT NULL) WITH (engine = 'MergeTree', order_by = ARRAY['id', 'x'], primary_key = ARRAY['id','x'], sample_by = 'x' )");
assertTrue(getQueryRunner().tableExists(getSession(), tableName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" x varchar NOT NULL,\n" +
" y varchar NOT NULL\n" +
")\n" +
"WITH (\n" +
" engine = 'MERGETREE',\n" +
" order_by = ARRAY['id','x'],\n" +
" primary_key = ARRAY['id','x'],\n" +
" sample_by = 'x'\n" +
")", tableName));
assertUpdate("DROP TABLE " + tableName);

// Partition column
assertUpdate("CREATE TABLE " + tableName + "(id int NOT NULL, part int NOT NULL) WITH " +
"(engine = 'MergeTree', order_by = ARRAY['id'], partition_by = ARRAY['part'])");
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.isEqualTo(format("" +
"CREATE TABLE clickhouse.tpch.%s (\n" +
" id integer NOT NULL,\n" +
" part integer NOT NULL\n" +
")\n" +
"WITH (\n" +
" engine = 'MERGETREE',\n" +
" order_by = ARRAY['id'],\n" +
" partition_by = ARRAY['part'],\n" +
" primary_key = ARRAY['id']\n" +
")", tableName));
assertUpdate("DROP TABLE " + tableName);

// Primary key must be a prefix of the sorting key,
Expand Down

0 comments on commit 40bbbde

Please sign in to comment.