Skip to content

Commit

Permalink
HIVE-26103: Port Iceberg fixes to the iceberg module (apache#3164)
Browse files Browse the repository at this point in the history
* Source Iceberg PR - Core: Remove deprecated APIs up to 0.13.0

* Revert "HIVE-25563: Iceberg table operations hang a long time if metadata is missing/corrupted (Adam Szita, reviewed by Marton Bod)" - applying instead  Hive: Limit number of retries when metadata file is missing (apache#3379)

This reverts commit 7b600fe.

* Source Iceberg PR - Hive: Limit number of retries when metadata file is missing (apache#3379)

* Source Iceberg PR - Hive: Fix RetryingMetaStoreClient for Hive 2.1 (apache#3403)

* Source Iceberg PR - Switch from new HashMap to Maps.newHashMap (apache#3648)

* Source Iceberg PR - Hive: HiveCatalog should remove HMS stats for certain engines based on config (apache#3652) - Use the Iceberg config property

* Source Iceberg PR - Core: If status check fails, commit should be unknown (apache#3717)

* Source Iceberg PR - Build: Add checkstyle rule for instantiating HashMap, HashSet, ArrayList (apache#3689)

* Source Iceberg PR - Test: Make sure to delete temp folders (apache#3790)

* Source Iceberg PR - API: Register existing tables in Iceberg HiveCatalog (apache#3851)

* Source Iceberg PR - Hive: Make Iceberg table filter optional in HiveCatalog (apache#3908)

* Source Iceberg PR - Core: Add reserved UUID Table Property and Expose in HMS. (apache#3914)

* Source Iceberg PR - Hive: Known exception should not become CommitStateUnknownException (apache#4261)

* Source Iceberg PR - Build: Add missing @OverRide annotations (apache#3654)
  • Loading branch information
pvary committed Apr 4, 2022
1 parent 8d0365f commit 6e25a01
Show file tree
Hide file tree
Showing 55 changed files with 419 additions and 254 deletions.
4 changes: 0 additions & 4 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5693,10 +5693,6 @@ public static enum ConfVars {
HIVE_SERVER2_ICEBERG_METADATA_GENERATOR_THREADS("hive.server2.iceberg.metadata.generator.threads", 10,
"Number of threads used to scan partition directories for data files and update/generate iceberg metadata"),

HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES("hive.iceberg.metadata.refresh.max.retries", 2,
"Max retry count for trying to access the metadata location in order to refresh metadata during " +
" Iceberg table load."),

/* BLOBSTORE section */

HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
Expand Down
15 changes: 15 additions & 0 deletions iceberg/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@
<property name="format" value="sparkContext\(\)\.hadoopConfiguration\(\)"/>
<property name="message" value="Are you sure that you want to use sparkContext().hadoopConfiguration()? In most cases, you should use sessionState().newHadoopConf() instead, so that the Hadoop configurations specified in the Spark session configuration will come into effect."/>
</module>
<module name="RegexpSingleline">
<property name="format" value="new HashMap&lt;&gt;\(.*\)"/>
<property name="message"
value="Prefer using Maps.newHashMap instead."/>
</module>
<module name="RegexpSingleline">
<property name="format" value="new ArrayList&lt;&gt;\(.*\)"/>
<property name="message"
value="Prefer using Lists.newArrayList() instead."/>
</module>
<module name="RegexpSingleline">
<property name="format" value="new HashSet&lt;&gt;\(.*\)"/>
<property name="message"
value="Prefer using Sets.newHashSet() instead."/>
</module>
<module name="SuppressionFilter"> <!-- baseline-gradle: README.md -->
<property name="file" value="${config_loc}/checkstyle-suppressions.xml"/>
</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -49,49 +51,31 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";

private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);

private String name;
private Configuration conf;
private FileIO fileIO;
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;

public HiveCatalog() {
}

/**
* Hive Catalog constructor.
*
* @param conf Hadoop Configuration
* @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog. Will be removed in
* v0.13.0
*/
@Deprecated
public HiveCatalog(Configuration conf) {
this.name = "hive";
this.conf = conf;
this.fileIO = new HadoopFileIO(conf);
Map<String, String> properties = ImmutableMap.of(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
conf.get(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
String.valueOf(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)),
CatalogProperties.CLIENT_POOL_SIZE,
conf.get(CatalogProperties.CLIENT_POOL_SIZE,
String.valueOf(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT))
);
this.clients = new CachedClientPool(conf, properties);
}

@Override
public void initialize(String inputName, Map<String, String> properties) {
this.name = inputName;
Expand All @@ -108,6 +92,8 @@ public void initialize(String inputName, Map<String, String> properties) {
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
}

this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));

String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

Expand All @@ -122,12 +108,20 @@ public List<TableIdentifier> listTables(Namespace namespace) {

try {
List<String> tableNames = clients.run(client -> client.getAllTables(database));
List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
List<TableIdentifier> tableIdentifiers = tableObjects.stream()
.filter(table -> table.getParameters() == null ? false : BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
.equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());
List<TableIdentifier> tableIdentifiers;

if (listAllTables) {
tableIdentifiers = tableNames.stream()
.map(t -> TableIdentifier.of(namespace, t))
.collect(Collectors.toList());
} else {
List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
tableIdentifiers = tableObjects.stream()
.filter(table -> table.getParameters() != null && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
.equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());
}

LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers);
return tableIdentifiers;
Expand Down Expand Up @@ -235,6 +229,23 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -533,4 +544,9 @@ public void setConf(Configuration conf) {
public Configuration getConf() {
return conf;
}

@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
Expand All @@ -35,9 +37,8 @@ public class HiveClientPool extends ClientPoolImpl<IMetaStoreClient, TException>
// use appropriate ctor depending on whether we're working with Hive1, Hive2, or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive1, Hive2, and Hive3
private static final DynMethods.StaticMethod GET_CLIENT = DynMethods.builder("getProxy")
.impl(RetryingMetaStoreClient.class, HiveConf.class)
.impl(RetryingMetaStoreClient.class, HiveConf.class, Boolean.TYPE)
.impl(RetryingMetaStoreClient.class, Configuration.class, Boolean.TYPE)
.impl(RetryingMetaStoreClient.class, HiveConf.class, HiveMetaHookLoader.class, String.class) // Hive 1 and 2
.impl(RetryingMetaStoreClient.class, Configuration.class, HiveMetaHookLoader.class, String.class) // Hive 3
.buildStatic();

private final HiveConf hiveConf;
Expand All @@ -53,7 +54,7 @@ public HiveClientPool(int poolSize, Configuration conf) {
protected IMetaStoreClient newClient() {
try {
try {
return GET_CLIENT.invoke(hiveConf, true);
return GET_CLIENT.invoke(hiveConf, (HiveMetaHookLoader) tbl -> null, HiveMetaStoreClient.class.getName());
} catch (RuntimeException e) {
// any MetaException would be wrapped into RuntimeException during reflection, so let's double-check type here
if (e.getCause() instanceof MetaException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.hive;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
Expand All @@ -30,6 +29,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
Expand Down Expand Up @@ -62,7 +62,7 @@ static Type convert(TypeInfo typeInfo, boolean autoConvert) {
}

List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> typeInfos, List<String> comments) {
List<Types.NestedField> result = new ArrayList<>(names.size());
List<Types.NestedField> result = Lists.newArrayListWithExpectedSize(names.size());
for (int i = 0; i < names.size(); ++i) {
result.add(Types.NestedField.optional(id++, names.get(i), convertType(typeInfos.get(i)),
comments.isEmpty() || i >= comments.size() ? null : comments.get(i)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.hive;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +30,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -72,9 +72,9 @@ public static Schema convert(List<FieldSchema> fieldSchemas) {
* @return An equivalent Iceberg Schema
*/
public static Schema convert(List<FieldSchema> fieldSchemas, boolean autoConvert) {
List<String> names = new ArrayList<>(fieldSchemas.size());
List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
List<String> comments = new ArrayList<>(fieldSchemas.size());
List<String> names = Lists.newArrayListWithExpectedSize(fieldSchemas.size());
List<TypeInfo> typeInfos = Lists.newArrayListWithExpectedSize(fieldSchemas.size());
List<String> comments = Lists.newArrayListWithExpectedSize(fieldSchemas.size());

for (FieldSchema col : fieldSchemas) {
names.add(col.getName());
Expand Down Expand Up @@ -237,10 +237,10 @@ public static Pair<String, Optional<String>> getReorderedColumn(List<FieldSchema
}

public static class SchemaDifference {
private final List<FieldSchema> missingFromFirst = new ArrayList<>();
private final List<FieldSchema> missingFromSecond = new ArrayList<>();
private final List<FieldSchema> typeChanged = new ArrayList<>();
private final List<FieldSchema> commentChanged = new ArrayList<>();
private final List<FieldSchema> missingFromFirst = Lists.newArrayList();
private final List<FieldSchema> missingFromSecond = Lists.newArrayList();
private final List<FieldSchema> typeChanged = Lists.newArrayList();
private final List<FieldSchema> commentChanged = Lists.newArrayList();

public List<FieldSchema> getMissingFromFirst() {
return missingFromFirst;
Expand Down

0 comments on commit 6e25a01

Please sign in to comment.