Skip to content

Commit

Permalink
Merge fa68c37 into 48d79ca
Browse files Browse the repository at this point in the history
  • Loading branch information
parmitam committed Jan 30, 2017
2 parents 48d79ca + fa68c37 commit 9bb1b6f
Show file tree
Hide file tree
Showing 168 changed files with 6,270 additions and 1,074 deletions.
18 changes: 18 additions & 0 deletions jsonQueries/pythonUDF/ingest_blob.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"relationKey" : {
"userName" : "public",
"programName" : "adhoc",
"relationName" : "raw"
},
"schema" : {
"columnTypes" : ["LONG_TYPE", "LONG_TYPE","LONG_TYPE","BLOB_TYPE"],
"columnNames" : ["id", "subjid","imgid" ,"image"]
},
"s3Source" : {
"dataType" : "S3",
"s3Uri" : "s3://imagedb-data/dmridatasample.csv"
},
"delimiter": ",",
"workers": [1,2]
}

84 changes: 84 additions & 0 deletions jsonQueries/pythonUDF/registerPythonFunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from myria import *
import numpy
import json
# Create a connection to the Myria *Production* cluster
connection = MyriaConnection(rest_url='http://localhost:8753')

py = myria.udf.functionTypes.PYTHON
outType= "BLOB_TYPE"

def simpleApplyTest(dt):
image = dt[0][0]#first item of first tuple
return image

connection.create_function("simpleApplyTest","function text",outType, py,False, simpleApplyTest)

def flatmapApplyTest(dt):
import itertools
image = dt[0][0] #first item of first tuple.
[xp,yp,zp] = [4,4,4]
datalist =[]
[xSize,ySize,zSize] = [image.shape[0]/xp, image.shape[1]/yp, image.shape[2]/zp]
for x,y,z in itertools.product(range(xp), range(yp), range(zp)):
[xS, yS, zS] = [x*xSize, y*ySize, z*zSize]
[xE, yE, zE] = [image.shape[0] if x == xp - 1 else (x+1)*xSize, \
image.shape[1] if y == yp - 1 else (y+1)*ySize, \
image.shape[2] if z == zp - 1 else (z+1)*zSize]
datalist.append(image[xS:xE, yS:yE, zS:zE])
return datalist

connection.create_function("flatmapApplyTest","function text",outType, py,True, flatmapApplyTest)


def udfAgg(dt):
import numpy as np
tuplist = dt
state = None
for i in tuplist:
imgid = i[2]
subjid = i[1]
img = np.asarray(i[3])
shape = img.shape + (5,)
if state is None:
state = np.empty(shape)
state[:,:,:,imgid]=img
else:
state[:,:,:,imgid]=img
return (state)

connection.create_function("udfAgg","function text",outType, py,False, udfAgg)


def pyAdd(dt):
tuplist = dt
retval = None
for i in tuplist:
if retval is None:
retval = i[1]
else:
retval = retval+i[1]
return retval

connection.create_function("pyAdd","function text",outType, py,False, pyAdd)

def pyMean(dt):
print dt
return dt[0][0]/dt[0][1]

connection.create_function("pyMean","function text",outType, py,False, pyMean)


outType= "LONG_TYPE"
def pyAggInt(dt):
return 5



connection.create_function("pyAggInt","function text",outType, py,False, pyAggInt)


def pyFlatmapInt(dt):
return [2,3]


connection.create_function("pyFlatmapInt","function text",outType, py,True, pyFlatmapInt)
120 changes: 120 additions & 0 deletions jsonQueries/pythonUDF/udfAgg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
{
"rawQuery": "Stateful agg",
"fragments": [
{
"operators":[
{
"opName":"MyriaShuffleConsumer",
"opType":"ShuffleConsumer",
"argOperatorId":2,
"opId":3
},
{
"opType":"MultiGroupByAggregate",
"argGroupFields":[1,2],
"aggregators":[
{
"initializers":[

{
"rootExpressionOperator":{
"value":"null",
"valueType":"BLOB_TYPE",
"type":"CONSTANT"
},
"outputName":"tm"
}
],
"updaters":[
{
"rootExpressionOperator":{
"children": [
{
"columnIdx":0,
"type":"STATE"
},
{
"type": "VARIABLE",
"columnIdx": 0
},
{
"type": "VARIABLE",
"columnIdx": 2
},
{
"columnIdx":3,
"type":"VARIABLE"
}
],
"type": "PYUDF",
"outputType":"BLOB_TYPE",
"name": "udfAgg"

},
"outputName":"tm"
}
],
"emitters":[
{
"rootExpressionOperator":{

"columnIdx": 0,
"type": "STATE"
},
"outputName":"voxel"

}
],
"type":"UserDefined"
}
],
"argChild":3,
"opId":4,
"opName":"MyriaGroupBy(a; UDA(_count__147))"
},

{
"opId":6,
"relationKey":{
"relationName":"agg",
"userName":"public",
"programName":"adhoc"
},
"argChild":4,
"argOverwriteTable":true,
"opType":"DbInsert",
"opName":"MyriaStore(public:adhoc:agg)"
}
]
},
{
"operators": [
{

"opType": "TableScan",
"opId": 0,
"relationKey": {
"userName": "public",
"relationName": "raw",
"programName": "adhoc"
}

},
{
"distributeFunction" : {
"type" : "Hash",
"indexes" : [1,2]
},
"opName":"MyriaShuffleProducer(h($1,$2))",
"opType":"ShuffleProducer",
"argChild":0,
"opId":2
}
]
}

],

"logicalRa": ""

}
138 changes: 138 additions & 0 deletions jsonQueries/pythonUDF/udfAggSingleColumn.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{
"logicalRa":"Store(public:adhoc:results)[Apply(_COLUMN0_=LOG($0))[GroupBy(; UDA((_sum__128 / _count__127)))[Apply(link=$1)[Select(((((($1 = 1) or ($1 = 2)) or ($1 = 3)) or ($1 = 4)) or ($1 = 5)))[Scan(public:adhoc:TwitterK)]]]]]",
"fragments":[
{
"operators":[
{
"opType":"ShuffleConsumer",
"opName":"MyriaShuffleConsumer",
"argOperatorId":1,
"opId":2
},
{
"opName":"MyriaGroupBy(a; UDA((_sum__262 / _count__261)))",
"aggregators":[
{
"updaters":[
{
"rootExpressionOperator":{
"left":{
"columnIdx":0,
"type":"STATE"
},
"right":{
"value":"1",
"valueType":"LONG_TYPE",
"type":"CONSTANT"
},
"type":"PLUS"
},
"outputName":"_count__261"
},
{
"rootExpressionOperator":{
"children":[
{
"columnIdx":1,
"type":"STATE"
},
{
"columnIdx":3,
"type":"VARIABLE"
}
],
"type":"PYUDF",
"outputType":"BLOB_TYPE",
"name":"pyAdd"
},
"outputName":"_sum__262"
}
],
"emitters":[
{
"rootExpressionOperator":{
"children":[
{
"columnIdx":1,
"type":"STATE"
},
{
"columnIdx":0,
"type":"STATE"
}
],
"type":"PYUDF",
"outputType":"BLOB_TYPE",
"name":"pyMean"
},
"outputName":"mask"
}
],
"initializers":[
{
"rootExpressionOperator":{
"value":"0",
"valueType":"LONG_TYPE",
"type":"CONSTANT"
},
"outputName":"_count__261"
},
{
"rootExpressionOperator":{
"value":"null",
"valueType":"BLOB_TYPE",
"type":"CONSTANT"
},
"outputName":"_sum__262"
}
],
"type":"UserDefined"
}
],
"argChild":2,
"argGroupField":1,
"opType":"SingleGroupByAggregate",
"opId":3
},
{
"argChild":3,
"argOverwriteTable":true,
"opName":"Insert",
"opId":9,
"opType":"DbInsert",
"relationKey":{
"programName":"adhoc",
"relationName":"bmask",
"userName":"public"
}
}
]
},
{
"operators":[
{
"opType": "TableScan",
"opId": 0,
"relationKey": {
"userName": "public",
"relationName": "raw",
"programName": "adhoc"
}

},
{
"opType":"ShuffleProducer",
"argChild":0,
"distributeFunction" : {
"type" : "Hash",
"indexes" : [1]
},
"opId":1,
"opName":"MyriaShuffleProducer(h($0))"
}
]
}
],
"rawQuery":"uda Foo(x){\n[0 as _count,0 as _sum];\n[_count+1, _sum+x];\n[_sum/_count ];\n};\n\n\nT1 = scan(public:adhoc:raw);\nT2 = [from inter emit Foo(link) as res];\nT3 = [from T2 emit Log(res)];\nstore(T3, results);",
"language":"myrial"
}
Loading

0 comments on commit 9bb1b6f

Please sign in to comment.