Skip to content

Commit

Permalink
Simplify position arithmetics in AbstractSingleRowBlock
Browse files Browse the repository at this point in the history
Previously, `AbstractSingleRowBlock` would use cell-based offset, this
required converting `position` parameters to cell-positions (by
multiplying by number of ROW fields) and then to get the right block (by
computing modulus by number of ROW fields), resulting in no-op
arithmetics.
  • Loading branch information
findepi authored and wenleix committed Dec 11, 2017
1 parent e39e446 commit d005cac
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 67 deletions.
Expand Up @@ -151,7 +151,7 @@ public <T> T getObject(int position, Class<T> clazz)
}
checkReadablePosition(position);

return clazz.cast(new SingleRowBlock(getFieldBlockOffset(position) * numFields, getFieldBlocks()));
return clazz.cast(new SingleRowBlock(getFieldBlockOffset(position), getFieldBlocks()));
}

@Override
Expand Down
Expand Up @@ -19,137 +19,132 @@
public abstract class AbstractSingleRowBlock
implements Block
{
// in AbstractSingleRowBlock, offset is position-based (consider as cell-based), not entry-based.
protected final int startOffset;
protected final int rowIndex;

protected final int numFields;

protected abstract Block getFieldBlock(int fieldIndex);

protected AbstractSingleRowBlock(int startOffset, int numFields)
protected AbstractSingleRowBlock(int rowIndex)
{
this.startOffset = startOffset;
this.numFields = numFields;
this.rowIndex = rowIndex;
}

private int getAbsolutePosition(int position)
protected abstract Block getFieldBlock(int fieldIndex);

private void checkFieldIndex(int position)
{
if (position < 0 || position >= getPositionCount()) {
throw new IllegalArgumentException("position is not valid");
throw new IllegalArgumentException("position is not valid: " + position);
}
return position + startOffset;
}

@Override
public boolean isNull(int position)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).isNull(position / numFields);
checkFieldIndex(position);
return getFieldBlock(position).isNull(rowIndex);
}

@Override
public byte getByte(int position, int offset)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getByte(position / numFields, offset);
checkFieldIndex(position);
return getFieldBlock(position).getByte(rowIndex, offset);
}

@Override
public short getShort(int position, int offset)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getShort(position / numFields, offset);
checkFieldIndex(position);
return getFieldBlock(position).getShort(rowIndex, offset);
}

@Override
public int getInt(int position, int offset)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getInt(position / numFields, offset);
checkFieldIndex(position);
return getFieldBlock(position).getInt(rowIndex, offset);
}

@Override
public long getLong(int position, int offset)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getLong(position / numFields, offset);
checkFieldIndex(position);
return getFieldBlock(position).getLong(rowIndex, offset);
}

@Override
public Slice getSlice(int position, int offset, int length)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getSlice(position / numFields, offset, length);
checkFieldIndex(position);
return getFieldBlock(position).getSlice(rowIndex, offset, length);
}

@Override
public int getSliceLength(int position)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getSliceLength(position / numFields);
checkFieldIndex(position);
return getFieldBlock(position).getSliceLength(rowIndex);
}

@Override
public int compareTo(int position, int offset, int length, Block otherBlock, int otherPosition, int otherOffset, int otherLength)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).compareTo(position / numFields, offset, length, otherBlock, otherPosition, otherOffset, otherLength);
checkFieldIndex(position);
return getFieldBlock(position).compareTo(rowIndex, offset, length, otherBlock, otherPosition, otherOffset, otherLength);
}

@Override
public boolean bytesEqual(int position, int offset, Slice otherSlice, int otherOffset, int length)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).bytesEqual(position / numFields, offset, otherSlice, otherOffset, length);
checkFieldIndex(position);
return getFieldBlock(position).bytesEqual(rowIndex, offset, otherSlice, otherOffset, length);
}

@Override
public int bytesCompare(int position, int offset, int length, Slice otherSlice, int otherOffset, int otherLength)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).bytesCompare(position / numFields, offset, length, otherSlice, otherOffset, otherLength);
checkFieldIndex(position);
return getFieldBlock(position).bytesCompare(rowIndex, offset, length, otherSlice, otherOffset, otherLength);
}

@Override
public void writeBytesTo(int position, int offset, int length, BlockBuilder blockBuilder)
{
position = getAbsolutePosition(position);
getFieldBlock(position % numFields).writeBytesTo(position / numFields, offset, length, blockBuilder);
checkFieldIndex(position);
getFieldBlock(position).writeBytesTo(rowIndex, offset, length, blockBuilder);
}

@Override
public boolean equals(int position, int offset, Block otherBlock, int otherPosition, int otherOffset, int length)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).equals(position / numFields, offset, otherBlock, otherPosition, otherOffset, length);
checkFieldIndex(position);
return getFieldBlock(position).equals(rowIndex, offset, otherBlock, otherPosition, otherOffset, length);
}

@Override
public long hash(int position, int offset, int length)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).hash(position / numFields, offset, length);
checkFieldIndex(position);
return getFieldBlock(position).hash(rowIndex, offset, length);
}

@Override
public <T> T getObject(int position, Class<T> clazz)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getObject(position / numFields, clazz);
checkFieldIndex(position);
return getFieldBlock(position).getObject(rowIndex, clazz);
}

@Override
public void writePositionTo(int position, BlockBuilder blockBuilder)
{
position = getAbsolutePosition(position);
getFieldBlock(position % numFields).writePositionTo(position / numFields, blockBuilder);
checkFieldIndex(position);
getFieldBlock(position).writePositionTo(rowIndex, blockBuilder);
}

@Override
public Block getSingleValueBlock(int position)
{
position = getAbsolutePosition(position);
return getFieldBlock(position % numFields).getSingleValueBlock(position / numFields);
checkFieldIndex(position);
return getFieldBlock(position).getSingleValueBlock(rowIndex);
}

@Override
Expand Down
Expand Up @@ -137,7 +137,7 @@ public SingleRowBlockWriter beginBlockEntry()
throw new IllegalStateException("Expected current entry to be closed but was opened");
}
currentEntryOpened = true;
return new SingleRowBlockWriter(fieldBlockBuilders[0].getPositionCount() * numFields, fieldBlockBuilders);
return new SingleRowBlockWriter(fieldBlockBuilders[0].getPositionCount(), fieldBlockBuilders);
}

@Override
Expand Down
Expand Up @@ -27,9 +27,9 @@ public class SingleRowBlock

private final Block[] fieldBlocks;

SingleRowBlock(int cellOffset, Block[] fieldBlocks)
SingleRowBlock(int rowIndex, Block[] fieldBlocks)
{
super(cellOffset, fieldBlocks.length);
super(rowIndex);
this.fieldBlocks = fieldBlocks;
}

Expand All @@ -42,14 +42,14 @@ protected Block getFieldBlock(int fieldIndex)
@Override
public int getPositionCount()
{
return numFields;
return fieldBlocks.length;
}

@Override
public long getSizeInBytes()
{
long sizeInBytes = 0;
for (int i = 0; i < numFields; i++) {
for (int i = 0; i < fieldBlocks.length; i++) {
sizeInBytes += getFieldBlock(i).getSizeInBytes();
}
return sizeInBytes;
Expand All @@ -59,7 +59,7 @@ public long getSizeInBytes()
public long getRetainedSizeInBytes()
{
long retainedSizeInBytes = INSTANCE_SIZE;
for (int i = 0; i < numFields; i++) {
for (int i = 0; i < fieldBlocks.length; i++) {
retainedSizeInBytes += getFieldBlock(i).getRetainedSizeInBytes();
}
return retainedSizeInBytes;
Expand All @@ -68,7 +68,7 @@ public long getRetainedSizeInBytes()
@Override
public void retainedBytesForEachPart(BiConsumer<Object, Long> consumer)
{
for (int i = 0; i < numFields; i++) {
for (int i = 0; i < fieldBlocks.length; i++) {
consumer.accept(fieldBlocks[i], fieldBlocks[i].getRetainedSizeInBytes());
}
consumer.accept(this, (long) INSTANCE_SIZE);
Expand All @@ -77,21 +77,21 @@ public void retainedBytesForEachPart(BiConsumer<Object, Long> consumer)
@Override
public BlockEncoding getEncoding()
{
BlockEncoding[] fieldBlockEncodings = new BlockEncoding[numFields];
for (int i = 0; i < numFields; i++) {
BlockEncoding[] fieldBlockEncodings = new BlockEncoding[fieldBlocks.length];
for (int i = 0; i < fieldBlocks.length; i++) {
fieldBlockEncodings[i] = fieldBlocks[i].getEncoding();
}
return new SingleRowBlockEncoding(fieldBlockEncodings);
}

public int getOffset()
public int getRowIndex()
{
return startOffset;
return rowIndex;
}

@Override
public String toString()
{
return format("SingleRowBlock{numFields=%d}", numFields);
return format("SingleRowBlock{numFields=%d}", fieldBlocks.length);
}
}
Expand Up @@ -43,9 +43,9 @@ public String getName()
public void writeBlock(SliceOutput sliceOutput, Block block)
{
SingleRowBlock singleRowBlock = (SingleRowBlock) block;
int fieldOffset = singleRowBlock.getOffset() / fieldBlockEncodings.length;
int rowIndex = singleRowBlock.getRowIndex();
for (int i = 0; i < fieldBlockEncodings.length; i++) {
fieldBlockEncodings[i].writeBlock(sliceOutput, singleRowBlock.getFieldBlock(i).getRegion(fieldOffset, 1));
fieldBlockEncodings[i].writeBlock(sliceOutput, singleRowBlock.getFieldBlock(i).getRegion(rowIndex, 1));
}
}

Expand Down
Expand Up @@ -33,9 +33,9 @@ public class SingleRowBlockWriter
private int currentFieldIndexToWrite;
private boolean fieldBlockBuilderReturned;

SingleRowBlockWriter(int startOffset, BlockBuilder[] fieldBlockBuilders)
SingleRowBlockWriter(int rowIndex, BlockBuilder[] fieldBlockBuilders)
{
super(startOffset, fieldBlockBuilders.length);
super(rowIndex);
this.fieldBlockBuilders = fieldBlockBuilders;
long initialBlockBuilderSize = 0;
for (int i = 0; i < fieldBlockBuilders.length; i++) {
Expand Down Expand Up @@ -72,7 +72,7 @@ protected Block getFieldBlock(int fieldIndex)
public long getSizeInBytes()
{
long currentBlockBuilderSize = 0;
for (int i = 0; i < numFields; i++) {
for (int i = 0; i < fieldBlockBuilders.length; i++) {
currentBlockBuilderSize += fieldBlockBuilders[i].getSizeInBytes();
}
return currentBlockBuilderSize - initialBlockBuilderSize;
Expand All @@ -82,7 +82,7 @@ public long getSizeInBytes()
public long getRetainedSizeInBytes()
{
long size = INSTANCE_SIZE;
for (int i = 0; i < numFields; i++) {
for (int i = 0; i < fieldBlockBuilders.length; i++) {
size += fieldBlockBuilders[i].getRetainedSizeInBytes();
}
return size;
Expand All @@ -91,7 +91,7 @@ public long getRetainedSizeInBytes()
@Override
public void retainedBytesForEachPart(BiConsumer<Object, Long> consumer)
{
for (int i = 0; i < numFields; i++) {
for (int i = 0; i < fieldBlockBuilders.length; i++) {
consumer.accept(fieldBlockBuilders[i], fieldBlockBuilders[i].getRetainedSizeInBytes());
}
consumer.accept(this, (long) INSTANCE_SIZE);
Expand Down Expand Up @@ -207,10 +207,10 @@ public BlockBuilder newBlockBuilderLike(BlockBuilderStatus blockBuilderStatus)
public String toString()
{
if (!fieldBlockBuilderReturned) {
return format("RowBlock{SingleRowBlockWriter=%d, fieldBlockBuilderReturned=false, positionCount=%d}", numFields, getPositionCount());
return format("RowBlock{SingleRowBlockWriter=%d, fieldBlockBuilderReturned=false, positionCount=%d}", fieldBlockBuilders.length, getPositionCount());
}
else {
return format("RowBlock{SingleRowBlockWriter=%d, fieldBlockBuilderReturned=true}", numFields);
return format("RowBlock{SingleRowBlockWriter=%d, fieldBlockBuilderReturned=true}", fieldBlockBuilders.length);
}
}

Expand All @@ -219,7 +219,7 @@ private void checkFieldIndexToWrite()
if (fieldBlockBuilderReturned) {
throw new IllegalStateException("cannot do sequential write after getFieldBlockBuilder is called");
}
if (currentFieldIndexToWrite >= numFields) {
if (currentFieldIndexToWrite >= fieldBlockBuilders.length) {
throw new IllegalStateException("currentFieldIndexToWrite is not valid");
}
}
Expand Down

0 comments on commit d005cac

Please sign in to comment.