Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove table layouts from Hive #689

Merged
merged 12 commits into from Jun 1, 2019
10 changes: 0 additions & 10 deletions presto-hive/pom.xml
Expand Up @@ -196,16 +196,6 @@
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
electrum marked this conversation as resolved.
Show resolved Hide resolved

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- used by tests but also needed transitively -->
<dependency>
<groupId>io.airlift</groupId>
Expand Down
Expand Up @@ -23,6 +23,8 @@
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.procedure.Procedure.Argument;
import org.apache.hadoop.hive.common.FileUtils;
Expand Down Expand Up @@ -94,7 +96,8 @@ private void doCreateEmptyPartition(ConnectorSession session, String schema, Str
{
TransactionalMetadata hiveMetadata = hiveMetadataFactory.get();

HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) hiveMetadata.beginInsert(session, new HiveTableHandle(schema, table));
ConnectorTableHandle tableHandle = hiveMetadata.getTableHandle(session, new SchemaTableName(schema, table));
HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) hiveMetadata.beginInsert(session, tableHandle);

List<String> actualPartitionColumnNames = hiveInsertTableHandle.getInputColumns().stream()
.filter(HiveColumnHandle::isPartitionKey)
Expand Down
Expand Up @@ -20,7 +20,6 @@
import io.prestosql.spi.connector.ConnectorPartitioningHandle;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;

public class HiveHandleResolver
Expand All @@ -32,12 +31,6 @@ public Class<? extends ConnectorTableHandle> getTableHandleClass()
return HiveTableHandle.class;
}

@Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
{
return HiveTableLayoutHandle.class;
}

@Override
public Class<? extends ColumnHandle> getColumnHandleClass()
{
Expand Down
226 changes: 98 additions & 128 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java

Large diffs are not rendered by default.

Expand Up @@ -41,7 +41,6 @@ public class HiveMetadataFactory
private final boolean writesToNonManagedTablesEnabled;
private final boolean createsOfNonManagedTablesEnabled;
private final long perTransactionCacheMaximumSize;
private final int maxPartitions;
private final HiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final HivePartitionManager partitionManager;
Expand Down Expand Up @@ -79,7 +78,6 @@ public HiveMetadataFactory(
hiveConfig.getWritesToNonManagedTablesEnabled(),
hiveConfig.getCreatesOfNonManagedTablesEnabled(),
hiveConfig.getPerTransactionMetastoreCacheMaximumSize(),
hiveConfig.getMaxPartitionsPerScan(),
typeManager,
locationService,
partitionUpdateCodec,
Expand All @@ -100,7 +98,6 @@ public HiveMetadataFactory(
boolean writesToNonManagedTablesEnabled,
boolean createsOfNonManagedTablesEnabled,
long perTransactionCacheMaximumSize,
int maxPartitions,
TypeManager typeManager,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
Expand All @@ -124,7 +121,6 @@ public HiveMetadataFactory(
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
this.typeTranslator = requireNonNull(typeTranslator, "typeTranslator is null");
this.prestoVersion = requireNonNull(prestoVersion, "prestoVersion is null");
this.maxPartitions = maxPartitions;

if (!allowCorruptWritesForTesting && !timeZone.equals(DateTimeZone.getDefault())) {
log.warn("Hive writes are disabled. " +
Expand Down Expand Up @@ -159,7 +155,6 @@ public HiveMetadata get()
partitionUpdateCodec,
typeTranslator,
prestoVersion,
new MetastoreHiveStatisticsProvider(metastore),
maxPartitions);
new MetastoreHiveStatisticsProvider(metastore));
}
}
Expand Up @@ -84,6 +84,8 @@ public HivePageSourceProvider(
@Override
public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<ColumnHandle> columns)
{
HiveTableHandle hiveTable = (HiveTableHandle) table;

List<HiveColumnHandle> hiveColumns = columns.stream()
.map(HiveColumnHandle.class::cast)
.collect(toList());
Expand All @@ -104,7 +106,7 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
hiveSplit.getLength(),
hiveSplit.getFileSize(),
hiveSplit.getSchema(),
hiveSplit.getEffectivePredicate(),
hiveTable.getCompactEffectivePredicate(),
hiveColumns,
hiveSplit.getPartitionKeys(),
hiveStorageTimeZone,
Expand Down
Expand Up @@ -55,6 +55,7 @@
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -64,12 +65,9 @@
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveBucketing.getHiveBucketFilter;
import static io.prestosql.plugin.hive.HiveBucketing.getHiveBucketHandle;
import static io.prestosql.plugin.hive.HiveUtil.getPartitionKeyColumnHandles;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static io.prestosql.plugin.hive.HiveUtil.parsePartitionValue;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.getProtectMode;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.makePartName;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.verifyOnline;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.toPartitionName;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.connector.Constraint.alwaysTrue;
import static io.prestosql.spi.predicate.TupleDomain.all;
Expand All @@ -84,6 +82,7 @@ public class HivePartitionManager
private static final String PARTITION_VALUE_WILDCARD = "";

private final DateTimeZone timeZone;
private final int maxPartitions;
private final boolean assumeCanonicalPartitionKeys;
private final int domainCompactionThreshold;
private final TypeManager typeManager;
Expand All @@ -96,17 +95,21 @@ public HivePartitionManager(
this(
typeManager,
hiveConfig.getDateTimeZone(),
hiveConfig.getMaxPartitionsPerScan(),
hiveConfig.isAssumeCanonicalPartitionKeys(),
hiveConfig.getDomainCompactionThreshold());
}

public HivePartitionManager(
TypeManager typeManager,
DateTimeZone timeZone,
int maxPartitions,
boolean assumeCanonicalPartitionKeys,
int domainCompactionThreshold)
{
this.timeZone = requireNonNull(timeZone, "timeZone is null");
checkArgument(maxPartitions >= 1, "maxPartitions must be at least 1");
this.maxPartitions = maxPartitions;
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
checkArgument(domainCompactionThreshold >= 1, "domainCompactionThreshold must be at least 1");
this.domainCompactionThreshold = domainCompactionThreshold;
Expand All @@ -119,15 +122,16 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
TupleDomain<ColumnHandle> effectivePredicate = constraint.getSummary();

SchemaTableName tableName = hiveTableHandle.getSchemaTableName();
Table table = getTable(metastore, tableName);
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);

List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(table);
Optional<HiveBucketHandle> hiveBucketHandle = hiveTableHandle.getBucketHandle();
electrum marked this conversation as resolved.
Show resolved Hide resolved
List<HiveColumnHandle> partitionColumns = hiveTableHandle.getPartitionColumns();

if (effectivePredicate.isNone()) {
return new HivePartitionResult(partitionColumns, ImmutableList.of(), none(), none(), none(), hiveBucketHandle, Optional.empty());
}

Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketFilter> bucketFilter = getHiveBucketFilter(table, effectivePredicate);
TupleDomain<HiveColumnHandle> compactEffectivePredicate = toCompactTupleDomain(effectivePredicate, domainCompactionThreshold);

Expand Down Expand Up @@ -161,25 +165,67 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
return new HivePartitionResult(partitionColumns, partitionsIterable, compactEffectivePredicate, remainingTupleDomain, enforcedTupleDomain, hiveBucketHandle, bucketFilter);
}

public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastore, ConnectorTableHandle tableHandle, List<List<String>> partitionValuesList)
public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List<List<String>> partitionValuesList)
{
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
SchemaTableName tableName = hiveTableHandle.getSchemaTableName();
List<HiveColumnHandle> partitionColumns = hiveTableHandle.getPartitionColumns();
Optional<HiveBucketHandle> bucketHandle = hiveTableHandle.getBucketHandle();

Table table = getTable(metastore, tableName);
List<String> partitionColumnNames = partitionColumns.stream()
.map(HiveColumnHandle::getName)
.collect(toImmutableList());

List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(table);
List<Type> partitionColumnTypes = partitionColumns.stream()
.map(column -> typeManager.getType(column.getTypeSignature()))
.collect(toImmutableList());

List<HivePartition> partitionList = partitionValuesList.stream()
.map(partitionValues -> makePartName(table.getPartitionColumns(), partitionValues))
.map(partitionValues -> toPartitionName(partitionColumnNames, partitionValues))
.map(partitionName -> parseValuesAndFilterPartition(tableName, partitionName, partitionColumns, partitionColumnTypes, alwaysTrue()))
.map(partition -> partition.orElseThrow(() -> new VerifyException("partition must exist")))
.collect(toImmutableList());

return new HivePartitionResult(partitionColumns, partitionList, all(), all(), none(), getHiveBucketHandle(table), Optional.empty());
return new HivePartitionResult(partitionColumns, partitionList, all(), all(), none(), bucketHandle, Optional.empty());
}

public List<HivePartition> getPartitionsAsList(HivePartitionResult partitionResult)
{
ImmutableList.Builder<HivePartition> partitionList = ImmutableList.builder();
int count = 0;
Iterator<HivePartition> iterator = partitionResult.getPartitions();
while (iterator.hasNext()) {
HivePartition partition = iterator.next();
if (count == maxPartitions) {
throw new PrestoException(HIVE_EXCEEDED_PARTITION_LIMIT, format(
"Query over table '%s' can potentially read more than %s partitions",
partition.getTableName(),
maxPartitions));
}
partitionList.add(partition);
count++;
}
return partitionList.build();
}

public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions)
{
return new HiveTableHandle(
handle.getSchemaName(),
handle.getTableName(),
ImmutableList.copyOf(partitions.getPartitionColumns()),
Optional.of(getPartitionsAsList(partitions)),
partitions.getCompactEffectivePredicate(),
partitions.getEnforcedConstraint(),
partitions.getBucketHandle(),
partitions.getBucketFilter(),
handle.getAnalyzePartitionValues());
}

public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table)
{
return table.getPartitions().orElseGet(() ->
getPartitionsAsList(getPartitions(metastore, table, new Constraint(table.getEnforcedConstraint()))));
}

private static TupleDomain<HiveColumnHandle> toCompactTupleDomain(TupleDomain<ColumnHandle> effectivePredicate, int threshold)
Expand Down Expand Up @@ -226,17 +272,6 @@ private Optional<HivePartition> parseValuesAndFilterPartition(
return Optional.of(partition);
}

private Table getTable(SemiTransactionalHiveMetastore metastore, SchemaTableName tableName)
{
Optional<Table> target = metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
if (!target.isPresent()) {
throw new TableNotFoundException(tableName);
}
Table table = target.get();
verifyOnline(tableName, Optional.empty(), getProtectMode(table), table.getParameters());
return table;
}

private List<String> getFilteredPartitionNames(SemiTransactionalHiveMetastore metastore, SchemaTableName tableName, List<HiveColumnHandle> partitionKeys, TupleDomain<ColumnHandle> effectivePredicate)
{
checkArgument(effectivePredicate.getDomains().isPresent());
Expand Down
Expand Up @@ -35,7 +35,7 @@ public class HivePartitionResult
{
private final List<HiveColumnHandle> partitionColumns;
private final Iterable<HivePartition> partitions;
private final TupleDomain<? extends ColumnHandle> compactEffectivePredicate;
private final TupleDomain<HiveColumnHandle> compactEffectivePredicate;
private final TupleDomain<ColumnHandle> unenforcedConstraint;
private final TupleDomain<ColumnHandle> enforcedConstraint;
private final Optional<HiveBucketHandle> bucketHandle;
Expand All @@ -44,7 +44,7 @@ public class HivePartitionResult
public HivePartitionResult(
List<HiveColumnHandle> partitionColumns,
Iterable<HivePartition> partitions,
TupleDomain<? extends ColumnHandle> compactEffectivePredicate,
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
TupleDomain<ColumnHandle> unenforcedConstraint,
TupleDomain<ColumnHandle> enforcedConstraint,
Optional<HiveBucketHandle> bucketHandle,
Expand All @@ -69,7 +69,7 @@ public Iterator<HivePartition> getPartitions()
return partitions.iterator();
}

public TupleDomain<? extends ColumnHandle> getCompactEffectivePredicate()
public TupleDomain<HiveColumnHandle> getCompactEffectivePredicate()
{
return compactEffectivePredicate;
}
Expand Down
13 changes: 0 additions & 13 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSplit.java
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableMap;
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Map;
Expand All @@ -45,7 +44,6 @@ public class HiveSplit
private final String database;
private final String table;
private final String partitionName;
private final TupleDomain<HiveColumnHandle> effectivePredicate;
private final OptionalInt bucketNumber;
private final boolean forceLocalScheduling;
private final Map<Integer, HiveType> columnCoercions; // key: hiveColumnIndex
Expand All @@ -66,7 +64,6 @@ public HiveSplit(
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("bucketNumber") OptionalInt bucketNumber,
@JsonProperty("forceLocalScheduling") boolean forceLocalScheduling,
@JsonProperty("effectivePredicate") TupleDomain<HiveColumnHandle> effectivePredicate,
@JsonProperty("columnCoercions") Map<Integer, HiveType> columnCoercions,
@JsonProperty("bucketConversion") Optional<BucketConversion> bucketConversion,
@JsonProperty("s3SelectPushdownEnabled") boolean s3SelectPushdownEnabled)
Expand All @@ -82,7 +79,6 @@ public HiveSplit(
requireNonNull(partitionKeys, "partitionKeys is null");
requireNonNull(addresses, "addresses is null");
requireNonNull(bucketNumber, "bucketNumber is null");
requireNonNull(effectivePredicate, "tupleDomain is null");
requireNonNull(columnCoercions, "columnCoercions is null");
requireNonNull(bucketConversion, "bucketConversion is null");

Expand All @@ -98,7 +94,6 @@ public HiveSplit(
this.addresses = ImmutableList.copyOf(addresses);
this.bucketNumber = bucketNumber;
this.forceLocalScheduling = forceLocalScheduling;
this.effectivePredicate = effectivePredicate;
this.columnCoercions = columnCoercions;
this.bucketConversion = bucketConversion;
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
Expand Down Expand Up @@ -171,12 +166,6 @@ public OptionalInt getBucketNumber()
return bucketNumber;
}

@JsonProperty
public TupleDomain<HiveColumnHandle> getEffectivePredicate()
{
return effectivePredicate;
}

@JsonProperty
public boolean isForceLocalScheduling()
{
Expand Down Expand Up @@ -232,8 +221,6 @@ public String toString()
.addValue(start)
.addValue(length)
.addValue(fileSize)
.addValue(effectivePredicate)
.addValue(s3SelectPushdownEnabled)
.toString();
}

Expand Down