From 7a0147a85be57680b0aae7406afe11ac72eec5e0 Mon Sep 17 00:00:00 2001 From: Rodrigo Meneses Date: Wed, 31 Jan 2024 10:11:48 -0800 Subject: [PATCH] Flink: Backport #9364 to 1.16 and 1.17 for Create CatalogTestBase for migration to JUnit5 --- .../apache/iceberg/flink/CatalogTestBase.java | 143 ++++++++++ .../flink/TestFlinkCatalogDatabase.java | 267 ++++++++---------- .../TestFlinkCatalogTablePartitions.java | 46 ++- .../TestMetadataTableReadableMetrics.java | 48 ++-- .../apache/iceberg/flink/CatalogTestBase.java | 143 ++++++++++ .../flink/TestFlinkCatalogDatabase.java | 267 ++++++++---------- .../TestFlinkCatalogTablePartitions.java | 46 ++- .../TestMetadataTableReadableMetrics.java | 48 ++-- 8 files changed, 604 insertions(+), 404 deletions(-) create mode 100644 flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java create mode 100644 flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java new file mode 100644 index 000000000000..91ed3c4adea3 --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.flink.util.ArrayUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public abstract class CatalogTestBase extends TestBase { + + protected static final String DATABASE = "db"; + @TempDir protected File hiveWarehouse; + @TempDir protected File hadoopWarehouse; + + @Parameter(index = 0) + protected String catalogName; + + @Parameter(index = 1) + protected Namespace baseNamespace; + + protected Catalog validationCatalog; + protected SupportsNamespaces validationNamespaceCatalog; + protected Map config = Maps.newHashMap(); + + protected String flinkDatabase; + protected Namespace icebergNamespace; + protected boolean isHadoopCatalog; + + @Parameters(name = "catalogName={0}, baseNamespace={1}") + protected static List parameters() { + return Arrays.asList( + new Object[] {"testhive", Namespace.empty()}, + new Object[] {"testhadoop", Namespace.empty()}, + new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")}); + } + + @BeforeEach + public void before() { + this.isHadoopCatalog = catalogName.startsWith("testhadoop"); + this.validationCatalog = + isHadoopCatalog + ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getPath()) + : catalog; + this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; + + config.put("type", "iceberg"); + if (!baseNamespace.isEmpty()) { + config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString()); + } + if (isHadoopCatalog) { + config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"); + } else { + config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); + config.put(CatalogProperties.URI, getURI(hiveConf)); + } + config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot())); + + this.flinkDatabase = catalogName + "." + DATABASE; + this.icebergNamespace = + Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] {DATABASE})); + sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config)); + } + + @AfterEach + public void clean() { + dropCatalog(catalogName, true); + } + + protected String warehouseRoot() { + if (isHadoopCatalog) { + return hadoopWarehouse.getAbsolutePath(); + } else { + return hiveWarehouse.getAbsolutePath(); + } + } + + protected String getFullQualifiedTableName(String tableName) { + final List levels = Lists.newArrayList(icebergNamespace.levels()); + levels.add(tableName); + return Joiner.on('.').join(levels); + } + + static String getURI(HiveConf conf) { + return conf.get(HiveConf.ConfVars.METASTOREURIS.varname); + } + + static String toWithClause(Map props) { + StringBuilder builder = new StringBuilder(); + builder.append("("); + int propCount = 0; + for (Map.Entry entry : props.entrySet()) { + if (propCount > 0) { + builder.append(","); + } + builder + .append("'") + .append(entry.getKey()) + .append("'") + .append("=") + .append("'") + .append(entry.getValue()) + .append("'"); + propCount++; + } + builder.append(")"); + return builder.toString(); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java index 47b47cb6262d..f46d50a5f0ab 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.flink; -import java.io.File; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.Objects; @@ -29,18 +32,12 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Test; - -public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; - public TestFlinkCatalogDatabase(String catalogName, Namespace baseNamespace) { - super(catalogName, baseNamespace); - } +public class TestFlinkCatalogDatabase extends CatalogTestBase { - @After + @AfterEach @Override public void clean() { sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase); @@ -48,240 +45,204 @@ public void clean() { super.clean(); } - @Test + @TestTemplate public void testCreateNamespace() { - Assert.assertFalse( - "Database should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - - Assert.assertTrue( - "Database should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should exist") + .isTrue(); sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); - Assert.assertTrue( - "Database should still exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should still exist") + .isTrue(); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); - Assert.assertFalse( - "Database should be dropped", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should be dropped") + .isFalse(); sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); - Assert.assertTrue( - "Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should be created") + .isTrue(); } - @Test + @TestTemplate public void testDropEmptyDatabase() { - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); sql("DROP DATABASE %s", flinkDatabase); - - Assert.assertFalse( - "Namespace should have been dropped", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should have been dropped") + .isFalse(); } - @Test + @TestTemplate public void testDropNonEmptyNamespace() { - Assume.assumeFalse( - "Hadoop catalog throws IOException: Directory is not empty.", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assumeThat(isHadoopCatalog) + .as("Hadoop catalog throws IOException: Directory is not empty.") + .isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - validationCatalog.createTable( TableIdentifier.of(icebergNamespace, "tl"), new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()))); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - Assert.assertTrue( - "Table should exist", - validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl"))); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); + assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl"))) + .as("Table should exist") + .isTrue(); Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase)) .cause() .isInstanceOf(DatabaseNotEmptyException.class) .hasMessage( String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName)); - sql("DROP TABLE %s.tl", flinkDatabase); } - @Test + @TestTemplate public void testListTables() { - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - - Assert.assertEquals("Should not list any tables", 0, sql("SHOW TABLES").size()); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); + assertThat(sql("SHOW TABLES")).isEmpty(); validationCatalog.createTable( TableIdentifier.of(icebergNamespace, "tl"), new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()))); List tables = sql("SHOW TABLES"); - Assert.assertEquals("Only 1 table", 1, tables.size()); - Assert.assertEquals("Table name should match", "tl", tables.get(0).getField(0)); + assertThat(tables).hasSize(1); + assertThat("tl").as("Table name should match").isEqualTo(tables.get(0).getField(0)); } - @Test + @TestTemplate public void testListNamespace() { - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); sql("USE CATALOG %s", catalogName); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); List databases = sql("SHOW DATABASES"); if (isHadoopCatalog) { - Assert.assertEquals("Should have 1 database", 1, databases.size()); - Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0)); - + assertThat(databases).hasSize(1); + assertThat(databases.get(0).getField(0)).as("Should have db database").isEqualTo("db"); if (!baseNamespace.isEmpty()) { // test namespace not belongs to this catalog validationNamespaceCatalog.createNamespace( Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE")); databases = sql("SHOW DATABASES"); - Assert.assertEquals("Should have 1 database", 1, databases.size()); - Assert.assertEquals( - "Should have db and default database", "db", databases.get(0).getField(0)); + assertThat(databases).hasSize(1); + assertThat(databases.get(0).getField(0)).as("Should have db database").isEqualTo("db"); } } else { // If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the // creation for default // database. See HiveMetaStore.HMSHandler.init. - Assert.assertTrue( - "Should have db database", - databases.stream().anyMatch(d -> Objects.equals(d.getField(0), "db"))); + assertThat(databases) + .as("Should have db database") + .anyMatch(d -> Objects.equals(d.getField(0), "db")); } } - @Test + @TestTemplate public void testCreateNamespaceWithMetadata() { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected prop value", "value", nsMetadata.get("prop")); + assertThat(nsMetadata).containsEntry("prop", "value"); } - @Test + @TestTemplate public void testCreateNamespaceWithComment() { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s COMMENT 'namespace doc'", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected comment", "namespace doc", nsMetadata.get("comment")); + assertThat(nsMetadata).containsEntry("comment", "namespace doc"); } - @Test + @TestTemplate public void testCreateNamespaceWithLocation() throws Exception { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - - File location = TEMPORARY_FOLDER.newFile(); - Assert.assertTrue(location.delete()); + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); + Path location = temporaryDirectory.getRoot(); sql("CREATE DATABASE %s WITH ('location'='%s')", flinkDatabase, location); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected location", - "file:" + location.getPath(), - nsMetadata.get("location")); + assertThat(nsMetadata).containsEntry("location", "file:" + location.getRoot()); } - @Test + @TestTemplate public void testSetProperties() { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map defaultMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - Assert.assertFalse( - "Default metadata should not have custom property", defaultMetadata.containsKey("prop")); - + assertThat(defaultMetadata).doesNotContainKey("prop"); sql("ALTER DATABASE %s SET ('prop'='value')", flinkDatabase); - Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected prop value", "value", nsMetadata.get("prop")); + assertThat(nsMetadata).containsEntry("prop", "value"); } - @Test + @TestTemplate public void testHadoopNotSupportMeta() { - Assume.assumeTrue("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isTrue(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); Assertions.assertThatThrownBy( () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase)) .cause() diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java index fad65f4c63c8..5716abf2e10b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.List; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.ObjectPath; @@ -25,25 +27,28 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; -public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase { +public class TestFlinkCatalogTablePartitions extends CatalogTestBase { private String tableName = "test_table"; - private final FileFormat format; + @Parameter(index = 2) + private FileFormat format; + + @Parameter(index = 3) + private Boolean cacheEnabled; - @Parameterized.Parameters( - name = "catalogName={0}, baseNamespace={1}, format={2}, cacheEnabled={3}") - public static Iterable parameters() { + @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, cacheEnabled={3}") + protected static List parameters() { List parameters = Lists.newArrayList(); for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { @@ -58,30 +63,24 @@ public static Iterable parameters() { return parameters; } - public TestFlinkCatalogTablePartitions( - String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) { - super(catalogName, baseNamespace); - this.format = format; - config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled)); - } - @Override - @Before + @BeforeEach public void before() { super.before(); + config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled)); sql("CREATE DATABASE %s", flinkDatabase); sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); } - @After + @AfterEach public void cleanNamespaces() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); super.clean(); } - @Test + @TestTemplate public void testListPartitionsWithUnpartitionedTable() { sql( "CREATE TABLE %s (id INT, data VARCHAR) with ('write.format.default'='%s')", @@ -95,7 +94,7 @@ public void testListPartitionsWithUnpartitionedTable() { .hasMessage("Table " + objectPath + " in catalog " + catalogName + " is not partitioned."); } - @Test + @TestTemplate public void testListPartitionsWithPartitionedTable() throws TableNotExistException, TableNotPartitionedException { sql( @@ -108,13 +107,12 @@ public void testListPartitionsWithPartitionedTable() ObjectPath objectPath = new ObjectPath(DATABASE, tableName); FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get(); List list = flinkCatalog.listPartitions(objectPath); - Assert.assertEquals("Should have 2 partition", 2, list.size()); - + assertThat(list).hasSize(2); List expected = Lists.newArrayList(); CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(ImmutableMap.of("data", "a")); CatalogPartitionSpec partitionSpec2 = new CatalogPartitionSpec(ImmutableMap.of("data", "b")); expected.add(partitionSpec1); expected.add(partitionSpec2); - Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected); + assertThat(list).as("Should produce the expected catalog partition specs.").isEqualTo(expected); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java index f05bf2fcd9e5..40dfda723749 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java @@ -21,9 +21,11 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.file.Path; import java.util.Base64; import java.util.List; import org.apache.flink.configuration.Configuration; @@ -32,6 +34,7 @@ import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -40,28 +43,22 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; -public class TestMetadataTableReadableMetrics extends FlinkCatalogTestBase { +public class TestMetadataTableReadableMetrics extends CatalogTestBase { private static final String TABLE_NAME = "test_table"; - public TestMetadataTableReadableMetrics(String catalogName, Namespace baseNamespace) { - super(catalogName, baseNamespace); - } - - @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}") - public static Iterable parameters() { + @Parameters(name = "catalogName={0}, baseNamespace={1}") + protected static List parameters() { List parameters = Lists.newArrayList(); String catalogName = "testhive"; Namespace baseNamespace = Namespace.empty(); @@ -76,7 +73,7 @@ protected TableEnvironment getTableEnv() { return super.getTableEnv(); } - @Rule public TemporaryFolder temp = new TemporaryFolder(); + private @TempDir Path temp; private static final Types.StructType LEAF_STRUCT_TYPE = Types.StructType.of( @@ -134,8 +131,8 @@ private Table createPrimitiveTable() throws IOException { createPrimitiveRecord( false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2", null, null)); - DataFile dataFile = - FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); + File testFile = File.createTempFile("junit", null, temp.toFile()); + DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records); table.newAppend().appendFile(dataFile).commit(); return table; } @@ -153,12 +150,13 @@ private void createNestedTable() throws IOException { createNestedRecord(0L, 0.0), createNestedRecord(1L, Double.NaN), createNestedRecord(null, null)); - DataFile dataFile = - FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); + + File testFile = File.createTempFile("junit", null, temp.toFile()); + DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records); table.newAppend().appendFile(dataFile).commit(); } - @Before + @BeforeEach public void before() { super.before(); sql("USE CATALOG %s", catalogName); @@ -167,7 +165,7 @@ public void before() { } @Override - @After + @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); @@ -212,7 +210,7 @@ protected Object[] row(Object... values) { return values; } - @Test + @TestTemplate public void testPrimitiveColumns() throws Exception { createPrimitiveTable(); List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); @@ -257,7 +255,7 @@ public void testPrimitiveColumns() throws Exception { TestHelpers.assertRows(result, expected); } - @Test + @TestTemplate public void testSelectPrimitiveValues() throws Exception { createPrimitiveTable(); @@ -276,7 +274,7 @@ public void testSelectPrimitiveValues() throws Exception { ImmutableList.of(Row.of(4L, 0))); } - @Test + @TestTemplate public void testSelectNestedValues() throws Exception { createNestedTable(); TestHelpers.assertRows( @@ -287,7 +285,7 @@ public void testSelectNestedValues() throws Exception { ImmutableList.of(Row.of(0L, 3L))); } - @Test + @TestTemplate public void testNestedValues() throws Exception { createNestedTable(); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java new file mode 100644 index 000000000000..91ed3c4adea3 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.flink.util.ArrayUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public abstract class CatalogTestBase extends TestBase { + + protected static final String DATABASE = "db"; + @TempDir protected File hiveWarehouse; + @TempDir protected File hadoopWarehouse; + + @Parameter(index = 0) + protected String catalogName; + + @Parameter(index = 1) + protected Namespace baseNamespace; + + protected Catalog validationCatalog; + protected SupportsNamespaces validationNamespaceCatalog; + protected Map config = Maps.newHashMap(); + + protected String flinkDatabase; + protected Namespace icebergNamespace; + protected boolean isHadoopCatalog; + + @Parameters(name = "catalogName={0}, baseNamespace={1}") + protected static List parameters() { + return Arrays.asList( + new Object[] {"testhive", Namespace.empty()}, + new Object[] {"testhadoop", Namespace.empty()}, + new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")}); + } + + @BeforeEach + public void before() { + this.isHadoopCatalog = catalogName.startsWith("testhadoop"); + this.validationCatalog = + isHadoopCatalog + ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getPath()) + : catalog; + this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; + + config.put("type", "iceberg"); + if (!baseNamespace.isEmpty()) { + config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString()); + } + if (isHadoopCatalog) { + config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"); + } else { + config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); + config.put(CatalogProperties.URI, getURI(hiveConf)); + } + config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot())); + + this.flinkDatabase = catalogName + "." + DATABASE; + this.icebergNamespace = + Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] {DATABASE})); + sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config)); + } + + @AfterEach + public void clean() { + dropCatalog(catalogName, true); + } + + protected String warehouseRoot() { + if (isHadoopCatalog) { + return hadoopWarehouse.getAbsolutePath(); + } else { + return hiveWarehouse.getAbsolutePath(); + } + } + + protected String getFullQualifiedTableName(String tableName) { + final List levels = Lists.newArrayList(icebergNamespace.levels()); + levels.add(tableName); + return Joiner.on('.').join(levels); + } + + static String getURI(HiveConf conf) { + return conf.get(HiveConf.ConfVars.METASTOREURIS.varname); + } + + static String toWithClause(Map props) { + StringBuilder builder = new StringBuilder(); + builder.append("("); + int propCount = 0; + for (Map.Entry entry : props.entrySet()) { + if (propCount > 0) { + builder.append(","); + } + builder + .append("'") + .append(entry.getKey()) + .append("'") + .append("=") + .append("'") + .append(entry.getValue()) + .append("'"); + propCount++; + } + builder.append(")"); + return builder.toString(); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java index 47b47cb6262d..f46d50a5f0ab 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.flink; -import java.io.File; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.Objects; @@ -29,18 +32,12 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Test; - -public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; - public TestFlinkCatalogDatabase(String catalogName, Namespace baseNamespace) { - super(catalogName, baseNamespace); - } +public class TestFlinkCatalogDatabase extends CatalogTestBase { - @After + @AfterEach @Override public void clean() { sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase); @@ -48,240 +45,204 @@ public void clean() { super.clean(); } - @Test + @TestTemplate public void testCreateNamespace() { - Assert.assertFalse( - "Database should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - - Assert.assertTrue( - "Database should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should exist") + .isTrue(); sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); - Assert.assertTrue( - "Database should still exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should still exist") + .isTrue(); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); - Assert.assertFalse( - "Database should be dropped", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should be dropped") + .isFalse(); sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); - Assert.assertTrue( - "Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Database should be created") + .isTrue(); } - @Test + @TestTemplate public void testDropEmptyDatabase() { - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); sql("DROP DATABASE %s", flinkDatabase); - - Assert.assertFalse( - "Namespace should have been dropped", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should have been dropped") + .isFalse(); } - @Test + @TestTemplate public void testDropNonEmptyNamespace() { - Assume.assumeFalse( - "Hadoop catalog throws IOException: Directory is not empty.", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assumeThat(isHadoopCatalog) + .as("Hadoop catalog throws IOException: Directory is not empty.") + .isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - validationCatalog.createTable( TableIdentifier.of(icebergNamespace, "tl"), new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()))); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - Assert.assertTrue( - "Table should exist", - validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl"))); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); + assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl"))) + .as("Table should exist") + .isTrue(); Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase)) .cause() .isInstanceOf(DatabaseNotEmptyException.class) .hasMessage( String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName)); - sql("DROP TABLE %s.tl", flinkDatabase); } - @Test + @TestTemplate public void testListTables() { - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - - Assert.assertEquals("Should not list any tables", 0, sql("SHOW TABLES").size()); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); + assertThat(sql("SHOW TABLES")).isEmpty(); validationCatalog.createTable( TableIdentifier.of(icebergNamespace, "tl"), new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()))); List tables = sql("SHOW TABLES"); - Assert.assertEquals("Only 1 table", 1, tables.size()); - Assert.assertEquals("Table name should match", "tl", tables.get(0).getField(0)); + assertThat(tables).hasSize(1); + assertThat("tl").as("Table name should match").isEqualTo(tables.get(0).getField(0)); } - @Test + @TestTemplate public void testListNamespace() { - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); sql("USE CATALOG %s", catalogName); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); List databases = sql("SHOW DATABASES"); if (isHadoopCatalog) { - Assert.assertEquals("Should have 1 database", 1, databases.size()); - Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0)); - + assertThat(databases).hasSize(1); + assertThat(databases.get(0).getField(0)).as("Should have db database").isEqualTo("db"); if (!baseNamespace.isEmpty()) { // test namespace not belongs to this catalog validationNamespaceCatalog.createNamespace( Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE")); databases = sql("SHOW DATABASES"); - Assert.assertEquals("Should have 1 database", 1, databases.size()); - Assert.assertEquals( - "Should have db and default database", "db", databases.get(0).getField(0)); + assertThat(databases).hasSize(1); + assertThat(databases.get(0).getField(0)).as("Should have db database").isEqualTo("db"); } } else { // If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the // creation for default // database. See HiveMetaStore.HMSHandler.init. - Assert.assertTrue( - "Should have db database", - databases.stream().anyMatch(d -> Objects.equals(d.getField(0), "db"))); + assertThat(databases) + .as("Should have db database") + .anyMatch(d -> Objects.equals(d.getField(0), "db")); } } - @Test + @TestTemplate public void testCreateNamespaceWithMetadata() { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected prop value", "value", nsMetadata.get("prop")); + assertThat(nsMetadata).containsEntry("prop", "value"); } - @Test + @TestTemplate public void testCreateNamespaceWithComment() { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s COMMENT 'namespace doc'", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected comment", "namespace doc", nsMetadata.get("comment")); + assertThat(nsMetadata).containsEntry("comment", "namespace doc"); } - @Test + @TestTemplate public void testCreateNamespaceWithLocation() throws Exception { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - - File location = TEMPORARY_FOLDER.newFile(); - Assert.assertTrue(location.delete()); + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); + Path location = temporaryDirectory.getRoot(); sql("CREATE DATABASE %s WITH ('location'='%s')", flinkDatabase, location); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected location", - "file:" + location.getPath(), - nsMetadata.get("location")); + assertThat(nsMetadata).containsEntry("location", "file:" + location.getRoot()); } - @Test + @TestTemplate public void testSetProperties() { - Assume.assumeFalse("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isFalse(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); sql("CREATE DATABASE %s", flinkDatabase); - - Assert.assertTrue( - "Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should exist") + .isTrue(); Map defaultMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - Assert.assertFalse( - "Default metadata should not have custom property", defaultMetadata.containsKey("prop")); - + assertThat(defaultMetadata).doesNotContainKey("prop"); sql("ALTER DATABASE %s SET ('prop'='value')", flinkDatabase); - Map nsMetadata = validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace); - - Assert.assertEquals( - "Namespace should have expected prop value", "value", nsMetadata.get("prop")); + assertThat(nsMetadata).containsEntry("prop", "value"); } - @Test + @TestTemplate public void testHadoopNotSupportMeta() { - Assume.assumeTrue("HadoopCatalog does not support namespace metadata", isHadoopCatalog); - - Assert.assertFalse( - "Namespace should not already exist", - validationNamespaceCatalog.namespaceExists(icebergNamespace)); - + assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace metadata").isTrue(); + assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) + .as("Namespace should not already exist") + .isFalse(); Assertions.assertThatThrownBy( () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase)) .cause() diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java index 0008e4320c8a..05fd1bad1ddb 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.List; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.ObjectPath; @@ -25,25 +27,28 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; -public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase { +public class TestFlinkCatalogTablePartitions extends CatalogTestBase { private String tableName = "test_table"; - private final FileFormat format; + @Parameter(index = 2) + private FileFormat format; + + @Parameter(index = 3) + private Boolean cacheEnabled; - @Parameterized.Parameters( - name = "catalogName={0}, baseNamespace={1}, format={2}, cacheEnabled={3}") - public static Iterable parameters() { + @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, cacheEnabled={3}") + protected static List parameters() { List parameters = Lists.newArrayList(); for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { @@ -58,30 +63,24 @@ public static Iterable parameters() { return parameters; } - public TestFlinkCatalogTablePartitions( - String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) { - super(catalogName, baseNamespace); - this.format = format; - config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled)); - } - @Override - @Before + @BeforeEach public void before() { super.before(); + config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled)); sql("CREATE DATABASE %s", flinkDatabase); sql("USE CATALOG %s", catalogName); sql("USE %s", DATABASE); } - @After + @AfterEach public void cleanNamespaces() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); super.clean(); } - @Test + @TestTemplate public void testListPartitionsWithUnpartitionedTable() { sql( "CREATE TABLE %s (id INT, data VARCHAR) with ('write.format.default'='%s')", @@ -96,7 +95,7 @@ public void testListPartitionsWithUnpartitionedTable() { .hasMessageEndingWith("is not partitioned."); } - @Test + @TestTemplate public void testListPartitionsWithPartitionedTable() throws TableNotExistException, TableNotPartitionedException { sql( @@ -109,13 +108,12 @@ public void testListPartitionsWithPartitionedTable() ObjectPath objectPath = new ObjectPath(DATABASE, tableName); FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get(); List list = flinkCatalog.listPartitions(objectPath); - Assert.assertEquals("Should have 2 partition", 2, list.size()); - + assertThat(list).hasSize(2); List expected = Lists.newArrayList(); CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(ImmutableMap.of("data", "a")); CatalogPartitionSpec partitionSpec2 = new CatalogPartitionSpec(ImmutableMap.of("data", "b")); expected.add(partitionSpec1); expected.add(partitionSpec2); - Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected); + assertThat(list).as("Should produce the expected catalog partition specs.").isEqualTo(expected); } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java index f05bf2fcd9e5..40dfda723749 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java @@ -21,9 +21,11 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.file.Path; import java.util.Base64; import java.util.List; import org.apache.flink.configuration.Configuration; @@ -32,6 +34,7 @@ import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -40,28 +43,22 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; -public class TestMetadataTableReadableMetrics extends FlinkCatalogTestBase { +public class TestMetadataTableReadableMetrics extends CatalogTestBase { private static final String TABLE_NAME = "test_table"; - public TestMetadataTableReadableMetrics(String catalogName, Namespace baseNamespace) { - super(catalogName, baseNamespace); - } - - @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}") - public static Iterable parameters() { + @Parameters(name = "catalogName={0}, baseNamespace={1}") + protected static List parameters() { List parameters = Lists.newArrayList(); String catalogName = "testhive"; Namespace baseNamespace = Namespace.empty(); @@ -76,7 +73,7 @@ protected TableEnvironment getTableEnv() { return super.getTableEnv(); } - @Rule public TemporaryFolder temp = new TemporaryFolder(); + private @TempDir Path temp; private static final Types.StructType LEAF_STRUCT_TYPE = Types.StructType.of( @@ -134,8 +131,8 @@ private Table createPrimitiveTable() throws IOException { createPrimitiveRecord( false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2", null, null)); - DataFile dataFile = - FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); + File testFile = File.createTempFile("junit", null, temp.toFile()); + DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records); table.newAppend().appendFile(dataFile).commit(); return table; } @@ -153,12 +150,13 @@ private void createNestedTable() throws IOException { createNestedRecord(0L, 0.0), createNestedRecord(1L, Double.NaN), createNestedRecord(null, null)); - DataFile dataFile = - FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); + + File testFile = File.createTempFile("junit", null, temp.toFile()); + DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records); table.newAppend().appendFile(dataFile).commit(); } - @Before + @BeforeEach public void before() { super.before(); sql("USE CATALOG %s", catalogName); @@ -167,7 +165,7 @@ public void before() { } @Override - @After + @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); @@ -212,7 +210,7 @@ protected Object[] row(Object... values) { return values; } - @Test + @TestTemplate public void testPrimitiveColumns() throws Exception { createPrimitiveTable(); List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); @@ -257,7 +255,7 @@ public void testPrimitiveColumns() throws Exception { TestHelpers.assertRows(result, expected); } - @Test + @TestTemplate public void testSelectPrimitiveValues() throws Exception { createPrimitiveTable(); @@ -276,7 +274,7 @@ public void testSelectPrimitiveValues() throws Exception { ImmutableList.of(Row.of(4L, 0))); } - @Test + @TestTemplate public void testSelectNestedValues() throws Exception { createNestedTable(); TestHelpers.assertRows( @@ -287,7 +285,7 @@ public void testSelectNestedValues() throws Exception { ImmutableList.of(Row.of(0L, 3L))); } - @Test + @TestTemplate public void testNestedValues() throws Exception { createNestedTable();