Skip to content

Commit

Permalink
Update window function API to supply partitionStartPosition on reset
Browse files Browse the repository at this point in the history
This reduces the amount of internal state that each window function needs to track and simplifies their logic
  • Loading branch information
erichwang committed Oct 24, 2014
1 parent e04249a commit 1145ae9
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public Page getOutput()

// reset functions for new partition
for (WindowFunction function : windowFunctions) {
function.reset(partitionEnd - currentPosition, pagesIndex);
function.reset(currentPosition, partitionEnd - currentPosition, pagesIndex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
totalCount = partitionRowCount;
count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
rank = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public VarcharFirstValueFunction(List<Integer> argumentChannels)

private final Type type;
private final int argumentChannel;
private int rowCount;
private PagesIndex pagesIndex;
private int valuePosition;

Expand All @@ -80,11 +79,10 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
this.pagesIndex = pagesIndex;
valuePosition = rowCount;
rowCount += partitionRowCount;
this.valuePosition = partitionStartPosition;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public VarcharLagFunction(List<Integer> argumentChannels)
private final int defaultChannel;

private int partitionStartPosition;
private int partitionRowCount;
private int currentPosition = -1;
private int currentPosition;
private PagesIndex pagesIndex;

protected LagFunction(Type type, List<Integer> argumentChannels)
Expand Down Expand Up @@ -101,21 +100,16 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
this.partitionStartPosition = partitionStartPosition;
this.currentPosition = partitionStartPosition;
this.pagesIndex = pagesIndex;
this.partitionStartPosition += this.partitionRowCount;
// start before the first row of the partition
this.currentPosition = partitionStartPosition - 1;

this.partitionRowCount = partitionRowCount;
}

@Override
public void processRow(BlockBuilder output, boolean newPeerGroup, int peerGroupCount)
{
currentPosition++;

int offset = offsetChannel < 0 ? 1 : Ints.checkedCast(pagesIndex.getLong(offsetChannel, currentPosition));
checkCondition(offset >= 0, INVALID_FUNCTION_ARGUMENT, "Offset must be at least 0");

Expand All @@ -132,5 +126,7 @@ public void processRow(BlockBuilder output, boolean newPeerGroup, int peerGroupC
pagesIndex.appendTo(defaultChannel, currentPosition, output);
}
}

currentPosition++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public VarcharLastValueFunction(List<Integer> argumentChannels)

private final Type type;
private final int argumentChannel;
private int rowCount;
private PagesIndex pagesIndex;
private int valuePosition;

Expand All @@ -80,11 +79,10 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
this.pagesIndex = pagesIndex;
rowCount += partitionRowCount;
valuePosition = rowCount - 1;
this.valuePosition = partitionStartPosition + partitionRowCount - 1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public VarcharLeadFunction(List<Integer> argumentChannels)
private final int defaultChannel;

private int partitionStartPosition;
private int currentPosition;
private int partitionRowCount;
private int currentPosition = -1;
private PagesIndex pagesIndex;

protected LeadFunction(Type type, List<Integer> argumentChannels)
Expand Down Expand Up @@ -101,21 +101,17 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
this.pagesIndex = pagesIndex;
this.partitionStartPosition += this.partitionRowCount;
// start before the first row of the partition
this.currentPosition = partitionStartPosition - 1;

this.partitionStartPosition = partitionStartPosition;
this.currentPosition = partitionStartPosition;
this.partitionRowCount = partitionRowCount;
this.pagesIndex = pagesIndex;
}

@Override
public void processRow(BlockBuilder output, boolean newPeerGroup, int peerGroupCount)
{
currentPosition++;

int offset = offsetChannel < 0 ? 1 : Ints.checkedCast(pagesIndex.getLong(offsetChannel, currentPosition));
checkCondition(offset >= 0, INVALID_FUNCTION_ARGUMENT, "Offset must be at least 0");

Expand All @@ -132,5 +128,7 @@ public void processRow(BlockBuilder output, boolean newPeerGroup, int peerGroupC
pagesIndex.appendTo(defaultChannel, currentPosition, output);
}
}

currentPosition++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public VarcharNthValueFunction(List<Integer> argumentChannels)
private final int offsetChannel;

private int partitionStartPosition;
private int currentPosition;
private int partitionRowCount;
private int currentPosition = -1;
private PagesIndex pagesIndex;

protected NthValueFunction(Type type, List<Integer> argumentChannels)
Expand All @@ -89,21 +89,17 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
this.partitionStartPosition += this.partitionRowCount;
// start before the first row of the partition
this.currentPosition = partitionStartPosition - 1;

this.partitionStartPosition = partitionStartPosition;
this.currentPosition = partitionStartPosition;
this.partitionRowCount = partitionRowCount;
this.pagesIndex = pagesIndex;
}

@Override
public void processRow(BlockBuilder output, boolean newPeerGroup, int peerGroupCount)
{
currentPosition++;

if (pagesIndex.isNull(offsetChannel, currentPosition)) {
output.appendNull();
return;
Expand All @@ -122,5 +118,7 @@ public void processRow(BlockBuilder output, boolean newPeerGroup, int peerGroupC
}

pagesIndex.appendTo(valueChannel, valuePosition, output);

currentPosition++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
totalCount = partitionRowCount;
rank = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
rank = 0;
count = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
rowNumber = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ public interface WindowFunction
/**
* Reset state for a new partition (including the first one).
*
* @param partitionStartPosition position of the first row of the partition in the pagesIndex
* @param partitionRowCount the total number of rows in the new partition
* @param pageIndex the pages index which contains sorted values
* @param pagesIndex the pages index which contains sorted values
*/
void reset(int partitionRowCount, PagesIndex pageIndex);
void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex);

/**
* Process a row by outputting the result of the window function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
package com.facebook.presto.tests;

import com.facebook.presto.operator.window.WindowFunction;
import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.operator.window.WindowFunction;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.Type;
Expand All @@ -34,7 +34,7 @@ public Type getType()
}

@Override
public void reset(int partitionRowCount, PagesIndex pagesIndex)
public void reset(int partitionStartPosition, int partitionRowCount, PagesIndex pagesIndex)
{
rank = 0;
count = 1;
Expand Down

0 comments on commit 1145ae9

Please sign in to comment.