Skip to content

Commit

Permalink
verify primary keys and partition keys during create table
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <chenxu@dmetasoul.com>
  • Loading branch information
dmetasoul01 committed Apr 30, 2024
1 parent e5cb9d6 commit 2f632d8
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,51 @@
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_HASH_PARTITION_SPLITTER;
import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN_DEFAULT;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FLINK_WAREHOUSE_DIR;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.HASH_BUCKET_NUM;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.HASH_PARTITIONS;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.LAKESOUL_VIEW;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.LAKESOUL_VIEW_TYPE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.USE_CDC;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.VIEW_EXPANDED_QUERY;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.VIEW_ORIGINAL_QUERY;
import static org.apache.flink.util.Preconditions.checkNotNull;

public class LakeSoulCatalog implements Catalog {
Expand Down Expand Up @@ -102,7 +133,8 @@ public void createDatabase(String databaseName, CatalogDatabase catalogDatabase,
throw new CatalogException(String.format("database %s already exists", databaseName));
}
try {
dbManager.createNewNamespace(databaseName, DBUtil.stringMapToJson(catalogDatabase.getProperties()).toJSONString(),
dbManager.createNewNamespace(databaseName,
DBUtil.stringMapToJson(catalogDatabase.getProperties()).toJSONString(),
catalogDatabase.getComment());
} catch (RuntimeException e) {
e.printStackTrace();
Expand Down Expand Up @@ -148,7 +180,8 @@ public void alterDatabase(String databaseName, CatalogDatabase catalogDatabase,
return;
}
}
dbManager.updateNamespaceProperties(databaseName, DBUtil.stringMapToJson(catalogDatabase.getProperties()).toJSONString());
dbManager.updateNamespaceProperties(databaseName,
DBUtil.stringMapToJson(catalogDatabase.getProperties()).toJSONString());
}

@Override
Expand Down Expand Up @@ -278,6 +311,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
List<String> partitionKeys = Collections.emptyList();
if (table instanceof ResolvedCatalogTable) {
partitionKeys = ((ResolvedCatalogTable) table).getPartitionKeys();
validatePrimaryAndPartitionKeys(primaryKeyColumns, partitionKeys);
String path = null;
if (tableOptions.containsKey(TABLE_PATH)) {
path = tableOptions.get(TABLE_PATH);
Expand Down Expand Up @@ -535,4 +569,20 @@ public String getName() {
public void cleanForTest() {
dbManager.cleanMeta();
}

private void validatePrimaryAndPartitionKeys(Optional<UniqueConstraint> primaryKeyColumns,
List<String> partitionKeys) {
primaryKeyColumns.map(uniqueConstraint -> {
Set<String> result = uniqueConstraint.getColumns().stream()
.distinct()
.filter(partitionKeys::contains)
.collect(Collectors.toSet());
if (!result.isEmpty()) {
throw new RuntimeException(
String.format("Primray columns (%s) and partition columns (%s) cannot overlap",
uniqueConstraint.getColumns(), partitionKeys));
}
return 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -55,14 +56,13 @@ public void dropView() throws ExecutionException, InterruptedException {
// tEnv.executeSql("select * from user_info").print();
// tEnv.executeSql("alter table user_info drop partition `date`='1995-10-01'");
// }

@Test
public void alterTableNotSupported() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUser(tEnv);
try {
tEnv.executeSql("ALTER TABLE user_info RENAME TO NewUsers");
}catch (TableException e) {
} catch (TableException e) {
System.out.println("Rename lakesoul table not supported now");
}
}
Expand All @@ -84,26 +84,38 @@ public void explainTable() throws ExecutionException, InterruptedException {
}

@Test
public void loadLakeSoulModuleNotSupported(){
public void loadLakeSoulModuleNotSupported() {
StreamTableEnvironment streamTableEnv = TestUtils.createStreamTableEnv(STREAMING_TYPE);
try {
streamTableEnv.executeSql("LOAD MODULE lakesoul WITH ('format'='lakesoul')");
}catch (ValidationException e) {
} catch (ValidationException e) {
System.out.println("LOAD lakesoul module not supported now");
}
}

@Test
public void unloadModuleTest(){
public void unloadModuleTest() {
StreamTableEnvironment streamTableEnv = TestUtils.createStreamTableEnv(STREAMING_TYPE);
try {
streamTableEnv.executeSql("UNLOAD MODULE core");
streamTableEnv.executeSql("SHOW MODULES");
}catch (ValidationException e) {
} catch (ValidationException e) {
System.out.println("UNLOAD lakesoul module not supported now");
}
}

@Test
public void primaryKeyAndPartitionKeyOverlapTest() {
StreamTableEnvironment streamTableEnv = TestUtils.createStreamTableEnv(STREAMING_TYPE);
Assert.assertThrows("cannot overlap", TableException.class, () -> {
streamTableEnv.executeSql("drop table if exists test_table");
streamTableEnv.executeSql(
"create table test_table (id int, primary key(id) not enforced) partitioned by (id) with " +
"('connector'='lakesoul'," +
"'hashBucketNum'='2')");
});
}

private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info (" +
" order_id INT," +
Expand All @@ -118,7 +130,8 @@ private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws Execut
tEnvs.executeSql(createUserSql);
}

private void createLakeSoulSourceTableViewUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
private void createLakeSoulSourceTableViewUser(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createUserSql = "create view if not exists user_info_view as select * from user_info";
tEnvs.executeSql("DROP view if exists user_info_view");
tEnvs.executeSql(createUserSql);
Expand Down

0 comments on commit 2f632d8

Please sign in to comment.