Skip to content

Commit

Permalink
Refactor ParquetPageSource
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenxiao committed Oct 15, 2020
1 parent 2e0c44f commit 84ade36
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,27 @@
package com.facebook.presto.hive.parquet;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.LazyBlock;
import com.facebook.presto.common.block.LazyBlockLoader;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.parquet.Field;
import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.reader.ParquetReader;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveColumnHandle.getPushedDownSubfield;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static com.facebook.presto.hive.parquet.ParquetPageSourceFactory.getParquetType;
import static com.facebook.presto.parquet.ParquetTypeUtils.lookupColumnByName;
import static com.facebook.presto.parquet.ParquetTypeUtils.nestedColumnPath;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.io.ColumnIOConverter.constructField;
import static org.apache.parquet.io.ColumnIOConverter.findNestedColumnIO;

public class ParquetPageSource
implements ConnectorPageSource
Expand All @@ -68,52 +51,14 @@ public class ParquetPageSource

public ParquetPageSource(
ParquetReader parquetReader,
MessageType fileSchema,
MessageColumnIO messageColumnIO,
TypeManager typeManager,
List<HiveColumnHandle> columns,
boolean useParquetColumnNames,
SchemaTableName tableName,
Path path)
List<Type> types,
List<Optional<Field>> fields,
List<String> columnNames)
{
requireNonNull(columns, "columns is null");
requireNonNull(fileSchema, "fileSchema is null");
this.parquetReader = requireNonNull(parquetReader, "parquetReader is null");

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
for (HiveColumnHandle column : columns) {
checkArgument(column.getColumnType() == REGULAR || column.getColumnType() == SYNTHESIZED, "column type must be regular or synthesized column");

String name = column.getName();
Type type = typeManager.getType(column.getTypeSignature());

namesBuilder.add(name);
typesBuilder.add(type);

if (column.getColumnType() == SYNTHESIZED) {
Subfield pushedDownSubfield = getPushedDownSubfield(column);
List<String> nestedColumnPath = nestedColumnPath(pushedDownSubfield);
Optional<ColumnIO> columnIO = findNestedColumnIO(lookupColumnByName(messageColumnIO, pushedDownSubfield.getRootName()), nestedColumnPath);
if (columnIO.isPresent()) {
fieldsBuilder.add(constructField(type, columnIO.get()));
}
else {
fieldsBuilder.add(Optional.empty());
}
}
else if (getParquetType(type, fileSchema, useParquetColumnNames, column, tableName, path).isPresent()) {
String columnName = useParquetColumnNames ? name : fileSchema.getFields().get(column.getHiveColumnIndex()).getName();
fieldsBuilder.add(constructField(type, lookupColumnByName(messageColumnIO, columnName)));
}
else {
fieldsBuilder.add(Optional.empty());
}
}
types = typesBuilder.build();
fields = fieldsBuilder.build();
columnNames = namesBuilder.build();
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
this.fields = ImmutableList.copyOf(requireNonNull(fields, "fields is null"));
this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.parquet.Field;
import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.ParquetDataSource;
import com.facebook.presto.parquet.RichColumnDescriptor;
Expand All @@ -51,6 +52,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -85,6 +87,7 @@
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.AGGREGATED;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveColumnHandle.getPushedDownSubfield;
import static com.facebook.presto.hive.HiveColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
Expand All @@ -104,6 +107,7 @@
import static com.facebook.presto.parquet.ParquetTypeUtils.getDescriptors;
import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetTypeByName;
import static com.facebook.presto.parquet.ParquetTypeUtils.getSubfieldType;
import static com.facebook.presto.parquet.ParquetTypeUtils.lookupColumnByName;
import static com.facebook.presto.parquet.ParquetTypeUtils.nestedColumnPath;
import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate;
import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches;
Expand All @@ -113,6 +117,8 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE;
import static org.apache.parquet.io.ColumnIOConverter.constructField;
import static org.apache.parquet.io.ColumnIOConverter.findNestedColumnIO;

public class ParquetPageSourceFactory
implements HiveBatchPageSourceFactory
Expand Down Expand Up @@ -261,15 +267,38 @@ public static ConnectorPageSource createParquetPageSource(
batchReaderEnabled,
verificationEnabled);

return new ParquetPageSource(
parquetReader,
fileSchema,
messageColumnIO,
typeManager,
columns,
useParquetColumnNames,
tableName,
path);
ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
for (HiveColumnHandle column : columns) {
checkArgument(column.getColumnType() == REGULAR || column.getColumnType() == SYNTHESIZED, "column type must be regular or synthesized column");

String name = column.getName();
Type type = typeManager.getType(column.getTypeSignature());

namesBuilder.add(name);
typesBuilder.add(type);

if (column.getColumnType() == SYNTHESIZED) {
Subfield pushedDownSubfield = getPushedDownSubfield(column);
List<String> nestedColumnPath = nestedColumnPath(pushedDownSubfield);
Optional<ColumnIO> columnIO = findNestedColumnIO(lookupColumnByName(messageColumnIO, pushedDownSubfield.getRootName()), nestedColumnPath);
if (columnIO.isPresent()) {
fieldsBuilder.add(constructField(type, columnIO.get()));
}
else {
fieldsBuilder.add(Optional.empty());
}
}
else if (getParquetType(type, fileSchema, useParquetColumnNames, column, tableName, path).isPresent()) {
String columnName = useParquetColumnNames ? name : fileSchema.getFields().get(column.getHiveColumnIndex()).getName();
fieldsBuilder.add(constructField(type, lookupColumnByName(messageColumnIO, columnName)));
}
else {
fieldsBuilder.add(Optional.empty());
}
}
return new ParquetPageSource(parquetReader, typesBuilder.build(), fieldsBuilder.build(), namesBuilder.build());
}
catch (Exception e) {
try {
Expand Down

0 comments on commit 84ade36

Please sign in to comment.