Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang committed Feb 15, 2017
1 parent ea5196e commit e34654b
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 177 deletions.
16 changes: 8 additions & 8 deletions src/edu/washington/escience/myria/operator/Apply.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ public class Apply extends UnaryOperator {
/**
* List (possibly empty) of expressions that will be used to create the output.
*/
@Nonnull
private ImmutableList<Expression> emitExpressions = ImmutableList.of();
@Nonnull private ImmutableList<Expression> emitExpressions = ImmutableList.of();

/**
* One evaluator for each expression in {@link #emitExpressions}.
*/
@Nonnull
private ImmutableList<GenericEvaluator> emitEvaluators = ImmutableList.of();
@Nonnull private ImmutableList<GenericEvaluator> emitEvaluators = ImmutableList.of();

/**
* Buffer to hold finished and in-progress TupleBatches.
Expand Down Expand Up @@ -107,7 +105,7 @@ private int numberOfMultiValuedExpressions() {

/**
* Should a counter be added?
*
*
* @return
*/
private boolean getAddCounter() {
Expand Down Expand Up @@ -218,7 +216,8 @@ protected TupleBatch fetchNextReady() throws DbException, InvocationTargetExcept
if (getAddCounter() && flatmapid < iteratorIndexes[iteratorIdx]) {
flatmapid = iteratorIndexes[iteratorIdx];
}
outputBuffer.appendFromColumn(iteratorIdx, resultColumns.get(iteratorIdx), resultRowIdx);
outputBuffer.appendFromColumn(
iteratorIdx, resultColumns.get(iteratorIdx), resultRowIdx);
}

if (getAddCounter()) {
Expand Down Expand Up @@ -281,8 +280,9 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc
Schema inputSchema = Objects.requireNonNull(getChild().getSchema());

List<GenericEvaluator> evals = new ArrayList<>();
final ExpressionOperatorParameter parameters = new ExpressionOperatorParameter(inputSchema, null, getNodeID(),
getPythonFunctionRegistrar());
final ExpressionOperatorParameter parameters =
new ExpressionOperatorParameter(
inputSchema, null, getNodeID(), getPythonFunctionRegistrar());

for (Expression expr : emitExpressions) {
GenericEvaluator evaluator;
Expand Down
73 changes: 55 additions & 18 deletions src/edu/washington/escience/myria/operator/SymmetricHashJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,24 @@ public final class SymmetricHashJoin extends BinaryOperator {
* @throw IllegalArgumentException if there are duplicated column names in <tt>outputColumns</tt>, or if
* <tt>outputColumns</tt> does not have the correct number of columns and column types.
*/
public SymmetricHashJoin(final Operator left, final Operator right, final int[] compareIndx1,
final int[] compareIndx2, final int[] answerColumns1, final int[] answerColumns2) {
public SymmetricHashJoin(
final Operator left,
final Operator right,
final int[] compareIndx1,
final int[] compareIndx2,
final int[] answerColumns1,
final int[] answerColumns2) {
/* Only used by tests */
this(left, right, compareIndx1, compareIndx2, answerColumns1, answerColumns2, false, false, null,
this(
left,
right,
compareIndx1,
compareIndx2,
answerColumns1,
answerColumns2,
false,
false,
null,
JoinPullOrder.ALTERNATE);
}

Expand All @@ -94,15 +108,25 @@ public SymmetricHashJoin(final Operator left, final Operator right, final int[]
* @throw IllegalArgumentException if there are duplicated column names in <tt>outputColumns</tt>, or if
* <tt>outputColumns</tt> does not have the correct number of columns and column types.
*/
public SymmetricHashJoin(final Operator left, final Operator right, final int[] compareIndx1,
final int[] compareIndx2, final int[] answerColumns1, final int[] answerColumns2, final boolean setSemanticsLeft,
final boolean setSemanticsRight, final List<String> outputColumns, final JoinPullOrder pullOrder) {
public SymmetricHashJoin(
final Operator left,
final Operator right,
final int[] compareIndx1,
final int[] compareIndx2,
final int[] answerColumns1,
final int[] answerColumns2,
final boolean setSemanticsLeft,
final boolean setSemanticsRight,
final List<String> outputColumns,
final JoinPullOrder pullOrder) {
super(left, right);
Preconditions.checkArgument(compareIndx1.length == compareIndx2.length);
if (outputColumns != null) {
Preconditions.checkArgument(outputColumns.size() == answerColumns1.length + answerColumns2.length,
Preconditions.checkArgument(
outputColumns.size() == answerColumns1.length + answerColumns2.length,
"length mismatch between output column names and columns selected for output");
Preconditions.checkArgument(ImmutableSet.copyOf(outputColumns).size() == outputColumns.size(),
Preconditions.checkArgument(
ImmutableSet.copyOf(outputColumns).size() == outputColumns.size(),
"duplicate column names in outputColumns");
this.outputColumns = ImmutableList.copyOf(outputColumns);
} else {
Expand Down Expand Up @@ -136,9 +160,14 @@ protected Schema generateSchema() {
int rightIndex = rightCompareColumns[i];
Type leftType = leftSchema.getColumnType(leftIndex);
Type rightType = rightSchema.getColumnType(rightIndex);
Preconditions.checkState(leftType == rightType,
"column types do not match for join at index %s: left column type %s [%s] != right column type %s [%s]", i,
leftIndex, leftType, rightIndex, rightType);
Preconditions.checkState(
leftType == rightType,
"column types do not match for join at index %s: left column type %s [%s] != right column type %s [%s]",
i,
leftIndex,
leftType,
rightIndex,
rightType);
}

for (int i : leftAnswerColumns) {
Expand All @@ -164,7 +193,11 @@ protected Schema generateSchema() {
* @param index the index of hashTable, which the cntTuple is to join with
* @param fromLeft if the tuple is from child 1
*/
protected void addToAns(final TupleBatch cntTB, final int row, final MutableTupleBuffer hashTable, final int index,
protected void addToAns(
final TupleBatch cntTB,
final int row,
final MutableTupleBuffer hashTable,
final int index,
final boolean fromLeft) {
if (fromLeft) {
for (int leftAnswerColumn : leftAnswerColumns) {
Expand Down Expand Up @@ -382,8 +415,9 @@ public void init(final ImmutableMap<String, Object> execEnvVars) throws DbExcept
leftHashTable = new TupleHashTable(getLeft().getSchema(), leftCompareColumns);
rightHashTable = new TupleHashTable(getRight().getSchema(), rightCompareColumns);
ans = new TupleBatchBuffer(getSchema());
nonBlocking = (QueryExecutionMode) execEnvVars
.get(MyriaConstants.EXEC_ENV_VAR_EXECUTION_MODE) == QueryExecutionMode.NON_BLOCKING;
nonBlocking =
(QueryExecutionMode) execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_EXECUTION_MODE)
== QueryExecutionMode.NON_BLOCKING;
}

/**
Expand Down Expand Up @@ -436,8 +470,12 @@ protected void processChildTB(final TupleBatch tb, final boolean fromLeft) {
* @param hashCode the hashCode of the tb.
* @param replace if need to replace the hash table with new values.
*/
private void addToHashTable(final TupleBatch tb, final int[] compareIndx, final int row,
final TupleHashTable hashTable, final boolean replace) {
private void addToHashTable(
final TupleBatch tb,
final int[] compareIndx,
final int row,
final TupleHashTable hashTable,
final boolean replace) {
if (replace) {
if (hashTable.replace(tb, compareIndx, row)) {
return;
Expand Down Expand Up @@ -479,6 +517,5 @@ public enum JoinPullOrder {
*
* @param order the pull order.
*/
public void setPullOrder(final JoinPullOrder order) {
}
public void setPullOrder(final JoinPullOrder order) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public enum AggregationOp {
* @param fieldName the name of the field being aggregated, for naming output columns.
* @param aggOps the set of aggregate operations to be computed.
*/
protected PrimitiveAggregator(final String inputName, final int column, final AggregationOp aggOp) {
protected PrimitiveAggregator(
final String inputName, final int column, final AggregationOp aggOp) {
if (!isSupported(aggOp)) {
throw new IllegalArgumentException("Unsupported aggregation " + aggOp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public class UserDefinedAggregator implements Aggregator {
/** Required for Java serialization. */
private static final long serialVersionUID = 1L;
/** logger for this class. */
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(UserDefinedAggregator.class);
private static final org.slf4j.Logger LOGGER =
org.slf4j.LoggerFactory.getLogger(UserDefinedAggregator.class);

/** Evaluators that initialize the state. */
protected final ScriptEvalInterface initEvaluator;
Expand All @@ -30,8 +31,11 @@ public class UserDefinedAggregator implements Aggregator {
* @param resultSchema the schema of the tuples produced by this aggregator
* @param stateSchema the schema of the state
*/
public UserDefinedAggregator(final ScriptEvalInterface initEvaluator, final ScriptEvalInterface updateEvaluator,
final Schema resultSchema, final Schema stateSchema) {
public UserDefinedAggregator(
final ScriptEvalInterface initEvaluator,
final ScriptEvalInterface updateEvaluator,
final Schema resultSchema,
final Schema stateSchema) {
this.initEvaluator = initEvaluator;
this.updateEvaluator = updateEvaluator;
this.resultSchema = resultSchema;
Expand All @@ -44,7 +48,8 @@ public int getStateSize() {
}

@Override
public void addRow(TupleBatch input, int inputRow, MutableTupleBuffer state, int stateRow, final int offset) {
public void addRow(
TupleBatch input, int inputRow, MutableTupleBuffer state, int stateRow, final int offset) {
updateEvaluator.evaluate(input, inputRow, state, stateRow, offset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ public class UserDefinedAggregatorFactory implements AggregatorFactory {
/** Required for Java serialization. */
private static final long serialVersionUID = 1L;
/** logger for this class. */
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(UserDefinedAggregatorFactory.class);
private static final org.slf4j.Logger LOGGER =
org.slf4j.LoggerFactory.getLogger(UserDefinedAggregatorFactory.class);

/** Expressions that initialize the state variables. */
@JsonProperty
private final List<Expression> initializers;
@JsonProperty private final List<Expression> initializers;
/** Expressions that update the state variables as a function of the input and the current tuple. */
@JsonProperty
private final List<Expression> updaters;
@JsonProperty private final List<Expression> updaters;
/** Expressions that emit the final aggregation result from the state. */
@JsonProperty
private final List<Expression> emitters;
@JsonProperty private final List<Expression> emitters;

/** Evaluators that initialize the {@link #state}. */
private transient ScriptEvalInterface initEvaluator;
Expand Down Expand Up @@ -97,7 +95,8 @@ public List<Aggregator> generateInternalAggs(final Schema inputSchema) throws Db
initEvaluator = getEvalScript(initializers, new ExpressionOperatorParameter(inputSchema));

/* Set up the updaters. */
updateEvaluator = getEvalScript(updaters, new ExpressionOperatorParameter(inputSchema, stateSchema));
updateEvaluator =
getEvalScript(updaters, new ExpressionOperatorParameter(inputSchema, stateSchema));

/* Compute the result schema. */
ExpressionOperatorParameter emitParams = new ExpressionOperatorParameter(null, stateSchema);
Expand All @@ -108,7 +107,8 @@ public List<Aggregator> generateInternalAggs(final Schema inputSchema) throws Db
names.add(e.getOutputName());
}
resultSchema = new Schema(types, names);
return ImmutableList.of(new UserDefinedAggregator(initEvaluator, updateEvaluator, resultSchema, stateSchema));
return ImmutableList.of(
new UserDefinedAggregator(initEvaluator, updateEvaluator, resultSchema, stateSchema));
}

/**
Expand All @@ -124,8 +124,9 @@ public List<Aggregator> generateInternalAggs(final Schema inputSchema) throws Db
* @return a compiled object that will run all the expressions and store them into the output.
* @throws DbException if there is an error compiling the expressions.
*/
private ScriptEvalInterface getEvalScript(@Nonnull final List<Expression> expressions,
@Nonnull final ExpressionOperatorParameter param) throws DbException {
private ScriptEvalInterface getEvalScript(
@Nonnull final List<Expression> expressions, @Nonnull final ExpressionOperatorParameter param)
throws DbException {

StringBuilder compute = new StringBuilder();
StringBuilder output = new StringBuilder();
Expand All @@ -138,19 +139,42 @@ private ScriptEvalInterface getEvalScript(@Nonnull final List<Expression> expres
Type type = expr.getOutputType(param);

// type valI = expression;
compute.append(type.toJavaType().getName()).append(" val").append(varCount).append(" = ")
.append(expr.getJavaExpression(param)).append(";\n");
compute
.append(type.toJavaType().getName())
.append(" val")
.append(varCount)
.append(" = ")
.append(expr.getJavaExpression(param))
.append(";\n");

if (param.getStateSchema() == null) {
// state.putType(I, valI);
output.append(Expression.STATE).append(".put").append(type.toJavaObjectType().getSimpleName()).append("(")
.append(varCount).append("+").append(Expression.STATECOLOFFSET).append(", val").append(varCount)
output
.append(Expression.STATE)
.append(".put")
.append(type.toJavaObjectType().getSimpleName())
.append("(")
.append(varCount)
.append("+")
.append(Expression.STATECOLOFFSET)
.append(", val")
.append(varCount)
.append(");\n");
} else {
// state.replaceType(I, stateRow, valI);
output.append(Expression.STATE).append(".replace").append(type.toJavaObjectType().getSimpleName()).append("(")
.append(varCount).append("+").append(Expression.STATECOLOFFSET).append(", ").append(Expression.STATEROW)
.append(", val").append(varCount).append(");\n");
output
.append(Expression.STATE)
.append(".replace")
.append(type.toJavaObjectType().getSimpleName())
.append("(")
.append(varCount)
.append("+")
.append(Expression.STATECOLOFFSET)
.append(", ")
.append(Expression.STATEROW)
.append(", val")
.append(varCount)
.append(");\n");
}
}

Expand All @@ -168,8 +192,17 @@ private ScriptEvalInterface getEvalScript(@Nonnull final List<Expression> expres

try {
if (script.length() > 1) {
return (ScriptEvalInterface) se.createFastEvaluator(script, ScriptEvalInterface.class, new String[] {
Expression.INPUT, Expression.INPUTROW, Expression.STATE, Expression.STATEROW, Expression.STATECOLOFFSET });
return (ScriptEvalInterface)
se.createFastEvaluator(
script,
ScriptEvalInterface.class,
new String[] {
Expression.INPUT,
Expression.INPUTROW,
Expression.STATE,
Expression.STATEROW,
Expression.STATECOLOFFSET
});
} else {
return null;
}
Expand All @@ -185,7 +218,8 @@ public Schema generateSchema(final Schema inputSchema) {
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
for (Expression expr : emitters) {
typesBuilder.add(expr.getOutputType(new ExpressionOperatorParameter(inputSchema, stateSchema)));
typesBuilder.add(
expr.getOutputType(new ExpressionOperatorParameter(inputSchema, stateSchema)));
namesBuilder.add(expr.getOutputName());
}
return Schema.of(typesBuilder.build(), namesBuilder.build());
Expand Down

0 comments on commit e34654b

Please sign in to comment.