Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading parquet column with unused dictionary #15942

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

import java.util.OptionalLong;

public abstract class DataPage
public abstract sealed class DataPage
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
extends Page
permits DataPageV1, DataPageV2
{
protected final int valueCount;
private final OptionalLong firstRowIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class DataPageV1
public final class DataPageV1
extends DataPage
{
private final Slice slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class DataPageV2
public final class DataPageV2
extends DataPage
{
private final int rowCount;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
public final class PageReader
{
private final CompressionCodecName codec;
private final boolean hasDictionaryPage;
private final boolean hasOnlyDictionaryEncodedPages;
private final boolean hasNoNulls;
private final PeekingIterator<Page> compressedPages;
Expand All @@ -64,23 +63,22 @@ public static PageReader createPageReader(
boolean hasOnlyDictionaryEncodedPages = isOnlyDictionaryEncodingPages(metadata);
ParquetColumnChunkIterator compressedPages = new ParquetColumnChunkIterator(
fileCreatedBy,
new ColumnChunkDescriptor(columnDescriptor, metadata),
columnDescriptor,
metadata,
columnChunk,
offsetIndex);
return new PageReader(metadata.getCodec(), compressedPages, compressedPages.hasDictionaryPage(), hasOnlyDictionaryEncodedPages, hasNoNulls);
return new PageReader(metadata.getCodec(), compressedPages, hasOnlyDictionaryEncodedPages, hasNoNulls);
}

@VisibleForTesting
public PageReader(
CompressionCodecName codec,
Iterator<? extends Page> compressedPages,
boolean hasDictionaryPage,
boolean hasOnlyDictionaryEncodedPages,
boolean hasNoNulls)
{
this.codec = codec;
this.compressedPages = Iterators.peekingIterator(compressedPages);
this.hasDictionaryPage = hasDictionaryPage;
this.hasOnlyDictionaryEncodedPages = hasOnlyDictionaryEncodedPages;
this.hasNoNulls = hasNoNulls;
}
Expand All @@ -97,13 +95,11 @@ public boolean hasOnlyDictionaryEncodedPages()

public DataPage readPage()
{
if (hasDictionaryPage) {
checkState(dictionaryAlreadyRead, "Dictionary has to be read first");
}
if (!compressedPages.hasNext()) {
return null;
}
Page compressedPage = compressedPages.next();
checkState(compressedPage instanceof DataPage, "Found page %s instead of a DataPage", compressedPage);
dataPageReadCount++;
try {
if (compressedPage instanceof DataPageV1 dataPageV1) {
Expand Down Expand Up @@ -143,16 +139,14 @@ public DataPage readPage()

public DictionaryPage readDictionaryPage()
{
if (!hasDictionaryPage) {
checkState(!dictionaryAlreadyRead, "Dictionary was already read");
checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already");
dictionaryAlreadyRead = true;
if (!(compressedPages.peek() instanceof DictionaryPage)) {
return null;
}
try {
checkState(!dictionaryAlreadyRead, "Dictionary was already read");
checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already");
dictionaryAlreadyRead = true;
Page firstPage = compressedPages.next();
checkArgument(firstPage instanceof DictionaryPage, "DictionaryPage has to be the first page in the column chunk but got %s", firstPage);
DictionaryPage compressedDictionaryPage = (DictionaryPage) firstPage;
DictionaryPage compressedDictionaryPage = (DictionaryPage) compressedPages.next();
return new DictionaryPage(
decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()),
compressedDictionaryPage.getDictionarySize(),
Expand Down Expand Up @@ -188,8 +182,6 @@ public boolean arePagesCompressed()

private void verifyDictionaryPageRead()
{
if (hasDictionaryPage) {
checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first");
}
checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.Page;
import io.trino.parquet.ParquetCorruptionException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.DictionaryPageHeader;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

import javax.annotation.Nullable;
Expand All @@ -33,49 +35,46 @@
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding;
import static java.util.Objects.requireNonNull;

public final class ParquetColumnChunkIterator
implements Iterator<Page>
{
private final Optional<String> fileCreatedBy;
private final ColumnChunkDescriptor descriptor;
private final ColumnDescriptor descriptor;
private final ColumnChunkMetaData metadata;
private final ChunkedInputStream input;
private final OffsetIndex offsetIndex;

private long valueCount;
private int dataPageCount;
private boolean dictionaryWasRead;

public ParquetColumnChunkIterator(
Optional<String> fileCreatedBy,
ColumnChunkDescriptor descriptor,
ColumnDescriptor descriptor,
ColumnChunkMetaData metadata,
ChunkedInputStream input,
@Nullable OffsetIndex offsetIndex)
{
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
this.descriptor = requireNonNull(descriptor, "descriptor is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.input = requireNonNull(input, "input is null");
this.offsetIndex = offsetIndex;
}

public boolean hasDictionaryPage()
{
return descriptor.getColumnChunkMetaData().hasDictionaryPage();
}

@Override
public boolean hasNext()
{
return hasMorePages(valueCount, dataPageCount) || (hasDictionaryPage() && !dictionaryWasRead);
return hasMorePages(valueCount, dataPageCount);
}

@Override
public Page next()
{
checkArgument(hasNext());
checkState(hasNext(), "No more data left to read in column (%s), metadata (%s), valueCount %s, dataPageCount %s", descriptor, metadata, valueCount, dataPageCount);

try {
PageHeader pageHeader = readPageHeader();
Expand All @@ -85,10 +84,9 @@ public Page next()
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dataPageCount != 0) {
throw new ParquetCorruptionException("%s has dictionary page at not first position in column chunk", descriptor.getColumnDescriptor());
throw new ParquetCorruptionException("Column (%s) has a dictionary page after the first position in column chunk", descriptor);
}
result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size());
dictionaryWasRead = true;
break;
case DATA_PAGE:
result = readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, getFirstRowIndex(dataPageCount, offsetIndex));
Expand Down Expand Up @@ -118,7 +116,7 @@ private PageHeader readPageHeader()
private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar)
{
if (offsetIndex == null) {
return valuesCountReadSoFar < descriptor.getColumnChunkMetaData().getValueCount();
return valuesCountReadSoFar < metadata.getValueCount();
}
return dataPageCountReadSoFar < offsetIndex.getPageCount();
}
Expand Down Expand Up @@ -176,7 +174,7 @@ private DataPageV2 readDataPageV2(
MetadataReader.readStats(
fileCreatedBy,
Optional.ofNullable(dataHeaderV2.getStatistics()),
descriptor.getColumnDescriptor().getPrimitiveType()),
descriptor.getPrimitiveType()),
dataHeaderV2.isIs_compressed());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public int read()
throws IOException
{
ColumnReader columnReader = ColumnReaderFactory.create(field, UTC, newSimpleAggregatedMemoryContext(), true);
columnReader.setPageReader(new PageReader(UNCOMPRESSED, dataPages.iterator(), false, false, false), Optional.empty());
columnReader.setPageReader(new PageReader(UNCOMPRESSED, dataPages.iterator(), false, false), Optional.empty());
int rowsRead = 0;
while (rowsRead < dataPositions) {
int remaining = dataPositions - rowsRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ else if (dictionaryEncoding == DictionaryEncoding.MIXED) {
return new PageReader(
UNCOMPRESSED,
inputPages.iterator(),
dictionaryEncoding != DictionaryEncoding.NONE,
dictionaryEncoding == DictionaryEncoding.ALL || (dictionaryEncoding == DictionaryEncoding.MIXED && testingPages.size() == 1),
false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testVariousTimestamps(TimestampType type, BiFunction<Block, Integer,
// Read and assert
ColumnReader reader = ColumnReaderFactory.create(field, DateTimeZone.UTC, newSimpleAggregatedMemoryContext(), true);
reader.setPageReader(
new PageReader(UNCOMPRESSED, List.of(dataPage).iterator(), false, false, false),
new PageReader(UNCOMPRESSED, List.of(dataPage).iterator(), false, false),
Optional.empty());
reader.prepareNextRead(valueCount);
Block block = reader.readPrimitive().getBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.parquet.DataPageV1;
import io.trino.parquet.DataPageV2;
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetEncoding;
import io.trino.parquet.ParquetTypeUtils;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -55,7 +54,6 @@
import static io.trino.parquet.reader.TestPageReader.DataPageType.V2;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.Encoding.PLAIN;
import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
import static org.apache.parquet.format.PageType.DATA_PAGE_V2;
import static org.apache.parquet.format.PageType.DICTIONARY_PAGE;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
Expand Down Expand Up @@ -199,8 +197,10 @@ public void dictionaryPage(CompressionCodecName compressionCodec, DataPageType d
assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of(Slices.wrappedBuffer(bytes)));

// only dictionary
assertPages(compressionCodec, 0, 0, pageHeader, compressedDataPage, true, ImmutableList.of(
Slices.wrappedBuffer(Arrays.copyOf(bytes, dictionaryPageSize))));
pageReader = createPageReader(0, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(Arrays.copyOf(bytes, dictionaryPageSize))));
assertThatThrownBy(pageReader::readDictionaryPage)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("No more data left to read");

// multiple slices dictionary
assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of(
Expand Down Expand Up @@ -239,16 +239,45 @@ public void dictionaryPageNotFirst()

int totalValueCount = valueCount * 2;

// metadata says there is a dictionary but it's not the first page
assertThatThrownBy(() -> createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))).readDictionaryPage())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("DictionaryPage has to be the first page in the column chunk");
// There is a dictionary, but it's there as the second page
PageReader pageReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes)));
assertThat(pageReader.readDictionaryPage()).isNull();
assertThat(pageReader.readPage()).isNotNull();
assertThatThrownBy(pageReader::readPage)
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("has a dictionary page after the first position");
}

@Test
public void unusedDictionaryPage()
throws Exception
{
// A parquet file produced by Impala was found to have an empty dictionary
// which is not used in the encoding of data pages in the column
CompressionCodecName compressionCodec = UNCOMPRESSED;
byte[] compressedDictionaryPage = TestPageReader.compress(compressionCodec, new byte[0], 0, 0);
PageHeader dictionaryPageHeader = new PageHeader(DICTIONARY_PAGE, 0, compressedDictionaryPage.length);
dictionaryPageHeader.setDictionary_page_header(new DictionaryPageHeader(0, Encoding.PLAIN));
ByteArrayOutputStream out = new ByteArrayOutputStream(100);
Util.writePageHeader(dictionaryPageHeader, out);
out.write(compressedDictionaryPage);

DataPageType dataPageType = V2;
byte[] compressedDataPage = DATA_PAGE;

PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length);
int valueCount = 10;
dataPageType.setDataPageHeader(pageHeader, valueCount);

Util.writePageHeader(pageHeader, out);
out.write(compressedDataPage);
byte[] bytes = out.toByteArray();

// metadata says there is no dictionary, but it's there as second page
PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)));
assertTrue(pageReader.hasNext());
pageReader.skipNextPage();
assertThatThrownBy(pageReader::readPage).isInstanceOf(RuntimeException.class).hasCauseInstanceOf(ParquetCorruptionException.class);
// There is a dictionary, but it's there as the second page
PageReader pageReader = createPageReader(valueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes)));
assertThat(pageReader.readDictionaryPage()).isNotNull();
assertThat(pageReader.readPage()).isNotNull();
assertThat(pageReader.readPage()).isNull();
}

private static void assertSinglePage(CompressionCodecName compressionCodec, int valueCount, PageHeader pageHeader, byte[] compressedDataPage, List<Slice> slices)
Expand Down Expand Up @@ -363,7 +392,6 @@ private static PageReader createPageReader(int valueCount, CompressionCodecName
EncodingStats.Builder encodingStats = new EncodingStats.Builder();
if (hasDictionary) {
encodingStats.addDictEncoding(PLAIN);
encodingStats.addDataEncoding(RLE_DICTIONARY);
}
ColumnChunkMetaData columnChunkMetaData = ColumnChunkMetaData.get(
ColumnPath.get(""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ private static PageReader getSimplePageReaderMock(ParquetEncoding encoding)
encoding,
encoding,
PLAIN));
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false, false);
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false);
}

private static PageReader getNullOnlyPageReaderMock()
Expand All @@ -745,7 +745,7 @@ private static PageReader getNullOnlyPageReaderMock()
RLE,
RLE,
PLAIN));
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false, false);
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false);
}

private static PageReader getPageReaderMock(List<DataPage> dataPages, @Nullable DictionaryPage dictionaryPage)
Expand All @@ -762,7 +762,6 @@ private static PageReader getPageReaderMock(List<DataPage> dataPages, @Nullable
return new PageReader(
UNCOMPRESSED,
pagesBuilder.addAll(dataPages).build().iterator(),
dictionaryPage != null,
dataPages.stream()
.map(page -> {
if (page instanceof DataPageV1 pageV1) {
Expand Down