diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index c0a7895acbba..0fa2e795811b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -30,13 +30,17 @@ import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.data.avro.DecoderResolver; -class GenericAvroReader implements DatumReader, SupportsRowPosition { +public class GenericAvroReader implements DatumReader, SupportsRowPosition { private final Schema readSchema; private ClassLoader loader = Thread.currentThread().getContextClassLoader(); private Schema fileSchema = null; private ValueReader reader = null; + public static GenericAvroReader create(Schema schema) { + return new GenericAvroReader<>(schema); + } + GenericAvroReader(Schema readSchema) { this.readSchema = readSchema; } diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java index d6a4574dab66..421bfc9dc462 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java @@ -28,9 +28,13 @@ import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -class GenericAvroWriter implements MetricsAwareDatumWriter { +public class GenericAvroWriter implements MetricsAwareDatumWriter { private ValueWriter writer = null; + public static GenericAvroWriter create(Schema schema) { + return new GenericAvroWriter<>(schema); + } + GenericAvroWriter(Schema schema) { setSchema(schema); } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java b/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java index 7eca98a1cfe7..94fedbcfce91 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java @@ -20,22 +20,18 @@ import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Map; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.SchemaNormalization; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; import org.apache.avro.message.BadHeaderException; import org.apache.avro.message.MessageDecoder; import org.apache.avro.message.MissingSchemaException; import org.apache.avro.message.SchemaStore; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.avro.ProjectionDatumReader; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; public class IcebergDecoder extends MessageDecoder.BaseDecoder { @@ -106,7 +102,10 @@ public void addSchema(org.apache.iceberg.Schema writeSchema) { private void addSchema(Schema writeSchema) { long fp = SchemaNormalization.parsingFingerprint64(writeSchema); - decoders.put(fp, new RawDecoder<>(readSchema, writeSchema)); + RawDecoder decoder = + new RawDecoder<>( + readSchema, avroSchema -> DataReader.create(readSchema, avroSchema), writeSchema); + decoders.put(fp, decoder); } private RawDecoder getDecoder(long fp) { @@ -144,44 +143,10 @@ public D decode(InputStream stream, D reuse) throws IOException { RawDecoder decoder = getDecoder(FP_BUFFER.get().getLong(2)); - return decoder.decode(stream, reuse); - } - - private static class RawDecoder extends MessageDecoder.BaseDecoder { - private static final ThreadLocal DECODER = new ThreadLocal<>(); - - private final DatumReader reader; - - /** - * Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link - * Schema readSchema}. - * - *

The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the - * schema used to decode buffers. The {@code writeSchema} must be the schema that was used to - * encode all buffers decoded by this class. - * - * @param readSchema the schema used to construct datum instances - * @param writeSchema the schema used to decode buffers - */ - private RawDecoder(org.apache.iceberg.Schema readSchema, Schema writeSchema) { - this.reader = - new ProjectionDatumReader<>( - avroSchema -> DataReader.create(readSchema, avroSchema), - readSchema, - ImmutableMap.of(), - null); - this.reader.setSchema(writeSchema); - } - - @Override - public D decode(InputStream stream, D reuse) { - BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get()); - DECODER.set(decoder); - try { - return reader.read(reuse, decoder); - } catch (IOException e) { - throw new AvroRuntimeException("Decoding datum failed", e); - } + try { + return decoder.decode(stream, reuse); + } catch (UncheckedIOException e) { + throw new AvroRuntimeException(e); } } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java b/core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java new file mode 100644 index 000000000000..c32ea707bfab --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.message.MessageDecoder; +import org.apache.iceberg.avro.ProjectionDatumReader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class RawDecoder extends MessageDecoder.BaseDecoder { + private static final ThreadLocal DECODER = new ThreadLocal<>(); + + private final DatumReader reader; + + /** + * Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link + * Schema readSchema}. + * + *

The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the + * schema used to decode buffers. The {@code writeSchema} must be the schema that was used to + * encode all buffers decoded by this class. + */ + public RawDecoder( + org.apache.iceberg.Schema readSchema, + Function> readerFunction, + Schema writeSchema) { + this.reader = new ProjectionDatumReader<>(readerFunction, readSchema, ImmutableMap.of(), null); + this.reader.setSchema(writeSchema); + } + + @Override + public D decode(InputStream stream, D reuse) { + BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get()); + DECODER.set(decoder); + try { + return reader.read(reuse, decoder); + } catch (IOException e) { + throw new UncheckedIOException("Decoding datum failed", e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java new file mode 100644 index 000000000000..0d7ec43f6ebc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; + +class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord { + private static final byte V1 = 1; + private static final Schema SCHEMA_V1 = + new Schema( + required(0, "encryption_key", Types.BinaryType.get()), + optional(1, "aad_prefix", Types.BinaryType.get())); + private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = + AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName()); + + private static final Map schemaVersions = ImmutableMap.of(V1, SCHEMA_V1); + private static final Map avroSchemaVersions = + ImmutableMap.of(V1, AVRO_SCHEMA_V1); + + private static final KeyMetadataEncoder KEY_METADATA_ENCODER = new KeyMetadataEncoder(V1); + private static final KeyMetadataDecoder KEY_METADATA_DECODER = new KeyMetadataDecoder(V1); + + private ByteBuffer encryptionKey; + private ByteBuffer aadPrefix; + private org.apache.avro.Schema avroSchema; + + /** Used by Avro reflection to instantiate this class * */ + KeyMetadata() {} + + KeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { + this.encryptionKey = encryptionKey; + this.aadPrefix = aadPrefix; + this.avroSchema = AVRO_SCHEMA_V1; + } + + static Map supportedSchemaVersions() { + return schemaVersions; + } + + static Map supportedAvroSchemaVersions() { + return avroSchemaVersions; + } + + ByteBuffer encryptionKey() { + return encryptionKey; + } + + ByteBuffer aadPrefix() { + return aadPrefix; + } + + static KeyMetadata parse(ByteBuffer buffer) { + try { + return KEY_METADATA_DECODER.decode(buffer); + } catch (IOException e) { + throw new UncheckedIOException("Failed to parse envelope encryption metadata", e); + } + } + + @Override + public ByteBuffer buffer() { + try { + return KEY_METADATA_ENCODER.encode(this); + } catch (IOException e) { + throw new UncheckedIOException("Failed to serialize envelope key metadata", e); + } + } + + @Override + public EncryptionKeyMetadata copy() { + KeyMetadata metadata = new KeyMetadata(encryptionKey(), aadPrefix()); + return metadata; + } + + @Override + public void put(int i, Object v) { + switch (i) { + case 0: + this.encryptionKey = (ByteBuffer) v; + return; + case 1: + this.aadPrefix = (ByteBuffer) v; + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return encryptionKey; + case 1: + return aadPrefix; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public org.apache.avro.Schema getSchema() { + return avroSchema; + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java new file mode 100644 index 000000000000..674685c30164 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.message.MessageDecoder; +import org.apache.iceberg.avro.GenericAvroReader; +import org.apache.iceberg.data.avro.RawDecoder; +import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; + +class KeyMetadataDecoder extends MessageDecoder.BaseDecoder { + private final org.apache.iceberg.Schema readSchema; + private final Map> decoders = new MapMaker().makeMap(); + + /** + * Creates a new decoder that constructs key metadata instances described by schema version. + * + *

The {@code readSchemaVersion} is as used the version of the expected (read) schema. Datum + * instances created by this class will are described by the expected schema. + */ + KeyMetadataDecoder(byte readSchemaVersion) { + this.readSchema = KeyMetadata.supportedSchemaVersions().get(readSchemaVersion); + } + + @Override + public KeyMetadata decode(InputStream stream, KeyMetadata reuse) { + byte writeSchemaVersion; + + try { + writeSchemaVersion = (byte) stream.read(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to read the version byte", e); + } + + if (writeSchemaVersion < 0) { + throw new RuntimeException("Version byte - end of stream reached"); + } + + Schema writeSchema = KeyMetadata.supportedAvroSchemaVersions().get(writeSchemaVersion); + + if (writeSchema == null) { + throw new UnsupportedOperationException( + "Cannot resolve schema for version: " + writeSchemaVersion); + } + + RawDecoder decoder = decoders.get(writeSchemaVersion); + + if (decoder == null) { + decoder = new RawDecoder<>(readSchema, GenericAvroReader::create, writeSchema); + + decoders.put(writeSchemaVersion, decoder); + } + + return decoder.decode(stream, reuse); + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java new file mode 100644 index 000000000000..faab6a47c814 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.message.MessageEncoder; +import org.apache.iceberg.avro.GenericAvroWriter; + +class KeyMetadataEncoder implements MessageEncoder { + private static final ThreadLocal TEMP = + ThreadLocal.withInitial(BufferOutputStream::new); + private static final ThreadLocal ENCODER = new ThreadLocal<>(); + + private final byte schemaVersion; + private final boolean copyOutputBytes; + private final DatumWriter writer; + + /** + * Creates a new {@link MessageEncoder} that will deconstruct {@link KeyMetadata} instances + * described by the schema version. + * + *

Buffers returned by {@code encode} are copied and will not be modified by future calls to + * {@code encode}. + */ + KeyMetadataEncoder(byte schemaVersion) { + this(schemaVersion, true); + } + + /** + * Creates a new {@link MessageEncoder} that will deconstruct {@link KeyMetadata} instances + * described by the schema version. + * + *

If {@code shouldCopy} is true, then buffers returned by {@code encode} are copied and will + * not be modified by future calls to {@code encode}. + * + *

If {@code shouldCopy} is false, then buffers returned by {@code encode} wrap a thread-local + * buffer that can be reused by future calls to {@code encode}, but may not be. Callers should + * only set {@code shouldCopy} to false if the buffer will be copied before the current thread's + * next call to {@code encode}. + */ + KeyMetadataEncoder(byte schemaVersion, boolean shouldCopy) { + Schema writeSchema = KeyMetadata.supportedAvroSchemaVersions().get(schemaVersion); + + if (writeSchema == null) { + throw new UnsupportedOperationException( + "Cannot resolve schema for version: " + schemaVersion); + } + + this.writer = GenericAvroWriter.create(writeSchema); + this.schemaVersion = schemaVersion; + this.copyOutputBytes = shouldCopy; + } + + @Override + public ByteBuffer encode(KeyMetadata datum) throws IOException { + BufferOutputStream temp = TEMP.get(); + temp.reset(); + temp.write(schemaVersion); + encode(datum, temp); + + if (copyOutputBytes) { + return temp.toBufferWithCopy(); + } else { + return temp.toBufferWithoutCopy(); + } + } + + @Override + public void encode(KeyMetadata datum, OutputStream stream) throws IOException { + BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get()); + ENCODER.set(encoder); + writer.write(datum, encoder); + encoder.flush(); + } + + private static class BufferOutputStream extends ByteArrayOutputStream { + BufferOutputStream() {} + + ByteBuffer toBufferWithoutCopy() { + return ByteBuffer.wrap(buf, 0, count); + } + + ByteBuffer toBufferWithCopy() { + return ByteBuffer.wrap(toByteArray()); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/encryption/TestKeyMetadataParser.java b/core/src/test/java/org/apache/iceberg/encryption/TestKeyMetadataParser.java new file mode 100644 index 000000000000..19836254d294 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/encryption/TestKeyMetadataParser.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.AssertHelpers; +import org.junit.Assert; +import org.junit.Test; + +public class TestKeyMetadataParser { + + @Test + public void testParser() { + ByteBuffer encryptionKey = ByteBuffer.wrap("0123456789012345".getBytes(StandardCharsets.UTF_8)); + ByteBuffer aadPrefix = ByteBuffer.wrap("1234567890123456".getBytes(StandardCharsets.UTF_8)); + KeyMetadata metadata = new KeyMetadata(encryptionKey, aadPrefix); + ByteBuffer serialized = metadata.buffer(); + + KeyMetadata parsedMetadata = KeyMetadata.parse(serialized); + Assert.assertEquals(parsedMetadata.encryptionKey(), encryptionKey); + Assert.assertEquals(parsedMetadata.aadPrefix(), aadPrefix); + } + + @Test + public void testUnsupportedVersion() { + ByteBuffer badBuffer = ByteBuffer.wrap(new byte[] {0x02}); + AssertHelpers.assertThrows( + "Should throw when attempting to parse a buffer with wrong key metadata version", + UnsupportedOperationException.class, + "Cannot resolve schema for version", + () -> KeyMetadata.parse(badBuffer)); + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestSingleMessageEncoding.java b/data/src/test/java/org/apache/iceberg/data/avro/TestSingleMessageEncoding.java index ce85e8a90519..1b8da1eafc67 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestSingleMessageEncoding.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestSingleMessageEncoding.java @@ -208,7 +208,7 @@ public void testByteBufferMissingPayload() throws Exception { Assertions.assertThatThrownBy(() -> decoder.decode(buffer)) .isInstanceOf(AvroRuntimeException.class) - .hasMessage("Decoding datum failed"); + .hasMessageContaining("Decoding datum failed"); } @Test