Skip to content

Commit

Permalink
Fixed apache#12 - Support selected vector with ORC reader on the row …
Browse files Browse the repository at this point in the history
…and batch reader
  • Loading branch information
pavibhai committed Mar 23, 2023
1 parent 82528f2 commit 3a7db82
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 28 deletions.
6 changes: 4 additions & 2 deletions orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,10 @@ public T next() {
nextRow = 0;
this.reader.setBatchContext(nextBatch.second());
}

return this.reader.read(current, nextRow++);
// If selected is in use then the row ids should be determined from the selected vector
int rowId = current.isSelectedInUse() ? current.selected[nextRow] : nextRow;
nextRow++;
return this.reader.read(current, rowId);
}

@Override
Expand Down
156 changes: 156 additions & 0 deletions orc/src/test/java/org/apache/iceberg/orc/TestOrcDataReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.orc;

import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.orc.OrcConf;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestOrcDataReader {
@ClassRule public static TemporaryFolder temp = new TemporaryFolder();

private static final Schema SCHEMA =
new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "binary", Types.BinaryType.get()));
private static DataFile dataFile;
private static OutputFile outputFile;

@BeforeClass
public static void createDataFile() throws IOException {
GenericRecord bufferRecord = GenericRecord.create(SCHEMA);

ImmutableList.Builder<Record> builder = ImmutableList.builder();
builder.add(bufferRecord.copy(ImmutableMap.of("id", 1L, "data", "a")));
builder.add(bufferRecord.copy(ImmutableMap.of("id", 2L, "data", "b")));
builder.add(bufferRecord.copy(ImmutableMap.of("id", 3L, "data", "c")));
builder.add(bufferRecord.copy(ImmutableMap.of("id", 4L, "data", "d")));
builder.add(bufferRecord.copy(ImmutableMap.of("id", 5L, "data", "e")));

outputFile = Files.localOutput(File.createTempFile("test", ".orc", temp.getRoot()));

DataWriter<Record> dataWriter =
ORC.writeData(outputFile)
.schema(SCHEMA)
.createWriterFunc(GenericOrcWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();

try {
for (Record record : builder.build()) {
dataWriter.write(record);
}
} finally {
dataWriter.close();
}

dataFile = dataWriter.toDataFile();
}

@Test
public void testWrite() {
Assert.assertEquals("Format should be ORC", FileFormat.ORC, dataFile.format());
Assert.assertEquals("Should be data file", FileContent.DATA, dataFile.content());
Assert.assertEquals("Record count should match", 5, dataFile.recordCount());
Assert.assertEquals("Partition should be empty", 0, dataFile.partition().size());
Assert.assertNull("Key metadata should be null", dataFile.keyMetadata());
}

private void validateAllRecords(List<Record> records) {
Assert.assertEquals(5, records.size());
long id = 1;
char data = 'a';
for (Record record : records) {
Assert.assertEquals(id, record.getField("id"));
id++;
Assert.assertEquals(data, ((String) record.getField("data")).charAt(0));
data++;
}
}

@Test
public void testRowReader() throws IOException {
try (CloseableIterable<Record> reader =
ORC.read(outputFile.toInputFile())
.project(SCHEMA)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(SCHEMA, fileSchema))
.filter(Expressions.and(Expressions.notNull("data"), Expressions.equal("id", 3L)))
.build()) {
validateAllRecords(Lists.newArrayList(reader));
}
}

@Test
public void testRowReaderWithFilter() throws IOException {
try (CloseableIterable<Record> reader =
ORC.read(outputFile.toInputFile())
.project(SCHEMA)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(SCHEMA, fileSchema))
.filter(Expressions.and(Expressions.notNull("data"), Expressions.equal("id", 3L)))
.config(OrcConf.ALLOW_SARG_TO_FILTER.name(), String.valueOf(true))
.build()) {
validateAllRecords(Lists.newArrayList(reader));
}
}

@Test
public void testRowReaderWithFilterWithSelected() throws IOException {
List<Record> writtenRecords;
try (CloseableIterable<Record> reader =
ORC.read(outputFile.toInputFile())
.project(SCHEMA)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(SCHEMA, fileSchema))
.filter(Expressions.and(Expressions.notNull("data"), Expressions.equal("id", 3L)))
.config(OrcConf.ALLOW_SARG_TO_FILTER.getAttribute(), String.valueOf(true))
.config(OrcConf.READER_USE_SELECTED.getAttribute(), String.valueOf(true))
.build()) {
writtenRecords = Lists.newArrayList(reader);
}

Assert.assertEquals(1, writtenRecords.size());
Assert.assertEquals(3L, writtenRecords.get(0).get(0));
Assert.assertEquals("c", writtenRecords.get(0).get(1));
}
}
2 changes: 2 additions & 0 deletions spark/v3.3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
exclude group: 'org.roaringbitmap'
exclude group: 'org.apache.orc'
}

implementation("org.apache.parquet:parquet-column")
Expand Down Expand Up @@ -146,6 +147,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
exclude group: 'org.roaringbitmap'
exclude group: 'org.apache.orc'
}

testImplementation project(path: ':iceberg-data')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public ColumnarBatch read(VectorizedRowBatch batch) {
BaseOrcColumnVector cv =
(BaseOrcColumnVector)
converter.convert(
new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile);
new StructColumnVector(batch.size, batch.cols),
batch.size,
batchOffsetInFile,
batch.selectedInUse,
batch.selected);
ColumnarBatch columnarBatch =
new ColumnarBatch(
IntStream.range(0, expectedSchema.columns().size())
Expand All @@ -82,7 +86,9 @@ private interface Converter {
ColumnVector convert(
org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector,
int batchSize,
long batchOffsetInFile);
long batchOffsetInFile,
boolean isSelectedInUse,
int[] selected);
}

private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> {
Expand Down Expand Up @@ -154,22 +160,36 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit
default:
throw new IllegalArgumentException("Unhandled type " + primitive);
}
return (columnVector, batchSize, batchOffsetInFile) ->
return (columnVector, batchSize, batchOffsetInFile, isSelectedInUse, selected) ->
new PrimitiveOrcColumnVector(
iPrimitive, batchSize, columnVector, primitiveValueReader, batchOffsetInFile);
iPrimitive,
batchSize,
columnVector,
primitiveValueReader,
batchOffsetInFile,
isSelectedInUse,
selected);
}
}

private abstract static class BaseOrcColumnVector extends ColumnVector {
private final org.apache.orc.storage.ql.exec.vector.ColumnVector vector;
private final int batchSize;
private final boolean isSelectedInUse;
private final int[] selected;
private Integer numNulls;

BaseOrcColumnVector(
Type type, int batchSize, org.apache.orc.storage.ql.exec.vector.ColumnVector vector) {
Type type,
int batchSize,
org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
boolean isSelectedInUse,
int[] selected) {
super(SparkSchemaUtil.convert(type));
this.vector = vector;
this.batchSize = batchSize;
this.isSelectedInUse = isSelectedInUse;
this.selected = selected;
}

@Override
Expand Down Expand Up @@ -209,7 +229,8 @@ private int numNullsHelper() {
}

protected int getRowIndex(int rowId) {
return vector.isRepeating ? 0 : rowId;
int row = isSelectedInUse ? selected[rowId] : rowId;
return vector.isRepeating ? 0 : row;
}

@Override
Expand Down Expand Up @@ -293,54 +314,56 @@ private static class PrimitiveOrcColumnVector extends BaseOrcColumnVector {
int batchSize,
org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
OrcValueReader<?> primitiveValueReader,
long batchOffsetInFile) {
super(type, batchSize, vector);
long batchOffsetInFile,
boolean isSelectedInUse,
int[] selected) {
super(type, batchSize, vector, isSelectedInUse, selected);
this.vector = vector;
this.primitiveValueReader = primitiveValueReader;
this.batchOffsetInFile = batchOffsetInFile;
}

@Override
public boolean getBoolean(int rowId) {
return (Boolean) primitiveValueReader.read(vector, rowId);
return (Boolean) primitiveValueReader.read(vector, getRowIndex(rowId));
}

@Override
public int getInt(int rowId) {
return (Integer) primitiveValueReader.read(vector, rowId);
return (Integer) primitiveValueReader.read(vector, getRowIndex(rowId));
}

@Override
public long getLong(int rowId) {
return (Long) primitiveValueReader.read(vector, rowId);
return (Long) primitiveValueReader.read(vector, getRowIndex(rowId));
}

@Override
public float getFloat(int rowId) {
return (Float) primitiveValueReader.read(vector, rowId);
return (Float) primitiveValueReader.read(vector, getRowIndex(rowId));
}

@Override
public double getDouble(int rowId) {
return (Double) primitiveValueReader.read(vector, rowId);
return (Double) primitiveValueReader.read(vector, getRowIndex(rowId));
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
// TODO: Is it okay to assume that (precision,scale) parameters == (precision,scale) of the
// decimal type
// and return a Decimal with (precision,scale) of the decimal type?
return (Decimal) primitiveValueReader.read(vector, rowId);
return (Decimal) primitiveValueReader.read(vector, getRowIndex(rowId));
}

@Override
public UTF8String getUTF8String(int rowId) {
return (UTF8String) primitiveValueReader.read(vector, rowId);
return (UTF8String) primitiveValueReader.read(vector, getRowIndex(rowId));
}

@Override
public byte[] getBinary(int rowId) {
return (byte[]) primitiveValueReader.read(vector, rowId);
return (byte[]) primitiveValueReader.read(vector, getRowIndex(rowId));
}
}

Expand All @@ -357,12 +380,14 @@ private ArrayConverter(Types.ListType listType, Converter elementConverter) {
public ColumnVector convert(
org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
int batchSize,
long batchOffsetInFile) {
long batchOffsetInFile,
boolean isSelectedInUse,
int[] selected) {
ListColumnVector listVector = (ListColumnVector) vector;
ColumnVector elementVector =
elementConverter.convert(listVector.child, batchSize, batchOffsetInFile);
elementConverter.convert(listVector.child, batchSize, batchOffsetInFile, false, null);

return new BaseOrcColumnVector(listType, batchSize, vector) {
return new BaseOrcColumnVector(listType, batchSize, vector, isSelectedInUse, selected) {
@Override
public ColumnarArray getArray(int rowId) {
int index = getRowIndex(rowId);
Expand All @@ -388,13 +413,16 @@ private MapConverter(Types.MapType mapType, Converter keyConverter, Converter va
public ColumnVector convert(
org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
int batchSize,
long batchOffsetInFile) {
long batchOffsetInFile,
boolean isSelectedInUse,
int[] selected) {
MapColumnVector mapVector = (MapColumnVector) vector;
ColumnVector keyVector = keyConverter.convert(mapVector.keys, batchSize, batchOffsetInFile);
ColumnVector keyVector =
keyConverter.convert(mapVector.keys, batchSize, batchOffsetInFile, false, null);
ColumnVector valueVector =
valueConverter.convert(mapVector.values, batchSize, batchOffsetInFile);
valueConverter.convert(mapVector.values, batchSize, batchOffsetInFile, false, null);

return new BaseOrcColumnVector(mapType, batchSize, vector) {
return new BaseOrcColumnVector(mapType, batchSize, vector, isSelectedInUse, selected) {
@Override
public ColumnarMap getMap(int rowId) {
int index = getRowIndex(rowId);
Expand Down Expand Up @@ -426,7 +454,9 @@ private StructConverter(
public ColumnVector convert(
org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
int batchSize,
long batchOffsetInFile) {
long batchOffsetInFile,
boolean isSelectedInUse,
int[] selected) {
StructColumnVector structVector = (StructColumnVector) vector;
List<Types.NestedField> fields = structType.fields();
List<ColumnVector> fieldVectors = Lists.newArrayListWithExpectedSize(fields.size());
Expand All @@ -443,12 +473,17 @@ public ColumnVector convert(
fieldVectors.add(
fieldConverters
.get(vectorIndex)
.convert(structVector.fields[vectorIndex], batchSize, batchOffsetInFile));
.convert(
structVector.fields[vectorIndex],
batchSize,
batchOffsetInFile,
isSelectedInUse,
selected));
vectorIndex++;
}
}

return new BaseOrcColumnVector(structType, batchSize, vector) {
return new BaseOrcColumnVector(structType, batchSize, vector, isSelectedInUse, selected) {
@Override
public ColumnVector getChild(int ordinal) {
return fieldVectors.get(ordinal);
Expand Down

0 comments on commit 3a7db82

Please sign in to comment.