Skip to content

Commit

Permalink
Add $bucket_number hidden column to Hive
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Demura committed Nov 16, 2016
1 parent 48f5d23 commit f8648a1
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 56 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class BackgroundHiveSplitLoader
private final String connectorId; private final String connectorId;
private final Table table; private final Table table;
private final Optional<HiveBucketHandle> bucketHandle; private final Optional<HiveBucketHandle> bucketHandle;
private final Optional<HiveBucket> bucket; private final List<HiveBucket> buckets;
private final HdfsEnvironment hdfsEnvironment; private final HdfsEnvironment hdfsEnvironment;
private final NamenodeStats namenodeStats; private final NamenodeStats namenodeStats;
private final DirectoryLister directoryLister; private final DirectoryLister directoryLister;
Expand Down Expand Up @@ -120,7 +120,7 @@ public BackgroundHiveSplitLoader(
Table table, Table table,
Iterable<HivePartitionMetadata> partitions, Iterable<HivePartitionMetadata> partitions,
Optional<HiveBucketHandle> bucketHandle, Optional<HiveBucketHandle> bucketHandle,
Optional<HiveBucket> bucket, List<HiveBucket> buckets,
ConnectorSession session, ConnectorSession session,
HdfsEnvironment hdfsEnvironment, HdfsEnvironment hdfsEnvironment,
NamenodeStats namenodeStats, NamenodeStats namenodeStats,
Expand All @@ -133,7 +133,7 @@ public BackgroundHiveSplitLoader(
this.connectorId = connectorId; this.connectorId = connectorId;
this.table = table; this.table = table;
this.bucketHandle = bucketHandle; this.bucketHandle = bucketHandle;
this.bucket = bucket; this.buckets = buckets;
this.maxSplitSize = getMaxSplitSize(session); this.maxSplitSize = getMaxSplitSize(session);
this.maxPartitionBatchSize = maxPartitionBatchSize; this.maxPartitionBatchSize = maxPartitionBatchSize;
this.session = session; this.session = session;
Expand Down Expand Up @@ -326,25 +326,30 @@ private void loadPartition(HivePartitionMetadata partition)


// If only one bucket could match: load that one file // If only one bucket could match: load that one file
HiveFileIterator iterator = new HiveFileIterator(path, fs, directoryLister, namenodeStats, partitionName, inputFormat, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions()); HiveFileIterator iterator = new HiveFileIterator(path, fs, directoryLister, namenodeStats, partitionName, inputFormat, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions());
if (bucket.isPresent()) { if (!buckets.isEmpty()) {
List<LocatedFileStatus> locatedFileStatuses = listAndSortBucketFiles(iterator, bucket.get().getBucketCount()); int bucketCount = buckets.get(0).getBucketCount();
FileStatus file = locatedFileStatuses.get(bucket.get().getBucketNumber()); List<LocatedFileStatus> list = listAndSortBucketFiles(iterator, bucketCount);
BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, file.getLen());
boolean splittable = isSplittable(inputFormat, fs, file.getPath()); for (HiveBucket bucket : buckets) {
int bucketNumber = bucket.getBucketNumber();
LocatedFileStatus file = list.get(bucketNumber);
boolean splittable = isSplittable(iterator.getInputFormat(), hdfsEnvironment.getFileSystem(session.getUser(), file.getPath()), file.getPath());

hiveSplitSource.addToQueue(createHiveSplits(
iterator.getPartitionName(),
file.getPath().toString(),
file.getBlockLocations(),
0,
file.getLen(),
iterator.getSchema(),
iterator.getPartitionKeys(),
splittable,
session,
OptionalInt.of(bucketNumber),
effectivePredicate,
partition.getColumnCoercions()));
}


hiveSplitSource.addToQueue(createHiveSplits(
partitionName,
file.getPath().toString(),
blockLocations,
0,
file.getLen(),
schema,
partitionKeys,
splittable,
session,
OptionalInt.of(bucket.get().getBucketNumber()),
effectivePredicate,
partition.getColumnCoercions()));
return; return;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.NullableValue; import com.facebook.presto.spi.predicate.NullableValue;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.predicate.ValueSet;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -46,6 +49,7 @@
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_NUMBER_COLUMN_NAME;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveUtil.getRegularColumnHandles; import static com.facebook.presto.hive.HiveUtil.getRegularColumnHandles;
import static com.facebook.presto.hive.HiveUtil.getTableStructFields; import static com.facebook.presto.hive.HiveUtil.getTableStructFields;
Expand Down Expand Up @@ -209,9 +213,45 @@ public static Optional<HiveBucketHandle> getHiveBucketHandle(String connectorId,
return Optional.of(new HiveBucketHandle(bucketColumns.build(), hiveBucketProperty.get().getBucketCount())); return Optional.of(new HiveBucketHandle(bucketColumns.build(), hiveBucketProperty.get().getBucketCount()));
} }


public static Optional<HiveBucket> getHiveBucket(Table table, Map<ColumnHandle, NullableValue> bindings) public static List<HiveBucket> getHiveBucketNumbers(Table table, TupleDomain<ColumnHandle> effectivePredicate)
{ {
if (!table.getStorage().getBucketProperty().isPresent() || bindings.isEmpty()) { if (!table.getStorage().getBucketProperty().isPresent()) {
return ImmutableList.of();
}

Optional<Map<ColumnHandle, NullableValue>> bindings = TupleDomain.extractFixedValues(effectivePredicate);
if (!bindings.isPresent()) {
return ImmutableList.of();
}
Optional<HiveBucket> singleBucket = getHiveBucket(table, bindings.get());
if (singleBucket.isPresent()) {
return ImmutableList.of(singleBucket.get());
}

if (!effectivePredicate.getDomains().isPresent()) {
return ImmutableList.of();
}
Optional<Domain> domain = effectivePredicate.getDomains().get().entrySet().stream()
.filter(entry -> ((HiveColumnHandle) entry.getKey()).getName().equals(BUCKET_NUMBER_COLUMN_NAME))
.findFirst()
.map(Entry::getValue);
if (!domain.isPresent()) {
return ImmutableList.of();
}
ValueSet values = domain.get().getValues();
ImmutableList.Builder<HiveBucket> builder = ImmutableList.builder();
int bucketCount = table.getStorage().getBucketProperty().get().getBucketCount();
for (int i = 0; i < bucketCount; i++) {
if (values.containsValue((long) i)) {
builder.add(new HiveBucket(i, bucketCount));
}
}
return builder.build();
}

private static Optional<HiveBucket> getHiveBucket(Table table, Map<ColumnHandle, NullableValue> bindings)
{
if (bindings.isEmpty()) {
return Optional.empty(); return Optional.empty();
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@


import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.HIDDEN; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.HIDDEN;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_LONG; import static com.facebook.presto.hive.HiveType.HIVE_LONG;
import static com.facebook.presto.hive.HiveType.HIVE_STRING; import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.util.Types.checkType; import static com.facebook.presto.hive.util.Types.checkType;
Expand All @@ -40,6 +41,11 @@ public class HiveColumnHandle
public static final HiveType PATH_HIVE_TYPE = HIVE_STRING; public static final HiveType PATH_HIVE_TYPE = HIVE_STRING;
public static final TypeSignature PATH_TYPE_SIGNATURE = PATH_HIVE_TYPE.getTypeSignature(); public static final TypeSignature PATH_TYPE_SIGNATURE = PATH_HIVE_TYPE.getTypeSignature();


public static final int BUCKET_NUMBER_COLUMN_INDEX = -12;
public static final String BUCKET_NUMBER_COLUMN_NAME = "$bucket_number";
public static final HiveType BUCKET_NUMBER_HIVE_TYPE = HIVE_INT;
public static final TypeSignature BUCKET_NUMBER_TYPE_SIGNATURE = BUCKET_NUMBER_HIVE_TYPE.getTypeSignature();

private static final String UPDATE_ROW_ID_COLUMN_NAME = "$shard_row_id"; private static final String UPDATE_ROW_ID_COLUMN_NAME = "$shard_row_id";


public enum ColumnType public enum ColumnType
Expand Down Expand Up @@ -181,8 +187,18 @@ public static HiveColumnHandle pathColumnHandle(String connectorId)
return new HiveColumnHandle(connectorId, PATH_COLUMN_NAME, PATH_HIVE_TYPE, PATH_TYPE_SIGNATURE, PATH_COLUMN_INDEX, HIDDEN); return new HiveColumnHandle(connectorId, PATH_COLUMN_NAME, PATH_HIVE_TYPE, PATH_TYPE_SIGNATURE, PATH_COLUMN_INDEX, HIDDEN);
} }


public static HiveColumnHandle bucketNumberColumnHandle(String connectorId)
{
return new HiveColumnHandle(connectorId, BUCKET_NUMBER_COLUMN_NAME, BUCKET_NUMBER_HIVE_TYPE, BUCKET_NUMBER_TYPE_SIGNATURE, BUCKET_NUMBER_COLUMN_INDEX, HIDDEN);
}

public static boolean isPathColumnHandle(HiveColumnHandle column) public static boolean isPathColumnHandle(HiveColumnHandle column)
{ {
return column.getHiveColumnIndex() == PATH_COLUMN_INDEX; return column.getHiveColumnIndex() == PATH_COLUMN_INDEX;
} }

public static boolean isBucketNumberColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == BUCKET_NUMBER_COLUMN_INDEX;
}
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_NUMBER_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.HIDDEN; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.HIDDEN;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
Expand Down Expand Up @@ -1290,8 +1291,11 @@ private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(T
builder.put(field.getName(), Optional.empty()); builder.put(field.getName(), Optional.empty());
} }
} }
// add hidden column // add hidden columns
builder.put(PATH_COLUMN_NAME, Optional.empty()); builder.put(PATH_COLUMN_NAME, Optional.empty());
if (table.getStorage().getBucketProperty().isPresent()) {
builder.put(BUCKET_NUMBER_COLUMN_NAME, Optional.empty());
}


Map<String, Optional<String>> columnComment = builder.build(); Map<String, Optional<String>> columnComment = builder.build();


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;


Expand Down Expand Up @@ -91,6 +92,7 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
hdfsEnvironment.getConfiguration(path), hdfsEnvironment.getConfiguration(path),
session, session,
path, path,
hiveSplit.getBucketNumber(),
hiveSplit.getStart(), hiveSplit.getStart(),
hiveSplit.getLength(), hiveSplit.getLength(),
hiveSplit.getSchema(), hiveSplit.getSchema(),
Expand All @@ -113,6 +115,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Configuration configuration, Configuration configuration,
ConnectorSession session, ConnectorSession session,
Path path, Path path,
OptionalInt bucketNumber,
long start, long start,
long length, long length,
Properties schema, Properties schema,
Expand All @@ -123,7 +126,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
TypeManager typeManager, TypeManager typeManager,
Map<Integer, HiveType> columnCoercions) Map<Integer, HiveType> columnCoercions)
{ {
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(partitionKeys, hiveColumns, columnCoercions, path); List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(partitionKeys, hiveColumns, columnCoercions, path, bucketNumber);
List<ColumnMapping> regularColumnMappings = ColumnMapping.extractRegularColumnMappings(columnMappings); List<ColumnMapping> regularColumnMappings = ColumnMapping.extractRegularColumnMappings(columnMappings);


for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) { for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) {
Expand Down Expand Up @@ -248,7 +251,8 @@ public static List<ColumnMapping> buildColumnMappings(
List<HivePartitionKey> partitionKeys, List<HivePartitionKey> partitionKeys,
List<HiveColumnHandle> columns, List<HiveColumnHandle> columns,
Map<Integer, HiveType> columnCoercions, Map<Integer, HiveType> columnCoercions,
Path path) Path path,
OptionalInt bucketNumber)
{ {
Map<String, HivePartitionKey> partitionKeysByName = uniqueIndex(partitionKeys, HivePartitionKey::getName); Map<String, HivePartitionKey> partitionKeysByName = uniqueIndex(partitionKeys, HivePartitionKey::getName);
int regularIndex = 0; int regularIndex = 0;
Expand All @@ -266,7 +270,7 @@ public static List<ColumnMapping> buildColumnMappings(


// prepare the prefilled value // prepare the prefilled value
HivePartitionKey partitionKey = partitionKeysByName.get(column.getName()); HivePartitionKey partitionKey = partitionKeysByName.get(column.getName());
prefilledValue = getPrefilledColumnValue(column, partitionKey, path); prefilledValue = getPrefilledColumnValue(column, partitionKey, path, bucketNumber);
} }


Optional<HiveType> coercionFrom = Optional.ofNullable(columnCoercions.get(column.getHiveColumnIndex())); Optional<HiveType> coercionFrom = Optional.ofNullable(columnCoercions.get(column.getHiveColumnIndex()));
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.predicate.NullableValue; import com.facebook.presto.spi.predicate.NullableValue;
import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.predicate.TupleDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;


import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;


import static com.facebook.presto.hive.HiveBucketing.HiveBucket; import static com.facebook.presto.hive.HiveBucketing.HiveBucket;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
Expand All @@ -34,33 +35,33 @@ public class HivePartition
private final TupleDomain<HiveColumnHandle> effectivePredicate; private final TupleDomain<HiveColumnHandle> effectivePredicate;
private final String partitionId; private final String partitionId;
private final Map<ColumnHandle, NullableValue> keys; private final Map<ColumnHandle, NullableValue> keys;
private final Optional<HiveBucket> bucket; private final List<HiveBucket> buckets;


public HivePartition(SchemaTableName tableName, TupleDomain<HiveColumnHandle> effectivePredicate) public HivePartition(SchemaTableName tableName, TupleDomain<HiveColumnHandle> effectivePredicate)
{ {
this.tableName = requireNonNull(tableName, "tableName is null"); this.tableName = requireNonNull(tableName, "tableName is null");
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null"); this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.partitionId = UNPARTITIONED_ID; this.partitionId = UNPARTITIONED_ID;
this.keys = ImmutableMap.of(); this.keys = ImmutableMap.of();
this.bucket = Optional.empty(); this.buckets = ImmutableList.of();
} }


public HivePartition(SchemaTableName tableName, TupleDomain<HiveColumnHandle> effectivePredicate, Optional<HiveBucket> bucket) public HivePartition(SchemaTableName tableName, TupleDomain<HiveColumnHandle> effectivePredicate, List<HiveBucket> buckets)
{ {
this(tableName, effectivePredicate, UNPARTITIONED_ID, ImmutableMap.of(), bucket); this(tableName, effectivePredicate, UNPARTITIONED_ID, ImmutableMap.of(), buckets);
} }


public HivePartition(SchemaTableName tableName, public HivePartition(SchemaTableName tableName,
TupleDomain<HiveColumnHandle> effectivePredicate, TupleDomain<HiveColumnHandle> effectivePredicate,
String partitionId, String partitionId,
Map<ColumnHandle, NullableValue> keys, Map<ColumnHandle, NullableValue> keys,
Optional<HiveBucket> bucket) List<HiveBucket> buckets)
{ {
this.tableName = requireNonNull(tableName, "tableName is null"); this.tableName = requireNonNull(tableName, "tableName is null");
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null"); this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.partitionId = requireNonNull(partitionId, "partitionId is null"); this.partitionId = requireNonNull(partitionId, "partitionId is null");
this.keys = ImmutableMap.copyOf(requireNonNull(keys, "keys is null")); this.keys = ImmutableMap.copyOf(requireNonNull(keys, "keys is null"));
this.bucket = requireNonNull(bucket, "bucket number is null"); this.buckets = requireNonNull(buckets, "bucket number is null");
} }


public SchemaTableName getTableName() public SchemaTableName getTableName()
Expand Down Expand Up @@ -88,9 +89,9 @@ public Map<ColumnHandle, NullableValue> getKeys()
return keys; return keys;
} }


public Optional<HiveBucket> getBucket() public List<HiveBucket> getBuckets()
{ {
return bucket; return buckets;
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;


import static com.facebook.presto.hive.HiveBucketing.getHiveBucket; import static com.facebook.presto.hive.HiveBucketing.HiveBucket;
import static com.facebook.presto.hive.HiveBucketing.getHiveBucketHandle; import static com.facebook.presto.hive.HiveBucketing.getHiveBucketHandle;
import static com.facebook.presto.hive.HiveBucketing.getHiveBucketNumbers;
import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles; import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.hive.HiveUtil.parsePartitionValue; import static com.facebook.presto.hive.HiveUtil.parsePartitionValue;
import static com.facebook.presto.hive.util.Types.checkType; import static com.facebook.presto.hive.util.Types.checkType;
Expand Down Expand Up @@ -106,8 +107,7 @@ public HivePartitionResult getPartitions(ConnectorSession session, SemiTransacti
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(connectorId, table); Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(connectorId, table);


List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(connectorId, table); List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(connectorId, table);
Optional<HiveBucketing.HiveBucket> bucket = getHiveBucket(table, TupleDomain.extractFixedValues(effectivePredicate).get()); List<HiveBucket> buckets = getHiveBucketNumbers(table, effectivePredicate);

TupleDomain<HiveColumnHandle> compactEffectivePredicate = toCompactTupleDomain(effectivePredicate, domainCompactionThreshold); TupleDomain<HiveColumnHandle> compactEffectivePredicate = toCompactTupleDomain(effectivePredicate, domainCompactionThreshold);


if (effectivePredicate.isNone()) { if (effectivePredicate.isNone()) {
Expand All @@ -117,7 +117,7 @@ public HivePartitionResult getPartitions(ConnectorSession session, SemiTransacti
if (partitionColumns.isEmpty()) { if (partitionColumns.isEmpty()) {
return new HivePartitionResult( return new HivePartitionResult(
partitionColumns, partitionColumns,
ImmutableList.of(new HivePartition(tableName, compactEffectivePredicate, bucket)), ImmutableList.of(new HivePartition(tableName, compactEffectivePredicate, buckets)),
effectivePredicate, effectivePredicate,
TupleDomain.none(), TupleDomain.none(),
hiveBucketHandle); hiveBucketHandle);
Expand All @@ -131,7 +131,7 @@ public HivePartitionResult getPartitions(ConnectorSession session, SemiTransacti
Optional<Map<ColumnHandle, NullableValue>> values = parseValuesAndFilterPartition(partitionName, partitionColumns, effectivePredicate); Optional<Map<ColumnHandle, NullableValue>> values = parseValuesAndFilterPartition(partitionName, partitionColumns, effectivePredicate);


if (values.isPresent()) { if (values.isPresent()) {
partitions.add(new HivePartition(tableName, compactEffectivePredicate, partitionName, values.get(), bucket)); partitions.add(new HivePartition(tableName, compactEffectivePredicate, partitionName, values.get(), buckets));
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
return new FixedSplitSource(ImmutableList.of()); return new FixedSplitSource(ImmutableList.of());
} }
SchemaTableName tableName = partition.getTableName(); SchemaTableName tableName = partition.getTableName();
Optional<HiveBucketing.HiveBucket> bucket = partition.getBucket(); List<HiveBucketing.HiveBucket> buckets = partition.getBuckets();
Optional<HiveBucketHandle> bucketHandle = layout.getBucketHandle(); Optional<HiveBucketHandle> bucketHandle = layout.getBucketHandle();


// sort partitions // sort partitions
Expand All @@ -169,7 +169,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
table.get(), table.get(),
hivePartitions, hivePartitions,
bucketHandle, bucketHandle,
bucket, buckets,
session, session,
hdfsEnvironment, hdfsEnvironment,
namenodeStats, namenodeStats,
Expand Down
Loading

0 comments on commit f8648a1

Please sign in to comment.