Skip to content

Commit

Permalink
Convert WindowIndex into an interface
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jul 19, 2016
1 parent afae204 commit 791e99a
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 67 deletions.
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.window;

import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.spi.block.BlockBuilder;
import io.airlift.slice.Slice;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkElementIndex;
import static com.google.common.base.Preconditions.checkPositionIndex;
import static java.util.Objects.requireNonNull;

public class PagesWindowIndex
implements WindowIndex
{
private final PagesIndex pagesIndex;
private final int start;
private final int size;

public PagesWindowIndex(PagesIndex pagesIndex, int start, int end)
{
requireNonNull(pagesIndex, "pagesIndex is null");
checkPositionIndex(start, pagesIndex.getPositionCount(), "start");
checkPositionIndex(end, pagesIndex.getPositionCount(), "end");
checkArgument(start < end, "start must be before end");

this.pagesIndex = pagesIndex;
this.start = start;
this.size = end - start;
}

@Override
public int size()
{
return size;
}

@Override
public boolean isNull(int channel, int position)
{
return pagesIndex.isNull(channel, position(position));
}

@Override
public boolean getBoolean(int channel, int position)
{
return pagesIndex.getBoolean(channel, position(position));
}

@Override
public long getLong(int channel, int position)
{
return pagesIndex.getLong(channel, position(position));
}

@Override
public double getDouble(int channel, int position)
{
return pagesIndex.getDouble(channel, position(position));
}

@Override
public Slice getSlice(int channel, int position)
{
return pagesIndex.getSlice(channel, position(position));
}

@Override
public void appendTo(int channel, int position, BlockBuilder output)
{
pagesIndex.appendTo(channel, position(position), output);
}

private int position(int position)
{
checkElementIndex(position, size, "position");
return position + start;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("size", size)
.toString();
}
}
Expand Up @@ -13,80 +13,72 @@
*/ */
package com.facebook.presto.operator.window; package com.facebook.presto.operator.window;


import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilder;
import io.airlift.slice.Slice; import io.airlift.slice.Slice;


import static com.google.common.base.MoreObjects.toStringHelper; /**
import static com.google.common.base.Preconditions.checkArgument; * A window index contains the sorted values for a window partition.
import static com.google.common.base.Preconditions.checkElementIndex; * Each window function argument is available as a separate channel.
import static com.google.common.base.Preconditions.checkPositionIndex; */
import static java.util.Objects.requireNonNull; public interface WindowIndex

public class WindowIndex
{ {
private final PagesIndex pagesIndex; /**
private final int start; * Gets the number of rows in the partition
private final int size; */

int size();
public WindowIndex(PagesIndex pagesIndex, int start, int end)
{
requireNonNull(pagesIndex, "pagesIndex is null");
checkPositionIndex(start, pagesIndex.getPositionCount(), "start");
checkPositionIndex(end, pagesIndex.getPositionCount(), "end");
checkArgument(start < end, "start must be before end");

this.pagesIndex = pagesIndex;
this.start = start;
this.size = end - start;
}

public int size()
{
return size;
}

public boolean isNull(int channel, int position)
{
return pagesIndex.isNull(channel, position(position));
}

public boolean getBoolean(int channel, int position)
{
return pagesIndex.getBoolean(channel, position(position));
}


public long getLong(int channel, int position) /**
{ * Check if a value is null.
return pagesIndex.getLong(channel, position(position)); *
} * @param channel argument number
* @param position row within the partition, starting at zero
* @return if the value is null
*/
boolean isNull(int channel, int position);


public double getDouble(int channel, int position) /**
{ * Gets a value as a {@code boolean}.
return pagesIndex.getDouble(channel, position(position)); *
} * @param channel argument number
* @param position row within the partition, starting at zero
* @return value at the specified channel and position
*/
boolean getBoolean(int channel, int position);


public Slice getSlice(int channel, int position) /**
{ * Gets a value as a {@code long}.
return pagesIndex.getSlice(channel, position(position)); *
} * @param channel argument number
* @param position row within the partition, starting at zero
* @return value at the specified channel and position
*/
long getLong(int channel, int position);


public void appendTo(int channel, int position, BlockBuilder output) /**
{ * Gets a value as a {@code double}.
pagesIndex.appendTo(channel, position(position), output); *
} * @param channel argument number
* @param position row within the partition, starting at zero
* @return value at the specified channel and position
*/
double getDouble(int channel, int position);


private int position(int position) /**
{ * Gets a value as a {@link Slice}.
checkElementIndex(position, size, "position"); *
return position + start; * @param channel argument number
} * @param position row within the partition, starting at zero
*/
Slice getSlice(int channel, int position);


@Override /**
public String toString() * Outputs a value from the index. This is useful for "value"
{ * window functions such as {@code lag} that operate on arbitrary
return toStringHelper(this) * types without caring about the specific contents.
.add("size", size) *
.toString(); * @param channel argument number
} * @param position row within the partition, starting at zero
* @param output the {@link BlockBuilder} to output to
*/
void appendTo(int channel, int position, BlockBuilder output);
} }
Expand Up @@ -65,7 +65,7 @@ public WindowPartition(PagesIndex pagesIndex,
this.peerGroupHashStrategy = peerGroupHashStrategy; this.peerGroupHashStrategy = peerGroupHashStrategy;


// reset functions for new partition // reset functions for new partition
WindowIndex windowIndex = new WindowIndex(pagesIndex, partitionStart, partitionEnd); WindowIndex windowIndex = new PagesWindowIndex(pagesIndex, partitionStart, partitionEnd);
for (WindowFunction windowFunction : windowFunctions) { for (WindowFunction windowFunction : windowFunctions) {
windowFunction.reset(windowIndex); windowFunction.reset(windowIndex);
} }
Expand Down

0 comments on commit 791e99a

Please sign in to comment.