Skip to content

Commit

Permalink
make KeepAndSortOnMinValue accept multiple value columns
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang committed Mar 10, 2015
1 parent 8f08707 commit c39be11
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
public class KeepAndSortOnMinValueStateEncoding extends StreamingStateEncoding<KeepAndSortOnMinValue> {

public int[] keyColIndices;
public int valueColIndex;
public int[] valueColIndices;

@Override
public KeepAndSortOnMinValue construct() {
return new KeepAndSortOnMinValue(keyColIndices, valueColIndex);
return new KeepAndSortOnMinValue(keyColIndices, valueColIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public GenericShuffleProducer construct(final ConstructArgs args) {
} else if (argBufferStateType instanceof KeepAndSortOnMinValueStateEncoding) {
producer.setBackupBufferAsPrioritizedMin(
((KeepAndSortOnMinValueStateEncoding) argBufferStateType).keyColIndices,
((KeepAndSortOnMinValueStateEncoding) argBufferStateType).valueColIndex);
((KeepAndSortOnMinValueStateEncoding) argBufferStateType).valueColIndices);
} else if (argBufferStateType instanceof DupElimStateEncoding) {
producer.setBackupBufferAsDupElim();
} else if (argBufferStateType instanceof SimpleAppenderStateEncoding) {
Expand Down
169 changes: 129 additions & 40 deletions src/edu/washington/escience/myria/operator/KeepAndSortOnMinValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

This comment has been minimized.

Copy link
@senderista

senderista May 19, 2015

Contributor

Most of the comments I made on your changes to KeepMinValue.java apply here as well (which points to massively duplicated code that should be refactored at some point).

import java.util.BitSet;
import java.util.List;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import com.gs.collections.api.block.procedure.primitive.IntProcedure;
import com.gs.collections.impl.list.mutable.primitive.IntArrayList;
import com.gs.collections.impl.map.mutable.primitive.IntObjectHashMap;
Expand Down Expand Up @@ -45,17 +48,20 @@ public final class KeepAndSortOnMinValue extends StreamingState {

/** column indices of the key. */
private final int[] keyColIndices;
/** column indices of the key as a set. */
private final Set<Integer> keyColIndicesSet;
/** column indices of the value. */
private final int valueColIndex;
private final int[] valueColIndices;

/**
*
* @param keyColIndices column indices of the key
* @param valueColIndex column index of the value
* @param valueColIndices column indices of the value
*/
public KeepAndSortOnMinValue(final int[] keyColIndices, final int valueColIndex) {
public KeepAndSortOnMinValue(final int[] keyColIndices, final int[] valueColIndices) {
this.keyColIndices = keyColIndices;
this.valueColIndex = valueColIndex;
keyColIndicesSet = ImmutableSet.copyOf(Ints.asList(keyColIndices));
this.valueColIndices = valueColIndices;
}

@Override
Expand All @@ -68,24 +74,63 @@ public void cleanup() {
* Check if a tuple in uniqueTuples equals to the comparing tuple (cntTuple).
*
* @param index the index in uniqueTuples
* @param column the source column
* @param columns the source columns
* @param row the index of the source row
* @return true if equals.
* */
private boolean shouldReplace(final int index, final Column<?> column, final int row) {
Type t = column.getType();
switch (t) {
case INT_TYPE:
return column.getInt(row) < uniqueTuples.getInt(valueColIndex, index);
case FLOAT_TYPE:
return column.getFloat(row) < uniqueTuples.getFloat(valueColIndex, index);
case DOUBLE_TYPE:
return column.getDouble(row) < uniqueTuples.getDouble(valueColIndex, index);
case LONG_TYPE:
return column.getLong(row) < uniqueTuples.getLong(valueColIndex, index);
default:
throw new IllegalStateException("type " + t + " is not supported in KeepMinValue.replace()");
private boolean shouldReplace(final int index, final List<? extends Column<?>> columns, final int row) {
for (int valueColIndice : valueColIndices) {
Column<?> column = columns.get(valueColIndice);
switch (column.getType()) {
case INT_TYPE: {
int t1 = column.getInt(row);
int t2 = uniqueTuples.getInt(valueColIndice, index);
if (t1 < t2) {
return true;
}
if (t1 > t2) {
return false;
}
break;
}
case LONG_TYPE: {
long t1 = column.getLong(row);
long t2 = uniqueTuples.getLong(valueColIndice, index);
if (t1 < t2) {
return true;
}
if (t1 > t2) {
return false;
}
break;
}
case FLOAT_TYPE: {
float t1 = column.getFloat(row);
float t2 = uniqueTuples.getFloat(valueColIndice, index);
if (t1 < t2) {
return true;
}
if (t1 > t2) {
return false;
}
break;
}
case DOUBLE_TYPE: {
double t1 = column.getDouble(row);
double t2 = uniqueTuples.getDouble(valueColIndice, index);
if (t1 < t2) {
return true;
}
if (t1 > t2) {
return false;
}
break;
}
default:
throw new IllegalStateException("type " + column.getType() + " is not supported in KeepMinValue.replace()");
}
}
return false;
}

/**
Expand Down Expand Up @@ -136,7 +181,7 @@ public Schema getSchema() {

@Override
public void init(final ImmutableMap<String, Object> execEnvVars) {
uniqueTupleIndices = new IntObjectHashMap<>();
uniqueTupleIndices = new IntObjectHashMap<IntArrayList>();
uniqueTuples = new MutableTupleBuffer(getSchema());
doReplace = new ReplaceProcedure();
}
Expand All @@ -153,7 +198,7 @@ public TupleBatch update(final TupleBatch tb) {
@Override
public List<TupleBatch> exportState() {
MutableTupleBuffer tmp = uniqueTuples.clone();
sortOn(tmp, valueColIndex);
sortOn(tmp, valueColIndices);
return tmp.getAll();
}

Expand Down Expand Up @@ -186,9 +231,14 @@ private final class ReplaceProcedure implements IntProcedure {
public void value(final int index) {
if (TupleUtils.tupleEquals(inputTB, keyColIndices, row, uniqueTuples, keyColIndices, index)) {
unique = false;
Column<?> valueColumn = inputTB.getDataColumns().get(valueColIndex);
if (shouldReplace(index, valueColumn, row)) {
uniqueTuples.replace(valueColIndex, index, valueColumn, row);
if (shouldReplace(index, inputTB.getDataColumns(), row)) {
for (int i = 0; i < uniqueTuples.numColumns(); ++i) {
if (!keyColIndicesSet.contains(i)) {
// replace the whole tuple except key columns.
uniqueTuples.replace(i, index, inputTB.getDataColumns().get(i), row);
}
}
replaced = true;
}
}
}
Expand All @@ -200,7 +250,7 @@ public void value(final int index) {
* @param tuples tuples
* @param col column index
*/
private void sortOn(final MutableTupleBuffer tuples, final int col) {
private void sortOn(final MutableTupleBuffer tuples, final int[] col) {
quicksort(tuples, col, 0, tuples.numTuples() - 1);
}

Expand All @@ -212,7 +262,7 @@ private void sortOn(final MutableTupleBuffer tuples, final int col) {
* @param low lower bound
* @param high upper bound
*/
private void quicksort(final MutableTupleBuffer tuples, final int col, final int low, final int high) {
private void quicksort(final MutableTupleBuffer tuples, final int[] col, final int low, final int high) {
int i = low, j = high;
int pivot = low + (high - low) / 2;

Expand Down Expand Up @@ -250,25 +300,64 @@ private void quicksort(final MutableTupleBuffer tuples, final int col, final int
* compare a value in a column with pivot.
*
* @param tuples tuples
* @param column the column index
* @param columns the column indices
* @param row row index to compare with
* @param pivot the index of the pivot value
* @return if the value is smaller than (-1), equal to (0) or bigger than (1) pivot
*/
public int compare(final MutableTupleBuffer tuples, final int column, final int row, final int pivot) {
Type t = getSchema().getColumnType(column);
switch (t) {
case LONG_TYPE:
return Type.compareRaw(tuples.getLong(column, row), tuples.getLong(column, pivot));
case INT_TYPE:
return Type.compareRaw(tuples.getInt(column, row), tuples.getInt(column, pivot));
case DOUBLE_TYPE:
return Type.compareRaw(tuples.getDouble(column, row), tuples.getDouble(column, pivot));
case FLOAT_TYPE:
return Type.compareRaw(tuples.getFloat(column, row), tuples.getFloat(column, pivot));
default:
throw new RuntimeException("compare() doesn't support type " + t);
public int compare(final MutableTupleBuffer tuples, final int[] columns, final int row, final int pivot) {
for (int column : columns) {
Type t = getSchema().getColumnType(column);
switch (t) {
case LONG_TYPE: {
long t1 = tuples.getLong(column, row);
long t2 = tuples.getLong(column, pivot);
if (t1 < t2) {
return -1;
}
if (t1 > t2) {
return 1;
}
break;
}
case INT_TYPE: {
int t1 = tuples.getInt(column, row);
int t2 = tuples.getInt(column, pivot);
if (t1 < t2) {
return -1;
}
if (t1 > t2) {
return 1;
}
break;
}
case FLOAT_TYPE: {
float t1 = tuples.getFloat(column, row);
float t2 = tuples.getFloat(column, pivot);
if (t1 < t2) {
return -1;
}
if (t1 > t2) {
return 1;
}
break;
}
case DOUBLE_TYPE: {
double t1 = tuples.getDouble(column, row);
double t2 = tuples.getDouble(column, pivot);
if (t1 < t2) {
return -1;
}
if (t1 > t2) {
return 1;
}
break;
}
default:
throw new IllegalStateException("type " + t + " is not supported");
}
}
return 0;
}

@Override
Expand All @@ -278,6 +367,6 @@ public int numTuples() {

@Override
public StreamingState newInstanceFromMyself() {
return new KeepAndSortOnMinValue(keyColIndices, valueColIndex);
return new KeepAndSortOnMinValue(keyColIndices, valueColIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ public void setBackupBufferAsMin(final int[] keyColIndices, final int[] valueCol
/**
* set backup buffers as KeepAndSortOnMinValue.
*
* @param keyColIndices the same as the one in KeepMinValue
* @param valueCol the same as the one in KeepMinValue
* @param keyColIndices the same as the one in KeepAndSortOnMinValue
* @param valueColindices the same as the one in KeepAndSortOnMinValue
*/
public void setBackupBufferAsPrioritizedMin(final int[] keyColIndices, final int valueCol) {
public void setBackupBufferAsPrioritizedMin(final int[] keyColIndices, final int[] valueColIndices) {
triedToSendTuples = new ArrayList<StreamingState>();
for (int i = 0; i < outputIDs.length; i++) {
triedToSendTuples.add(i, new KeepAndSortOnMinValue(keyColIndices, valueCol));
triedToSendTuples.add(i, new KeepAndSortOnMinValue(keyColIndices, valueColIndices));
triedToSendTuples.get(i).setAttachedOperator(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public List<RootOperator> generatePlan(final Schema table1Schema, final Schema t
final CollectProducer cp = new CollectProducer(agg, serverOpId, MASTER_ID);
final GenericShuffleProducer sp3 = new GenericShuffleProducer(join, joinArrayId3, workerIDs, pf0);
if (prioritized) {
sp3.setBackupBufferAsPrioritizedMin(new int[] { 0 }, 1);
sp3.setBackupBufferAsPrioritizedMin(new int[] { 0 }, new int[] { 1 });
} else {
sp3.setBackupBufferAsMin(new int[] { 0 }, new int[] { 1 });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void testKeepAndSortedOnMinValue() throws DbException {
}

TupleSource scan = new TupleSource(input);
StreamingStateWrapper keepmin = new StreamingStateWrapper(scan, new KeepAndSortOnMinValue(new int[] { 0 }, 1));
StreamingStateWrapper keepmin =
new StreamingStateWrapper(scan, new KeepAndSortOnMinValue(new int[] { 0 }, new int[] { 1 }));
keepmin.open(null);
while (!keepmin.eos()) {
keepmin.nextReady();
Expand Down
7 changes: 4 additions & 3 deletions test/edu/washington/escience/myria/operator/OperatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class OperatorTest {
public class EntryComparator implements Comparator<Entry<Long, String>> {

@Override
public int compare(Entry<Long, String> o1, Entry<Long, String> o2) {
public int compare(final Entry<Long, String> o1, final Entry<Long, String> o2) {
int res = o1.getKey().compareTo(o2.getKey());
if (res != 0) {
return res;
Expand All @@ -48,7 +48,7 @@ public int compare(Entry<Long, String> o1, Entry<Long, String> o2) {
* @param sorted Generate sorted tuples, sorted by id
* @return
*/
public TupleBatchBuffer generateRandomTuples(final int numTuples, final int sampleSize, boolean sorted) {
public TupleBatchBuffer generateRandomTuples(final int numTuples, final int sampleSize, final boolean sorted) {
final ArrayList<Entry<Long, String>> entries = new ArrayList<Entry<Long, String>>();

final long[] ids = TestUtils.randomLong(0, sampleSize, numTuples);
Expand Down Expand Up @@ -276,7 +276,8 @@ public void testKeepAndSortedOnMinValue() throws DbException {
}

TupleSource scan = new TupleSource(input);
StreamingStateWrapper keepmin = new StreamingStateWrapper(scan, new KeepAndSortOnMinValue(new int[] { 0 }, 1));
StreamingStateWrapper keepmin =
new StreamingStateWrapper(scan, new KeepAndSortOnMinValue(new int[] { 0 }, new int[] { 1 }));
keepmin.open(null);
while (!keepmin.eos()) {
keepmin.nextReady();
Expand Down

0 comments on commit c39be11

Please sign in to comment.