Skip to content

Commit

Permalink
Merge branch 'master' into slxu-kill-query-at-aborted-downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Sep 15, 2014
2 parents 7edcfa9 + c085de9 commit e612a10
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package edu.washington.escience.myria.api.encoding;

import edu.washington.escience.myria.api.MyriaApiException;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.operator.CrossWithSingleton;

public class CrossWithSingletonEncoding extends BinaryOperatorEncoding<CrossWithSingleton> {

@Override
public CrossWithSingleton construct(ConstructArgs args) throws MyriaApiException {
return new CrossWithSingleton(null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
@Type(name = "ShuffleProducer", value = ShuffleProducerEncoding.class),
@Type(name = "SingleGroupByAggregate", value = SingleGroupByAggregateEncoding.class),
@Type(name = "Singleton", value = SingletonEncoding.class),
@Type(name = "CrossWithSingleton", value = CrossWithSingletonEncoding.class),
@Type(name = "SinkRoot", value = SinkRootEncoding.class),
@Type(name = "StatefulApply", value = StatefulApplyEncoding.class),
@Type(name = "SymmetricHashJoin", value = SymmetricHashJoinEncoding.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.washington.escience.myria.api.encoding;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -25,6 +26,18 @@ public final class PlanFragmentEncoding extends MyriaApiEncoding {
/** List of required fields. */
public static final List<String> requiredFields = ImmutableList.of("operators");

/**
* Construct a PlanFragmentEncoding wrapping the given operators
*
* @param operators
*/
@SafeVarargs
public static PlanFragmentEncoding of(final OperatorEncoding<? extends Operator>... operators) {
PlanFragmentEncoding ret = new PlanFragmentEncoding();
ret.operators = Arrays.asList(operators);
return ret;
}

public void setFragmentIndex(int fragmentIndex) {
this.fragmentIndex = fragmentIndex;
}
Expand Down
96 changes: 96 additions & 0 deletions src/edu/washington/escience/myria/operator/CrossWithSingleton.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package edu.washington.escience.myria.operator;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.column.Column;
import edu.washington.escience.myria.column.ConstantValueColumn;
import edu.washington.escience.myria.storage.TupleBatch;

/**
* Given a singleton right child, cross the left child with it in a way that minimizes state and does not construct new
* tuples.
*/
public class CrossWithSingleton extends BinaryOperator {

/** Required for Java serialization. */
private static final long serialVersionUID = 1L;
/** The singleton tuple from the right child. */
private TupleBatch rightTuple;

/**
* Instantiate a new operator to cross all tuples in the left child with the singleton tuple from the right child.
*
* @param left the left child, which may have any number of tuples.
* @param right the right child, which may only have one tuple.
*/
public CrossWithSingleton(final Operator left, final Operator right) {
super(left, right);
rightTuple = null;
}

@Override
protected TupleBatch fetchNextReady() throws Exception {

/* Before we can do anything, get the singleton tuple from the right child, and ensure that it is a singleton. */
Operator right = getRight();
while (!right.eos()) {
TupleBatch tb = right.nextReady();
if (tb == null) {
/* The right child may have realized it's EOS now. If so, we must move onto left child to avoid livelock. */
if (right.eos()) {
break;
}
return null;
}
Preconditions.checkState(rightTuple == null,
"Expecting a singleton right child, but received a batch with %s additional tuples", tb.numTuples());
Preconditions.checkState(tb.numTuples() == 1,
"Expecting a singleton right child, instead received a batch with %s tuples", tb.numTuples());
rightTuple = tb;
}

/* Verify that the right child did produce a tuple. */
Preconditions.checkState(rightTuple != null,
"Expecting a singleton right child, but right child is EOS and no tuples received.");

Operator left = getLeft();
Schema schema = getSchema();
while (!left.eos()) {
TupleBatch tb = left.nextReady();
if (tb == null) {
break;
}

ImmutableList.Builder<Column<?>> columns = ImmutableList.builder();
for (Column<?> c : tb.getDataColumns()) {
columns.add(c);
}
for (Column<?> c : rightTuple.getDataColumns()) {
columns.add(new ConstantValueColumn(c.getObject(0), c.getType(), tb.numTuples()));
}
return new TupleBatch(schema, columns.build());
}

return null;
}

@Override
protected Schema generateSchema() {
Operator left = getLeft();
Operator right = getRight();
if (left == null || right == null) {
return null;
}

Schema leftSchema = left.getSchema();
Schema rightSchema = right.getSchema();
if (leftSchema == null || rightSchema == null) {
return null;
}

return Schema.merge(leftSchema, rightSchema);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package edu.washington.escience.myria.systemtest;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.net.HttpURLConnection;

import org.apache.commons.httpclient.HttpStatus;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.ImmutableList;

import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.api.MyriaJsonMapperProvider;
import edu.washington.escience.myria.api.encoding.CrossWithSingletonEncoding;
import edu.washington.escience.myria.api.encoding.DbInsertEncoding;
import edu.washington.escience.myria.api.encoding.EmptyRelationEncoding;
import edu.washington.escience.myria.api.encoding.PlanFragmentEncoding;
import edu.washington.escience.myria.api.encoding.QueryEncoding;
import edu.washington.escience.myria.api.encoding.QueryStatusEncoding;
import edu.washington.escience.myria.api.encoding.QueryStatusEncoding.Status;
import edu.washington.escience.myria.api.encoding.SingletonEncoding;
import edu.washington.escience.myria.api.encoding.plan.SubQueryEncoding;
import edu.washington.escience.myria.util.JsonAPIUtils;

/**
* System tests of operators using plans submitted via JSON. Tests both the API encoding of the operator AND the
* serializability of the operator.
*/
public class JsonOperatorTests extends SystemTestBase {

static Logger LOGGER = LoggerFactory.getLogger(JsonOperatorTests.class);

private HttpURLConnection submitQuery(final QueryEncoding query) throws IOException {
ObjectWriter writer = MyriaJsonMapperProvider.getWriter();
String queryString = writer.writeValueAsString(query);
HttpURLConnection conn = JsonAPIUtils.submitQuery("localhost", masterDaemonPort, queryString);
if (null != conn.getErrorStream()) {
throw new IllegalStateException(getContents(conn));
}
return conn;
}

@Test
public void crossWithSingletonTest() throws Exception {
SingletonEncoding singleton = new SingletonEncoding();
EmptyRelationEncoding empty = new EmptyRelationEncoding();
CrossWithSingletonEncoding cross = new CrossWithSingletonEncoding();
DbInsertEncoding insert = new DbInsertEncoding();

RelationKey outputRelation = RelationKey.of("test", "crosswithsingleton", "empty");
singleton.opId = 0;
empty.opId = 1;
empty.schema = Schema.ofFields("x", Type.LONG_TYPE);
cross.opId = 2;
cross.argChild1 = empty.opId;
cross.argChild2 = singleton.opId;
insert.opId = 3;
insert.argChild = cross.opId;
insert.relationKey = outputRelation;
PlanFragmentEncoding frag = PlanFragmentEncoding.of(singleton, empty, cross, insert);

QueryEncoding query = new QueryEncoding();
query.plan = new SubQueryEncoding(ImmutableList.of(frag));
query.logicalRa = "CrossWithSingleton test";
query.rawQuery = query.logicalRa;

HttpURLConnection conn = submitQuery(query);
assertEquals(HttpStatus.SC_ACCEPTED, conn.getResponseCode());
long queryId = getQueryStatus(conn).queryId;
conn.disconnect();
while (!server.queryCompleted(queryId)) {
Thread.sleep(1);
}
QueryStatusEncoding status = server.getQueryStatus(queryId);
assertEquals(Status.SUCCESS, status.status);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
*
*/
package edu.washington.escience.myria.operator;

import static org.junit.Assert.assertEquals;

import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.util.TestEnvVars;

/**
* Tests that crossing with a singleton works and catches the major error cases.
*/
public class CrossWithSingletonTest {
private final int NUM_TUPLES = TupleBatch.BATCH_SIZE * 3 + 3;
private TupleSource dataSource = null;
private TupleBatch singleton = null;

@Before
public void setUp() {
Schema dataSchema = Schema.ofFields("val1", Type.LONG_TYPE, "val2", Type.STRING_TYPE);
TupleBatchBuffer tbb = new TupleBatchBuffer(dataSchema);
for (int i = 0; i < NUM_TUPLES; ++i) {
tbb.putLong(0, i);
tbb.putString(1, "" + i);
}
dataSource = new TupleSource(tbb.getAll());

tbb = new TupleBatchBuffer(Schema.ofFields(Type.DATETIME_TYPE, Type.BOOLEAN_TYPE));
tbb.putDateTime(0, DateTime.now());
tbb.putBoolean(1, false);
singleton = tbb.popAny();
}

@SuppressWarnings("deprecation")
private void verifyMatch(final CrossWithSingleton cross) throws DbException {
cross.open(TestEnvVars.get());

int numTuples = 0;
while (!cross.eos()) {
TupleBatch next = cross.nextReady();
if (next == null) {
continue;
}
for (int j = 0; j < next.numTuples(); ++j) {
assertEquals(j + numTuples, next.getLong(0, j));
assertEquals("" + (j + numTuples), next.getString(1, j));
for (int k = 0; k < singleton.numColumns(); ++k) {
assertEquals(next.getObject(2 + k, j), singleton.getObject(k, 0));
}
}
numTuples += next.numTuples();
}
}

@Test
public void testWithSingleton() throws DbException {
TupleSource singletonSource = new TupleSource(singleton);
CrossWithSingleton cross = new CrossWithSingleton(dataSource, singletonSource);
assertEquals(Schema.merge(dataSource.getSchema(), singleton.getSchema()), cross.getSchema());
verifyMatch(cross);
}

@Test(expected = IllegalStateException.class)
public void testWithSingletonWrongSide() throws DbException {
TupleSource singletonSource = new TupleSource(singleton);
CrossWithSingleton cross = new CrossWithSingleton(singletonSource, dataSource);
assertEquals(Schema.merge(singleton.getSchema(), dataSource.getSchema()), cross.getSchema());
verifyMatch(cross);
}

@Test(expected = IllegalStateException.class)
public void testWithTwoSingletons() throws DbException {
TupleBatchBuffer tbb = new TupleBatchBuffer(singleton.getSchema());
tbb.appendTB(singleton);
tbb.appendTB(singleton);
TupleSource source = new TupleSource(tbb.getAll());
CrossWithSingleton cross = new CrossWithSingleton(dataSource, source);
assertEquals(Schema.merge(dataSource.getSchema(), singleton.getSchema()), cross.getSchema());
verifyMatch(cross);
}

@Test(expected = IllegalStateException.class)
public void testWithEmptyRelation() throws DbException {
CrossWithSingleton cross = new CrossWithSingleton(dataSource, EmptyRelation.of(singleton.getSchema()));
verifyMatch(cross);
}
}

0 comments on commit e612a10

Please sign in to comment.