Skip to content

Commit

Permalink
Use builder for ParquetReader
Browse files Browse the repository at this point in the history
  • Loading branch information
shangxinli committed Nov 8, 2022
1 parent 4053ddc commit 0d5c126
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,19 +260,14 @@ private static ConnectorPageSource createParquetPageSource(
}
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks.build(),
Optional.empty(),
dataSource,
systemMemoryContext,
getParquetMaxReadBlockSize(session),
isParquetBatchReadsEnabled(session),
isParquetBatchReaderVerificationEnabled(session),
parquetPredicate,
blockIndexStores,
false,
fileDecryptor);
ParquetReader parquetReader = ParquetReader.builder(messageColumnIO, blocks.build(), dataSource, systemMemoryContext)
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withBatchReadEnabled(isParquetBatchReadsEnabled(session))
.withEnableVerification(isParquetBatchReaderVerificationEnabled(session))
.withPredicate(parquetPredicate)
.withBlockIndexStores(blockIndexStores)
.withFileDecryptor(fileDecryptor)
.build();

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,16 @@ public static ConnectorPageSource createParquetPageSource(
nextStart += block.getRowCount();
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks.build(),
Optional.of(blockStarts.build()),
dataSource,
systemMemoryContext,
getParquetMaxReadBlockSize(session),
isParquetBatchReadsEnabled(session),
isParquetBatchReaderVerificationEnabled(session),
parquetPredicate,
blockIndexStores,
columnIndexFilterEnabled,
fileDecryptor);
ParquetReader parquetReader = ParquetReader.builder(messageColumnIO, blocks.build(), dataSource, systemMemoryContext)
.withFirstRowOfBlocks(Optional.of(blockStarts.build()))
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withBatchReadEnabled(isParquetBatchReadsEnabled(session))
.withEnableVerification(isParquetBatchReaderVerificationEnabled(session))
.withPredicate(parquetPredicate)
.withBlockIndexStores(blockIndexStores)
.withFileDecryptor(fileDecryptor)
.withColumnIndexEnabled(columnIndexFilterEnabled)
.build();

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,11 @@ ParquetPageSource createParquetPageSource()
for (int i = 0; i < channelCount; i++) {
fields.add(ColumnIOConverter.constructField(getTypeFromTypeSignature(), messageColumnIO.getChild(i)));
}

ParquetReader parquetReader = new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), batchReadEnabled, enableVerification, null, null, false, Optional.empty());
ParquetReader parquetReader = ParquetReader.builder(messageColumnIO, parquetMetadata.getBlocks(), dataSource, newSimpleAggregatedMemoryContext())
.withMaxReadBlockSize(new DataSize(16, MEGABYTE))
.withBatchReadEnabled(batchReadEnabled)
.withEnableVerification(enableVerification)
.build();
return new ParquetPageSource(parquetReader, Collections.nCopies(channelCount, type), fields, columnNames, new RuntimeStats());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,14 @@ public static ConnectorPageSource createParquetPageSource(
}

MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks,
Optional.empty(),
dataSource,
systemMemoryContext,
getParquetMaxReadBlockSize(session),
isParquetBatchReadsEnabled(session),
isParquetBatchReaderVerificationEnabled(session),
parquetPredicate,
blockIndexStores,
false,
fileDecryptor);
ParquetReader parquetReader = ParquetReader.builder(messageColumnIO, blocks, dataSource, systemMemoryContext)
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withBatchReadEnabled(isParquetBatchReadsEnabled(session))
.withEnableVerification(isParquetBatchReaderVerificationEnabled(session))
.withPredicate(parquetPredicate)
.withBlockIndexStores(blockIndexStores)
.withFileDecryptor(fileDecryptor)
.build();

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<com.facebook.presto.common.type.Type> prestoTypes = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,14 @@ private static ConnectorPageSource createParquetPageSource(
}

MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks,
Optional.empty(),
dataSource,
systemMemoryContext,
getParquetMaxReadBlockSize(session),
isParquetBatchReadsEnabled(session),
isParquetBatchReaderVerificationEnabled(session),
parquetPredicate,
blockIndexStores,
false,
fileDecryptor);
ParquetReader parquetReader = ParquetReader.builder(messageColumnIO, blocks, dataSource, systemMemoryContext)
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withBatchReadEnabled(isParquetBatchReadsEnabled(session))
.withEnableVerification(isParquetBatchReaderVerificationEnabled(session))
.withPredicate(parquetPredicate)
.withBlockIndexStores(blockIndexStores)
.withFileDecryptor(fileDecryptor)
.build();

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> prestoTypes = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,4 +595,106 @@ private List<OffsetRange> concatRanges(List<OffsetRange> offsetRanges)
}
return pageRanges;
}

public static Builder builder(
MessageColumnIO messageColumnIO,
List<BlockMetaData> block,
ParquetDataSource dataSource,
AggregatedMemoryContext systemMemoryContext)
{
return new Builder(messageColumnIO, block, dataSource, systemMemoryContext);
}

public static class Builder
{
private final MessageColumnIO messageColumnIO;
private final List<BlockMetaData> blocks;
protected final ParquetDataSource dataSource;
private final AggregatedMemoryContext systemMemoryContext;
private Optional<List<Long>> firstRowsOfBlocks = Optional.empty();
private DataSize maxReadBlockSize;
private boolean batchReadEnabled;
private boolean enableVerification;
private Predicate parquetPredicate;
private List<ColumnIndexStore> blockIndexStores;
private boolean columnIndexFilterEnabled;
private Optional<InternalFileDecryptor> fileDecryptor = Optional.empty();

private Builder(
MessageColumnIO messageColumnIO,
List<BlockMetaData> block,
ParquetDataSource dataSource,
AggregatedMemoryContext systemMemoryContext)
{
this.messageColumnIO = messageColumnIO;
this.blocks = block;
this.dataSource = dataSource;
this.systemMemoryContext = systemMemoryContext;
}

public Builder withFirstRowOfBlocks(Optional<List<Long>> firstRowsOfBlocks)
{
this.firstRowsOfBlocks = firstRowsOfBlocks;
return this;
}

public Builder withMaxReadBlockSize(DataSize maxReadBlockSize)
{
this.maxReadBlockSize = maxReadBlockSize;
return this;
}

public Builder withBatchReadEnabled(boolean batchReadEnabled)
{
this.batchReadEnabled = batchReadEnabled;
return this;
}

public Builder withEnableVerification(boolean enableVerification)
{
this.enableVerification = enableVerification;
return this;
}

public Builder withPredicate(Predicate parquetPredicate)
{
this.parquetPredicate = parquetPredicate;
return this;
}

public Builder withBlockIndexStores(List<ColumnIndexStore> blockIndexStores)
{
this.blockIndexStores = blockIndexStores;
return this;
}

public Builder withColumnIndexEnabled(boolean columnIndexFilterEnabled)
{
this.columnIndexFilterEnabled = columnIndexFilterEnabled;
return this;
}

public Builder withFileDecryptor(Optional<InternalFileDecryptor> fileDecryptor)
{
this.fileDecryptor = fileDecryptor;
return this;
}

public ParquetReader build()
{
return new ParquetReader(
this.messageColumnIO,
this.blocks,
this.firstRowsOfBlocks,
this.dataSource,
this.systemMemoryContext,
this.maxReadBlockSize,
this.batchReadEnabled,
this.enableVerification,
this.parquetPredicate,
this.blockIndexStores,
this.columnIndexFilterEnabled,
this.fileDecryptor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,11 @@ ParquetReader createRecordReader()
MessageColumnIO messageColumnIO = getColumnIO(schema, schema);

this.field = ColumnIOConverter.constructField(getType(), messageColumnIO.getChild(0)).get();

return new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), enableOptimizedReader, enableVerification, null, null, false, Optional.empty());
return ParquetReader.builder(messageColumnIO, parquetMetadata.getBlocks(), dataSource, newSimpleAggregatedMemoryContext())
.withMaxReadBlockSize(new DataSize(16, MEGABYTE))
.withBatchReadEnabled(enableOptimizedReader)
.withEnableVerification(enableVerification)
.build();
}

protected boolean getNullability()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static com.facebook.presto.parquet.ParquetTypeUtils.getArrayElementColumn;
import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO;
import static com.facebook.presto.parquet.ParquetTypeUtils.getMapKeyValueColumn;
Expand Down Expand Up @@ -432,19 +433,10 @@ private ParquetReader createParquetReader(ParquetMetadata parquetMetadata,
nextStart += block.getRowCount();
}

return new ParquetReader(
messageColumn,
blocks.build(),
Optional.empty(),
dataSource,
com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext(),
new DataSize(100000, DataSize.Unit.BYTE),
false,
false,
null,
null,
false,
fileDecryptor);
return ParquetReader.builder(messageColumn, blocks.build(), dataSource, newSimpleAggregatedMemoryContext())
.withMaxReadBlockSize(new DataSize(100000, DataSize.Unit.BYTE))
.withFileDecryptor(fileDecryptor)
.build();
}

private void validateFile(ParquetReader parquetReader, MessageColumnIO messageColumn, EncryptionTestFile inputFile)
Expand Down

0 comments on commit 0d5c126

Please sign in to comment.