Skip to content

Commit

Permalink
Fix reading parquet column with unused dictionary
Browse files Browse the repository at this point in the history
A parquet file produced by Impala was found to have an empty dictionary
which is not used in the encoding of data pages in the column.
For such a case we cannot rely on ColumnChunkMetaData#hasDictionaryPage
as that checks for whether the data pages are also encoded using the dictionary.
This change removes usage of hasDictionaryPage to fix query failures
with such files.
  • Loading branch information
raunaqmorarka committed Feb 2, 2023
1 parent d94d45a commit 51dc650
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
public final class PageReader
{
private final CompressionCodecName codec;
private final boolean hasDictionaryPage;
private final boolean hasOnlyDictionaryEncodedPages;
private final boolean hasNoNulls;
private final PeekingIterator<Page> compressedPages;
Expand All @@ -68,20 +67,18 @@ public static PageReader createPageReader(
metadata,
columnChunk,
offsetIndex);
return new PageReader(metadata.getCodec(), compressedPages, compressedPages.hasDictionaryPage(), hasOnlyDictionaryEncodedPages, hasNoNulls);
return new PageReader(metadata.getCodec(), compressedPages, hasOnlyDictionaryEncodedPages, hasNoNulls);
}

@VisibleForTesting
public PageReader(
CompressionCodecName codec,
Iterator<? extends Page> compressedPages,
boolean hasDictionaryPage,
boolean hasOnlyDictionaryEncodedPages,
boolean hasNoNulls)
{
this.codec = codec;
this.compressedPages = Iterators.peekingIterator(compressedPages);
this.hasDictionaryPage = hasDictionaryPage;
this.hasOnlyDictionaryEncodedPages = hasOnlyDictionaryEncodedPages;
this.hasNoNulls = hasNoNulls;
}
Expand All @@ -98,13 +95,11 @@ public boolean hasOnlyDictionaryEncodedPages()

public DataPage readPage()
{
if (hasDictionaryPage) {
checkState(dictionaryAlreadyRead, "Dictionary has to be read first");
}
if (!compressedPages.hasNext()) {
return null;
}
Page compressedPage = compressedPages.next();
checkState(compressedPage instanceof DataPage, "Found page %s instead of a DataPage", compressedPage);
dataPageReadCount++;
try {
if (compressedPage instanceof DataPageV1 dataPageV1) {
Expand Down Expand Up @@ -144,16 +139,14 @@ public DataPage readPage()

public DictionaryPage readDictionaryPage()
{
if (!hasDictionaryPage) {
checkState(!dictionaryAlreadyRead, "Dictionary was already read");
checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already");
dictionaryAlreadyRead = true;
if (!(compressedPages.peek() instanceof DictionaryPage)) {
return null;
}
try {
checkState(!dictionaryAlreadyRead, "Dictionary was already read");
checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already");
dictionaryAlreadyRead = true;
Page firstPage = compressedPages.next();
checkArgument(firstPage instanceof DictionaryPage, "DictionaryPage has to be the first page in the column chunk but got %s", firstPage);
DictionaryPage compressedDictionaryPage = (DictionaryPage) firstPage;
DictionaryPage compressedDictionaryPage = (DictionaryPage) compressedPages.next();
return new DictionaryPage(
decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()),
compressedDictionaryPage.getDictionarySize(),
Expand Down Expand Up @@ -189,8 +182,6 @@ public boolean arePagesCompressed()

private void verifyDictionaryPageRead()
{
if (hasDictionaryPage) {
checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first");
}
checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding;
import static java.util.Objects.requireNonNull;

Expand All @@ -50,7 +50,6 @@ public final class ParquetColumnChunkIterator

private long valueCount;
private int dataPageCount;
private boolean dictionaryWasRead;

public ParquetColumnChunkIterator(
Optional<String> fileCreatedBy,
Expand All @@ -66,21 +65,16 @@ public ParquetColumnChunkIterator(
this.offsetIndex = offsetIndex;
}

public boolean hasDictionaryPage()
{
return metadata.hasDictionaryPage();
}

@Override
public boolean hasNext()
{
return hasMorePages(valueCount, dataPageCount) || (hasDictionaryPage() && !dictionaryWasRead);
return hasMorePages(valueCount, dataPageCount);
}

@Override
public Page next()
{
checkArgument(hasNext());
checkState(hasNext(), "No more data left to read in column (%s), metadata (%s), valueCount %s, dataPageCount %s", descriptor, metadata, valueCount, dataPageCount);

try {
PageHeader pageHeader = readPageHeader();
Expand All @@ -90,10 +84,9 @@ public Page next()
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dataPageCount != 0) {
throw new ParquetCorruptionException("%s has dictionary page at not first position in column chunk", descriptor);
throw new ParquetCorruptionException("Column (%s) has a dictionary page after the first position in column chunk", descriptor);
}
result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size());
dictionaryWasRead = true;
break;
case DATA_PAGE:
result = readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, getFirstRowIndex(dataPageCount, offsetIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public int read()
throws IOException
{
ColumnReader columnReader = ColumnReaderFactory.create(field, UTC, newSimpleAggregatedMemoryContext(), true);
columnReader.setPageReader(new PageReader(UNCOMPRESSED, dataPages.iterator(), false, false, false), Optional.empty());
columnReader.setPageReader(new PageReader(UNCOMPRESSED, dataPages.iterator(), false, false), Optional.empty());
int rowsRead = 0;
while (rowsRead < dataPositions) {
int remaining = dataPositions - rowsRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ else if (dictionaryEncoding == DictionaryEncoding.MIXED) {
return new PageReader(
UNCOMPRESSED,
inputPages.iterator(),
dictionaryEncoding != DictionaryEncoding.NONE,
dictionaryEncoding == DictionaryEncoding.ALL || (dictionaryEncoding == DictionaryEncoding.MIXED && testingPages.size() == 1),
false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testVariousTimestamps(TimestampType type, BiFunction<Block, Integer,
// Read and assert
ColumnReader reader = ColumnReaderFactory.create(field, DateTimeZone.UTC, newSimpleAggregatedMemoryContext(), true);
reader.setPageReader(
new PageReader(UNCOMPRESSED, List.of(dataPage).iterator(), false, false, false),
new PageReader(UNCOMPRESSED, List.of(dataPage).iterator(), false, false),
Optional.empty());
reader.prepareNextRead(valueCount);
Block block = reader.readPrimitive().getBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.parquet.DataPageV1;
import io.trino.parquet.DataPageV2;
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetEncoding;
import io.trino.parquet.ParquetTypeUtils;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -55,7 +54,6 @@
import static io.trino.parquet.reader.TestPageReader.DataPageType.V2;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.Encoding.PLAIN;
import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
import static org.apache.parquet.format.PageType.DATA_PAGE_V2;
import static org.apache.parquet.format.PageType.DICTIONARY_PAGE;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
Expand Down Expand Up @@ -199,8 +197,10 @@ public void dictionaryPage(CompressionCodecName compressionCodec, DataPageType d
assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of(Slices.wrappedBuffer(bytes)));

// only dictionary
assertPages(compressionCodec, 0, 0, pageHeader, compressedDataPage, true, ImmutableList.of(
Slices.wrappedBuffer(Arrays.copyOf(bytes, dictionaryPageSize))));
pageReader = createPageReader(0, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(Arrays.copyOf(bytes, dictionaryPageSize))));
assertThatThrownBy(pageReader::readDictionaryPage)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("No more data left to read");

// multiple slices dictionary
assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of(
Expand Down Expand Up @@ -239,16 +239,45 @@ public void dictionaryPageNotFirst()

int totalValueCount = valueCount * 2;

// metadata says there is a dictionary but it's not the first page
assertThatThrownBy(() -> createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))).readDictionaryPage())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("DictionaryPage has to be the first page in the column chunk");
// There is a dictionary, but it's there as the second page
PageReader pageReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes)));
assertThat(pageReader.readDictionaryPage()).isNull();
assertThat(pageReader.readPage()).isNotNull();
assertThatThrownBy(pageReader::readPage)
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("has a dictionary page after the first position");
}

@Test
public void unusedDictionaryPage()
throws Exception
{
// A parquet file produced by Impala was found to have an empty dictionary
// which is not used in the encoding of data pages in the column
CompressionCodecName compressionCodec = UNCOMPRESSED;
byte[] compressedDictionaryPage = TestPageReader.compress(compressionCodec, new byte[0], 0, 0);
PageHeader dictionaryPageHeader = new PageHeader(DICTIONARY_PAGE, 0, compressedDictionaryPage.length);
dictionaryPageHeader.setDictionary_page_header(new DictionaryPageHeader(0, Encoding.PLAIN));
ByteArrayOutputStream out = new ByteArrayOutputStream(100);
Util.writePageHeader(dictionaryPageHeader, out);
out.write(compressedDictionaryPage);

DataPageType dataPageType = V2;
byte[] compressedDataPage = DATA_PAGE;

PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length);
int valueCount = 10;
dataPageType.setDataPageHeader(pageHeader, valueCount);

Util.writePageHeader(pageHeader, out);
out.write(compressedDataPage);
byte[] bytes = out.toByteArray();

// metadata says there is no dictionary, but it's there as second page
PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)));
assertTrue(pageReader.hasNext());
pageReader.skipNextPage();
assertThatThrownBy(pageReader::readPage).isInstanceOf(RuntimeException.class).hasCauseInstanceOf(ParquetCorruptionException.class);
// There is a dictionary, but it's there as the second page
PageReader pageReader = createPageReader(valueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes)));
assertThat(pageReader.readDictionaryPage()).isNotNull();
assertThat(pageReader.readPage()).isNotNull();
assertThat(pageReader.readPage()).isNull();
}

private static void assertSinglePage(CompressionCodecName compressionCodec, int valueCount, PageHeader pageHeader, byte[] compressedDataPage, List<Slice> slices)
Expand Down Expand Up @@ -363,7 +392,6 @@ private static PageReader createPageReader(int valueCount, CompressionCodecName
EncodingStats.Builder encodingStats = new EncodingStats.Builder();
if (hasDictionary) {
encodingStats.addDictEncoding(PLAIN);
encodingStats.addDataEncoding(RLE_DICTIONARY);
}
ColumnChunkMetaData columnChunkMetaData = ColumnChunkMetaData.get(
ColumnPath.get(""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ private static PageReader getSimplePageReaderMock(ParquetEncoding encoding)
encoding,
encoding,
PLAIN));
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false, false);
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false);
}

private static PageReader getNullOnlyPageReaderMock()
Expand All @@ -745,7 +745,7 @@ private static PageReader getNullOnlyPageReaderMock()
RLE,
RLE,
PLAIN));
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false, false);
return new PageReader(UNCOMPRESSED, pages.iterator(), false, false);
}

private static PageReader getPageReaderMock(List<DataPage> dataPages, @Nullable DictionaryPage dictionaryPage)
Expand All @@ -762,7 +762,6 @@ private static PageReader getPageReaderMock(List<DataPage> dataPages, @Nullable
return new PageReader(
UNCOMPRESSED,
pagesBuilder.addAll(dataPages).build().iterator(),
dictionaryPage != null,
dataPages.stream()
.map(page -> {
if (page instanceof DataPageV1 pageV1) {
Expand Down

0 comments on commit 51dc650

Please sign in to comment.