Skip to content

Commit

Permalink
Support Iceberg UUIDs in ORC complex types
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and findepi committed Dec 2, 2021
1 parent 5f62269 commit db863b1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 14 deletions.
Expand Up @@ -64,8 +64,12 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -144,18 +148,21 @@ public class IcebergPageSourceProvider
private final FileFormatDataSourceStats fileFormatDataSourceStats;
private final OrcReaderOptions orcReaderOptions;
private final ParquetReaderOptions parquetReaderOptions;
private final TypeManager typeManager;

@Inject
public IcebergPageSourceProvider(
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats fileFormatDataSourceStats,
OrcReaderConfig orcReaderConfig,
ParquetReaderConfig parquetReaderConfig)
ParquetReaderConfig parquetReaderConfig,
TypeManager typeManager)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
this.orcReaderOptions = requireNonNull(orcReaderConfig, "orcReaderConfig is null").toOrcReaderOptions();
this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
Expand Down Expand Up @@ -248,7 +255,8 @@ private ReaderPageSource createDataPageSource(
.withLazyReadSmallRanges(getOrcLazyReadSmallRanges(session))
.withNestedLazy(isOrcNestedLazy(session))
.withBloomFiltersEnabled(isOrcBloomFiltersEnabled(session)),
fileFormatDataSourceStats);
fileFormatDataSourceStats,
typeManager);
case PARQUET:
return createParquetPageSource(
hdfsEnvironment,
Expand Down Expand Up @@ -279,7 +287,8 @@ private static ReaderPageSource createOrcPageSource(
List<IcebergColumnHandle> columns,
TupleDomain<IcebergColumnHandle> effectivePredicate,
OrcReaderOptions options,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
TypeManager typeManager)
{
OrcDataSource orcDataSource = null;
try {
Expand Down Expand Up @@ -338,17 +347,10 @@ private static ReaderPageSource createOrcPageSource(
}

if (orcColumn != null) {
Type readType;
if (column.getType() == UUID) {
if (!"UUID".equals(orcColumn.getAttributes().get(ICEBERG_BINARY_TYPE))) {
throw new TrinoException(ICEBERG_BAD_DATA, format("Expected ORC column for UUID data to be annotated with %s=UUID: %s", ICEBERG_BINARY_TYPE, orcColumn));
}
// ORC spec doesn't have UUID
// TODO read into Int128ArrayBlock for better performance when operating on read values
readType = VARBINARY;
}
else {
readType = column.getType();
Type readType = getOrcReadType(column.getType(), typeManager);

if (column.getType() == UUID && !"UUID".equals(orcColumn.getAttributes().get(ICEBERG_BINARY_TYPE))) {
throw new TrinoException(ICEBERG_BAD_DATA, format("Expected ORC column for UUID data to be annotated with %s=UUID: %s", ICEBERG_BINARY_TYPE, orcColumn));
}

List<List<Integer>> fieldIdProjections = fileColumnsByIcebergId.isEmpty() ?
Expand Down Expand Up @@ -465,6 +467,33 @@ private static Integer getIcebergFieldId(OrcColumn column)
return Integer.valueOf(icebergId);
}

private static Type getOrcReadType(Type columnType, TypeManager typeManager)
{
if (columnType == UUID) {
// ORC spec doesn't have UUID
// TODO read into Int128ArrayBlock for better performance when operating on read values
// TODO: Validate that the OrcColumn attribute ICEBERG_BINARY_TYPE is equal to "UUID"
return VARBINARY;
}

if (columnType instanceof ArrayType) {
return new ArrayType(getOrcReadType(((ArrayType) columnType).getElementType(), typeManager));
}
if (columnType instanceof MapType) {
MapType mapType = (MapType) columnType;
Type keyType = getOrcReadType(mapType.getKeyType(), typeManager);
Type valueType = getOrcReadType(mapType.getValueType(), typeManager);
return new MapType(keyType, valueType, typeManager.getTypeOperators());
}
if (columnType instanceof RowType) {
return RowType.from(((RowType) columnType).getFields().stream()
.map(field -> new RowType.Field(field.getName(), getOrcReadType(field.getType(), typeManager)))
.collect(toImmutableList()));
}

return columnType;
}

private static class IdBasedFieldMapperFactory
implements OrcReader.FieldMapperFactory
{
Expand Down
Expand Up @@ -589,6 +589,26 @@ private void testSelectOrPartitionedByUuid(boolean partitioned)
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testNestedUuid()
{
assertUpdate("CREATE TABLE test_nested_uuid (int_t int, row_t row(uuid_t uuid, int_t int), map_t map(int, uuid), array_t array(uuid))");

String uuid = "UUID '406caec7-68b9-4778-81b2-a12ece70c8b1'";
String value = format("VALUES (2, row(%1$s, 1), map(array[1], array[%1$s]), array[%1$s, %1$s])", uuid);
assertUpdate("INSERT INTO test_nested_uuid " + value, 1);

assertThat(query("SELECT row_t.int_t, row_t.uuid_t FROM test_nested_uuid"))
.matches("VALUES (1, UUID '406caec7-68b9-4778-81b2-a12ece70c8b1')");
assertThat(query("SELECT map_t[1] FROM test_nested_uuid"))
.matches("VALUES UUID '406caec7-68b9-4778-81b2-a12ece70c8b1'");
assertThat(query("SELECT array_t FROM test_nested_uuid"))
.matches("VALUES ARRAY[UUID '406caec7-68b9-4778-81b2-a12ece70c8b1', UUID '406caec7-68b9-4778-81b2-a12ece70c8b1']");

assertQuery("SELECT row_t.int_t FROM test_nested_uuid WHERE row_t.uuid_t = UUID '406caec7-68b9-4778-81b2-a12ece70c8b1'", "VALUES 1");
assertQuery("SELECT int_t FROM test_nested_uuid WHERE row_t.uuid_t = UUID '406caec7-68b9-4778-81b2-a12ece70c8b1'", "VALUES 2");
}

@Test
public void testCreatePartitionedTable()
{
Expand Down

0 comments on commit db863b1

Please sign in to comment.