Skip to content

Commit

Permalink
merge master, fix several busg, address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang committed Feb 21, 2017
1 parent e34654b commit bf05ea2
Show file tree
Hide file tree
Showing 42 changed files with 552 additions and 708 deletions.
2 changes: 1 addition & 1 deletion jsonQueries/globalJoin_jwang/ingest_smallTable.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"dataType" : "Bytes",
"bytes" : "MSA0NAoyIDUxCjQ2IDE3CjYzIDM0CjU0IDYzCjIwIDk0CjEyIDY2Cjc5IDQyCjEgMTAKODggMjAKMTAgNDIKNTYgNDQKMTAgMTIKNzkgMzcKMzAgNjYKODMgMTMKMzEgMQozMSA5OQo4MSAzNQo3MCAyNgo0IDUxCjE1IDY2Cjg4IDY2CjI3IDE3CjMxIDgyCjc2IDc0Cjk2IDY1CjYyIDIyCjkwIDU5CjEzIDI5CjQ0IDQyCjM1IDYyCjk5IDE1Cjk1IDc3CjEwIDcwCjI0IDMwCjgyIDY0CjQ0IDQ4CjY1IDc0CjE4IDg1CjQ5IDE0Cjc1IDk5CjU3IDk1CjQyIDk2CjQxIDY5CjE0IDY1CjE2IDExCjcyIDIyCjc2IDgyCjY2IDY4Cjc0IDg4CjQ3IDYKNTYgMAo2IDkKNTAgODAKNiAzMQo3NiA0NAo0OSAzMAo0NyAxNgo4MiA3NwoxIDgxCjIwIDQwCjE4IDU2CjI4IDkyCjU4IDE2CjgyIDEzCjcxIDc1CjYwIDQxCjIzIDkKMiA1MQo4NiA5NQo4IDgxCjk3IDc5CjE4IDQxCjg5IDQ4CjU5IDUxCjIxIDg2CjYzIDc2CjQyIDIyCjczIDM4CjI0IDE3CjggMzQKNzggMTUKOTMgMTUKMzEgMjIKNzMgMjkKOTMgMTYKODcgOTUKNSA1Nwo0MiA4OAoxNSA4NwozOCA5NwowIDc2CjU3IDUxCjMwIDE5CjUyIDI4CjQyIDE0CjczIDI4CjM3IDY5CjQzIDQ3Cg=="
},
"partitionFunction": {
"distributeFunction": {
"type": "Hash",
"indexes": [0]
},
Expand Down
9 changes: 4 additions & 5 deletions jsonQueries/pythonUDF/ingest_blob.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
"columnTypes" : ["LONG_TYPE", "LONG_TYPE","LONG_TYPE","STRING_TYPE"],
"columnNames" : ["id", "subjid","imgid" ,"image"]
},
"s3Source" : {
"dataType" : "S3",
"s3Uri" : "s3://imagedb-data/dmridatasample.csv"
"source" : {
"dataType" : "URI",
"uri" : "https://s3-us-west-2.amazonaws.com/imagedb-data/dmridatasample.csv"
},
"delimiter": ",",
"workers": [1,2]
"delimiter": ","
}
2 changes: 1 addition & 1 deletion jsonQueries/pythonUDF/udfAgg.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"opId":3
},
{
"opType":"MultiGroupByAggregate",
"opType":"Aggregate",
"argGroupFields":[1,2],
"aggregators":[
{
Expand Down
4 changes: 2 additions & 2 deletions jsonQueries/pythonUDF/udfAggSingleColumn.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@
}
],
"argChild":2,
"argGroupField":1,
"opType":"SingleGroupByAggregate",
"argGroupFields":[0],
"opType":"Aggregate",
"opId":3
},
{
Expand Down
16 changes: 8 additions & 8 deletions python/MyriaPythonWorker/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ def write_with_length(self, obj, stream):

def read_item(self, stream, itemType, length):
obj = None
if(itemType == DataType.INT):
if itemType == DataType.INT:
obj = read_int(stream)
elif(itemType == DataType.LONG):
elif itemType == DataType.LONG:
obj = read_long(stream)
elif(itemType == DataType.FLOAT):
elif itemType == DataType.FLOAT:
obj = read_float(stream)
elif(itemType == DataType.DOUBLE):
elif itemType == DataType.DOUBLE:
obj = read_double(stream)
elif(itemType == DataType.BLOB):
elif itemType == DataType.BLOB:
obj = self.loads(stream.read(length))
return obj

Expand All @@ -122,10 +122,10 @@ def read_tuple(self, stream, tuplesize):
# Second read the length
length = read_int(stream)

if (length == SpecialLengths.NULL):
if length == SpecialLengths.NULL or length == 0:
datalist.append(0)
# length is >0, read the item now
elif (length > 0):
# length is > 0, read the item now
elif length > 0:
obj = self.read_item(stream, elementType, length)
datalist.append(obj)

Expand Down
2 changes: 1 addition & 1 deletion python/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Myria Python Worker.

Online documentation for [Myria](http://myria.cs.washington.edu/)
Myria Python worker is used for executing python UDFs.
Myria Python worker is used for executing python UDFs.
6 changes: 2 additions & 4 deletions src/edu/washington/escience/myria/api/FunctionResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
*/
/**
* This is the class that handles API calls to create or fetch functions.
*
*/
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MyriaApiConstants.JSON_UTF_8)
Expand All @@ -51,7 +50,6 @@ public class FunctionResource {
protected static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(FunctionResource.class);

/**
*
* @return a list of function, names only.
* @throws DbException if there is an error accessing the Catalog.
*/
Expand All @@ -74,15 +72,15 @@ public Response createFunction(final CreateFunctionEncoding encoding) throws DbE
encoding.binary,
encoding.workers);
} catch (Exception e) {
throw new DbException();
throw new DbException(e);
}
/* Build the response to return the queryId */
ResponseBuilder response = Response.ok();
return response.entity(functionCreationResponse).build();
}

/**
* @param name function name
* @param name function name
* @return details of a registered function.
* @throws DbException if there is an error accessing the Catalog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
import edu.washington.escience.myria.proto.DataProto.ColumnMessage;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.util.MyriaUtils;

/**
* A column of Blob values.
*
*/
public final class BlobColumnBuilder extends ColumnBuilder<ByteBuffer> {

/**
* The internal representation of the data.
* */
*/
private final ByteBuffer[] data;

/** Number of elements in this column. */
private int numBB;

/**
* If the builder has built the column.
* */
*/
private boolean built = false;

/** Constructs an empty column that can hold up to TupleBatch.BATCH_SIZE elements. */
Expand All @@ -48,21 +48,16 @@ public BlobColumnBuilder() {
*
* @param numDates the actual num strings in the data
* @param data the underlying data
* */
*/
private BlobColumnBuilder(final ByteBuffer[] data, final int numBB) {
this.numBB = numBB;
this.data = data;
}

/*
* Constructs a BlobColumn by deserializing the given ColumnMessage.
*
/* Constructs a BlobColumn by deserializing the given ColumnMessage.
* @param message a ColumnMessage containing the contents of this column.
*
* @param numTuples num tuples in the column message
*
* @return the built column
*/
* @return the built column */
public static BlobColumn buildFromProtobuf(final ColumnMessage message, final int numTuples) {
Preconditions.checkArgument(
message.getType().ordinal() == ColumnMessage.Type.BLOB_VALUE,
Expand All @@ -86,9 +81,12 @@ public static BlobColumn buildFromProtobuf(final ColumnMessage message, final in
}

@Override
public BlobColumnBuilder appendBlob(final ByteBuffer value) throws BufferOverflowException {
public BlobColumnBuilder appendBlob(ByteBuffer value) throws BufferOverflowException {
Preconditions.checkState(
!built, "No further changes are allowed after the builder has built the column.");
if (value == null) {
value = ByteBuffer.allocate(0);
}
Objects.requireNonNull(value, "value");
if (numBB >= TupleUtils.getBatchSize(Type.BLOB_TYPE)) {
throw new BufferOverflowException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,24 @@
import edu.washington.escience.myria.column.mutable.StringMutableColumn;
import edu.washington.escience.myria.proto.DataProto.ColumnMessage;
import edu.washington.escience.myria.proto.DataProto.StringColumnMessage;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.util.MyriaUtils;

/**
* A column of String values.
*
*/
public final class StringColumnBuilder extends ColumnBuilder<String> {

/**
* The internal representation of the data.
* */
*/
private final String[] data;
/** Number of elements in this column. */
private int numStrings;

/**
* If the builder has built the column.
* */
*/
private boolean built = false;

/** Constructs an empty column that can hold up to TupleBatch.BATCH_SIZE elements. */
Expand All @@ -49,7 +47,7 @@ public StringColumnBuilder() {
*
* @param numStrings the actual num strings in the data
* @param data the underlying data
* */
*/
private StringColumnBuilder(final String[] data, final int numStrings) {
this.numStrings = numStrings;
this.data = data;
Expand Down Expand Up @@ -84,7 +82,6 @@ public static StringColumn buildFromProtobuf(final ColumnMessage message, final
public StringColumnBuilder appendString(final String value) throws BufferOverflowException {
Preconditions.checkState(
!built, "No further changes are allowed after the builder has built the column.");
Objects.requireNonNull(value, "value");
if (numStrings >= TupleUtils.getBatchSize(Type.STRING_TYPE)) {
throw new BufferOverflowException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public ConstantExpression(final boolean value) {
public ConstantExpression(final String value) {
this(Type.STRING_TYPE, value);
}

/**
* Construct Blob constant.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,6 @@ public ConstantEvaluator(
*/
private final ExpressionEvaluator evaluator;

/**
* Creates an {@link ExpressionEvaluator} from the {@link #javaExpression}. This does not really compile the
* expression and is thus faster.
*/
@Override
public void compile() {
/* Do nothing! */
}

/**
* Evaluates the {@link #getJavaExpressionWithAppend()} using the {@link #evaluator}.
*
Expand All @@ -103,18 +94,18 @@ public void eval(
final int stateRow,
final WritableColumn result,
final WritableColumn count) {
result.appendObject(count);
result.appendObject(value);
count.appendInt(1);
}

@Override
public EvaluatorResult evaluateColumn(final TupleBatch tb, final Schema outputSchema)
public EvaluatorResult evalTupleBatch(final TupleBatch tb, final Schema outputSchema)
throws DbException {
if (TupleUtils.getBatchSize(outputSchema) == tb.getBatchSize()) {
return new EvaluatorResult(
new ConstantValueColumn((Comparable<?>) value, type, tb.numTuples()),
new ConstantValueColumn(1, Type.INT_TYPE, tb.numTuples()));
}
return super.evaluateColumn(tb, outputSchema);
return super.evalTupleBatch(tb, outputSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,6 @@ public boolean isCopyFromInput() {
return rootOp instanceof VariableExpression;
}

/**
* An expression does not have to be compiled when it only renames or copies a column. This is an optimization to
* avoid evaluating the expression and avoid autoboxing values.
*
* @return true if the expression does not have to be compiled.
*/
public boolean needsCompiling() {
return !(isCopyFromInput() || isConstant() || isRegisteredUDF());
}

/**
* @return true if the expression evaluates to a constant
*/
Expand All @@ -134,6 +124,7 @@ public boolean isConstant() {
public boolean needsState() {
return needsState;
}

/**
* @return true if the expression is a contains a python UDF expression.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package edu.washington.escience.myria.expression.evaluate;

import edu.washington.escience.myria.column.builder.WritableColumn;
import edu.washington.escience.myria.storage.ReadableTable;

/**
* Interface for evaluating a single {@link edu.washington.escience.myria.expression.Expression} and appending the
* results to a column, along with a count of results.
*/
public interface ExpressionEvalAppendInterface extends ExpressionEvalInterface {
/**
* The interface evaluates a single {@link edu.washington.escience.myria.expression.Expression} and appends the
* results and (optional) counts to the given columns.
*
* @param input the input tuple batch
* @param inputRow row index of the input tuple batch
* @param state optional state that is passed during evaluation
* @param stateRow row index of the state
* @param result a table storing evaluation results
* @param count a column storing the number of results returned from this row
*/
void evaluate(
final ReadableTable input,
final int inputRow,
final ReadableTable state,
final int stateRow,
final WritableColumn result,
final WritableColumn count);
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,7 @@
package edu.washington.escience.myria.expression.evaluate;

import edu.washington.escience.myria.column.builder.WritableColumn;
import edu.washington.escience.myria.storage.ReadableTable;

/**
* Interface for evaluating a single {@link edu.washington.escience.myria.expression.Expression} and appending the
* results to a column, along with a count of results.
*/
public interface ExpressionEvalInterface {
/**
* The interface evaluating a single {@link edu.washington.escience.myria.expression.Expression} and appending it to a
* column. We only need a reference to the tuple batch and a row id, plus the optional state of e.g. an
* {@link edu.washington.escience.myria.operator.agg.Aggregate} or a
* {@link edu.washington.escience.myria.operator.StatefulApply}. The variables will be fetched from the tuple buffer
* using the rowId provided in {@link edu.washington.escience.myria.expression.VariableExpression}.
*
* @param input the input tuple batch
* @param inputRow row index of the input tuple batch
* @param state optional state that is passed during evaluation
* @param stateRow row index of the state
* @param result a table storing evaluation results
* @param count a column storing the number of results returned from this row
*/
void evaluate(
final ReadableTable input,
final int inputRow,
final ReadableTable state,
final int stateRow,
final WritableColumn result,
final WritableColumn count);
}
public interface ExpressionEvalInterface {}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package edu.washington.escience.myria.operator.agg;
package edu.washington.escience.myria.expression.evaluate;

import edu.washington.escience.myria.storage.MutableTupleBuffer;
import edu.washington.escience.myria.storage.ReadableTable;

/**
* Interface for evaluators that take multiple expressions and may write multiple columns.
* Interface for evaluating a single {@link edu.washington.escience.myria.expression.Expression} and appending the
* results to a column, along with a count of results.
*/
public interface ScriptEvalInterface {
public interface ExpressionEvalReplaceInterface extends ExpressionEvalInterface {
/**
* The interface for applying expressions. The variables will be fetched from the tuple buffer using the rowId
* provided in {@link edu.washington.escience.myria.expression.VariableExpression} or
* {@link edu.washington.escience.myria.expression.StateExpression}.
* The interface evaluating a single {@link edu.washington.escience.myria.expression.Expression} and replace old
* values in a state column with the results.
*
* @param input the input tuple batch
* @param inputRow row index of the input tuple batch
Expand Down

0 comments on commit bf05ea2

Please sign in to comment.