Skip to content

Commit

Permalink
extend DatasetStatus to return howPartitioned, add a test to ingest data
Browse files Browse the repository at this point in the history
using SingleFieldHashPartitionFunction.
  • Loading branch information
jingjingwang committed Mar 10, 2015
1 parent 1ec89c7 commit b820339
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.operator.network.partition.PartitionFunction;

/**
* Metadata about a dataset that has been loaded into the system.
Expand All @@ -21,16 +22,19 @@ public class DatasetStatus {
* @param numTuples The number of tuples in the dataset.
* @param queryId The query that created this dataset.
* @param created When this dataset was created, in ISO8601 format.
* @param howPartitioned How this dataset was partitioned.
*/
@JsonCreator
public DatasetStatus(@JsonProperty("relationKey") final RelationKey relationKey,
@JsonProperty("schema") final Schema schema, @JsonProperty("numTuples") final long numTuples,
@JsonProperty("queryId") final long queryId, @JsonProperty("created") final String created) {
@JsonProperty("queryId") final long queryId, @JsonProperty("created") final String created,
@JsonProperty("howPartitioned") final PartitionFunction howPartitioned) {
this.relationKey = relationKey;
this.schema = schema;
this.numTuples = numTuples;
this.queryId = queryId;
this.created = created;
this.howPartitioned = howPartitioned;
}

/** The {@link RelationKey} identifying the dataset. */
Expand All @@ -48,6 +52,9 @@ public DatasetStatus(@JsonProperty("relationKey") final RelationKey relationKey,
/** When this dataset was created, in ISO8601 format. */
@JsonProperty
private final String created;
/** How this dataset was partitioned. */
@JsonProperty
private final PartitionFunction howPartitioned;
/** The URI of this resource. */
@JsonProperty
public URI uri;
Expand Down Expand Up @@ -87,6 +94,13 @@ public String getCreated() {
return created;
}

/**
* @return the howPartitioned.
*/
public PartitionFunction getHowPartitioned() {
return howPartitioned;
}

/**
* Set the URI of this dataset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,8 +1079,16 @@ private static List<DatasetStatus> datasetStatusListHelper(final SQLiteStatement
long numTuples = statement.columnLong(3);
long queryId = statement.columnLong(4);
String created = statement.columnString(5);
PartitionFunction howPartitioned;
try {
howPartitioned =
MyriaJsonMapperProvider.getMapper().readValue(statement.columnString(6), PartitionFunction.class);
} catch (final IOException e) {
LOGGER.debug("Error deserializing howPartitioned for dataset #{}", relationKey.toString(), e);
howPartitioned = new UnknownPartitionFunction(null);
}
result.add(new DatasetStatus(relationKey, getDatasetSchema(connection, relationKey), numTuples, queryId,
created));
created, howPartitioned));
}
statement.dispose();
return result.build();
Expand Down Expand Up @@ -1572,7 +1580,7 @@ protected Object job(final SQLiteConnection sqliteConnection) throws CatalogExce
* @param pf the partition function.
* @throws CatalogException if there is an error in the catalog.
*/
public void updatePartitionFunction(final RelationKey key, final PartitionFunction pf) throws CatalogException {
public void updateHowPartitioned(final RelationKey key, final PartitionFunction pf) throws CatalogException {
if (isClosed) {
throw new CatalogException("Catalog is closed.");
}
Expand Down Expand Up @@ -1627,7 +1635,7 @@ protected DatasetStatus job(final SQLiteConnection sqliteConnection) throws Cata
try {
SQLiteStatement statement =
sqliteConnection
.prepare("SELECT num_tuples, query_id, finish_time FROM relations JOIN queries USING (query_id) WHERE user_name=? AND program_name=? AND relation_name=?");
.prepare("SELECT num_tuples, query_id, finish_time, how_partitioned FROM relations JOIN queries USING (query_id) JOIN stored_relations USING (user_name,program_name,relation_name) WHERE user_name=? AND program_name=? AND relation_name=?");
statement.bind(1, relationKey.getUserName());
statement.bind(2, relationKey.getProgramName());
statement.bind(3, relationKey.getRelationName());
Expand All @@ -1638,8 +1646,16 @@ protected DatasetStatus job(final SQLiteConnection sqliteConnection) throws Cata
long numTuples = statement.columnLong(0);
long queryId = statement.columnLong(1);
String created = statement.columnString(2);
PartitionFunction howPartitioned;
try {
howPartitioned =
MyriaJsonMapperProvider.getMapper().readValue(statement.columnString(3), PartitionFunction.class);
} catch (final IOException e) {
LOGGER.debug("Error deserializing howPartitioned for dataset #{}", relationKey.toString(), e);
howPartitioned = new UnknownPartitionFunction(null);
}
statement.dispose();
return new DatasetStatus(relationKey, schema, numTuples, queryId, created);
return new DatasetStatus(relationKey, schema, numTuples, queryId, created, howPartitioned);
} catch (final SQLiteException e) {
throw new CatalogException(e);
}
Expand Down
22 changes: 22 additions & 0 deletions src/edu/washington/escience/myria/util/JsonAPIUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;

import edu.washington.escience.myria.RelationKey;

/**
* Util methods for handling of Json API stuff.
* */
Expand Down Expand Up @@ -179,4 +181,24 @@ public static HttpURLConnection ingestData(final String masterHostname, final in
conn.getResponseCode();
return conn;
}

/**
* @param masterHostname master hostname
* @param apiPort rest api port
* @return a HTTPURLConnection instance of retrieving responses.
* @throws IOException if IO errors
* */
public HttpURLConnection getDatasetStatus(final String masterHostname, final int apiPort, final RelationKey key)
throws IOException {
URL url =
new URL(String.format("http://%s:%d/dataset/user-%s/program-%s/relation-%s", masterHostname, apiPort, key
.getUserName(), key.getProgramName(), key.getRelationName()));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setDoOutput(true);
conn.setRequestMethod("GET");
conn.connect();
conn.getResponseCode();
return conn;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.api.MyriaJsonMapperProvider;
import edu.washington.escience.myria.api.encoding.DatasetEncoding;
import edu.washington.escience.myria.api.encoding.DatasetStatus;
import edu.washington.escience.myria.api.encoding.EmptyRelationEncoding;
import edu.washington.escience.myria.api.encoding.LocalMultiwayConsumerEncoding;
import edu.washington.escience.myria.api.encoding.LocalMultiwayProducerEncoding;
Expand All @@ -45,6 +46,9 @@
import edu.washington.escience.myria.io.DataSource;
import edu.washington.escience.myria.io.EmptySource;
import edu.washington.escience.myria.io.FileSource;
import edu.washington.escience.myria.operator.network.partition.PartitionFunction;
import edu.washington.escience.myria.operator.network.partition.RoundRobinPartitionFunction;
import edu.washington.escience.myria.operator.network.partition.SingleFieldHashPartitionFunction;
import edu.washington.escience.myria.parallel.SocketInfo;
import edu.washington.escience.myria.util.JsonAPIUtils;
import edu.washington.escience.myria.util.TestUtils;
Expand Down Expand Up @@ -73,18 +77,21 @@ public static String emptyIngest() throws JsonProcessingException {
/* Construct the JSON for an Empty Ingest request. */
RelationKey key = RelationKey.of("public", "adhoc", "smallTable");
Schema schema = Schema.of(ImmutableList.of(Type.STRING_TYPE, Type.LONG_TYPE), ImmutableList.of("foo", "bar"));
return ingest(key, schema, new EmptySource(), null);
return ingest(key, schema, new EmptySource(), null, null);
}

public static String ingest(final RelationKey key, final Schema schema, final DataSource source,
@Nullable final Character delimiter) throws JsonProcessingException {
@Nullable final Character delimiter, @Nullable final PartitionFunction pf) throws JsonProcessingException {
DatasetEncoding ingest = new DatasetEncoding();
ingest.relationKey = key;
ingest.schema = schema;
ingest.source = source;
if (delimiter != null) {
ingest.delimiter = delimiter;
}
if (pf != null) {
ingest.howPartitioned = pf;
}
return MyriaJsonMapperProvider.getWriter().writeValueAsString(ingest);
}

Expand Down Expand Up @@ -140,19 +147,38 @@ public void ingestTest() throws Exception {
Schema schema = Schema.of(ImmutableList.of(Type.INT_TYPE, Type.INT_TYPE), ImmutableList.of("x", "y"));
Character delimiter = ' ';
HttpURLConnection conn =
JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(key, schema, source, delimiter));
JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(key, schema, source, delimiter, null));
if (null != conn.getErrorStream()) {
throw new IllegalStateException(getContents(conn));
}
assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_CREATED);
assertEquals(getDatasetStatus(conn).getNumTuples(), 7);
assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode());
DatasetStatus status = getDatasetStatus(conn);
assertEquals(7, status.getNumTuples());
/* not specified, should be RoundRobin. */
assertTrue(status.getHowPartitioned() instanceof RoundRobinPartitionFunction);
assertEquals(2, status.getHowPartitioned().numPartition());
conn.disconnect();
/* bad ingestion. */
delimiter = ',';
RelationKey newkey = RelationKey.of("public", "adhoc", "testbadIngest");
conn = JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(newkey, schema, source, delimiter));
conn = JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(newkey, schema, source, delimiter, null));
assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_INTERNAL_ERROR);
conn.disconnect();

/* hash-partitioned ingest. */
delimiter = ' ';
RelationKey keyP = RelationKey.of("public", "adhoc", "testIngestHashPartitioned");
conn =
JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(keyP, schema, source, delimiter,
new SingleFieldHashPartitionFunction(null, 1)));
assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_CREATED);
status = getDatasetStatus(conn);
PartitionFunction pf = status.getHowPartitioned();
/* specified, should be SingleField with index = 1. */
assertEquals(2, pf.numPartition());
assertTrue(pf instanceof SingleFieldHashPartitionFunction);
assertEquals(1, ((SingleFieldHashPartitionFunction) pf).getIndex());
conn.disconnect();
}

@Test
Expand Down

0 comments on commit b820339

Please sign in to comment.