Skip to content

Commit

Permalink
Merge branch 'whitaker_uda_combiner' of github.com:uwescience/raco in…
Browse files Browse the repository at this point in the history
…to whitaker_uda_combiner
  • Loading branch information
Andrew Whitaker committed Sep 13, 2014
2 parents df1f260 + 7e1c0a3 commit 15cb168
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 84 deletions.
1 change: 0 additions & 1 deletion raco/algebra.py
Expand Up @@ -6,7 +6,6 @@
import copy
import operator
import math
import collections
from raco.expression import StateVar


Expand Down
2 changes: 1 addition & 1 deletion raco/fakedb.py
Expand Up @@ -272,7 +272,7 @@ def process_grouping_columns(_tuple):
agg_fields.append(
expr.evaluate_aggregate(tuples, input_scheme))
else:
# UDA-style aggregate: evaluate a nornal expression that
# UDA-style aggregate: evaluate a normal expression that
# can reference only the state tuple
agg_fields.append(expr.evaluate(None, None, state))
yield(key + tuple(agg_fields))
Expand Down
1 change: 0 additions & 1 deletion raco/language/clang.py
Expand Up @@ -3,7 +3,6 @@

from raco import algebra
from raco import expression
from raco.expression import aggregate
from raco.language import Language, clangcommon, Algebra
from raco import rules
from raco.pipelines import Pipelined
Expand Down
1 change: 0 additions & 1 deletion raco/language/clangcommon.py
Expand Up @@ -7,7 +7,6 @@
from raco import catalog
from raco.algebra import gensym
from raco.expression import UnnamedAttributeRef
from raco.expression import util

import logging
LOG = logging.getLogger(__name__)
Expand Down
110 changes: 53 additions & 57 deletions raco/language/grappalang.py
Expand Up @@ -4,12 +4,12 @@

from raco import algebra
from raco import expression
from raco.expression import aggregate
from raco.language import Language, Algebra
from raco import rules
from raco.pipelines import Pipelined
from raco.language.clangcommon import StagedTupleRef, ct
from raco.language import clangcommon
from raco.utility import emitlist

from raco.algebra import gensym

Expand Down Expand Up @@ -114,23 +114,26 @@ def pipeline_wrap(ident, plcode, attrs):
""")
code = timing_template % locals()

dependences = attrs.get('dependences', [])
_LOG.debug("pipeline %s dependences %s", ident, dependences)
dependence_code = emitlist([wait_statement(d) for d in dependences])

code = """{dependence_code}
{inner_code}
""".format(dependence_code=dependence_code,
inner_code=code)

syncname = attrs.get('sync')
if syncname:
inner_code = code
sync_template = ct("""spawn(&%(syncname)s, [=] {
sync_template = ct("""
CompletionEvent %(syncname)s;
spawn(&%(syncname)s, [=] {
%(inner_code)s
});
""")
code = sync_template % locals()

syncname = attrs.get('syncdef')
if syncname:
inner_code = code
sync_def_template = ct("""CompletionEvent %(syncname)s;
%(inner_code)s
""")
code = sync_def_template % locals()

return code

@staticmethod
Expand Down Expand Up @@ -263,6 +266,7 @@ def consume(self, inputsym, src, state):
# """

global_syncname = create_pipeline_synchronization(state)
get_pipeline_task_name(state)

memory_scan_template = ct("""
forall<&%(global_syncname)s>( %(inputsym)s.data, %(inputsym)s.numtuples, \
Expand Down Expand Up @@ -305,8 +309,6 @@ def __eq__(self, other):

class GrappaSymmetricHashJoin(algebra.Join, GrappaOperator):
_i = 0
wait_template = ct("""%(syncname)s.wait();
""")

@classmethod
def __genBaseName__(cls):
Expand All @@ -318,16 +320,7 @@ def __getHashName__(self):
name = "dhash_%s" % self.symBase
return name

def __getSyncName__(self, side):
base = "dh_sync_%s" % self.symBase
if side == "left":
return base + "_L"
if side == "right":
return base + "_R"
assert False, "type error {left,right}"

def produce(self, state):
self.syncnames = []
self.symBase = self.__genBaseName__()

if not isinstance(self.condition, expression.EQ):
Expand Down Expand Up @@ -378,10 +371,6 @@ def produce(self, state):
self.left.childtag = "left"
self.left.produce(state)

for sn in self.syncnames:
syncname = sn
state.addCode(self.wait_template % locals())

def consume(self, t, src, state):
access_template = ct("""
%(hashname)s.insert_lookup_iter_%(side)s<&%(global_syncname)s>(\
Expand All @@ -405,13 +394,6 @@ def consume(self, t, src, state):
out_tuple_type = self.outTuple.getTupleTypename()
out_tuple_name = self.outTuple.name

syncname = self.__getSyncName__(src.childtag)
# only add such a sync if one doesn't exist yet
if not state.checkPipelineProperty('sync'):
state.setPipelineProperty('sync', syncname)
state.setPipelineProperty('syncdef', syncname)
self.syncnames.append(syncname)

global_syncname = state.getPipelineProperty('global_syncname')

if src.childtag == "right":
Expand Down Expand Up @@ -468,8 +450,6 @@ def consume(self, t, src, state):

class GrappaShuffleHashJoin(algebra.Join, GrappaOperator):
_i = 0
wait_template = ct("""%(syncname)s.wait();
""")

@classmethod
def __genBaseName__(cls):
Expand All @@ -481,14 +461,6 @@ def __getHashName__(self):
name = "hashjoin_reducer_%s" % self.symBase
return name

def __getSyncName__(self, side):
base = "shj_sync_%s" % self.symBase
if side == "left":
return base + "_L"
if side == "right":
return base + "_R"
assert False, "type error {left,right}"

def produce(self, state):
left_sch = self.left.scheme()

Expand Down Expand Up @@ -562,15 +534,14 @@ def produce(self, state):
self.left.produce(state)

state.saveExpr((self.right, self.right_keypos),
(self._hashname, right_type, left_type))
(self._hashname, right_type, left_type,
self.right_syncname, self.left_syncname))

for sn in self.syncnames:
syncname = sn
state.addCode(self.wait_template % locals())
else:
# if found a common subexpression on right child then
# use the same hashtable
self._hashname, right_type, left_type = hashtableInfo
self._hashname, right_type, left_type,\
self.right_syncname, self.left_syncname = hashtableInfo
_LOG.debug("reuse hash %s for %s", self._hashname, self)

# now that Relation is produced, produce its contents by iterating over
Expand All @@ -589,6 +560,11 @@ def produce(self, state):
state.addDeclarations([out_tuple_type_def])

pipeline_sync = create_pipeline_synchronization(state)
get_pipeline_task_name(state)

# add dependences on left and right inputs
state.appendPipelineProperty('dependences', self.right_syncname)
state.appendPipelineProperty('dependences', self.left_syncname)

# reduce is a single self contained pipeline.
# future hashjoin implementations may pipeline out of it
Expand All @@ -613,6 +589,7 @@ def produce(self, state):
def consume(self, inputTuple, fromOp, state):
if fromOp.childtag == "right":
side = "Right"
self.right_syncname = get_pipeline_task_name(state)

keypos = self.right_keypos

Expand All @@ -622,6 +599,7 @@ def consume(self, inputTuple, fromOp, state):
self.rightTupleTypename)
elif fromOp.childtag == "left":
side = "Left"
self.left_syncname = get_pipeline_task_name(state)

keypos = self.left_keypos

Expand All @@ -639,14 +617,6 @@ def consume(self, inputTuple, fromOp, state):
# intra-pipeline sync
global_syncname = state.getPipelineProperty('global_syncname')

# inter-pipeline sync
syncname = self.__getSyncName__(fromOp.childtag)
# only add such a sync if one doesn't exist yet
if not state.checkPipelineProperty('sync'):
state.setPipelineProperty('sync', syncname)
state.setPipelineProperty('syncdef', syncname)
self.syncnames.append(syncname)

mat_template = ct("""%(hashname)s_ctx.emitIntermediate%(side)s\
<&%(global_syncname)s>(\
%(keyname)s.get(%(keypos)s), %(keyname)s);""")
Expand Down Expand Up @@ -773,6 +743,10 @@ def produce(self, state):
""")

pipeline_sync = create_pipeline_synchronization(state)
get_pipeline_task_name(state)

# add a dependence on the input aggregation pipeline
state.appendPipelineProperty('dependences', self.input_syncname)

output_tuple = GrappaStagedTupleRef(gensym(), self.scheme())
output_tuple_name = output_tuple.name
Expand All @@ -785,6 +759,9 @@ def produce(self, state):
state.addPipeline(code)

def consume(self, inputTuple, fromOp, state):
# save the inter-pipeline task name
self.input_syncname = get_pipeline_task_name(state)

inp_sch = self.input.scheme()

if self.useKey:
Expand Down Expand Up @@ -838,6 +815,18 @@ def consume(self, inputTuple, fromOp, state):
return code


def wait_statement(name):
return """{name}.wait();""".format(name=name)


def get_pipeline_task_name(state):
name = "p_task_{n}".format(n=state.getCurrentPipelineId())
state.setPipelineProperty('sync', name)
wait_stmt = wait_statement(name)
state.addMainWaitStatement(wait_stmt)
return name


class GrappaHashJoin(algebra.Join, GrappaOperator):
_i = 0

Expand Down Expand Up @@ -909,14 +898,16 @@ def produce(self, state):
state.addInitializers([init_template % locals()])
self.right.produce(state)
state.saveExpr((self.right, self.right_keypos),
(self._hashname, self.rightTupleTypename))
(self._hashname, self.rightTupleTypename,
self.right_syncname))
# TODO always safe here? I really want to call
# TODO saveExpr before self.right.produce(),
# TODO but I need to get the self.rightTupleTypename cleanly
else:
# if found a common subexpression on right child then
# use the same hashtable
self._hashname, self.rightTupleTypename = hashtableInfo
self._hashname, self.rightTupleTypename, self.right_syncname\
= hashtableInfo
_LOG.debug("reuse hash %s for %s", self._hashname, self)

self.left.childtag = "left"
Expand All @@ -935,6 +926,8 @@ def consume(self, t, src, state):

keypos = self.right_keypos

self.right_syncname = get_pipeline_task_name(state)

self.rightTupleTypename = t.getTupleTypename()
if self.rightTupleTypeRef is not None:
state.resolveSymbol(self.rightTupleTypeRef,
Expand Down Expand Up @@ -962,6 +955,9 @@ def consume(self, t, src, state):
});
""")

# add a dependence on the right pipeline
state.appendPipelineProperty('dependences', self.right_syncname)

hashname = self._hashname
keyname = t.name
keytype = t.getTupleTypename()
Expand Down
19 changes: 5 additions & 14 deletions raco/language/myrialang.py
Expand Up @@ -6,8 +6,8 @@
from raco.catalog import Catalog
from raco.language import Language, Algebra
from raco.expression import UnnamedAttributeRef
from raco.expression.aggregate import (
UdaAggregateExpression, rebase_local_aggregate_output, rebase_finalizer)
from raco.expression.aggregate import (rebase_local_aggregate_output,
rebase_finalizer)
from raco.expression.statevar import *
from raco.datastructure.UnionFind import UnionFind
from raco import types
Expand Down Expand Up @@ -373,6 +373,7 @@ def compile_builtin_agg(agg, child_scheme):
if isinstance(agg, expression.COUNTALL):
return {"type": "CountAll"}

assert isinstance(agg, expression.UnaryOperator)
column = expression.toUnnamed(agg.input, child_scheme).position
return {"type": "SingleColumn",
"aggOps": [MyriaGroupBy.agg_mapping(agg)],
Expand Down Expand Up @@ -1080,7 +1081,7 @@ def fire(self, op):
requires_finalizer = False

for agg in op.aggregate_list:
# Multiple emit arguments can be associted with a single
# Multiple emit arguments can be associated with a single
# decomposition rule; coalesce them all together.
next_state = agg.get_decomposable_state()
assert next_state
Expand Down Expand Up @@ -1141,16 +1142,6 @@ def fire(self, op):
local_gb = MyriaGroupBy(op.grouping_list, local_emitters, op.input,
local_statemods)

shuffle_fields = [UnnamedAttributeRef(i)
for i in range(len(op.grouping_list))]

if len(shuffle_fields) == 0:
# Need to Collect all tuples at once place
shuffle = algebra.Collect(local_gb)
else:
# Need to Shuffle
shuffle = algebra.Shuffle(local_gb, shuffle_fields)

grouping_fields = [UnnamedAttributeRef(i)
for i in range(num_grouping_terms)]

Expand Down Expand Up @@ -1225,7 +1216,7 @@ def fire(self, op):
children = []
MergeToNaryJoin.collect_join_groups(
op, join_groups, children)
# 2. extract join groups from the union find datastructure
# 2. extract join groups from the union-find data structure
join_conds = defaultdict(list)
for field, key in join_groups.parents.items():
join_conds[key].append(field)
Expand Down
4 changes: 2 additions & 2 deletions raco/myrial/interpreter.py
Expand Up @@ -42,7 +42,7 @@ def get_unnamed_ref(column_ref, scheme, offset=0):

def check_binop_compatability(op_name, left, right):
"""Check whether the arguments to an operation are compatible."""
# Todo: check for type compatibilty here?
# Todo: check for type compatibility here?
# https://github.com/uwescience/raco/issues/213
if len(left.scheme()) != len(right.scheme()):
raise SchemaMismatchException(op_name)
Expand All @@ -62,7 +62,7 @@ def __init__(self, symbols, catalog, use_dummy_schema=False):
self.catalog = catalog
self.use_dummy_schema = use_dummy_schema

# Variables accesed by the current operation
# Variables accessed by the current operation
self.uses_set = set()

def get_and_clear_uses_set(self):
Expand Down
2 changes: 1 addition & 1 deletion raco/myrial/optimizer_tests.py
@@ -1,4 +1,4 @@

import collections
import random

from raco.algebra import *
Expand Down

0 comments on commit 15cb168

Please sign in to comment.