Skip to content

Commit

Permalink
Glue metastore statistics integration
Browse files Browse the repository at this point in the history
  • Loading branch information
GaruGaru authored and losipiuk committed Feb 24, 2021
1 parent c440395 commit db91859
Show file tree
Hide file tree
Showing 15 changed files with 789 additions and 59 deletions.
Expand Up @@ -66,6 +66,7 @@ public enum HiveErrorCode
HIVE_UNKNOWN_COLUMN_STATISTIC_TYPE(39, INTERNAL_ERROR),
HIVE_TABLE_LOCK_NOT_ACQUIRED(40, EXTERNAL),
HIVE_VIEW_TRANSLATION_ERROR(41, EXTERNAL),
HIVE_PARTITION_NOT_FOUND(42, USER_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.hive.metastore.glue;

import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.TableInput;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.hive.metastore.HiveColumnStatistics;
Expand Down Expand Up @@ -51,15 +49,15 @@ public Map<String, HiveColumnStatistics> getPartitionColumnStatistics(Partition
}

@Override
public void updateTableColumnStatistics(TableInput table, Map<String, HiveColumnStatistics> columnStatistics)
public void updateTableColumnStatistics(Table table, Map<String, HiveColumnStatistics> columnStatistics)
{
if (!columnStatistics.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Glue metastore column level statistics are disabled");
}
}

@Override
public void updatePartitionStatistics(PartitionInput partition, Map<String, HiveColumnStatistics> columnStatistics)
public void updatePartitionStatistics(Partition partition, Map<String, HiveColumnStatistics> columnStatistics)
{
if (!columnStatistics.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Glue metastore column level statistics are disabled");
Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.metastore.glue;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForGlueColumnStatisticsRead {}
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.metastore.glue;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForGlueColumnStatisticsWrite {}
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.hive.metastore.glue;

import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.TableInput;
import io.trino.plugin.hive.metastore.HiveColumnStatistics;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
Expand All @@ -32,7 +30,7 @@ public interface GlueColumnStatisticsProvider

Map<String, HiveColumnStatistics> getPartitionColumnStatistics(Partition partition);

void updateTableColumnStatistics(TableInput table, Map<String, HiveColumnStatistics> columnStatistics);
void updateTableColumnStatistics(Table table, Map<String, HiveColumnStatistics> columnStatistics);

void updatePartitionStatistics(PartitionInput partition, Map<String, HiveColumnStatistics> columnStatistics);
void updatePartitionStatistics(Partition partition, Map<String, HiveColumnStatistics> columnStatistics);
}
Expand Up @@ -65,6 +65,7 @@
import io.airlift.log.Logger;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.PartitionNotFoundException;
import io.trino.plugin.hive.PartitionStatistics;
Expand Down Expand Up @@ -163,18 +164,21 @@ public class GlueHiveMetastore
private final Optional<String> defaultDir;
private final String catalogId;
private final int partitionSegments;
private final Executor executor;
private final Executor partitionsReadExecutor;
private final GlueMetastoreStats stats = new GlueMetastoreStats();
private final GlueColumnStatisticsProvider columnStatisticsProvider;
private final boolean assumeCanonicalPartitionKeys;
private final Predicate<com.amazonaws.services.glue.model.Table> tableFilter;
private final boolean enableColumnStatistics;

@Inject
public GlueHiveMetastore(
HdfsEnvironment hdfsEnvironment,
GlueHiveMetastoreConfig glueConfig,
GlueColumnStatisticsProvider columnStatisticsProvider,
@ForGlueHiveMetastore Executor executor,
HiveConfig hiveConfig,
@ForGlueHiveMetastore Executor partitionsReadExecutor,
@ForGlueColumnStatisticsRead Executor statisticsReadExecutor,
@ForGlueColumnStatisticsWrite Executor statisticsWriteExecutor,
@ForGlueHiveMetastore Optional<RequestHandler2> requestHandler,
@ForGlueHiveMetastore Predicate<com.amazonaws.services.glue.model.Table> tableFilter)
{
Expand All @@ -185,10 +189,16 @@ public GlueHiveMetastore(
this.defaultDir = glueConfig.getDefaultWarehouseDir();
this.catalogId = glueConfig.getCatalogId().orElse(null);
this.partitionSegments = glueConfig.getPartitionSegments();
this.executor = requireNonNull(executor, "executor is null");
this.columnStatisticsProvider = requireNonNull(columnStatisticsProvider, "columnStatisticsProvider is null");
this.partitionsReadExecutor = requireNonNull(partitionsReadExecutor, "executor is null");
this.assumeCanonicalPartitionKeys = glueConfig.isAssumeCanonicalPartitionKeys();
this.tableFilter = requireNonNull(tableFilter, "tableFilter is null");
this.enableColumnStatistics = hiveConfig.isTableStatisticsEnabled();
if (this.enableColumnStatistics) {
this.columnStatisticsProvider = new DefaultGlueColumnStatisticsProvider(glueClient, catalogId, statisticsReadExecutor, statisticsWriteExecutor);
}
else {
this.columnStatisticsProvider = new DisabledGlueColumnStatisticsProvider();
}
}

private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
Expand Down Expand Up @@ -358,12 +368,14 @@ public void updateTableStatistics(HiveIdentity identity, String databaseName, St

try {
TableInput tableInput = GlueInputConverter.convertTable(table);
tableInput.setParameters(updateStatisticsParameters(table.getParameters(), updatedStatistics.getBasicStatistics()));
columnStatisticsProvider.updateTableColumnStatistics(tableInput, updatedStatistics.getColumnStatistics());
final Map<String, String> statisticsParameters = updateStatisticsParameters(table.getParameters(), updatedStatistics.getBasicStatistics());
tableInput.setParameters(statisticsParameters);
table = Table.builder(table).setParameters(statisticsParameters).build();
glueClient.updateTable(new UpdateTableRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
.withTableInput(tableInput));
columnStatisticsProvider.updateTableColumnStatistics(table, updatedStatistics.getColumnStatistics());
}
catch (EntityNotFoundException e) {
throw new TableNotFoundException(new SchemaTableName(databaseName, tableName));
Expand All @@ -385,14 +397,16 @@ public void updatePartitionStatistics(HiveIdentity identity, Table table, String

try {
PartitionInput partitionInput = GlueInputConverter.convertPartition(partition);
partitionInput.setParameters(updateStatisticsParameters(partition.getParameters(), updatedStatistics.getBasicStatistics()));
columnStatisticsProvider.updatePartitionStatistics(partitionInput, updatedStatistics.getColumnStatistics());
final Map<String, String> updateStatisticsParameters = updateStatisticsParameters(partition.getParameters(), updatedStatistics.getBasicStatistics());
partitionInput.setParameters(updateStatisticsParameters);
partition = Partition.builder(partition).setParameters(updateStatisticsParameters).build();
glueClient.updatePartition(new UpdatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withPartitionValueList(partition.getValues())
.withPartitionInput(partitionInput));
columnStatisticsProvider.updatePartitionStatistics(partition, updatedStatistics.getColumnStatistics());
}
catch (EntityNotFoundException e) {
throw new PartitionNotFoundException(new SchemaTableName(table.getDatabaseName(), table.getTableName()), partitionValues);
Expand Down Expand Up @@ -767,7 +781,7 @@ private List<Partition> getPartitions(Table table, String expression)
}

// Do parallel partition fetch.
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(partitionsReadExecutor);
for (int i = 0; i < partitionSegments; i++) {
Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
completionService.submit(() -> getPartitions(table, expression, segment));
Expand Down
Expand Up @@ -40,6 +40,8 @@ public class GlueHiveMetastoreConfig
private Optional<String> catalogId = Optional.empty();
private int partitionSegments = 5;
private int getPartitionThreads = 20;
private int readStatisticsThreads = 1;
private int writeStatisticsThreads = 1;
private boolean assumeCanonicalPartitionKeys;

public Optional<String> getGlueRegion()
Expand Down Expand Up @@ -242,4 +244,32 @@ public GlueHiveMetastoreConfig setAssumeCanonicalPartitionKeys(boolean assumeCan
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
return this;
}

@Min(1)
public int getReadStatisticsThreads()
{
return readStatisticsThreads;
}

@Config("hive.metastore.glue.read-statistics-threads")
@ConfigDescription("Number of threads for parallel statistics reads from Glue")
public GlueHiveMetastoreConfig setReadStatisticsThreads(int getReadStatisticsThreads)
{
this.readStatisticsThreads = getReadStatisticsThreads;
return this;
}

@Min(1)
public int getWriteStatisticsThreads()
{
return writeStatisticsThreads;
}

@Config("hive.metastore.glue.write-statistics-threads")
@ConfigDescription("Number of threads for parallel statistics writes to Glue")
public GlueHiveMetastoreConfig setWriteStatisticsThreads(int writeStatisticsThreads)
{
this.writeStatisticsThreads = writeStatisticsThreads;
return this;
}
}
Expand Up @@ -25,6 +25,7 @@
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.ForRecordingHiveMetastore;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.RecordingHiveMetastoreModule;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastoreModule;
Expand All @@ -46,10 +47,7 @@ public class GlueMetastoreModule
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(GlueHiveMetastoreConfig.class);

newOptionalBinder(binder, GlueColumnStatisticsProvider.class)
.setDefault().to(DisabledGlueColumnStatisticsProvider.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(HiveConfig.class);
newOptionalBinder(binder, Key.get(RequestHandler2.class, ForGlueHiveMetastore.class));

newOptionalBinder(binder, Key.get(new TypeLiteral<Predicate<Table>>() {}, ForGlueHiveMetastore.class))
Expand All @@ -59,6 +57,7 @@ protected void setup(Binder binder)
.annotatedWith(ForRecordingHiveMetastore.class)
.to(GlueHiveMetastore.class)
.in(Scopes.SINGLETON);

binder.bind(GlueHiveMetastore.class).in(Scopes.SINGLETON);
newExporter(binder).export(GlueHiveMetastore.class).withGeneratedName();

Expand All @@ -71,11 +70,32 @@ protected void setup(Binder binder)
@ForGlueHiveMetastore
public Executor createExecutor(CatalogName catalogName, GlueHiveMetastoreConfig hiveConfig)
{
if (hiveConfig.getGetPartitionThreads() == 1) {
return createExecutor("hive-glue-partitions-%s", hiveConfig.getWriteStatisticsThreads());
}

@Provides
@Singleton
@ForGlueColumnStatisticsRead
public Executor createStatisticsReadExecutor(CatalogName catalogName, GlueHiveMetastoreConfig hiveConfig)
{
return createExecutor("hive-glue-statistics-read-%s", hiveConfig.getReadStatisticsThreads());
}

@Provides
@Singleton
@ForGlueColumnStatisticsWrite
public Executor createStatisticsWriteExecutor(CatalogName catalogName, GlueHiveMetastoreConfig hiveConfig)
{
return createExecutor("hive-glue-statistics-write-%s", hiveConfig.getWriteStatisticsThreads());
}

private Executor createExecutor(String nameTemplate, int threads)
{
if (threads == 1) {
return directExecutor();
}
return new BoundedExecutor(
newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")),
hiveConfig.getGetPartitionThreads());
newCachedThreadPool(daemonThreadsNamed(nameTemplate)),
threads);
}
}
Expand Up @@ -68,7 +68,7 @@ public static PartitionInput convertPartition(PartitionWithStatistics partitionW
{
PartitionInput input = convertPartition(partitionWithStatistics.getPartition());
PartitionStatistics statistics = partitionWithStatistics.getStatistics();
columnStatisticsProvider.updatePartitionStatistics(input, statistics.getColumnStatistics());
columnStatisticsProvider.updatePartitionStatistics(partitionWithStatistics.getPartition(), statistics.getColumnStatistics());
input.setParameters(updateStatisticsParameters(input.getParameters(), statistics.getBasicStatistics()));
return input;
}
Expand Down

0 comments on commit db91859

Please sign in to comment.