Skip to content

Commit

Permalink
Improve ORC byte reader
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Apr 11, 2019
1 parent 879fadf commit 62079a8
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.prestosql.orc.stream.InputStreamSource;
import io.prestosql.orc.stream.InputStreamSources;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.ByteArrayBlock;
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
Expand All @@ -33,11 +33,14 @@
import java.io.IOException;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.prestosql.orc.metadata.Stream.StreamKind.DATA;
import static io.prestosql.orc.metadata.Stream.StreamKind.PRESENT;
import static io.prestosql.orc.reader.ReaderUtils.minNonNullValueSize;
import static io.prestosql.orc.reader.ReaderUtils.verifyStreamType;
import static io.prestosql.orc.stream.MissingInputStreamSource.missingStreamSource;
import static io.prestosql.spi.type.TinyintType.TINYINT;
Expand All @@ -64,6 +67,8 @@ public class ByteStreamReader

private boolean rowGroupOpen;

private byte[] nonNullValueTemp = new byte[0];

private final LocalMemoryContext systemMemoryContext;

public ByteStreamReader(Type type, StreamDescriptor streamDescriptor, LocalMemoryContext systemMemoryContext)
Expand Down Expand Up @@ -99,42 +104,66 @@ public Block readBlock()
}
if (readOffset > 0) {
if (dataStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is not present");
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is missing");
}
dataStream.skip(readOffset);
}
}

if (dataStream == null && presentStream != null) {
Block block;
if (dataStream == null) {
if (presentStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is null but present stream is missing");
}
presentStream.skip(nextBatchSize);
Block nullValueBlock = RunLengthEncodedBlock.create(TINYINT, null, nextBatchSize);
readOffset = 0;
nextBatchSize = 0;
return nullValueBlock;
block = RunLengthEncodedBlock.create(TINYINT, null, nextBatchSize);
}

BlockBuilder builder = TINYINT.createBlockBuilder(null, nextBatchSize);
if (presentStream == null) {
if (dataStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is not present");
}
dataStream.nextVector(TINYINT, nextBatchSize, builder);
else if (presentStream == null) {
block = readNonNullBlock();
}
else {
for (int i = 0; i < nextBatchSize; i++) {
if (presentStream.nextBit()) {
TINYINT.writeLong(builder, dataStream.next());
}
else {
builder.appendNull();
}
boolean[] isNull = new boolean[nextBatchSize];
int nullCount = presentStream.getUnsetBits(nextBatchSize, isNull);
if (nullCount == 0) {
block = readNonNullBlock();
}
else if (nullCount != nextBatchSize) {
block = readNullBlock(isNull, nextBatchSize - nullCount);
}
else {
block = RunLengthEncodedBlock.create(TINYINT, null, nextBatchSize);
}
}

readOffset = 0;
nextBatchSize = 0;

return builder.build();
return block;
}

private Block readNonNullBlock()
throws IOException
{
verify(dataStream != null);
byte[] values = dataStream.next(nextBatchSize);
return new ByteArrayBlock(nextBatchSize, Optional.empty(), values);
}

private Block readNullBlock(boolean[] isNull, int nonNullCount)
throws IOException
{
verify(dataStream != null);
int minNonNullValueSize = minNonNullValueSize(nonNullCount);
if (nonNullValueTemp.length < minNonNullValueSize) {
nonNullValueTemp = new byte[minNonNullValueSize];
systemMemoryContext.setBytes(sizeOf(nonNullValueTemp));
}

dataStream.next(nonNullValueTemp, nonNullCount);

byte[] result = ReaderUtils.unpackByteNulls(nonNullValueTemp, isNull);

return new ByteArrayBlock(nextBatchSize, Optional.of(isNull), result);
}

private void openRowGroup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

import io.prestosql.orc.OrcCorruptionException;
import io.prestosql.orc.checkpoint.ByteStreamCheckpoint;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.type.Type;

import java.io.IOException;
import java.util.Arrays;

import static java.lang.Math.min;

public class ByteInputStream
implements ValueInputStream<ByteStreamCheckpoint>
{
Expand Down Expand Up @@ -104,7 +104,7 @@ public void skip(long items)
if (offset == length) {
readNextBlock();
}
long consume = Math.min(items, length - offset);
long consume = min(items, length - offset);
offset += consume;
items -= consume;
}
Expand All @@ -119,11 +119,31 @@ public byte next()
return buffer[offset++];
}

public void nextVector(Type type, long items, BlockBuilder builder)
public byte[] next(int items)
throws IOException
{
byte[] values = new byte[items];
next(values, items);
return values;
}

public void next(byte[] values, int items)
throws IOException
{
for (int i = 0; i < items; i++) {
type.writeLong(builder, next());
int outputOffset = 0;
while (outputOffset < items) {
if (offset == length) {
readNextBlock();
}
if (length == 0) {
throw new OrcCorruptionException(input.getOrcDataSourceId(), "Unexpected end of stream");
}

int chunkSize = min(items - outputOffset, length - offset);
System.arraycopy(buffer, offset, values, outputOffset, chunkSize);

outputOffset += chunkSize;
offset += chunkSize;
}
}
}

0 comments on commit 62079a8

Please sign in to comment.