Skip to content

Commit

Permalink
Use updateStatistics methods to store basic statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Jul 10, 2018
1 parent ba34d33 commit 42c727f
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 273 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table; import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.statistics.HiveStatisticsProvider; import com.facebook.presto.hive.statistics.HiveStatisticsProvider;
import com.facebook.presto.hive.util.Statistics;
import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -99,6 +98,7 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;


import static com.facebook.presto.hive.HiveBasicStatistics.createEmptyStatistics;
import static com.facebook.presto.hive.HiveBasicStatistics.createZeroStatistics; import static com.facebook.presto.hive.HiveBasicStatistics.createZeroStatistics;
import static com.facebook.presto.hive.HiveBucketing.getHiveBucketHandle; import static com.facebook.presto.hive.HiveBucketing.getHiveBucketHandle;
import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_COLUMN_NAME; import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
Expand Down Expand Up @@ -159,10 +159,9 @@
import static com.facebook.presto.hive.metastore.PrincipalType.USER; import static com.facebook.presto.hive.metastore.PrincipalType.USER;
import static com.facebook.presto.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static com.facebook.presto.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat; import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.toStatisticsParameters;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf; import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.hive.util.Statistics.ReduceOperator.ADD; import static com.facebook.presto.hive.util.Statistics.ReduceOperator.ADD;
import static com.facebook.presto.hive.util.Statistics.updateStatistics; import static com.facebook.presto.hive.util.Statistics.reduce;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static com.facebook.presto.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -607,7 +606,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties()); Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator); List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata, !partitionedBy.isEmpty()); Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata);


hiveStorageFormat.validateColumns(columnHandles); hiveStorageFormat.validateColumns(columnHandles);


Expand Down Expand Up @@ -649,10 +648,17 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
external, external,
prestoVersion); prestoVersion);
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner()); PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner());
metastore.createTable(session, table, principalPrivileges, Optional.empty(), ignoreExisting); HiveBasicStatistics basicStatistics = table.getPartitionColumns().isEmpty() ? createZeroStatistics() : createEmptyStatistics();
metastore.createTable(
session,
table,
principalPrivileges,
Optional.empty(),
ignoreExisting,
new PartitionStatistics(basicStatistics, ImmutableMap.of()));
} }


private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata tableMetadata, boolean partitioned) private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata tableMetadata)
{ {
Builder<String, String> tableProperties = ImmutableMap.builder(); Builder<String, String> tableProperties = ImmutableMap.builder();


Expand All @@ -669,10 +675,6 @@ private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata table
// Table comment property // Table comment property
tableMetadata.getComment().ifPresent(value -> tableProperties.put(TABLE_COMMENT, value)); tableMetadata.getComment().ifPresent(value -> tableProperties.put(TABLE_COMMENT, value));


if (!partitioned) {
tableProperties.putAll(toStatisticsParameters(createZeroStatistics()));
}

return tableProperties.build(); return tableProperties.build();
} }


Expand Down Expand Up @@ -830,7 +832,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties()); List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties()); Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());


Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata, !partitionedBy.isEmpty()); Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata);


// get the root directory for the database // get the root directory for the database
SchemaTableName schemaTableName = tableMetadata.getTable(); SchemaTableName schemaTableName = tableMetadata.getTable();
Expand Down Expand Up @@ -910,22 +912,34 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
} }
} }


PartitionStatistics tableStatistics;
if (table.getPartitionColumns().isEmpty()) { if (table.getPartitionColumns().isEmpty()) {
HiveBasicStatistics tableStatistic = partitionUpdates.stream() tableStatistics = new PartitionStatistics(
.map(PartitionUpdate::getStatistics) partitionUpdates.stream()
.reduce(Statistics::add) .map(PartitionUpdate::getStatistics)
.orElse(createZeroStatistics()); .reduce((first, second) -> reduce(first, second, ADD))
table = updateStatistics(table, tableStatistic, ADD); .orElse(createZeroStatistics()),
ImmutableMap.of());
}
else {
tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of());
} }


metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), false); metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), false, tableStatistics);


if (!handle.getPartitionedBy().isEmpty()) { if (!handle.getPartitionedBy().isEmpty()) {
if (isRespectTableFormat(session)) { if (isRespectTableFormat(session)) {
Verify.verify(handle.getPartitionStorageFormat() == handle.getTableStorageFormat()); Verify.verify(handle.getPartitionStorageFormat() == handle.getTableStorageFormat());
} }
for (PartitionUpdate update : partitionUpdates) { for (PartitionUpdate update : partitionUpdates) {
metastore.addPartition(session, handle.getSchemaName(), handle.getTableName(), buildPartitionObject(session, table, update), update.getWritePath()); PartitionStatistics partitionStatistics = new PartitionStatistics(update.getStatistics(), ImmutableMap.of());
metastore.addPartition(
session,
handle.getSchemaName(),
handle.getTableName(),
buildPartitionObject(session, table, update),
update.getWritePath(),
partitionStatistics);
} }
} }


Expand Down Expand Up @@ -1114,7 +1128,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
handle.getTableName(), handle.getTableName(),
partitionUpdate.getWritePath(), partitionUpdate.getWritePath(),
partitionUpdate.getFileNames(), partitionUpdate.getFileNames(),
partitionUpdate.getStatistics()); new PartitionStatistics(partitionUpdate.getStatistics(), ImmutableMap.of()));
} }
else if (partitionUpdate.getUpdateMode() == APPEND) { else if (partitionUpdate.getUpdateMode() == APPEND) {
// insert into existing partition // insert into existing partition
Expand All @@ -1125,19 +1139,24 @@ else if (partitionUpdate.getUpdateMode() == APPEND) {
toPartitionValues(partitionUpdate.getName()), toPartitionValues(partitionUpdate.getName()),
partitionUpdate.getWritePath(), partitionUpdate.getWritePath(),
partitionUpdate.getFileNames(), partitionUpdate.getFileNames(),
partitionUpdate.getStatistics()); new PartitionStatistics(partitionUpdate.getStatistics(), ImmutableMap.of()));
} }
else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode() == OVERWRITE) { else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode() == OVERWRITE) {
// insert into new partition or overwrite existing partition // insert into new partition or overwrite existing partition
Partition partition = buildPartitionObject(session, table.get(), partitionUpdate); Partition partition = buildPartitionObject(session, table.get(), partitionUpdate);
if (!partition.getStorage().getStorageFormat().getInputFormat().equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) { if (!partition.getStorage().getStorageFormat().getInputFormat().equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) {
throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert"); throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert");
} }

if (partitionUpdate.getUpdateMode() == OVERWRITE) { if (partitionUpdate.getUpdateMode() == OVERWRITE) {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), partition.getValues()); metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), partition.getValues());
} }
metastore.addPartition(session, handle.getSchemaName(), handle.getTableName(), partition, partitionUpdate.getWritePath()); metastore.addPartition(
session,
handle.getSchemaName(),
handle.getTableName(),
partition,
partitionUpdate.getWritePath(),
new PartitionStatistics(partitionUpdate.getStatistics(), ImmutableMap.of()));
} }
else { else {
throw new IllegalArgumentException(format("Unsupported update mode: %s", partitionUpdate.getUpdateMode())); throw new IllegalArgumentException(format("Unsupported update mode: %s", partitionUpdate.getUpdateMode()));
Expand All @@ -1160,7 +1179,6 @@ private Partition buildPartitionObject(ConnectorSession session, Table table, Pa
.setParameters(ImmutableMap.<String, String>builder() .setParameters(ImmutableMap.<String, String>builder()
.put(PRESTO_VERSION_NAME, prestoVersion) .put(PRESTO_VERSION_NAME, prestoVersion)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId()) .put(PRESTO_QUERY_ID_NAME, session.getQueryId())
.putAll(toStatisticsParameters(partitionUpdate.getStatistics()))
.build()) .build())
.withStorage(storage -> storage .withStorage(storage -> storage
.setStorageFormat(isRespectTableFormat(session) ? .setStorageFormat(isRespectTableFormat(session) ?
Expand Down Expand Up @@ -1212,7 +1230,7 @@ public void createView(ConnectorSession session, SchemaTableName viewName, Strin
} }


try { try {
metastore.createTable(session, table, principalPrivileges, Optional.empty(), false); metastore.createTable(session, table, principalPrivileges, Optional.empty(), false, new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of()));
} }
catch (TableAlreadyExistsException e) { catch (TableAlreadyExistsException e) {
throw new ViewAlreadyExistsException(e.getTableName()); throw new ViewAlreadyExistsException(e.getTableName());
Expand Down
Loading

0 comments on commit 42c727f

Please sign in to comment.