Skip to content

Commit

Permalink
Merge 716fbfd into 2df556e
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang committed Feb 22, 2017
2 parents 2df556e + 716fbfd commit 850fd46
Show file tree
Hide file tree
Showing 76 changed files with 2,232 additions and 4,214 deletions.
2 changes: 1 addition & 1 deletion jsonQueries/globalJoin_jwang/ingest_smallTable.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"dataType" : "Bytes",
"bytes" : "MSA0NAoyIDUxCjQ2IDE3CjYzIDM0CjU0IDYzCjIwIDk0CjEyIDY2Cjc5IDQyCjEgMTAKODggMjAKMTAgNDIKNTYgNDQKMTAgMTIKNzkgMzcKMzAgNjYKODMgMTMKMzEgMQozMSA5OQo4MSAzNQo3MCAyNgo0IDUxCjE1IDY2Cjg4IDY2CjI3IDE3CjMxIDgyCjc2IDc0Cjk2IDY1CjYyIDIyCjkwIDU5CjEzIDI5CjQ0IDQyCjM1IDYyCjk5IDE1Cjk1IDc3CjEwIDcwCjI0IDMwCjgyIDY0CjQ0IDQ4CjY1IDc0CjE4IDg1CjQ5IDE0Cjc1IDk5CjU3IDk1CjQyIDk2CjQxIDY5CjE0IDY1CjE2IDExCjcyIDIyCjc2IDgyCjY2IDY4Cjc0IDg4CjQ3IDYKNTYgMAo2IDkKNTAgODAKNiAzMQo3NiA0NAo0OSAzMAo0NyAxNgo4MiA3NwoxIDgxCjIwIDQwCjE4IDU2CjI4IDkyCjU4IDE2CjgyIDEzCjcxIDc1CjYwIDQxCjIzIDkKMiA1MQo4NiA5NQo4IDgxCjk3IDc5CjE4IDQxCjg5IDQ4CjU5IDUxCjIxIDg2CjYzIDc2CjQyIDIyCjczIDM4CjI0IDE3CjggMzQKNzggMTUKOTMgMTUKMzEgMjIKNzMgMjkKOTMgMTYKODcgOTUKNSA1Nwo0MiA4OAoxNSA4NwozOCA5NwowIDc2CjU3IDUxCjMwIDE5CjUyIDI4CjQyIDE0CjczIDI4CjM3IDY5CjQzIDQ3Cg=="
},
"partitionFunction": {
"distributeFunction": {
"type": "Hash",
"indexes": [0]
},
Expand Down
9 changes: 4 additions & 5 deletions jsonQueries/pythonUDF/ingest_blob.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
"columnTypes" : ["LONG_TYPE", "LONG_TYPE","LONG_TYPE","STRING_TYPE"],
"columnNames" : ["id", "subjid","imgid" ,"image"]
},
"s3Source" : {
"dataType" : "S3",
"s3Uri" : "s3://imagedb-data/dmridatasample.csv"
"source" : {
"dataType" : "URI",
"uri" : "https://s3-us-west-2.amazonaws.com/imagedb-data/dmridatasample.csv"
},
"delimiter": ",",
"workers": [1,2]
"delimiter": ","
}
2 changes: 1 addition & 1 deletion jsonQueries/pythonUDF/udfAgg.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"opId":3
},
{
"opType":"MultiGroupByAggregate",
"opType":"Aggregate",
"argGroupFields":[1,2],
"aggregators":[
{
Expand Down
4 changes: 2 additions & 2 deletions jsonQueries/pythonUDF/udfAggSingleColumn.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@
}
],
"argChild":2,
"argGroupField":1,
"opType":"SingleGroupByAggregate",
"argGroupFields":[1],
"opType":"Aggregate",
"opId":3
},
{
Expand Down
16 changes: 8 additions & 8 deletions python/MyriaPythonWorker/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ def write_with_length(self, obj, stream):

def read_item(self, stream, itemType, length):
obj = None
if(itemType == DataType.INT):
if itemType == DataType.INT:
obj = read_int(stream)
elif(itemType == DataType.LONG):
elif itemType == DataType.LONG:
obj = read_long(stream)
elif(itemType == DataType.FLOAT):
elif itemType == DataType.FLOAT:
obj = read_float(stream)
elif(itemType == DataType.DOUBLE):
elif itemType == DataType.DOUBLE:
obj = read_double(stream)
elif(itemType == DataType.BLOB):
elif itemType == DataType.BLOB:
obj = self.loads(stream.read(length))
return obj

Expand All @@ -122,10 +122,10 @@ def read_tuple(self, stream, tuplesize):
# Second read the length
length = read_int(stream)

if (length == SpecialLengths.NULL):
if length == SpecialLengths.NULL or length == 0:
datalist.append(0)
# length is >0, read the item now
elif (length > 0):
# length is > 0, read the item now
elif length > 0:
obj = self.read_item(stream, elementType, length)
datalist.append(obj)

Expand Down
2 changes: 1 addition & 1 deletion python/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Myria Python Worker.

Online documentation for [Myria](http://myria.cs.washington.edu/)
Myria Python worker is used for executing python UDFs.
Myria Python worker is used for executing python UDFs.
18 changes: 9 additions & 9 deletions src/edu/washington/escience/myria/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ public Response getDataset(
@PathParam("programName") final String programName,
@PathParam("relationName") final String relationName)
throws DbException {
DatasetStatus status =
server.getDatasetStatus(RelationKey.of(userName, programName, relationName));
RelationKey relationKey = RelationKey.of(userName, programName, relationName);
DatasetStatus status = server.getDatasetStatus(relationKey);
if (status == null) {
/* Not found, throw a 404 (Not Found) */
throw new MyriaApiException(Status.NOT_FOUND, "That dataset was not found");
throw new MyriaApiException(Status.NOT_FOUND, "Dataset " + relationKey + " was not found");
}
status.setUri(getCanonicalResourcePath(uriInfo, status.getRelationKey()));
/* Yay, worked! */
Expand Down Expand Up @@ -356,13 +356,12 @@ public Response deleteDataset(
@PathParam("programName") final String programName,
@PathParam("relationName") final String relationName)
throws DbException {
DatasetStatus status =
server.getDatasetStatus(RelationKey.of(userName, programName, relationName));
RelationKey relationKey = RelationKey.of(userName, programName, relationName);
DatasetStatus status = server.getDatasetStatus(relationKey);
if (status == null) {
/* Dataset not found, throw a 404 (Not Found) */
throw new MyriaApiException(Status.NOT_FOUND, "That dataset was not found");
throw new MyriaApiException(Status.NOT_FOUND, "Dataset " + relationKey + " was not found");
}
RelationKey relationKey = status.getRelationKey();
// delete command
try {
server.deleteDataset(relationKey);
Expand Down Expand Up @@ -566,7 +565,7 @@ private Response doIngest(
/* Check overwriting existing dataset. */
try {
if (!MoreObjects.firstNonNull(overwrite, false) && server.getSchema(relationKey) != null) {
throw new MyriaApiException(Status.CONFLICT, "That dataset already exists.");
throw new MyriaApiException(Status.CONFLICT, "Dataset " + relationKey + " already exists.");
}
} catch (CatalogException e) {
throw new DbException(e);
Expand Down Expand Up @@ -642,7 +641,8 @@ public Response addDatasetToCatalog(final DatasetEncoding dataset, @Context fina
if (!MoreObjects.firstNonNull(dataset.overwrite, Boolean.FALSE)
&& server.getSchema(dataset.relationKey) != null) {
/* Found, throw a 409 (Conflict) */
throw new MyriaApiException(Status.CONFLICT, "That dataset already exists.");
throw new MyriaApiException(
Status.CONFLICT, "Dataset " + dataset.relationKey + " already exists.");
}
} catch (CatalogException e) {
throw new DbException(e);
Expand Down
6 changes: 2 additions & 4 deletions src/edu/washington/escience/myria/api/FunctionResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
*/
/**
* This is the class that handles API calls to create or fetch functions.
*
*/
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MyriaApiConstants.JSON_UTF_8)
Expand All @@ -51,7 +50,6 @@ public class FunctionResource {
protected static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(FunctionResource.class);

/**
*
* @return a list of function, names only.
* @throws DbException if there is an error accessing the Catalog.
*/
Expand All @@ -74,15 +72,15 @@ public Response createFunction(final CreateFunctionEncoding encoding) throws DbE
encoding.binary,
encoding.workers);
} catch (Exception e) {
throw new DbException();
throw new DbException(e);
}
/* Build the response to return the queryId */
ResponseBuilder response = Response.ok();
return response.entity(functionCreationResponse).build();
}

/**
* @param name function name
* @param name function name
* @return details of a registered function.
* @throws DbException if there is an error accessing the Catalog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

/** JSON wrapper for Aggregate. */
public class AggregateEncoding extends UnaryOperatorEncoding<Aggregate> {
/** aggregators. */
@Required public int[] argGroupFields;
@Required public AggregatorFactory[] aggregators;

@Override
public Aggregate construct(ConstructArgs args) {
return new Aggregate(null, aggregators);
return new Aggregate(null, argGroupFields, aggregators);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
@Type(name = "LocalMultiwayProducer", value = LocalMultiwayProducerEncoding.class),
@Type(name = "Merge", value = MergeEncoding.class),
@Type(name = "MergeJoin", value = MergeJoinEncoding.class),
@Type(name = "MultiGroupByAggregate", value = MultiGroupByAggregateEncoding.class),
@Type(name = "NChiladaFileScan", value = NChiladaFileScanEncoding.class),
@Type(name = "RightHashCountingJoin", value = RightHashCountingJoinEncoding.class),
@Type(name = "RightHashJoin", value = RightHashJoinEncoding.class),
Expand All @@ -62,7 +61,6 @@
@Type(name = "SetGlobal", value = SetGlobalEncoding.class),
@Type(name = "ShuffleConsumer", value = GenericShuffleConsumerEncoding.class),
@Type(name = "ShuffleProducer", value = GenericShuffleProducerEncoding.class),
@Type(name = "SingleGroupByAggregate", value = SingleGroupByAggregateEncoding.class),
@Type(name = "Singleton", value = SingletonEncoding.class),
@Type(name = "StatefulApply", value = StatefulApplyEncoding.class),
@Type(name = "SymmetricHashJoin", value = SymmetricHashJoinEncoding.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import edu.washington.escience.myria.operator.Operator;
import edu.washington.escience.myria.operator.RootOperator;
import edu.washington.escience.myria.operator.UpdateCatalog;
import edu.washington.escience.myria.operator.agg.MultiGroupByAggregate;
import edu.washington.escience.myria.operator.agg.Aggregate;
import edu.washington.escience.myria.operator.agg.PrimitiveAggregator.AggregationOp;
import edu.washington.escience.myria.operator.agg.SingleColumnAggregatorFactory;
import edu.washington.escience.myria.operator.agg.PrimitiveAggregatorFactory;
import edu.washington.escience.myria.operator.network.CollectProducer;
import edu.washington.escience.myria.operator.network.Consumer;
import edu.washington.escience.myria.operator.network.EOSController;
Expand Down Expand Up @@ -663,9 +663,9 @@ public static SubQuery getRelationTupleUpdateSubQuery(

/* Master plan: collect, sum, insert the updates. */
Consumer consumer = new Consumer(schema, collectId, workerPlans.keySet());
MultiGroupByAggregate aggCounts =
new MultiGroupByAggregate(
consumer, new int[] {0, 1, 2}, new SingleColumnAggregatorFactory(3, AggregationOp.SUM));
Aggregate aggCounts =
new Aggregate(
consumer, new int[] {0, 1, 2}, new PrimitiveAggregatorFactory(3, AggregationOp.SUM));
UpdateCatalog catalog = new UpdateCatalog(aggCounts, server);
SubQueryPlan masterPlan = new SubQueryPlan(catalog);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,27 @@

public class SymmetricHashJoinEncoding extends BinaryOperatorEncoding<SymmetricHashJoin> {

public List<String> argColumnNames;
@Required public int[] argColumns1;
@Required public int[] argColumns2;
@Required public int[] argSelect1;
@Required public int[] argSelect2;
public List<String> argColumnNames;
public boolean argSetSemanticsLeft = false;
public boolean argSetSemanticsRight = false;
public JoinPullOrder argOrder = JoinPullOrder.ALTERNATE;

@Override
public SymmetricHashJoin construct(final ConstructArgs args) {
SymmetricHashJoin join =
new SymmetricHashJoin(
argColumnNames,
null,
null,
argColumns1,
argColumns2,
argSelect1,
argSelect2,
argSetSemanticsLeft,
argSetSemanticsRight);
join.setPullOrder(argOrder);
return join;
return new SymmetricHashJoin(
null,
null,
argColumns1,
argColumns2,
argSelect1,
argSelect2,
argSetSemanticsLeft,
argSetSemanticsRight,
argColumnNames,
argOrder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
import edu.washington.escience.myria.proto.DataProto.ColumnMessage;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.util.MyriaUtils;

/**
* A column of Blob values.
*
*/
public final class BlobColumnBuilder extends ColumnBuilder<ByteBuffer> {

/**
* The internal representation of the data.
* */
*/
private final ByteBuffer[] data;

/** Number of elements in this column. */
private int numBB;

/**
* If the builder has built the column.
* */
*/
private boolean built = false;

/** Constructs an empty column that can hold up to TupleBatch.BATCH_SIZE elements. */
Expand All @@ -48,21 +48,16 @@ public BlobColumnBuilder() {
*
* @param numDates the actual num strings in the data
* @param data the underlying data
* */
*/
private BlobColumnBuilder(final ByteBuffer[] data, final int numBB) {
this.numBB = numBB;
this.data = data;
}

/*
* Constructs a BlobColumn by deserializing the given ColumnMessage.
*
/* Constructs a BlobColumn by deserializing the given ColumnMessage.
* @param message a ColumnMessage containing the contents of this column.
*
* @param numTuples num tuples in the column message
*
* @return the built column
*/
* @return the built column */
public static BlobColumn buildFromProtobuf(final ColumnMessage message, final int numTuples) {
Preconditions.checkArgument(
message.getType().ordinal() == ColumnMessage.Type.BLOB_VALUE,
Expand All @@ -86,9 +81,12 @@ public static BlobColumn buildFromProtobuf(final ColumnMessage message, final in
}

@Override
public BlobColumnBuilder appendBlob(final ByteBuffer value) throws BufferOverflowException {
public BlobColumnBuilder appendBlob(ByteBuffer value) throws BufferOverflowException {
Preconditions.checkState(
!built, "No further changes are allowed after the builder has built the column.");
if (value == null) {
value = ByteBuffer.allocate(0);
}
Objects.requireNonNull(value, "value");
if (numBB >= TupleUtils.getBatchSize(Type.BLOB_TYPE)) {
throw new BufferOverflowException();
Expand Down
Loading

0 comments on commit 850fd46

Please sign in to comment.