Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support backwards compatible reads for unnanotated repeated primitive fields in Parquet #20943

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,15 @@ public static byte[] paddingBigInteger(BigInteger bigInteger, int numBytes)
return result;
}

/**
* Assumes the parent of columnIO is a MessageColumnIO, i.e. columnIO should be a top level column in the schema.
*/
public static Optional<Field> constructField(Type type, ColumnIO columnIO)
{
return constructField(type, columnIO, true);
}

private static Optional<Field> constructField(Type type, ColumnIO columnIO, boolean isTopLevel)
{
if (columnIO == null) {
return Optional.empty();
Expand All @@ -334,7 +342,7 @@ public static Optional<Field> constructField(Type type, ColumnIO columnIO)
boolean structHasParameters = false;
for (RowType.Field rowField : fields) {
String name = rowField.getName().orElseThrow().toLowerCase(Locale.ENGLISH);
Optional<Field> field = constructField(rowField.getType(), lookupColumnByName(groupColumnIO, name));
Optional<Field> field = constructField(rowField.getType(), lookupColumnByName(groupColumnIO, name), false);
structHasParameters |= field.isPresent();
fieldsBuilder.add(field);
}
Expand All @@ -349,19 +357,41 @@ public static Optional<Field> constructField(Type type, ColumnIO columnIO)
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
}
Optional<Field> keyField = constructField(mapType.getKeyType(), keyValueColumnIO.getChild(0));
Optional<Field> valueField = constructField(mapType.getValueType(), keyValueColumnIO.getChild(1));
Optional<Field> keyField = constructField(mapType.getKeyType(), keyValueColumnIO.getChild(0), false);
Optional<Field> valueField = constructField(mapType.getValueType(), keyValueColumnIO.getChild(1), false);
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(keyField, valueField)));
}
if (type instanceof ArrayType arrayType) {
// Per the parquet spec (https://github.com/apache/parquet-format/blob/master/LogicalTypes.md):
// `A repeated field that is neither contained by a LIST- or MAP-annotated group nor annotated by LIST or MAP should be interpreted as a required list of required elements
// where the element type is the type of the field.`
//
// A parquet encoding for a required list of strings can be expressed in two ways, however for backwards compatibility they should be handled the same, so here we need
// to adjust repetition and definition levels when converting ColumnIOs to Fields.
// 1. required group colors (LIST) {
// repeated group list {
// required string element;
// }
// }
// 2. repeated binary colors (STRING);
if (columnIO instanceof PrimitiveColumnIO primitiveColumnIO) {
if (columnIO.getType().getRepetition() != REPEATED || repetitionLevel == 0 || definitionLevel == 0) {
throw new TrinoException(NOT_SUPPORTED, format("Unsupported schema for Parquet column (%s)", primitiveColumnIO.getColumnDescriptor()));
}
PrimitiveField primitiveFieldElement = new PrimitiveField(arrayType.getElementType(), true, primitiveColumnIO.getColumnDescriptor(), primitiveColumnIO.getId());
return Optional.of(new GroupField(type, repetitionLevel - 1, definitionLevel - 1, true, ImmutableList.of(Optional.of(primitiveFieldElement))));
}
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
if (groupColumnIO.getChildrenCount() != 1) {
return Optional.empty();
}
Optional<Field> field = constructField(arrayType.getElementType(), getArrayElementColumn(groupColumnIO.getChild(0)));
Optional<Field> field = constructField(arrayType.getElementType(), getArrayElementColumn(groupColumnIO.getChild(0)), false);
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(field)));
}
PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
if (primitiveColumnIO.getType().getRepetition() == REPEATED && isTopLevel) {
throw new TrinoException(NOT_SUPPORTED, format("Unsupported Trino column type (%s) for Parquet column (%s)", type, primitiveColumnIO.getColumnDescriptor()));
}
return Optional.of(new PrimitiveField(type, required, primitiveColumnIO.getColumnDescriptor(), primitiveColumnIO.getId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private static <T> ColumnReader createColumnReader(

private static boolean isFlatColumn(PrimitiveField field)
{
return field.getDescriptor().getPath().length == 1;
return field.getDescriptor().getPath().length == 1 && field.getRepetitionLevel() == 0;
}

private static boolean isLogicalUuid(LogicalTypeAnnotation annotation)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.reader;

import io.trino.parquet.PrimitiveField;
import io.trino.parquet.reader.flat.FlatColumnReader;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.PrimitiveType;
import org.testng.annotations.Test;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.spi.type.IntegerType.INTEGER;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.joda.time.DateTimeZone.UTC;

public class TestColumnReaderFactory
{
@Test
public void testTopLevelPrimitiveFields()
{
ColumnReaderFactory columnReaderFactory = new ColumnReaderFactory(UTC);
PrimitiveType primitiveType = new PrimitiveType(OPTIONAL, INT32, "test");

PrimitiveField topLevelRepeatedPrimitiveField = new PrimitiveField(
INTEGER,
true,
new ColumnDescriptor(new String[] {"topLevelRepeatedPrimitiveField test"}, primitiveType, 1, 1),
0);
assertThat(columnReaderFactory.create(topLevelRepeatedPrimitiveField, newSimpleAggregatedMemoryContext())).isInstanceOf(NestedColumnReader.class);

PrimitiveField topLevelOptionalPrimitiveField = new PrimitiveField(
INTEGER,
false,
new ColumnDescriptor(new String[] {"topLevelRequiredPrimitiveField test"}, primitiveType, 0, 1),
0);
assertThat(columnReaderFactory.create(topLevelOptionalPrimitiveField, newSimpleAggregatedMemoryContext())).isInstanceOf(FlatColumnReader.class);

PrimitiveField topLevelRequiredPrimitiveField = new PrimitiveField(
INTEGER,
true,
new ColumnDescriptor(new String[] {"topLevelRequiredPrimitiveField test"}, primitiveType, 0, 0),
0);
assertThat(columnReaderFactory.create(topLevelRequiredPrimitiveField, newSimpleAggregatedMemoryContext())).isInstanceOf(FlatColumnReader.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,28 @@
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.LazyBlock;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.metrics.Count;
import io.trino.spi.metrics.Metric;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.Type;
import io.trino.testing.TestingConnectorSession;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,7 +56,9 @@
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestParquetReader
{
Expand Down Expand Up @@ -147,4 +156,57 @@ public void testEmptyRowRangesWithColumnIndex()
.isGreaterThanOrEqualTo(parquetMetadata.getBlocks().get(0).getRowCount());
}
}

@Test
public void testBackwardsCompatibleRepeatedStringField()
throws Exception
{
File parquetFile = new File(Resources.getResource("parquet_repeated_primitives/string/old-repeated-string.parquet").toURI());
List<List<String>> expectedValues = ImmutableList.of(Arrays.asList("hello", "world"), Arrays.asList("good", "bye"), Arrays.asList("one", "two", "three"));
testReadingOldParquetFiles(parquetFile, ImmutableList.of("myString"), new ArrayType(VARCHAR), expectedValues);
}

@Test
public void testBackwardsCompatibleRepeatedIntegerField()
throws Exception
{
File parquetFile = new File(Resources.getResource("parquet_repeated_primitives/int/old-repeated-int.parquet").toURI());
List<List<Integer>> expectedValues = ImmutableList.of(Arrays.asList(1, 2, 3));
testReadingOldParquetFiles(parquetFile, ImmutableList.of("repeatedInt"), new ArrayType(INTEGER), expectedValues);
}

@Test
public void testBackwardsCompatibleRepeatedPrimitiveFieldDefinedAsPrimitive()
{
assertThatThrownBy(() -> {
File parquetFile = new File(Resources.getResource("parquet_repeated_primitives/int/old-repeated-int.parquet").toURI());
List<List<Integer>> expectedValues = ImmutableList.of(Arrays.asList(1, 2, 3));
testReadingOldParquetFiles(parquetFile, ImmutableList.of("repeatedInt"), INTEGER, expectedValues);
}).hasMessage("Unsupported Trino column type (integer) for Parquet column ([repeatedint] repeated int32 repeatedint)")
.isInstanceOf(TrinoException.class);
}

private void testReadingOldParquetFiles(File file, List<String> columnNames, Type columnType, List<?> expectedValues)
throws IOException
{
ParquetDataSource dataSource = new FileParquetDataSource(
file,
new ParquetReaderOptions());
ConnectorSession session = TestingConnectorSession.builder().build();
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
try (ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), ImmutableList.of(columnType), columnNames)) {
Page page = reader.nextPage();
Iterator<?> expected = expectedValues.iterator();
while (page != null) {
Block block = page.getBlock(0);
for (int i = 0; i < block.getPositionCount(); i++) {
assertThat(columnType.getObjectValue(session, block, i)).isEqualTo(expected.next());
}
page = reader.nextPage();
}
assertThat(expected.hasNext())
.describedAs("Read fewer values than expected")
.isFalse();
}
}
}
Binary file not shown.
Binary file not shown.