Skip to content

Commit

Permalink
Revert "Defer the creation of dictionary in SliceDictionarySelectiveR…
Browse files Browse the repository at this point in the history
…eader"

This reverts commit 7994754.
  • Loading branch information
kewang1024 authored and caithagoras0 committed Oct 8, 2020
1 parent 460a4d0 commit 8ab725f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.common.block.DictionaryBlock;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.block.VariableWidthBlock;
import com.facebook.presto.common.type.Chars;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcCorruptionException;
import com.facebook.presto.orc.OrcLocalMemoryContext;
Expand All @@ -36,15 +37,12 @@
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.orc.array.Arrays.ExpansionFactor.MEDIUM;
import static com.facebook.presto.orc.array.Arrays.ExpansionOption.NONE;
import static com.facebook.presto.orc.array.Arrays.ensureCapacity;
import static com.facebook.presto.orc.metadata.OrcType.OrcTypeKind.CHAR;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.DATA;
Expand Down Expand Up @@ -91,14 +89,10 @@ public class SliceDictionarySelectiveReader

private byte[] stripeDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
private byte[] rowGroupDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] rowGroupDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
private byte[] currentDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] currentDictionaryOffsetVector;
private int[] stripeDictionaryLength = new int[0];
private int[] rowGroupDictionaryLength = new int[0];
private byte[] evaluationStatus;
private byte[] valueWithPadding;

private int readOffset;

Expand All @@ -112,13 +106,7 @@ public class SliceDictionarySelectiveReader
private InputStreamSource<ByteArrayInputStream> stripeDictionaryDataStreamSource = missingStreamSource(ByteArrayInputStream.class);
private InputStreamSource<LongInputStream> stripeDictionaryLengthStreamSource = missingStreamSource(LongInputStream.class);
private boolean stripeDictionaryOpen;
// The dictionaries will be wrapped in getBlock(). It's set to false when opening a new dictionary (be it stripe dictionary or rowgroup dictionary). When there is only stripe
// dictionary but no rowgroup dictionaries, we shall set it to false only when opening the stripe dictionary while not for every rowgroup. It is set to true when the dictionary
// is wrapped up in wrapDictionaryIfNecessary().
private boolean dictionaryWrapped;

private int stripeDictionarySize;
private int currentDictionarySize;

private InputStreamSource<ByteArrayInputStream> rowGroupDictionaryDataStreamSource = missingStreamSource(ByteArrayInputStream.class);
private InputStreamSource<BooleanInputStream> inDictionaryStreamSource = missingStreamSource(BooleanInputStream.class);
Expand Down Expand Up @@ -146,7 +134,6 @@ public SliceDictionarySelectiveReader(StreamDescriptor streamDescriptor, Optiona
this.outputType = requireNonNull(outputType, "outputType is null").orElse(null);
OrcType orcType = streamDescriptor.getOrcType();
this.maxCodePointCount = orcType == null ? 0 : orcType.getLength().orElse(-1);
this.valueWithPadding = maxCodePointCount < 0 ? null : new byte[maxCodePointCount];
this.isCharType = orcType.getOrcTypeKind() == CHAR;
this.outputRequired = outputType.isPresent();
checkArgument(filter.isPresent() || outputRequired, "filter must be present if outputRequired is false");
Expand Down Expand Up @@ -206,7 +193,7 @@ private int readNoFilter(int[] positions, int positionCount)
}

if (presentStream != null && !presentStream.nextBit()) {
values[i] = currentDictionarySize - 1;
values[i] = dictionary.getPositionCount() - 1;
}
else {
boolean isInRowDictionary = inDictionaryStream != null && !inDictionaryStream.nextBit();
Expand All @@ -233,7 +220,7 @@ private int readWithFilter(int[] positions, int positionCount)
if (presentStream != null && !presentStream.nextBit()) {
if ((nonDeterministicFilter && filter.testNull()) || nullsAllowed) {
if (outputRequired) {
values[outputPositionCount] = currentDictionarySize - 1;
values[outputPositionCount] = dictionary.getPositionCount() - 1;
}
outputPositions[outputPositionCount] = position;
outputPositionCount++;
Expand Down Expand Up @@ -298,28 +285,22 @@ private int readWithFilter(int[] positions, int positionCount)

private byte evaluateFilter(int position, int index, int length)
{
if (!filter.testLength(length)) {
return FILTER_FAILED;
}

int currentLength = currentDictionaryOffsetVector[index + 1] - currentDictionaryOffsetVector[index];
if (isCharType && length != currentLength) {
System.arraycopy(currentDictionaryData, currentDictionaryOffsetVector[index], valueWithPadding, 0, currentLength);
Arrays.fill(valueWithPadding, currentLength, length, (byte) ' ');
if (!filter.testBytes(valueWithPadding, 0, length)) {
return FILTER_FAILED;
if (filter.testLength(length)) {
int currentLength = dictionary.getSliceLength(index);
Slice data = dictionary.getSlice(index, 0, currentLength);
if (isCharType && length != currentLength) {
data = Chars.padSpaces(data, maxCodePointCount);
}
if (filter.testBytes(data.getBytes(), 0, length)) {
if (outputRequired) {
values[outputPositionCount] = index;
}
outputPositions[outputPositionCount] = position;
outputPositionCount++;
return FILTER_PASSED;
}
}
else if (!filter.testBytes(currentDictionaryData, currentDictionaryOffsetVector[index], length)) {
return FILTER_FAILED;
}

if (outputRequired) {
values[outputPositionCount] = index;
}
outputPositions[outputPositionCount] = position;
outputPositionCount++;
return FILTER_PASSED;
return FILTER_FAILED;
}

private int readAllNulls(int[] positions, int positionCount)
Expand Down Expand Up @@ -388,11 +369,8 @@ public Block getBlock(int[] positions, int positionCount)
return new RunLengthEncodedBlock(outputType.createBlockBuilder(null, 1).appendNull().build(), positionCount);
}

wrapDictionaryIfNecessary();

if (positionCount == outputPositionCount) {
DictionaryBlock block = new DictionaryBlock(positionCount, dictionary, values);

values = null;
return block;
}
Expand Down Expand Up @@ -431,26 +409,9 @@ public BlockLease getBlockView(int[] positions, int positionCount)
if (positionCount < outputPositionCount) {
compactValues(positions, positionCount);
}
wrapDictionaryIfNecessary();
return newLease(new DictionaryBlock(positionCount, dictionary, values));
}

private void wrapDictionaryIfNecessary()
{
if (dictionaryWrapped) {
return;
}

boolean[] isNullVector = new boolean[currentDictionarySize];
isNullVector[currentDictionarySize - 1] = true;

byte[] dictionaryDataCopy = Arrays.copyOf(currentDictionaryData, currentDictionaryOffsetVector[currentDictionarySize]);
int[] dictionaryOffsetVectorCopy = Arrays.copyOf(currentDictionaryOffsetVector, currentDictionarySize + 1);
dictionary = new VariableWidthBlock(currentDictionarySize, wrappedBuffer(dictionaryDataCopy), dictionaryOffsetVectorCopy, Optional.of(isNullVector));

dictionaryWrapped = true;
}

private void compactValues(int[] positions, int positionCount)
{
int positionIndex = 0;
Expand Down Expand Up @@ -503,8 +464,10 @@ private void openRowGroup()
dataLength += stripeDictionaryLength[i];
}

stripeDictionaryData = ensureCapacity(stripeDictionaryData, toIntExact(dataLength));
stripeDictionaryOffsetVector = ensureCapacity(stripeDictionaryOffsetVector, stripeDictionarySize + 2);
// we must always create a new dictionary array because the previous dictionary may still be referenced
stripeDictionaryData = new byte[toIntExact(dataLength)];
// add one extra entry for null
stripeDictionaryOffsetVector = new int[stripeDictionarySize + 2];

// read dictionary values
ByteArrayInputStream dictionaryDataStream = stripeDictionaryDataStreamSource.openStream();
Expand All @@ -514,10 +477,8 @@ private void openRowGroup()
stripeDictionaryData = EMPTY_DICTIONARY_DATA;
stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
}

// If there is no rowgroup dictionary, we only need to wrap the stripe dictionary once per stripe because wrapping dictionary is very expensive.
dictionaryWrapped = false;
}
stripeDictionaryOpen = true;

// read row group dictionary
RowGroupDictionaryLengthInputStream dictionaryLengthStream = rowGroupDictionaryLengthStreamSource.openStream();
Expand All @@ -536,22 +497,10 @@ private void openRowGroup()
dataLength += rowGroupDictionaryLength[i];
}

rowGroupDictionaryData = ensureCapacity(
rowGroupDictionaryData,
stripeDictionaryOffsetVector[stripeDictionarySize] + toIntExact(dataLength),
MEDIUM,
NONE);

rowGroupDictionaryOffsetVector = ensureCapacity(rowGroupDictionaryOffsetVector,
stripeDictionarySize + rowGroupDictionarySize + 2,
MEDIUM,
NONE);

if (!stripeDictionaryOpen) {
System.arraycopy(stripeDictionaryData, 0, rowGroupDictionaryData, 0, stripeDictionaryOffsetVector[stripeDictionarySize]);
System.arraycopy(stripeDictionaryOffsetVector, 0, rowGroupDictionaryOffsetVector, 0, stripeDictionarySize + 2);
}
dictionaryWrapped = false;
// We must always create a new dictionary array because the previous dictionary may still be referenced
// The first elements of the dictionary are from the stripe dictionary, then the row group dictionary elements, and then a null
byte[] rowGroupDictionaryData = java.util.Arrays.copyOf(stripeDictionaryData, stripeDictionaryOffsetVector[stripeDictionarySize] + toIntExact(dataLength));
int[] rowGroupDictionaryOffsetVector = Arrays.copyOf(stripeDictionaryOffsetVector, stripeDictionarySize + rowGroupDictionarySize + 2);

// read dictionary values
ByteArrayInputStream dictionaryDataStream = rowGroupDictionaryDataStreamSource.openStream();
Expand All @@ -563,7 +512,6 @@ private void openRowGroup()
setDictionaryBlockData(stripeDictionaryData, stripeDictionaryOffsetVector, stripeDictionarySize + 1);
}

stripeDictionaryOpen = true;
presentStream = presentStreamSource.openStream();
inDictionaryStream = inDictionaryStreamSource.openStream();
dataStream = dataStreamSource.openStream();
Expand Down Expand Up @@ -663,7 +611,6 @@ public void close()
{
dictionary = null;
currentDictionaryData = null;
rowGroupDictionaryData = null;
rowGroupDictionaryLength = null;
stripeDictionaryData = null;
stripeDictionaryLength = null;
Expand All @@ -677,15 +624,15 @@ public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE +
sizeOf(values) +
sizeOf(stripeDictionaryData) +
sizeOf(outputPositions) +
// dictionary could be built on stripeDictionaryData or the rowGroupDictionaryData created locally in openRowGroup(). For the first case, currentDictionaryData
// points to stripeDictionaryData and we just need to count the dictionary's retained size. For the second case, we need to count both stripeDictionaryData and dictionary.
dictionary.getRetainedSizeInBytes() +
(stripeDictionaryData == currentDictionaryData ? 0 : sizeOf(stripeDictionaryData)) +
sizeOf(stripeDictionaryOffsetVector) +
sizeOf(stripeDictionaryLength) +
sizeOf(rowGroupDictionaryData) +
sizeOf(rowGroupDictionaryOffsetVector) +
sizeOf(rowGroupDictionaryLength) +
sizeOf(evaluationStatus) +
sizeOf(valueWithPadding) +
dictionary.getRetainedSizeInBytes();
sizeOf(evaluationStatus);
}

private void setDictionaryBlockData(byte[] dictionaryData, int[] dictionaryOffsets, int positionCount)
Expand All @@ -694,15 +641,14 @@ private void setDictionaryBlockData(byte[] dictionaryData, int[] dictionaryOffse
// only update the block if the array changed to prevent creation of new Block objects, since
// the engine currently uses identity equality to test if dictionaries are the same
if (currentDictionaryData != dictionaryData) {
boolean[] isNullVector = new boolean[positionCount];
isNullVector[positionCount - 1] = true;
dictionaryOffsets[positionCount] = dictionaryOffsets[positionCount - 1];
dictionary = new VariableWidthBlock(positionCount, wrappedBuffer(dictionaryData), dictionaryOffsets, Optional.of(isNullVector));
currentDictionaryData = dictionaryData;
evaluationStatus = ensureCapacity(evaluationStatus, positionCount - 1);
fill(evaluationStatus, 0, evaluationStatus.length, FILTER_NOT_EVALUATED);
}
currentDictionaryOffsetVector = dictionaryOffsets;
currentDictionarySize = positionCount;
// The last element in the dictionary is null.
currentDictionaryOffsetVector[currentDictionarySize] = currentDictionaryOffsetVector[currentDictionarySize - 1];

evaluationStatus = ensureCapacity(evaluationStatus, positionCount - 1);
fill(evaluationStatus, 0, evaluationStatus.length, FILTER_NOT_EVALUATED);
}

private BlockLease newLease(Block block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ public void testVarBinaries()
public void testMemoryTracking()
throws Exception
{
List<Type> types = ImmutableList.of(INTEGER, VARCHAR, VARCHAR);
List<Type> types = ImmutableList.of(INTEGER, VARCHAR);
TempFile tempFile = new TempFile();
List<Integer> intValues = newArrayList(limit(
cycle(concat(
Expand All @@ -916,13 +916,11 @@ public void testMemoryTracking()
ImmutableList.of(3), nCopies(9999, 123),
nCopies(1_000_000, null))),
NUM_ROWS));
List<String> varcharDirectValues = newArrayList(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS));
List<String> varcharDictionaryValues = newArrayList(limit(cycle(ImmutableList.of("apple", "apple pie", "apple\uD835\uDC03", "apple\uFFFD")), NUM_ROWS));
List<List<?>> values = ImmutableList.of(intValues, varcharDirectValues, varcharDictionaryValues);
List<String> varcharValues = newArrayList(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS));

writeOrcColumnsPresto(tempFile.getFile(), DWRF, CompressionKind.NONE, Optional.empty(), types, values, new OrcWriterStats());
writeOrcColumnsPresto(tempFile.getFile(), DWRF, CompressionKind.NONE, Optional.empty(), types, ImmutableList.of(intValues, varcharValues), new OrcWriterStats());

OrcPredicate orcPredicate = createOrcPredicate(types, values, DWRF, false);
OrcPredicate orcPredicate = createOrcPredicate(types, ImmutableList.of(intValues, varcharValues), DWRF, false);
Map<Integer, Type> includedColumns = IntStream.range(0, types.size())
.boxed()
.collect(toImmutableMap(Function.identity(), types::get));
Expand Down Expand Up @@ -962,7 +960,7 @@ public void testMemoryTracking()

page.getLoadedPage();

assertBetweenInclusive(systemMemoryUsage.getBytes(), 150000L, 160000L);
assertBetweenInclusive(systemMemoryUsage.getBytes(), 110000L, 130000L);

rowsProcessed += positionCount;
}
Expand Down

0 comments on commit 8ab725f

Please sign in to comment.