Skip to content

Commit

Permalink
fixes after manul testing
Browse files Browse the repository at this point in the history
  • Loading branch information
parmitam committed Jan 19, 2017
1 parent 6745d55 commit 92796f7
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 183 deletions.
244 changes: 131 additions & 113 deletions jsonQueries/pythonUDF/udfAggSingleColumn.json
Original file line number Diff line number Diff line change
@@ -1,120 +1,138 @@
{
"rawQuery": "Stateful agg",
"fragments": [
{
"operators":[
{
"opName":"MyriaShuffleConsumer",
"opType":"ShuffleConsumer",
"argOperatorId":2,
"opId":3
},
{
"opType":"MultiGroupByAggregate",
"argGroupFields":[1,2],
"aggregators":[
{
"initializers":[

{
"rootExpressionOperator":{
"value":"null",
"valueType":"BLOB_TYPE",
"type":"CONSTANT"
},
"outputName":"tm"
}
],
"updaters":[
{
"rootExpressionOperator":{
"children": [
{
"columnIdx":0,
"type":"STATE"
},
{
"type": "VARIABLE",
"columnIdx": 0
},
{
"type": "VARIABLE",
"columnIdx": 3
},
{
"columnIdx":2,
"type":"VARIABLE"
}
],
"type": "PYUDF",
"outputType":"BLOB_TYPE",
"name": "udfAgg"

},
"outputName":"tm"
"logicalRa":"Store(public:adhoc:results)[Apply(_COLUMN0_=LOG($0))[GroupBy(; UDA((_sum__128 / _count__127)))[Apply(link=$1)[Select(((((($1 = 1) or ($1 = 2)) or ($1 = 3)) or ($1 = 4)) or ($1 = 5)))[Scan(public:adhoc:TwitterK)]]]]]",
"fragments":[
{
"operators":[
{
"opType":"ShuffleConsumer",
"opName":"MyriaShuffleConsumer",
"argOperatorId":1,
"opId":2
},
{
"opName":"MyriaGroupBy(a; UDA((_sum__262 / _count__261)))",
"aggregators":[
{
"updaters":[
{
"rootExpressionOperator":{
"left":{
"columnIdx":0,
"type":"STATE"
},
"right":{
"value":"1",
"valueType":"LONG_TYPE",
"type":"CONSTANT"
},
"type":"PLUS"
},
"outputName":"_count__261"
},
{
"rootExpressionOperator":{
"children":[
{
"columnIdx":1,
"type":"STATE"
},
{
"columnIdx":3,
"type":"VARIABLE"
}
],
"type":"PYUDF",
"outputType":"BLOB_TYPE",
"name":"pyAdd"
},
"outputName":"_sum__262"
}
],
"emitters":[
{
"rootExpressionOperator":{
"children":[
{
"columnIdx":1,
"type":"STATE"
},
{
"columnIdx":0,
"type":"STATE"
}
],
"type":"PYUDF",
"outputType":"BLOB_TYPE",
"name":"pyMean"
},
"outputName":"mask"
}
],
"initializers":[
{
"rootExpressionOperator":{
"value":"0",
"valueType":"LONG_TYPE",
"type":"CONSTANT"
},
"outputName":"_count__261"
},
{
"rootExpressionOperator":{
"value":"null",
"valueType":"BLOB_TYPE",
"type":"CONSTANT"
},
"outputName":"_sum__262"
}
],
"type":"UserDefined"
}
],
"emitters":[
{
"rootExpressionOperator":{

"columnIdx": 0,
"type": "STATE"
},
"outputName":"voxel"

}
],
"type":"UserDefined"
],
"argChild":2,
"argGroupField":1,
"opType":"SingleGroupByAggregate",
"opId":3
},
{
"argChild":3,
"argOverwriteTable":true,
"opName":"Insert",
"opId":9,
"opType":"DbInsert",
"relationKey":{
"programName":"adhoc",
"relationName":"bmask",
"userName":"public"
}
],
"argChild":3,
"opId":4,
"opName":"MyriaGroupBy(a; UDA(_count__147))"
},

{
"opId":6,
"relationKey":{
"relationName":"agg",
"userName":"public",
"programName":"adhoc"
},
"argChild":4,
"argOverwriteTable":true,
"opType":"DbInsert",
"opName":"MyriaStore(public:adhoc:agg)"
}
]
}
]
},
{
"operators": [
{

"opType": "TableScan",
"opId": 0,
"relationKey": {
"userName": "public",
"relationName": "raw",
"programName": "adhoc"
}

},
{
"distributeFunction" : {
"type" : "Hash",
"indexes" : [1,2]
},
"opName":"MyriaShuffleProducer(h($1,$2))",
"opType":"ShuffleProducer",
"argChild":0,
"opId":2
}
]
}

],

"logicalRa": ""
"operators":[
{
"opType": "TableScan",
"opId": 0,
"relationKey": {
"userName": "public",
"relationName": "raw",
"programName": "adhoc"
}

},
{
"opType":"ShuffleProducer",
"argChild":0,
"distributeFunction" : {
"type" : "Hash",
"indexes" : [1]
},
"opId":1,
"opName":"MyriaShuffleProducer(h($0))"
}
]
}
],
"rawQuery":"uda Foo(x){\n[0 as _count,0 as _sum];\n[_count+1, _sum+x];\n[_sum/_count ];\n};\n\n\nT1 = scan(public:adhoc:raw);\nT2 = [from inter emit Foo(link) as res];\nT3 = [from T2 emit Log(res)];\nstore(T3, results);",
"language":"myrial"
}
68 changes: 68 additions & 0 deletions jsonQueries/pythonUDF/udfApplyInt.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"fragments":[
{
"operators": [
{
"opType": "TableScan",
"opId": 0,
"relationKey": {
"userName": "public",
"relationName": "raw",
"programName": "adhoc"
}
},
{

"opType": "Apply",
"opId": 1,
"emitExpressions": [
{
"outputName": "udfOutput",
"rootExpressionOperator": {
"type": "PYUDF",
"name": "flatmapApplyTest",
"outputType":"BLOB_TYPE",
"addCounter":"False",
"children": [
{
"type": "VARIABLE",
"columnIdx": 3
}
]
}
},
{
"outputName":"subjectid",
"rootExpressionOperator":{
"type":"VARIABLE",
"columnIdx":2}
},
{"outputName":"imageid",
"rootExpressionOperator":{
"type":"VARIABLE",
"columnIdx":1}
}
],
"argChild": 0
},
{
"opType": "DbInsert",
"opId": 2,
"relationKey": {
"userName": "public",
"relationName": "results",
"programName": "adhoc"
},
"argChild": 1,
"argOverwriteTable": true
}

]
}
],

"language":"myrial",
"logicalRa":"Store(public:adhoc:result)[Apply(PyUDF(simpleApplyTest,))]]",

"rawQuery":"Simple Apply test for PythonUDFs"
}
7 changes: 3 additions & 4 deletions python/MyriaPythonWorker/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ def write_with_length(obj, stream, outputType, serialiser):
write_int(obj, stream)
elif(outputType == DataType.LONG):
write_int(DataType.LONG, stream)
write_long(stream.write(obj))
write_long(obj,stream)
elif(outputType == DataType.FLOAT):
write_int(DataType.FLOAT, stream)
write_float(stream.write(obj))
write_float(obj, stream)
elif(outputType == DataType.DOUBLE):
write_int(DataType.DOUBLE, stream)
write_double(stream.write(obj))
write_double(obj,stream)
elif(outputType == DataType.BLOB):
write_int(DataType.BLOB, stream)
serialiser.write_with_length(obj, stream)
Expand Down Expand Up @@ -149,4 +149,3 @@ def dumps(self, obj):

def loads(self, obj):
return cPickle.loads(obj)

2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


setup(name='MyriaPythonWorker',
version='0.2',
version='0.31',
description='Myria Python worker.',
author='Parmita Mehta',
packages=find_packages(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
public class ApplyEncoding extends UnaryOperatorEncoding<Apply> {

@Required public List<Expression> emitExpressions;
@Required public Boolean addCounter;

@Override
public Apply construct(final ConstructArgs args) {
return new Apply(null, emitExpressions);
return new Apply(null, emitExpressions, addCounter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public FunctionStatus(
this.description = description;
this.outputType = outputType;
this.isMultivalued = isMultivalued;
LOGGER.info("ismultivalued: " + isMultivalued);
this.lang = lang;
this.binary = binary;
}
Expand Down
7 changes: 0 additions & 7 deletions src/edu/washington/escience/myria/expression/Expression.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,4 @@ public boolean isRegisteredUDF() {
public boolean isMultivalued() {
return rootExpressionOperator.hasArrayOutputType();
}
/**
*
* @return true, if counter column is to be added
*/
public boolean addCounter() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ public String getJavaExpressionWithAppend(final ExpressionOperatorParameter para
public boolean hasArrayOutputType() {
return false;
}
/**
* @return if an addditional counter column should be added.
*/
public boolean addCounter() {
return false;
}

/**
* @return all children
Expand Down
Loading

0 comments on commit 92796f7

Please sign in to comment.