From 72fa55d75ee2bea76ed203d9687618ca417c5928 Mon Sep 17 00:00:00 2001 From: James Sun Date: Sun, 5 Nov 2017 00:56:44 -0700 Subject: [PATCH] Use VariableWidthBlock in SliceDictionaryStreamReader SliceArrayBlock has Slice instance size for each element in the array. This can cause huge overhead (56 byte per slice) if the base of a slice is relatively small in size. Replace SliceArrayBlock with VariableWidthBlock. --- .../reader/SliceDictionaryStreamReader.java | 121 ++++++++++++------ 1 file changed, 81 insertions(+), 40 deletions(-) diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/reader/SliceDictionaryStreamReader.java b/presto-orc/src/main/java/com/facebook/presto/orc/reader/SliceDictionaryStreamReader.java index d93cc826ce8e..be1696b9e67f 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/reader/SliceDictionaryStreamReader.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/reader/SliceDictionaryStreamReader.java @@ -24,7 +24,7 @@ import com.facebook.presto.orc.stream.RowGroupDictionaryLengthInputStream; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.DictionaryBlock; -import com.facebook.presto.spi.block.SliceArrayBlock; +import com.facebook.presto.spi.block.VariableWidthBlock; import com.facebook.presto.spi.type.Type; import io.airlift.slice.Slice; import io.airlift.slice.Slices; @@ -43,17 +43,20 @@ import static com.facebook.presto.orc.metadata.Stream.StreamKind.PRESENT; import static com.facebook.presto.orc.metadata.Stream.StreamKind.ROW_GROUP_DICTIONARY; import static com.facebook.presto.orc.metadata.Stream.StreamKind.ROW_GROUP_DICTIONARY_LENGTH; +import static com.facebook.presto.orc.reader.SliceStreamReader.computeTruncatedLength; import static com.facebook.presto.orc.stream.MissingInputStreamSource.missingStreamSource; -import static com.facebook.presto.spi.type.Chars.isCharType; -import static com.facebook.presto.spi.type.Chars.truncateToLengthAndTrimSpaces; -import static com.facebook.presto.spi.type.Varchars.isVarcharType; -import static com.facebook.presto.spi.type.Varchars.truncateToLength; import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Verify.verify; +import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; public class SliceDictionaryStreamReader implements StreamReader { + private static final byte[] EMPTY_DICTIONARY_DATA = new byte[0]; + // add one extra entry for null after strip/rowGroup dictionary + private static final int[] EMPTY_DICTIONARY_OFFSETS = new int[2]; + private final StreamDescriptor streamDescriptor; private int readOffset; @@ -70,9 +73,14 @@ public class SliceDictionaryStreamReader private boolean stripeDictionaryOpen; private int stripeDictionarySize; @Nonnull - private Slice[] stripeDictionary = new Slice[1]; + private int[] stripeDictionaryLength = new int[0]; + @Nonnull + private byte[] stripeDictionaryData = EMPTY_DICTIONARY_DATA; + @Nonnull + private int[] stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS; - private SliceArrayBlock dictionaryBlock = new SliceArrayBlock(stripeDictionary.length, stripeDictionary, true); + private VariableWidthBlock dictionaryBlock = new VariableWidthBlock(1, Slices.wrappedBuffer(EMPTY_DICTIONARY_DATA), EMPTY_DICTIONARY_OFFSETS, new boolean[]{true}); + private byte[] currentDictionaryData = EMPTY_DICTIONARY_DATA; @Nonnull private InputStreamSource stripeDictionaryLengthStreamSource = missingStreamSource(LongInputStream.class); @@ -85,8 +93,6 @@ public class SliceDictionaryStreamReader @Nonnull private InputStreamSource rowGroupDictionaryDataStreamSource = missingStreamSource(ByteArrayInputStream.class); - @Nonnull - private Slice[] rowGroupDictionary = new Slice[0]; @Nonnull private InputStreamSource rowGroupDictionaryLengthStreamSource = missingStreamSource(RowGroupDictionaryLengthInputStream.class); @@ -191,12 +197,16 @@ else if (inDictionary[i]) { return block; } - private void setDictionaryBlockData(Slice[] dictionary) + private void setDictionaryBlockData(byte[] dictionaryData, int[] dictionaryOffsets, int positionCount) { + verify(positionCount > 0); // only update the block if the array changed to prevent creation of new Block objects, since // the engine currently uses identity equality to test if dictionaries are the same - if (dictionaryBlock.getValues() != dictionary) { - dictionaryBlock = new SliceArrayBlock(dictionary.length, dictionary, true); + if (currentDictionaryData != dictionaryData) { + boolean[] isNullVector = new boolean[positionCount]; + isNullVector[isNullVector.length - 1] = true; + dictionaryBlock = new VariableWidthBlock(positionCount, Slices.wrappedBuffer(dictionaryData), dictionaryOffsets, isNullVector); + currentDictionaryData = dictionaryData; } } @@ -205,22 +215,36 @@ private void openRowGroup(Type type) { // read the dictionary if (!stripeDictionaryOpen) { - // We must always create a new dictionary array because the previous dictionary may still be referenced - // add one extra entry for null - stripeDictionary = new Slice[stripeDictionarySize + 1]; if (stripeDictionarySize > 0) { - int[] dictionaryLength = new int[stripeDictionarySize]; + // resize the dictionary lengths array if necessary + if (stripeDictionaryLength.length < stripeDictionarySize) { + stripeDictionaryLength = new int[stripeDictionarySize]; + } // read the lengths LongInputStream lengthStream = stripeDictionaryLengthStreamSource.openStream(); if (lengthStream == null) { throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Dictionary is not empty but dictionary length stream is not present"); } - lengthStream.nextIntVector(stripeDictionarySize, dictionaryLength); + lengthStream.nextIntVector(stripeDictionarySize, stripeDictionaryLength); + + long length = 0; + for (int i = 0; i < stripeDictionarySize; i++) { + length += stripeDictionaryLength[i]; + } + + // we must always create a new dictionary array because the previous dictionary may still be referenced + stripeDictionaryData = new byte[toIntExact(length)]; + // add one extra entry for null + stripeDictionaryOffsetVector = new int[stripeDictionarySize + 2]; // read dictionary values ByteArrayInputStream dictionaryDataStream = stripeDictionaryDataStreamSource.openStream(); - readDictionary(dictionaryDataStream, stripeDictionarySize, dictionaryLength, 0, stripeDictionary, type); + readDictionary(dictionaryDataStream, stripeDictionarySize, stripeDictionaryLength, 0, stripeDictionaryData, stripeDictionaryOffsetVector, type); + } + else { + stripeDictionaryData = EMPTY_DICTIONARY_DATA; + stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS; } } stripeDictionaryOpen = true; @@ -230,11 +254,6 @@ private void openRowGroup(Type type) if (dictionaryLengthStream != null) { int rowGroupDictionarySize = dictionaryLengthStream.getEntryCount(); - // We must always create a new dictionary array because the previous dictionary may still be referenced - // The first elements of the dictionary are from the stripe dictionary, then the row group dictionary elements, and then a null - rowGroupDictionary = Arrays.copyOf(stripeDictionary, stripeDictionarySize + rowGroupDictionarySize + 1); - setDictionaryBlockData(rowGroupDictionary); - // resize the dictionary lengths array if necessary if (rowGroupDictionaryLength.length < rowGroupDictionarySize) { rowGroupDictionaryLength = new int[rowGroupDictionarySize]; @@ -242,14 +261,24 @@ private void openRowGroup(Type type) // read the lengths dictionaryLengthStream.nextIntVector(rowGroupDictionarySize, rowGroupDictionaryLength); + long length = 0; + for (int i = 0; i < rowGroupDictionarySize; i++) { + length += rowGroupDictionaryLength[i]; + } + + // We must always create a new dictionary array because the previous dictionary may still be referenced + // The first elements of the dictionary are from the stripe dictionary, then the row group dictionary elements, and then a null + byte[] rowGroupDictionaryData = Arrays.copyOf(stripeDictionaryData, stripeDictionaryOffsetVector[stripeDictionarySize] + toIntExact(length)); + int[] rowGroupDictionaryOffsetVector = Arrays.copyOf(stripeDictionaryOffsetVector, stripeDictionarySize + rowGroupDictionarySize + 2); // read dictionary values ByteArrayInputStream dictionaryDataStream = rowGroupDictionaryDataStreamSource.openStream(); - readDictionary(dictionaryDataStream, rowGroupDictionarySize, rowGroupDictionaryLength, stripeDictionarySize, rowGroupDictionary, type); + readDictionary(dictionaryDataStream, rowGroupDictionarySize, rowGroupDictionaryLength, stripeDictionarySize, rowGroupDictionaryData, rowGroupDictionaryOffsetVector, type); + setDictionaryBlockData(rowGroupDictionaryData, rowGroupDictionaryOffsetVector, stripeDictionarySize + rowGroupDictionarySize + 1); } else { // there is no row group dictionary so use the stripe dictionary - setDictionaryBlockData(stripeDictionary); + setDictionaryBlockData(stripeDictionaryData, stripeDictionaryOffsetVector, stripeDictionarySize + 1); } presentStream = presentStreamSource.openStream(); @@ -262,28 +291,40 @@ private void openRowGroup(Type type) private static void readDictionary( @Nullable ByteArrayInputStream dictionaryDataStream, int dictionarySize, - int[] dictionaryLength, - int dictionaryOutputOffset, - Slice[] dictionary, + int[] dictionaryLengthVector, + int offsetVectorOffset, + byte[] data, + int[] offsetVector, Type type) throws IOException { - // build dictionary slices + Slice slice = Slices.wrappedBuffer(data); + + // initialize the offset if necessary; + // otherwise, use the previous offset + if (offsetVectorOffset == 0) { + offsetVector[0] = 0; + } + + // truncate string and update offsets for (int i = 0; i < dictionarySize; i++) { - int length = dictionaryLength[i]; - if (length == 0) { - dictionary[dictionaryOutputOffset + i] = Slices.EMPTY_SLICE; + int offsetIndex = offsetVectorOffset + i; + int offset = offsetVector[offsetIndex]; + int length = dictionaryLengthVector[i]; + + int truncatedLength; + if (length > 0) { + // read data without truncation + dictionaryDataStream.next(data, offset, offset + length); + + // adjust offsets with truncated length + truncatedLength = computeTruncatedLength(slice, offset, length, type); + verify(truncatedLength >= 0); } else { - Slice value = Slices.wrappedBuffer(dictionaryDataStream.next(length)); - if (isVarcharType(type)) { - value = truncateToLength(value, type); - } - if (isCharType(type)) { - value = truncateToLengthAndTrimSpaces(value, type); - } - dictionary[dictionaryOutputOffset + i] = value; + truncatedLength = 0; } + offsetVector[offsetIndex + 1] = offsetVector[offsetIndex] + truncatedLength; } }