Skip to content

Commit

Permalink
Add operator id, make operator name human readable
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Apr 14, 2014
1 parent c646eb8 commit 8a5df9e
Showing 1 changed file with 35 additions and 52 deletions.
87 changes: 35 additions & 52 deletions raco/myrialang.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ class MyriaOperator(object):


class MyriaScan(algebra.Scan, MyriaOperator):
def compileme(self, resultsym):
def compileme(self):
return {
"opName": resultsym,
"opType": "TableScan",
"relationKey": {
"userName": self.relation_key.user,
Expand All @@ -172,9 +171,8 @@ def compileme(self, resultsym):


class MyriaScanTemp(algebra.ScanTemp, MyriaOperator):
def compileme(self, resultsym):
def compileme(self):
return {
"opName": resultsym,
"opType": "TableScan",
"relationKey": {
"userName": 'public',
Expand All @@ -185,46 +183,41 @@ def compileme(self, resultsym):


class MyriaUnionAll(algebra.UnionAll, MyriaOperator):
def compileme(self, resultsym, leftsym, rightsym):
def compileme(self, leftsym, rightsym):
return {
"opName": resultsym,
"opType": "UnionAll",
"argChildren": [leftsym, rightsym]
}


class MyriaDifference(algebra.Difference, MyriaOperator):
def compileme(self, resultsym, leftsym, rightsym):
def compileme(self, leftsym, rightsym):
return {
"opName": resultsym,
"opType": "Difference",
"argChild1": leftsym,
"argChild2": rightsym,
}


class MyriaSingleton(algebra.SingletonRelation, MyriaOperator):
def compileme(self, resultsym):
def compileme(self):
return {
"opName": resultsym,
"opType": "Singleton",
}


class MyriaEmptyRelation(algebra.EmptyRelation, MyriaOperator):
def compileme(self, resultsym):
def compileme(self):
return {
"opName": resultsym,
"opType": "Empty",
'schema': scheme_to_schema(self.scheme())
}


class MyriaSelect(algebra.Select, MyriaOperator):
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
pred = compile_expr(self.condition, self.scheme(), None)
return {
"opName": resultsym,
"opType": "Filter",
"argChild": inputsym,
"argPredicate": {
Expand All @@ -234,12 +227,11 @@ def compileme(self, resultsym, inputsym):


class MyriaCrossProduct(algebra.CrossProduct, MyriaOperator):
def compileme(self, resultsym, leftsym, rightsym):
def compileme(self, leftsym, rightsym):
column_names = [name for (name, _) in self.scheme()]
allleft = [i.position for i in self.left.scheme().ascolumnlist()]
allright = [i.position for i in self.right.scheme().ascolumnlist()]
return {
"opName": resultsym,
"opType": "SymmetricHashJoin",
"argColumnNames": column_names,
"argChild1": leftsym,
Expand All @@ -252,9 +244,8 @@ def compileme(self, resultsym, leftsym, rightsym):


class MyriaStore(algebra.Store, MyriaOperator):
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
"opName": resultsym,
"opType": "DbInsert",
"relationKey": {
"userName": self.relation_key.user,
Expand All @@ -267,9 +258,8 @@ def compileme(self, resultsym, inputsym):


class MyriaStoreTemp(algebra.StoreTemp, MyriaOperator):
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
"opName": resultsym,
"opType": "DbInsert",
"relationKey": {
"userName": 'public',
Expand Down Expand Up @@ -306,7 +296,7 @@ def convertcondition(condition, left_len, combined_scheme):

class MyriaSymmetricHashJoin(algebra.ProjectingJoin, MyriaOperator):

def compileme(self, resultsym, leftsym, rightsym):
def compileme(self, leftsym, rightsym):
"""Compile the operator to a sequence of json operators"""

left_len = len(self.left.scheme())
Expand All @@ -324,7 +314,6 @@ def compileme(self, resultsym, leftsym, rightsym):
allright = [i - left_len for i in pos if i >= left_len]

join = {
"opName": resultsym,
"opType": "SymmetricHashJoin",
"argColumnNames": column_names,
"argChild1": "%s" % leftsym,
Expand Down Expand Up @@ -354,7 +343,7 @@ def agg_mapping(agg_expr):
elif isinstance(agg_expr, expression.SUM):
return "AGG_OP_SUM"

def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
child_scheme = self.input.scheme()
group_fields = [expression.toUnnamed(ref, child_scheme)
for ref in self.grouping_list]
Expand All @@ -371,7 +360,6 @@ def compileme(self, resultsym, inputsym):
agg_types = [[MyriaGroupBy.agg_mapping(agg_expr)]
for agg_expr in self.aggregate_list]
ret = {
"opName": resultsym,
"argChild": inputsym,
"argAggFields": [agg_field.position for agg_field in agg_fields],
"argAggOperators": agg_types,
Expand All @@ -391,34 +379,32 @@ def compileme(self, resultsym, inputsym):

class MyriaShuffle(algebra.Shuffle, MyriaOperator):
"""Represents a simple shuffle operator"""
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
raise NotImplementedError('shouldn''t ever get here, should be turned into SP-SC pair') # noqa


class MyriaCollect(algebra.Collect, MyriaOperator):
"""Represents a simple collect operator"""
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC pair') # noqa


class MyriaDupElim(algebra.Distinct, MyriaOperator):
"""Represents duplicate elimination"""
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
"opName": resultsym,
"opType": "DupElim",
"argChild": inputsym,
}


class MyriaApply(algebra.Apply, MyriaOperator):
"""Represents a simple apply operator"""
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
child_scheme = self.input.scheme()
emitters = [compile_mapping(x, child_scheme, None)
for x in self.emitters]
return {
'opName': resultsym,
'opType': 'Apply',
'argChild': inputsym,
'emitExpressions': emitters
Expand All @@ -427,15 +413,14 @@ def compileme(self, resultsym, inputsym):

class MyriaStatefulApply(algebra.StatefulApply, MyriaOperator):
"""Represents a stateful apply operator"""
def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
child_scheme = self.input.scheme()
state_scheme = self.state_scheme
comp_map = lambda x: compile_mapping(x, child_scheme, state_scheme)
emitters = [comp_map(x) for x in self.emitters]
inits = [comp_map(x) for x in self.inits]
updaters = [comp_map(x) for x in self.updaters]
return {
'opName': resultsym,
'opType': 'StatefulApply',
'argChild': inputsym,
'emitExpressions': emitters,
Expand All @@ -452,9 +437,8 @@ def __init__(self, input):
def shortStr(self):
return "%s" % self.opname()

def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
"opName": resultsym,
"opType": "BroadcastProducer",
"argChild": inputsym,
}
Expand All @@ -468,9 +452,8 @@ def __init__(self, input):
def shortStr(self):
return "%s" % self.opname()

def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
'opName': resultsym,
'opType': 'BroadcastConsumer',
'argOperatorId': inputsym
}
Expand All @@ -486,7 +469,7 @@ def shortStr(self):
hash_string = ','.join([str(x) for x in self.hash_columns])
return "%s(h(%s))" % (self.opname(), hash_string)

def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
if len(self.hash_columns) == 1:
pf = {
"type": "SingleFieldHash",
Expand All @@ -499,7 +482,6 @@ def compileme(self, resultsym, inputsym):
}

return {
"opName": resultsym,
"opType": "ShuffleProducer",
"argChild": inputsym,
"argPf": pf
Expand All @@ -514,9 +496,8 @@ def __init__(self, input):
def shortStr(self):
return "%s" % self.opname()

def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
'opName': resultsym,
'opType': 'ShuffleConsumer',
'argOperatorId': inputsym
}
Expand All @@ -541,9 +522,8 @@ def __init__(self, input, server):
def shortStr(self):
return "%s(@%s)" % (self.opname(), self.server)

def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
"opName": resultsym,
"opType": "CollectProducer",
"argChild": inputsym,
}
Expand All @@ -557,9 +537,8 @@ def __init__(self, input):
def shortStr(self):
return "%s" % self.opname()

def compileme(self, resultsym, inputsym):
def compileme(self, inputsym):
return {
'opName': resultsym,
'opType': 'CollectConsumer',
'argOperatorId': inputsym
}
Expand Down Expand Up @@ -1131,14 +1110,18 @@ def call_compile_me(op):
opsym = syms[id(op)]
childsyms = [syms[id(child)] for child in op.children()]
if isinstance(op, algebra.ZeroaryOperator):
return op.compileme(opsym)
if isinstance(op, algebra.UnaryOperator):
return op.compileme(opsym, childsyms[0])
if isinstance(op, algebra.BinaryOperator):
return op.compileme(opsym, childsyms[0], childsyms[1])
if isinstance(op, algebra.NaryOperator):
return op.compileme(opsym, childsyms)
raise NotImplementedError("unable to handle operator of type " + type(op)) # noqa
op_dict = op.compileme()
elif isinstance(op, algebra.UnaryOperator):
op_dict = op.compileme(childsyms[0])
elif isinstance(op, algebra.BinaryOperator):
op_dict = op.compileme(childsyms[0], childsyms[1])
elif isinstance(op, algebra.NaryOperator):
op_dict = op.compileme(childsyms)
else:
raise NotImplementedError("unable to handle operator of type " + type(op)) # noqa
op_dict['opName'] = op.shortStr()
op_dict['opID'] = opsym
return op_dict

# The actual code. all_frags collects up the fragments.
all_frags = []
Expand Down

0 comments on commit 8a5df9e

Please sign in to comment.