Skip to content

Commit

Permalink
Reverting formatting changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobin Baker committed Mar 24, 2015
1 parent c5dcfef commit a12a54d
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 163 deletions.
Expand Up @@ -13,31 +13,27 @@
import edu.washington.escience.myria.operator.Operator;

/**
* A JSON-able wrapper for the expected wire message for an operator. To add a new operator, two
* things need to be done.
* A JSON-able wrapper for the expected wire message for an operator. To add a new operator, two things need to be done.
*
* 1. Create an Encoding class that extends OperatorEncoding.
*
* 2. Add the operator to the list of (alphabetically sorted) JsonSubTypes below.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "opType")
@JsonSubTypes({@Type(name = "Aggregate", value = AggregateEncoding.class),
@Type(name = "Apply", value = ApplyEncoding.class),
@JsonSubTypes({
@Type(name = "Aggregate", value = AggregateEncoding.class), @Type(name = "Apply", value = ApplyEncoding.class),
@Type(name = "BinaryFileScan", value = BinaryFileScanEncoding.class),
@Type(name = "BroadcastConsumer", value = BroadcastConsumerEncoding.class),
@Type(name = "BroadcastProducer", value = BroadcastProducerEncoding.class),
@Type(name = "CollectConsumer", value = CollectConsumerEncoding.class),
@Type(name = "CollectProducer", value = CollectProducerEncoding.class),
@Type(name = "Consumer", value = ConsumerEncoding.class),
@Type(name = "Counter", value = CounterEncoding.class),
@Type(name = "Consumer", value = ConsumerEncoding.class), @Type(name = "Counter", value = CounterEncoding.class),
@Type(name = "DbInsert", value = DbInsertEncoding.class),
@Type(name = "DbQueryScan", value = QueryScanEncoding.class),
@Type(name = "Difference", value = DifferenceEncoding.class),
@Type(name = "DupElim", value = DupElimEncoding.class),
@Type(name = "Empty", value = EmptyRelationEncoding.class),
@Type(name = "DupElim", value = DupElimEncoding.class), @Type(name = "Empty", value = EmptyRelationEncoding.class),
@Type(name = "EOSController", value = EOSControllerEncoding.class),
@Type(name = "FileScan", value = FileScanEncoding.class),
@Type(name = "Filter", value = FilterEncoding.class),
@Type(name = "FileScan", value = FileScanEncoding.class), @Type(name = "Filter", value = FilterEncoding.class),
@Type(name = "HyperShuffleProducer", value = HyperShuffleProducerEncoding.class),
@Type(name = "HyperShuffleConsumer", value = HyperShuffleConsumerEncoding.class),
@Type(name = "IDBController", value = IDBControllerEncoding.class),
Expand All @@ -46,8 +42,7 @@
@Type(name = "Limit", value = LimitEncoding.class),
@Type(name = "LocalMultiwayConsumer", value = LocalMultiwayConsumerEncoding.class),
@Type(name = "LocalMultiwayProducer", value = LocalMultiwayProducerEncoding.class),
@Type(name = "Merge", value = MergeEncoding.class),
@Type(name = "MergeJoin", value = MergeJoinEncoding.class),
@Type(name = "Merge", value = MergeEncoding.class), @Type(name = "MergeJoin", value = MergeJoinEncoding.class),
@Type(name = "MultiGroupByAggregate", value = MultiGroupByAggregateEncoding.class),
@Type(name = "NChiladaFileScan", value = NChiladaFileScanEncoding.class),
@Type(name = "RightHashCountingJoin", value = RightHashCountingJoinEncoding.class),
Expand All @@ -59,16 +54,15 @@
@Type(name = "SingleGroupByAggregate", value = SingleGroupByAggregateEncoding.class),
@Type(name = "Singleton", value = SingletonEncoding.class),
@Type(name = "CrossWithSingleton", value = CrossWithSingletonEncoding.class),
@Type(name = "SinkRoot", value = SinkRootEncoding.class),
@Type(name = "Split", value = SplitEncoding.class),
@Type(name = "SinkRoot", value = SinkRootEncoding.class), @Type(name = "Split", value = SplitEncoding.class),
@Type(name = "StatefulApply", value = StatefulApplyEncoding.class),
@Type(name = "SymmetricHashJoin", value = SymmetricHashJoinEncoding.class),
@Type(name = "SymmetricHashCountingJoin", value = SymmetricHashCountingJoinEncoding.class),
@Type(name = "TableScan", value = TableScanEncoding.class),
@Type(name = "TempInsert", value = TempInsertEncoding.class),
@Type(name = "TempTableScan", value = TempTableScanEncoding.class),
@Type(name = "TipsyFileScan", value = TipsyFileScanEncoding.class),
@Type(name = "UnionAll", value = UnionAllEncoding.class)})
@Type(name = "UnionAll", value = UnionAllEncoding.class) })
public abstract class OperatorEncoding<T extends Operator> extends MyriaApiEncoding {

@Required
Expand Down
50 changes: 23 additions & 27 deletions src/edu/washington/escience/myria/operator/Operator.java
Expand Up @@ -30,8 +30,8 @@
*
* @author slxu
*
* Currently, the operator api design requires that each single operator instance should be
* executed within a single thread.
* Currently, the operator api design requires that each single operator instance should be executed within a
* single thread.
*
* No multi-thread synchronization is considered.
*
Expand Down Expand Up @@ -110,9 +110,8 @@ public ProfilingLogger getProfilingLogger() {
* @return return subquery id.
*/
public SubQueryId getSubQueryId() {
return ((LocalFragmentResourceManager) execEnvVars
.get(MyriaConstants.EXEC_ENV_VAR_FRAGMENT_RESOURCE_MANAGER)).getFragment()
.getLocalSubQuery().getSubQueryId();
return ((LocalFragmentResourceManager) execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_FRAGMENT_RESOURCE_MANAGER))
.getFragment().getLocalSubQuery().getSubQueryId();
}

/**
Expand All @@ -134,8 +133,8 @@ public LocalFragment getFragment() {
if (execEnvVars == null) {
return null;
} else {
return ((LocalFragmentResourceManager) execEnvVars
.get(MyriaConstants.EXEC_ENV_VAR_FRAGMENT_RESOURCE_MANAGER)).getFragment();
return ((LocalFragmentResourceManager) execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_FRAGMENT_RESOURCE_MANAGER))
.getFragment();
}
}

Expand Down Expand Up @@ -170,8 +169,7 @@ protected Set<ProfilingMode> getProfilingMode() {
}
if (profilingMode == null) {
LocalFragmentResourceManager lfrm =
(LocalFragmentResourceManager) execEnvVars
.get(MyriaConstants.EXEC_ENV_VAR_FRAGMENT_RESOURCE_MANAGER);
(LocalFragmentResourceManager) execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_FRAGMENT_RESOURCE_MANAGER);
if (lfrm == null) {
return ImmutableSet.of();
}
Expand All @@ -193,8 +191,7 @@ public final void close() throws DbException {
// Ensures that a future call to next() or nextReady() will fail
// outputBuffer = null;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Operator {} closed, #output TBs: {}, # output tuples: {}", this, numOutputTBs,
numOutputTuples);
LOGGER.debug("Operator {} closed, #output TBs: {}, # output tuples: {}", this, numOutputTBs, numOutputTuples);
}
open = false;
eos = true;
Expand Down Expand Up @@ -256,9 +253,9 @@ public final boolean eoi() {
}

/**
* @return return the children Operators of this operator. If there is only one child, return an
* array of only one element. For join operators, the order of the children is not
* important. But they should be consistent among multiple calls.
* @return return the children Operators of this operator. If there is only one child, return an array of only one
* element. For join operators, the order of the children is not important. But they should be consistent
* among multiple calls.
*/
public abstract Operator[] getChildren();

Expand Down Expand Up @@ -308,8 +305,7 @@ protected void checkEOSAndEOI() {
*
* This method is non-blocking.
*
* If the thread is interrupted during the processing of nextReady, the interrupt status will be
* kept.
* If the thread is interrupted during the processing of nextReady, the interrupt status will be kept.
*
* @throws DbException if any problem
*
Expand Down Expand Up @@ -441,25 +437,25 @@ public final boolean isOpen() {
* @param execEnvVars execution environment variables
* @throws Exception if any error occurs
*/
protected void init(final ImmutableMap<String, Object> execEnvVars) throws Exception {};
protected void init(final ImmutableMap<String, Object> execEnvVars) throws Exception {
};

/**
* Do the clean up, release resources.
*
* @throws Exception if any error occurs
*/
protected void cleanup() throws Exception {};
protected void cleanup() throws Exception {
};

/**
* Generate next output TupleBatch if possible. Return null immediately if currently no output can
* be generated.
* Generate next output TupleBatch if possible. Return null immediately if currently no output can be generated.
*
* Do not block the execution thread in this method, including sleep, wait on locks, etc.
*
* @throws Exception if any error occurs
*
* @return next ready output TupleBatch. null if either EOS or no output TupleBatch can be
* generated currently.
* @return next ready output TupleBatch. null if either EOS or no output TupleBatch can be generated currently.
*/
protected abstract TupleBatch fetchNextReady() throws Exception;

Expand All @@ -476,12 +472,12 @@ protected final void setEOS() {
}

/**
* Attempt to produce the {@link Schema} of the tuples generated by this operator. This function
* must handle cases like <code>null</code> children or arguments, and return <code>null</code> if
* there is not enough information to produce the schema.
* Attempt to produce the {@link Schema} of the tuples generated by this operator. This function must handle cases
* like <code>null</code> children or arguments, and return <code>null</code> if there is not enough information to
* produce the schema.
*
* @return the {@link Schema} of the tuples generated by this operator, or <code>null</code> if
* the operator does not yet have enough information to generate the schema.
* @return the {@link Schema} of the tuples generated by this operator, or <code>null</code> if the operator does not
* yet have enough information to generate the schema.
*/
protected abstract Schema generateSchema();

Expand Down
7 changes: 3 additions & 4 deletions src/edu/washington/escience/myria/operator/Split.java
Expand Up @@ -16,8 +16,8 @@
import edu.washington.escience.myria.storage.TupleUtils;

/**
* Operator which splits a string-valued column on a Java regular expression and duplicates the
* input row with each segment of the split result.
* Operator which splits a string-valued column on a Java regular expression and duplicates the input row with each
* segment of the split result.
*
* E.g., (1, 2, "foo:bar:baz") -> (1, 2, "foo"), (1, 2, "bar"), (1, 2, "baz")
*/
Expand Down Expand Up @@ -54,8 +54,7 @@ public Split(final Operator child, final int splitColumnIndex, @Nonnull final St
}

/**
* Instantiate a Split operator with null child. (Must be set later by setChild() or
* setChildren().)
* Instantiate a Split operator with null child. (Must be set later by setChild() or setChildren().)
*
* @param splitColumnIndex index of string column to split using {@link #regex}
* @param regex regular expression to split value of column at {@link #splitColumnIndex}
Expand Down
Expand Up @@ -32,8 +32,8 @@
import edu.washington.escience.myria.util.JsonAPIUtils;

/**
* System tests of operators using plans submitted via JSON. Tests both the API encoding of the
* operator AND the serializability of the operator.
* System tests of operators using plans submitted via JSON. Tests both the API encoding of the operator AND the
* serializability of the operator.
*/
public class JsonOperatorTests extends SystemTestBase {

Expand Down Expand Up @@ -79,8 +79,8 @@ public void crossWithSingletonTest() throws Exception {
public void splitTest() throws Exception {
File currentDir = new File(".");
DataSource source =
new FileSource(Paths.get(currentDir.getAbsolutePath(), "testdata", "filescan",
"one_col_string_array.txt").toString());
new FileSource(Paths.get(currentDir.getAbsolutePath(), "testdata", "filescan", "one_col_string_array.txt")
.toString());

Schema schema = Schema.of(ImmutableList.of(Type.STRING_TYPE), ImmutableList.of("string_array"));
FileScanEncoding fs = new FileScanEncoding();
Expand Down Expand Up @@ -120,8 +120,8 @@ public void splitTest() throws Exception {
assertEquals(status.message, Status.SUCCESS, status.status);

String data =
JsonAPIUtils.download("localhost", masterDaemonPort, outputRelation.getUserName(),
outputRelation.getProgramName(), outputRelation.getRelationName(), "json");
JsonAPIUtils.download("localhost", masterDaemonPort, outputRelation.getUserName(), outputRelation
.getProgramName(), outputRelation.getRelationName(), "json");
String expectedData =
"[{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"a\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"b\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"c\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"d\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"e\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"f\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"a\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"b\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"c\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"d\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"e\"},{\"string_array\":\"a:b:c:d:e:f\",\"string_array_splits\":\"f\"}]";
assertEquals(data, expectedData);
Expand Down
Expand Up @@ -57,9 +57,7 @@ public class JsonQuerySubmitTest extends SystemTestBase {
public static String emptyIngest() throws JsonProcessingException {
/* Construct the JSON for an Empty Ingest request. */
RelationKey key = RelationKey.of("public", "adhoc", "smallTable");
Schema schema =
Schema.of(ImmutableList.of(Type.STRING_TYPE, Type.LONG_TYPE),
ImmutableList.of("foo", "bar"));
Schema schema = Schema.of(ImmutableList.of(Type.STRING_TYPE, Type.LONG_TYPE), ImmutableList.of("foo", "bar"));
return ingest(key, schema, new EmptySource(), null);
}

Expand Down Expand Up @@ -92,38 +90,30 @@ public void datasetPutTest() throws Exception {
conn.disconnect();

String dataset = "Hello world,3242\n" + "goodbye world,321\n" + "pizza pizza,104";
JsonAPIUtils.replace("localhost", masterDaemonPort, "public", "adhoc", "smallTable", dataset,
"csv");
JsonAPIUtils.replace("localhost", masterDaemonPort, "public", "adhoc", "smallTable", dataset, "csv");

String fetchedDataset =
JsonAPIUtils
.download("localhost", masterDaemonPort, "public", "adhoc", "smallTable", "csv");
JsonAPIUtils.download("localhost", masterDaemonPort, "public", "adhoc", "smallTable", "csv");
assertTrue(fetchedDataset.contains("pizza pizza"));

// Replace the dataset with all new contents
dataset = "mexico\t42\n" + "sri lanka\t12342\n" + "belize\t802304";
JsonAPIUtils.replace("localhost", masterDaemonPort, "public", "adhoc", "smallTable", dataset,
"tsv");
JsonAPIUtils.replace("localhost", masterDaemonPort, "public", "adhoc", "smallTable", dataset, "tsv");

fetchedDataset =
JsonAPIUtils
.download("localhost", masterDaemonPort, "public", "adhoc", "smallTable", "csv");
fetchedDataset = JsonAPIUtils.download("localhost", masterDaemonPort, "public", "adhoc", "smallTable", "csv");
assertFalse(fetchedDataset.contains("pizza pizza"));
assertTrue(fetchedDataset.contains("sri lanka"));
}

@Test
public void ingestTest() throws Exception {
/* good ingestion. */
DataSource source =
new FileSource(Paths.get("testdata", "filescan", "simple_two_col_int.txt").toString());
DataSource source = new FileSource(Paths.get("testdata", "filescan", "simple_two_col_int.txt").toString());
RelationKey key = RelationKey.of("public", "adhoc", "testIngest");
Schema schema =
Schema.of(ImmutableList.of(Type.INT_TYPE, Type.INT_TYPE), ImmutableList.of("x", "y"));
Schema schema = Schema.of(ImmutableList.of(Type.INT_TYPE, Type.INT_TYPE), ImmutableList.of("x", "y"));
Character delimiter = ' ';
HttpURLConnection conn =
JsonAPIUtils.ingestData("localhost", masterDaemonPort,
ingest(key, schema, source, delimiter));
JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(key, schema, source, delimiter));
if (null != conn.getErrorStream()) {
throw new IllegalStateException(getContents(conn));
}
Expand All @@ -133,9 +123,7 @@ public void ingestTest() throws Exception {
/* bad ingestion. */
delimiter = ',';
RelationKey newkey = RelationKey.of("public", "adhoc", "testbadIngest");
conn =
JsonAPIUtils.ingestData("localhost", masterDaemonPort,
ingest(newkey, schema, source, delimiter));
conn = JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(newkey, schema, source, delimiter));
assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_INTERNAL_ERROR);
conn.disconnect();
}
Expand Down Expand Up @@ -164,9 +152,7 @@ public void jsonQuerySubmitTest() throws Exception {
assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode());
conn.disconnect();

String data =
JsonAPIUtils.download("localhost", masterDaemonPort, "jwang", "global_join", "smallTable",
"json");
String data = JsonAPIUtils.download("localhost", masterDaemonPort, "jwang", "global_join", "smallTable", "json");
String subStr = "{\"follower\":46,\"followee\":17}";
assertTrue(data.contains(subStr));

Expand Down Expand Up @@ -225,8 +211,8 @@ public void joinChainResultTest() throws Exception {
throw new IllegalStateException(getContents(conn));
}
assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode());
assertEquals(QueryStatusEncoding.Status.SUCCESS,
server.getQueryManager().getQueryStatus(getDatasetStatus(conn).getQueryId()).status);
assertEquals(QueryStatusEncoding.Status.SUCCESS, server.getQueryManager().getQueryStatus(
getDatasetStatus(conn).getQueryId()).status);
conn.disconnect();

File queryJson = new File("./jsonQueries/multiIDB_jwang/joinChain.json");
Expand All @@ -240,8 +226,7 @@ public void joinChainResultTest() throws Exception {
while (!server.getQueryManager().queryCompleted(queryId)) {
Thread.sleep(100);
}
assertEquals(QueryStatusEncoding.Status.SUCCESS,
server.getQueryManager().getQueryStatus(queryId).status);
assertEquals(QueryStatusEncoding.Status.SUCCESS, server.getQueryManager().getQueryStatus(queryId).status);
}

@Test
Expand All @@ -252,8 +237,8 @@ public void abortedDownloadTest() throws Exception {
final int BYTES_TO_READ = 1024; // read 1 kb

URL url =
new URL(String.format("http://%s:%d/dataset/download_test?num_tb=%d&format=%s",
"localhost", masterDaemonPort, NUM_DUPLICATES, "json"));
new URL(String.format("http://%s:%d/dataset/download_test?num_tb=%d&format=%s", "localhost", masterDaemonPort,
NUM_DUPLICATES, "json"));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setDoOutput(true);
conn.setRequestMethod("GET");
Expand All @@ -277,10 +262,8 @@ public void abortedDownloadTest() throws Exception {
}
long nanoElapse = System.nanoTime() - start;
System.out.println("Download size: " + (numBytesRead * 1.0 / 1024 / 1024 / 1024) + " GB");
System.out
.println("Speed is: "
+ (numBytesRead * 1.0 / 1024 / 1024 / TimeUnit.NANOSECONDS.toSeconds(nanoElapse))
+ " MB/s");
System.out.println("Speed is: " + (numBytesRead * 1.0 / 1024 / 1024 / TimeUnit.NANOSECONDS.toSeconds(nanoElapse))
+ " MB/s");
while (server.getQueryManager().getQueries(1L, null, null, null).get(0).finishTime == null) {
Thread.sleep(100);
}
Expand Down

0 comments on commit a12a54d

Please sign in to comment.