Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
parmitam committed Feb 2, 2017
1 parent 31a68cb commit 96ed435
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
38 changes: 15 additions & 23 deletions src/edu/washington/escience/myria/operator/Apply.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.storage.TupleBuffer;
import edu.washington.escience.myria.storage.TupleUtils;

/**
* Generic apply operator for single- or multivalued expressions.
Expand Down Expand Up @@ -56,6 +57,7 @@ public class Apply extends UnaryOperator {
* AddCounter to the returning tuplebatch.
*/
private Boolean addCounter = false;

/**
* @return the {@link #emitExpressions}
*/
Expand Down Expand Up @@ -137,37 +139,26 @@ protected TupleBatch fetchNextReady() throws DbException, InvocationTargetExcept
if (inputTuples != null) {
// Evaluate expressions on each column and store counts and results.
List<ReadableColumn> resultCountColumns = new ArrayList<>();

List<ReadableColumn> resultColumns = new ArrayList<>();
List<Column<?>> resultColumnsForTB = new ArrayList<>();
//List<Column<?>> resultColumnsForTB = new ArrayList<>();
for (final GenericEvaluator eval : emitEvaluators) {
EvaluatorResult evalResult = eval.evaluateColumn(inputTuples);
resultCountColumns.add(evalResult.getResultCounts());
resultColumns.add(evalResult.getResults());

// Preconditions.checkArgument(
// eval.getExpression().isMultiValued() || (evalResult.getResultColumns().size() == 1),
// "A single-valued expression cannot have more than one result column.");
// resultColumnsForTB.add(evalResult.getResultColumns().)
for (int j = 0; j < evalResult.getResultColumns().size(); j++) {
resultColumnsForTB.add(evalResult.getResultColumns().get(j));
}
}
// This is a zero-copy optimization that appends result columns directly to our output buffer
// if we don't need to take the Cartesian product. (For expressions which are pure column references,
// we elide 2 copies: one in `GenericEvaluator.evaluateColumn()` and one generating the Cartesian product.)

if (onlySingleValuedExpressions()) {
for (int j = 0; j < resultColumnsForTB.size(); ) {
List<Column<?>> tmpListForSingleColumn = new ArrayList<>();
for (int k = 0; k < emitEvaluators.size(); k++) {
tmpListForSingleColumn.add(resultColumnsForTB.get(j + k));
int[] iteratorIndexes = new int[emitEvaluators.size()];
Arrays.fill(iteratorIndexes, 0);
for (int rowIdx = 0; rowIdx < inputTuples.numTuples(); ++rowIdx) {
LOGGER.info("value of row: " + rowIdx);
for (int i = 0; i < iteratorIndexes.length; ++i) {
LOGGER.info("value of column: " + i);
outputBuffer.appendFromColumn(i, resultColumns.get(i), rowIdx);
}
outputBuffer.absorb(
new TupleBatch(
getSchema(), tmpListForSingleColumn, resultColumnsForTB.get(j).size()));
j += emitEvaluators.size();
}
// outputBuffer.absorb(
// new TupleBatch(getSchema(), resultColumnsForTB, inputTuples.numTuples()));

} else {
// Generate the Cartesian product and append to output buffer.
int[] resultCounts = new int[emitEvaluators.size()];
Expand Down Expand Up @@ -293,7 +284,8 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc
evals.add(evaluator);
}
setEmitEvaluators(evals);
outputBuffer = new TupleBatchBuffer(getSchema());

outputBuffer = new TupleBatchBuffer(generateSchema());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import edu.washington.escience.myria.expression.DownloadBlobExpression;
import edu.washington.escience.myria.expression.Expression;
import edu.washington.escience.myria.expression.ExpressionOperator;
import edu.washington.escience.myria.expression.SqrtExpression;
import edu.washington.escience.myria.expression.VariableExpression;
import edu.washington.escience.myria.operator.Apply;
import edu.washington.escience.myria.operator.BatchTupleSource;
Expand All @@ -25,20 +26,34 @@ public class ApplyDownloadBlobTest {

@Test
public void ApplyTest() throws DbException {
final Schema schema = Schema.ofFields("S3_URI", Type.STRING_TYPE);
final Schema expectedResultSchema = Schema.ofFields("blobs", Type.BLOB_TYPE);
final Schema schema =
new Schema(
ImmutableList.of(Type.STRING_TYPE, Type.LONG_TYPE), ImmutableList.of("blobs", "b"));
final Schema expectedResultSchema =
new Schema(
ImmutableList.of(Type.BLOB_TYPE, Type.DOUBLE_TYPE), ImmutableList.of("blobs", "sqrt"));

//Schema.ofFields("blobs", Type.BLOB_TYPE);
final TupleBatchBuffer input = new TupleBatchBuffer(schema);

input.putString(0, "s3://mribmarktmp/100307/1.p".toString());
input.putString(0, "s3://mribmarktmp/100408/1.p".toString());
input.putString(0, "s3://imagedb-data/dmridatasample.csv".toString());
input.putLong(1, 2);
input.putString(0, "s3://imagedb-data/dmridatasample.csv".toString());
input.putLong(1, 2);

ImmutableList.Builder<Expression> Expressions = ImmutableList.builder();
ExpressionOperator filename = new VariableExpression(0);
;

ExpressionOperator db = new DownloadBlobExpression(filename);
Expression expr = new Expression("blobs", db);

Expressions.add(expr);
ExpressionOperator varb = new VariableExpression(1);
ExpressionOperator squareRoot = new SqrtExpression(varb);

Expression expr2 = new Expression("sqrt", squareRoot);
Expressions.add(expr2);

Apply apply = new Apply(new BatchTupleSource(input), Expressions.build(), false);
apply.open(TestEnvVars.get());
Expand Down

0 comments on commit 96ed435

Please sign in to comment.