Skip to content

Commit

Permalink
Return HiveColumnStatistics from the HiveMetastore interface
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Jul 10, 2018
1 parent 02a44d5 commit 68adfec
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 66 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.facebook.presto.hive.HiveType; import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.HiveUtil; import com.facebook.presto.hive.HiveUtil;
import com.facebook.presto.hive.PartitionNotFoundException; import com.facebook.presto.hive.PartitionNotFoundException;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Database; import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveColumnStatistics; import com.facebook.presto.hive.metastore.HiveColumnStatistics;
Expand All @@ -29,7 +28,6 @@
import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.TableNotFoundException;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;


Expand All @@ -48,8 +46,6 @@
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiPrivilegeGrantInfo; import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiPrivilegeGrantInfo;
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiTable; import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiTable;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.function.UnaryOperator.identity; import static java.util.function.UnaryOperator.identity;


Expand Down Expand Up @@ -85,33 +81,13 @@ public Optional<Table> getTable(String databaseName, String tableName)
@Override @Override
public Map<String, HiveColumnStatistics> getTableColumnStatistics(String databaseName, String tableName) public Map<String, HiveColumnStatistics> getTableColumnStatistics(String databaseName, String tableName)
{ {
Table table = getTable(databaseName, tableName) return delegate.getTableColumnStatistics(databaseName, tableName);
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));
Set<String> dataColumns = table.getDataColumns().stream()
.map(Column::getName)
.collect(toImmutableSet());
return groupStatisticsByColumn(delegate.getTableColumnStatistics(databaseName, tableName, dataColumns));
} }


@Override @Override
public Map<String, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames) public Map<String, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames)
{ {
Table table = getTable(databaseName, tableName) return delegate.getPartitionColumnStatistics(databaseName, tableName, partitionNames);
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));
Set<String> dataColumns = table.getDataColumns().stream()
.map(Column::getName)
.collect(toImmutableSet());
Map<String, Set<ColumnStatisticsObj>> statistics = delegate.getPartitionColumnStatistics(databaseName, tableName, partitionNames, dataColumns);
return statistics.entrySet()
.stream()
.filter(entry -> !entry.getValue().isEmpty())
.collect(toImmutableMap(Map.Entry::getKey, entry -> groupStatisticsByColumn(entry.getValue())));
}

private Map<String, HiveColumnStatistics> groupStatisticsByColumn(Set<ColumnStatisticsObj> statistics)
{
return statistics.stream()
.collect(toImmutableMap(ColumnStatisticsObj::getColName, ThriftMetastoreUtil::fromMetastoreApiColumnStatistics));
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/ */
package com.facebook.presto.hive.metastore.thrift; package com.facebook.presto.hive.metastore.thrift;


import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo; import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
Expand Down Expand Up @@ -72,9 +72,9 @@ public interface HiveMetastore


Optional<Table> getTable(String databaseName, String tableName); Optional<Table> getTable(String databaseName, String tableName);


Set<ColumnStatisticsObj> getTableColumnStatistics(String databaseName, String tableName, Set<String> columnNames); Map<String, HiveColumnStatistics> getTableColumnStatistics(String databaseName, String tableName);


Map<String, Set<ColumnStatisticsObj>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames, Set<String> columnNames); Map<String, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames);


Set<String> getRoles(String user); Set<String> getRoles(String user);


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.RetryDriver; import com.facebook.presto.hive.RetryDriver;
import com.facebook.presto.hive.SchemaAlreadyExistsException; import com.facebook.presto.hive.SchemaAlreadyExistsException;
import com.facebook.presto.hive.TableAlreadyExistsException; import com.facebook.presto.hive.TableAlreadyExistsException;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePrincipal; import com.facebook.presto.hive.metastore.HivePrincipal;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo; import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
Expand All @@ -32,6 +33,7 @@
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
Expand All @@ -55,7 +57,6 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
Expand All @@ -72,6 +73,7 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Sets.newHashSet; import static com.google.common.collect.Sets.newHashSet;
Expand Down Expand Up @@ -231,20 +233,25 @@ private static boolean isPrestoView(Table table)
} }


@Override @Override
public Set<ColumnStatisticsObj> getTableColumnStatistics(String databaseName, String tableName, Set<String> columnNames) public Map<String, HiveColumnStatistics> getTableColumnStatistics(String databaseName, String tableName)
{ {
Table table = getTable(databaseName, tableName)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));
List<String> dataColumns = table.getSd().getCols().stream()
.map(FieldSchema::getName)
.collect(toImmutableList());
try { try {
return retry() return retry()
.stopOn(NoSuchObjectException.class, HiveViewNotSupportedException.class) .stopOn(NoSuchObjectException.class, HiveViewNotSupportedException.class)
.stopOnIllegalExceptions() .stopOnIllegalExceptions()
.run("getTableColumnStatistics", stats.getGetTableColumnStatistics().wrap(() -> { .run("getTableColumnStatistics", stats.getGetTableColumnStatistics().wrap(() -> {
try (HiveMetastoreClient client = clientProvider.createMetastoreClient()) { try (HiveMetastoreClient client = clientProvider.createMetastoreClient()) {
return ImmutableSet.copyOf(client.getTableColumnStatistics(databaseName, tableName, ImmutableList.copyOf(columnNames))); return groupStatisticsByColumn(client.getTableColumnStatistics(databaseName, tableName, dataColumns));
} }
})); }));
} }
catch (NoSuchObjectException e) { catch (NoSuchObjectException e) {
return ImmutableSet.of(); throw new TableNotFoundException(new SchemaTableName(databaseName, tableName));
} }
catch (TException e) { catch (TException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e); throw new PrestoException(HIVE_METASTORE_ERROR, e);
Expand All @@ -255,23 +262,30 @@ public Set<ColumnStatisticsObj> getTableColumnStatistics(String databaseName, St
} }


@Override @Override
public Map<String, Set<ColumnStatisticsObj>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames, Set<String> columnNames) public Map<String, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames)
{ {
Table table = getTable(databaseName, tableName)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));
List<String> dataColumns = table.getSd().getCols().stream()
.map(FieldSchema::getName)
.collect(toImmutableList());

try { try {
return retry() return retry()
.stopOn(NoSuchObjectException.class, HiveViewNotSupportedException.class) .stopOn(NoSuchObjectException.class, HiveViewNotSupportedException.class)
.stopOnIllegalExceptions() .stopOnIllegalExceptions()
.run("getPartitionColumnStatistics", stats.getGetPartitionColumnStatistics().wrap(() -> { .run("getPartitionColumnStatistics", stats.getGetPartitionColumnStatistics().wrap(() -> {
try (HiveMetastoreClient client = clientProvider.createMetastoreClient()) { try (HiveMetastoreClient client = clientProvider.createMetastoreClient()) {
Map<String, List<ColumnStatisticsObj>> partitionColumnStatistics = client.getPartitionColumnStatistics(databaseName, tableName, ImmutableList.copyOf(partitionNames), ImmutableList.copyOf(columnNames)); Map<String, List<ColumnStatisticsObj>> partitionColumnStatistics = client.getPartitionColumnStatistics(databaseName, tableName, ImmutableList.copyOf(partitionNames), dataColumns);
return partitionColumnStatistics.entrySet() return partitionColumnStatistics.entrySet()
.stream() .stream()
.collect(toImmutableMap(Entry::getKey, entry -> ImmutableSet.copyOf(entry.getValue()))); .filter(entry -> !entry.getValue().isEmpty())
.collect(toImmutableMap(Map.Entry::getKey, entry -> groupStatisticsByColumn(entry.getValue())));
} }
})); }));
} }
catch (NoSuchObjectException e) { catch (NoSuchObjectException e) {
return ImmutableMap.of(); throw new TableNotFoundException(new SchemaTableName(databaseName, tableName));
} }
catch (TException e) { catch (TException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e); throw new PrestoException(HIVE_METASTORE_ERROR, e);
Expand All @@ -281,6 +295,12 @@ public Map<String, Set<ColumnStatisticsObj>> getPartitionColumnStatistics(String
} }
} }


private Map<String, HiveColumnStatistics> groupStatisticsByColumn(List<ColumnStatisticsObj> statistics)
{
return statistics.stream()
.collect(toImmutableMap(ColumnStatisticsObj::getColName, ThriftMetastoreUtil::fromMetastoreApiColumnStatistics));
}

@Override @Override
public Optional<List<String>> getAllViews(String databaseName) public Optional<List<String>> getAllViews(String databaseName)
{ {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


import com.facebook.presto.hive.SchemaAlreadyExistsException; import com.facebook.presto.hive.SchemaAlreadyExistsException;
import com.facebook.presto.hive.TableAlreadyExistsException; import com.facebook.presto.hive.TableAlreadyExistsException;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo; import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaNotFoundException; import com.facebook.presto.spi.SchemaNotFoundException;
Expand All @@ -26,7 +27,6 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Partition;
Expand Down Expand Up @@ -84,9 +84,9 @@ public class InMemoryHiveMetastore
@GuardedBy("this") @GuardedBy("this")
private final Map<PartitionName, Partition> partitions = new HashMap<>(); private final Map<PartitionName, Partition> partitions = new HashMap<>();
@GuardedBy("this") @GuardedBy("this")
private final Map<SchemaTableName, Map<String, ColumnStatisticsObj>> columnStatistics = new HashMap<>(); private final Map<SchemaTableName, Map<String, HiveColumnStatistics>> columnStatistics = new HashMap<>();
@GuardedBy("this") @GuardedBy("this")
private final Map<PartitionName, Map<String, ColumnStatisticsObj>> partitionColumnStatistics = new HashMap<>(); private final Map<PartitionName, Map<String, HiveColumnStatistics>> partitionColumnStatistics = new HashMap<>();
@GuardedBy("this") @GuardedBy("this")
private final Map<String, Set<String>> roleGrants = new HashMap<>(); private final Map<String, Set<String>> roleGrants = new HashMap<>();
@GuardedBy("this") @GuardedBy("this")
Expand Down Expand Up @@ -441,55 +441,41 @@ public synchronized Optional<Table> getTable(String databaseName, String tableNa
} }


@Override @Override
public synchronized Set<ColumnStatisticsObj> getTableColumnStatistics(String databaseName, String tableName, Set<String> columnNames) public synchronized Map<String, HiveColumnStatistics> getTableColumnStatistics(String databaseName, String tableName)
{ {
SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName); SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
if (!columnStatistics.containsKey(schemaTableName)) { if (!columnStatistics.containsKey(schemaTableName)) {
return ImmutableSet.of(); return ImmutableMap.of();
} }

return columnStatistics.get(schemaTableName);
Map<String, ColumnStatisticsObj> columnStatisticsMap = columnStatistics.get(schemaTableName);
return columnNames.stream()
.filter(columnStatisticsMap::containsKey)
.map(columnStatisticsMap::get)
.collect(toImmutableSet());
} }


public synchronized void setColumnStatistics(String databaseName, String tableName, String columnName, ColumnStatisticsObj columnStatisticsObj) public synchronized void setColumnStatistics(String databaseName, String tableName, String columnName, HiveColumnStatistics statistics)
{ {
checkArgument(columnStatisticsObj.getColName().equals(columnName), "columnName argument and columnStatisticsObj.getColName() must be the same");
SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName); SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
columnStatistics.computeIfAbsent(schemaTableName, key -> new HashMap<>()).put(columnName, columnStatisticsObj); columnStatistics.computeIfAbsent(schemaTableName, key -> new HashMap<>()).put(columnName, statistics);
} }


@Override @Override
public synchronized Map<String, Set<ColumnStatisticsObj>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames, Set<String> columnNames) public synchronized Map<String, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(String databaseName, String tableName, Set<String> partitionNames)
{ {
ImmutableMap.Builder<String, Set<ColumnStatisticsObj>> result = ImmutableMap.builder(); ImmutableMap.Builder<String, Map<String, HiveColumnStatistics>> result = ImmutableMap.builder();
for (String partitionName : partitionNames) { for (String partitionName : partitionNames) {
PartitionName partitionKey = PartitionName.partition(databaseName, tableName, partitionName); PartitionName partitionKey = PartitionName.partition(databaseName, tableName, partitionName);
if (!partitionColumnStatistics.containsKey(partitionKey)) { Map<String, HiveColumnStatistics> statistics = partitionColumnStatistics.get(partitionKey);
continue; if (statistics != null && !statistics.isEmpty()) {
result.put(partitionName, statistics);
} }

Map<String, ColumnStatisticsObj> columnStatistics = partitionColumnStatistics.get(partitionKey);
result.put(
partitionName,
columnNames.stream()
.filter(columnStatistics::containsKey)
.map(columnStatistics::get)
.collect(toImmutableSet()));
} }
return result.build(); return result.build();
} }


public synchronized void setPartitionColumnStatistics(String databaseName, String tableName, String partitionName, String columnName, ColumnStatisticsObj columnStatisticsObj) public synchronized void setPartitionColumnStatistics(String databaseName, String tableName, String partitionName, String columnName, HiveColumnStatistics statistics)
{ {
checkArgument(columnStatisticsObj.getColName().equals(columnName), "columnName argument and columnStatisticsObj.getColName() must be the same");
PartitionName partitionKey = PartitionName.partition(databaseName, tableName, partitionName); PartitionName partitionKey = PartitionName.partition(databaseName, tableName, partitionName);
partitionColumnStatistics partitionColumnStatistics
.computeIfAbsent(partitionKey, key -> new HashMap<>()) .computeIfAbsent(partitionKey, key -> new HashMap<>())
.put(columnName, columnStatisticsObj); .put(columnName, statistics);
} }


@Override @Override
Expand Down

0 comments on commit 68adfec

Please sign in to comment.