Skip to content

Commit

Permalink
Use VariableWidthBlock in SliceDictionaryStreamReader
Browse files Browse the repository at this point in the history
SliceArrayBlock has Slice instance size for each element in the array.
This can cause huge overhead (56 byte per slice) if the base of a slice
is relatively small in size. Replace SliceArrayBlock with
VariableWidthBlock.
  • Loading branch information
highker committed Nov 9, 2017
1 parent 1959d76 commit 72fa55d
Showing 1 changed file with 81 additions and 40 deletions.
Expand Up @@ -24,7 +24,7 @@
import com.facebook.presto.orc.stream.RowGroupDictionaryLengthInputStream;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.SliceArrayBlock;
import com.facebook.presto.spi.block.VariableWidthBlock;
import com.facebook.presto.spi.type.Type;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
Expand All @@ -43,17 +43,20 @@
import static com.facebook.presto.orc.metadata.Stream.StreamKind.PRESENT;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.ROW_GROUP_DICTIONARY;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.ROW_GROUP_DICTIONARY_LENGTH;
import static com.facebook.presto.orc.reader.SliceStreamReader.computeTruncatedLength;
import static com.facebook.presto.orc.stream.MissingInputStreamSource.missingStreamSource;
import static com.facebook.presto.spi.type.Chars.isCharType;
import static com.facebook.presto.spi.type.Chars.truncateToLengthAndTrimSpaces;
import static com.facebook.presto.spi.type.Varchars.isVarcharType;
import static com.facebook.presto.spi.type.Varchars.truncateToLength;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class SliceDictionaryStreamReader
implements StreamReader
{
private static final byte[] EMPTY_DICTIONARY_DATA = new byte[0];
// add one extra entry for null after strip/rowGroup dictionary
private static final int[] EMPTY_DICTIONARY_OFFSETS = new int[2];

private final StreamDescriptor streamDescriptor;

private int readOffset;
Expand All @@ -70,9 +73,14 @@ public class SliceDictionaryStreamReader
private boolean stripeDictionaryOpen;
private int stripeDictionarySize;
@Nonnull
private Slice[] stripeDictionary = new Slice[1];
private int[] stripeDictionaryLength = new int[0];
@Nonnull
private byte[] stripeDictionaryData = EMPTY_DICTIONARY_DATA;
@Nonnull
private int[] stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;

private SliceArrayBlock dictionaryBlock = new SliceArrayBlock(stripeDictionary.length, stripeDictionary, true);
private VariableWidthBlock dictionaryBlock = new VariableWidthBlock(1, Slices.wrappedBuffer(EMPTY_DICTIONARY_DATA), EMPTY_DICTIONARY_OFFSETS, new boolean[]{true});
private byte[] currentDictionaryData = EMPTY_DICTIONARY_DATA;

@Nonnull
private InputStreamSource<LongInputStream> stripeDictionaryLengthStreamSource = missingStreamSource(LongInputStream.class);
Expand All @@ -85,8 +93,6 @@ public class SliceDictionaryStreamReader

@Nonnull
private InputStreamSource<ByteArrayInputStream> rowGroupDictionaryDataStreamSource = missingStreamSource(ByteArrayInputStream.class);
@Nonnull
private Slice[] rowGroupDictionary = new Slice[0];

@Nonnull
private InputStreamSource<RowGroupDictionaryLengthInputStream> rowGroupDictionaryLengthStreamSource = missingStreamSource(RowGroupDictionaryLengthInputStream.class);
Expand Down Expand Up @@ -191,12 +197,16 @@ else if (inDictionary[i]) {
return block;
}

private void setDictionaryBlockData(Slice[] dictionary)
private void setDictionaryBlockData(byte[] dictionaryData, int[] dictionaryOffsets, int positionCount)
{
verify(positionCount > 0);
// only update the block if the array changed to prevent creation of new Block objects, since
// the engine currently uses identity equality to test if dictionaries are the same
if (dictionaryBlock.getValues() != dictionary) {
dictionaryBlock = new SliceArrayBlock(dictionary.length, dictionary, true);
if (currentDictionaryData != dictionaryData) {
boolean[] isNullVector = new boolean[positionCount];
isNullVector[isNullVector.length - 1] = true;
dictionaryBlock = new VariableWidthBlock(positionCount, Slices.wrappedBuffer(dictionaryData), dictionaryOffsets, isNullVector);
currentDictionaryData = dictionaryData;
}
}

Expand All @@ -205,22 +215,36 @@ private void openRowGroup(Type type)
{
// read the dictionary
if (!stripeDictionaryOpen) {
// We must always create a new dictionary array because the previous dictionary may still be referenced
// add one extra entry for null
stripeDictionary = new Slice[stripeDictionarySize + 1];
if (stripeDictionarySize > 0) {
int[] dictionaryLength = new int[stripeDictionarySize];
// resize the dictionary lengths array if necessary
if (stripeDictionaryLength.length < stripeDictionarySize) {
stripeDictionaryLength = new int[stripeDictionarySize];
}

// read the lengths
LongInputStream lengthStream = stripeDictionaryLengthStreamSource.openStream();
if (lengthStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Dictionary is not empty but dictionary length stream is not present");
}
lengthStream.nextIntVector(stripeDictionarySize, dictionaryLength);
lengthStream.nextIntVector(stripeDictionarySize, stripeDictionaryLength);

long length = 0;
for (int i = 0; i < stripeDictionarySize; i++) {
length += stripeDictionaryLength[i];
}

// we must always create a new dictionary array because the previous dictionary may still be referenced
stripeDictionaryData = new byte[toIntExact(length)];
// add one extra entry for null
stripeDictionaryOffsetVector = new int[stripeDictionarySize + 2];

// read dictionary values
ByteArrayInputStream dictionaryDataStream = stripeDictionaryDataStreamSource.openStream();
readDictionary(dictionaryDataStream, stripeDictionarySize, dictionaryLength, 0, stripeDictionary, type);
readDictionary(dictionaryDataStream, stripeDictionarySize, stripeDictionaryLength, 0, stripeDictionaryData, stripeDictionaryOffsetVector, type);
}
else {
stripeDictionaryData = EMPTY_DICTIONARY_DATA;
stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
}
}
stripeDictionaryOpen = true;
Expand All @@ -230,26 +254,31 @@ private void openRowGroup(Type type)
if (dictionaryLengthStream != null) {
int rowGroupDictionarySize = dictionaryLengthStream.getEntryCount();

// We must always create a new dictionary array because the previous dictionary may still be referenced
// The first elements of the dictionary are from the stripe dictionary, then the row group dictionary elements, and then a null
rowGroupDictionary = Arrays.copyOf(stripeDictionary, stripeDictionarySize + rowGroupDictionarySize + 1);
setDictionaryBlockData(rowGroupDictionary);

// resize the dictionary lengths array if necessary
if (rowGroupDictionaryLength.length < rowGroupDictionarySize) {
rowGroupDictionaryLength = new int[rowGroupDictionarySize];
}

// read the lengths
dictionaryLengthStream.nextIntVector(rowGroupDictionarySize, rowGroupDictionaryLength);
long length = 0;
for (int i = 0; i < rowGroupDictionarySize; i++) {
length += rowGroupDictionaryLength[i];
}

// We must always create a new dictionary array because the previous dictionary may still be referenced
// The first elements of the dictionary are from the stripe dictionary, then the row group dictionary elements, and then a null
byte[] rowGroupDictionaryData = Arrays.copyOf(stripeDictionaryData, stripeDictionaryOffsetVector[stripeDictionarySize] + toIntExact(length));
int[] rowGroupDictionaryOffsetVector = Arrays.copyOf(stripeDictionaryOffsetVector, stripeDictionarySize + rowGroupDictionarySize + 2);

// read dictionary values
ByteArrayInputStream dictionaryDataStream = rowGroupDictionaryDataStreamSource.openStream();
readDictionary(dictionaryDataStream, rowGroupDictionarySize, rowGroupDictionaryLength, stripeDictionarySize, rowGroupDictionary, type);
readDictionary(dictionaryDataStream, rowGroupDictionarySize, rowGroupDictionaryLength, stripeDictionarySize, rowGroupDictionaryData, rowGroupDictionaryOffsetVector, type);
setDictionaryBlockData(rowGroupDictionaryData, rowGroupDictionaryOffsetVector, stripeDictionarySize + rowGroupDictionarySize + 1);
}
else {
// there is no row group dictionary so use the stripe dictionary
setDictionaryBlockData(stripeDictionary);
setDictionaryBlockData(stripeDictionaryData, stripeDictionaryOffsetVector, stripeDictionarySize + 1);
}

presentStream = presentStreamSource.openStream();
Expand All @@ -262,28 +291,40 @@ private void openRowGroup(Type type)
private static void readDictionary(
@Nullable ByteArrayInputStream dictionaryDataStream,
int dictionarySize,
int[] dictionaryLength,
int dictionaryOutputOffset,
Slice[] dictionary,
int[] dictionaryLengthVector,
int offsetVectorOffset,
byte[] data,
int[] offsetVector,
Type type)
throws IOException
{
// build dictionary slices
Slice slice = Slices.wrappedBuffer(data);

// initialize the offset if necessary;
// otherwise, use the previous offset
if (offsetVectorOffset == 0) {
offsetVector[0] = 0;
}

// truncate string and update offsets
for (int i = 0; i < dictionarySize; i++) {
int length = dictionaryLength[i];
if (length == 0) {
dictionary[dictionaryOutputOffset + i] = Slices.EMPTY_SLICE;
int offsetIndex = offsetVectorOffset + i;
int offset = offsetVector[offsetIndex];
int length = dictionaryLengthVector[i];

int truncatedLength;
if (length > 0) {
// read data without truncation
dictionaryDataStream.next(data, offset, offset + length);

// adjust offsets with truncated length
truncatedLength = computeTruncatedLength(slice, offset, length, type);
verify(truncatedLength >= 0);
}
else {
Slice value = Slices.wrappedBuffer(dictionaryDataStream.next(length));
if (isVarcharType(type)) {
value = truncateToLength(value, type);
}
if (isCharType(type)) {
value = truncateToLengthAndTrimSpaces(value, type);
}
dictionary[dictionaryOutputOffset + i] = value;
truncatedLength = 0;
}
offsetVector[offsetIndex + 1] = offsetVector[offsetIndex] + truncatedLength;
}
}

Expand Down

0 comments on commit 72fa55d

Please sign in to comment.