New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Native Avro File Reader #17221
Native Avro File Reader #17221
Conversation
1e57d63
to
afde27b
Compare
plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveFormatsConfig.java
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHivePageSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHivePageSourceFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHivePageSourceFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHivePageSourceFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHivePageSourceFactory.java
Outdated
Show resolved
Hide resolved
5033fb9
to
487d3a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still reviewing but here are some initial comments
...rmats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java
Outdated
Show resolved
Hide resolved
...rmats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java
Outdated
Show resolved
Hide resolved
} | ||
case ENUM -> VarcharType.VARCHAR; | ||
case ARRAY -> new ArrayType(typeFromAvro(schema.getElementType(), avroTypeManager, enclosingRecords)); | ||
case MAP -> new MapType(VarcharType.VARCHAR, typeFromAvro(schema.getValueType(), avroTypeManager, enclosingRecords), new TypeOperators()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should to use the TypeOperators
from ConnectorContext.getTypeManager().getTypeOperators()
, but if that is too difficult you could create a static final. The TypeOperators are effectiely generating code with method handles so if you are constantly creating new instances, the code will always be cold (a.k.a., slow)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might need some help wiring this up the way you're envisioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reminder we need to resolve this one
@@ -113,6 +123,13 @@ | |||
<scope>test</scope> | |||
</dependency> | |||
|
|||
<dependency> | |||
<groupId>io.trino</groupId> | |||
<artifactId>trino-main</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this dependency? I would prefer if we did not need this in this module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im using it for io.trino.block.BlockAssertions
can I move that class to SPI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should I make own version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move it to the SPI. When you do that, switch the TestNG assertEquals
usage to AssertJ, since we're trying to move away from TestNG in new code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting the rolling IntelliJ errors on the refactor. I'll ping you offline on what you think we should do here.
Field ColorType.COLOR, referenced in method BlockAssertions.createColorRepeatBlock(int, int), will not be accessible in module trino-spi Field TestingConnectorSession.SESSION, referenced in method BlockAssertions.getOnlyValue(Type, Block), will not be accessible in module trino-spi Field IpAddressType.IPADDRESS, referenced in method BlockAssertions.createRandomBlockForType(Type, int, float), will not be accessible in module trino-spi
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFilePageIterator.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFilePageIterator.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroTypeException.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroTypeUtils.java
Outdated
Show resolved
Hide resolved
5feca4b
to
d101b9e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed Add Native Avro to Page code with connector defined mappings
commit. My comments are mostly stylistic, and the code looks good.
I am concerned about the testing. Most of the tests in this commit are just verifying the correct number of rows and that no exceptions were thrown. I think we need basic tests for every type and a few complex combinations of types.
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
Outdated
Show resolved
Hide resolved
...rmats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java
Outdated
Show resolved
Hide resolved
...rmats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java
Outdated
Show resolved
Hide resolved
protected static TrinoInputFile createWrittenFileWithSchema(int count, Schema schema) | ||
throws IOException | ||
{ | ||
Iterator<Object> randomData = new RandomData(schema, count).iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really nice
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroTypeManager.java
Outdated
Show resolved
Hide resolved
...rmats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithTypeManagement.java
Outdated
Show resolved
Hide resolved
45aa870
to
9752755
Compare
for (String columnName : maskedColumns) { | ||
Schema.Field field = tableSchema.getField(columnName); | ||
if (Objects.isNull(field)) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check
} | ||
|
||
private static BlockBuildingDecoder createBlockBuildingDecoderForAction(Resolver.Action action, AvroTypeManager typeManager) | ||
throws AvroTypeException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably wouldn't recommend using the resolver. The only thing that needs to be resolved is the order to read struct fields, skip, and then how to fill in defaults. That's actually pretty simple and can be done directly. Check out our implementation in Python.
Basically, that creates a list of (optional position, reader) pairs by doing the following:
- Loop over the file schema's record and create a pair for every field
- If there is a corresponding field in the read schema, use that field's position from the read schema
- If the field is not in the read schema, use empty
- Loop over the read schema and add any needed pairs
- If the field was in the file schema, skip it
- If the field was missing, create a pair from the field's position in the read schema and a constant reader with the field's default
Then to read, you just loop over those pairs. If the position is present, get the value from the reader and set that position in the record. If the position is not present, call skip to consume and discard the bytes.
I think this is overall way cleaner, although I like what you're doing here and using the resolver to create your reader tree, rather than resolving in each record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's a good point. I definitely don't love the data model of the Actions returned for this or other use cases. When I was implementing I was just kinda peeling back layers of Generic Datum Reader and FastDatumReader until I didn't need to anymore and just pretty much copied the use from those classes. The reason I might keep using it is that is also provides Reader and Writer Union resolution steps that I originally wan't super confident on implementing but need (I imagine I could but would need to think on it, I think it would involve load bearing exceptions). From my understanding after looking at the pyiceberg code, Iceberg doesn't support non-trivial(optional) unions and doesn't need to do union resolution correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to address this now or in a follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll ping @rdblue offline and we'll get this sorted.
e5dd970
to
e5f4d4c
Compare
Capitalize the commit titles: https://cbea.ms/git-commit/ |
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private static BlockBuildingDecoder createBlockBuildingDecoderForAction(Resolver.Action action, AvroTypeManager typeManager) | ||
throws AvroTypeException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to address this now or in a follow up?
{ | ||
BlockBuilder entryBuilder = builder.beginBlockEntry(); | ||
long entriesInBlock = decoder.readMapStart(); | ||
// TODO need to filter out all but last value for key? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if we encounter a file with duplicate keys?
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroTypeException.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroTypeManager.java
Outdated
Show resolved
Hide resolved
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroTypeUtils.java
Outdated
Show resolved
Hide resolved
logicalType = fromSchemaIgnoreInvalid(schema); | ||
break; | ||
case LOCAL_TIMESTAMP_MICROS + LOCAL_TIMESTAMP_MILLIS: | ||
log.debug("Logical type " + typeName + " not currently supported by by Trino"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something we plan to support? I'm wondering how the log message might be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would actually like to talk with you about this. I want to make sure I understand our TimestampWithTimeZoneType
enough to know if maps directly to this logical type or can be coerced in a way that makes sense.
plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java
Show resolved
Hide resolved
e5f4d4c
to
13183c0
Compare
13183c0
to
35e959d
Compare
Description
Add classes and utilities to replace Hive library AvroSerde deserializing functionality by creating a Trino native Avro page source.
The changes are largely split into two module:
trino-hive-formats
General use avro library that will allow connectors to define custom page building code or use default behavior. Responsible for resolution of read/write schemas and file bytes decoding.trino-hive
The Hive plugins usage of the above library to ensure backwards compatibility with current implementation along with custom schema sourcing code. Includes custom solutions for handling reader projection use cases.Main features:
Main classes for review:
io.trino.hive.formats.avro.AvroPageDataReader
:org.apache.avro.io.FastReaderBuilder
andio.trino.hive.formats.line.json.JsonDeserializer
org.apache.avro.io.DatumReader
io.trino.plugin.hive.avro.AvroHiveFileUtils
: handles HiveType -> Avro Schema mapping in place oforg.apache.hadoop.hive.serde2.avro.AvroSerDe
io.trino.plugin.hive.avro.HiveAvroTypeManager
: customizes the format library with special HiveType and Timestamp transformations in a backwards compatible way. Flattens behavior currently located in a number of files into functions it supplies to the library.Additional context and related issues
This is part of a broader effort to optionalize the Trino dependency on hive/hadoop.
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text: