Skip to content

Commit

Permalink
Merge 718157d into 6d75be5
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang authored Jan 28, 2017
2 parents 6d75be5 + 718157d commit b5794d1
Show file tree
Hide file tree
Showing 11 changed files with 877 additions and 86 deletions.
207 changes: 199 additions & 8 deletions raco/algebra.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def __init__(self):
self.cleanup = ""
self.alias = self
self._trace = []
self.stop_recursion = False

def set_stop_recursion(self):
self.stop_recursion = True

@abstractmethod
def apply(self, f):
Expand All @@ -66,6 +70,8 @@ def scheme(self):
def walk(self):
"""Return an iterator over the tree of operators."""
yield self
if self.stop_recursion:
return
for c in self.children():
for x in c.walk():
yield x
Expand Down Expand Up @@ -98,13 +104,13 @@ def preorder(self, f):
for x in c.preorder(f):
yield x

def collectParents(self, parent_map=None):
def collectParents(self, parent_map={}):
"""Construct a dict mapping children to parents. Used in
optimization"""
if parent_map is None:
parent_map = {}
if self.stop_recursion:
return
for c in self.children():
parent_map.setdefault(c, []).append(self)
parent_map.setdefault(id(c), []).append(self)
c.collectParents(parent_map)

def __copy__(self):
Expand All @@ -114,6 +120,8 @@ def __eq__(self, other):
return self.__class__ == other.__class__

def __str__(self):
if self.stop_recursion:
return self.shortStr()
if len(self.children()) > 0:
return "%s%s" % (self.shortStr(), real_str(self.children()))
return self.shortStr()
Expand Down Expand Up @@ -206,7 +214,7 @@ def copy(self, other):
"""Deep copy"""
Operator.copy(self, other)

def compileme(self, resultsym):
def compileme(self):
"""Compile this operator, storing its result in resultsym"""
raise NotImplementedError("{op}.compileme".format(op=type(self)))

Expand Down Expand Up @@ -382,6 +390,163 @@ def shortStr(self):
"""Logical Relational Algebra"""


class ScanIDB(ZeroaryOperator):

def __init__(self, name, _scheme=None, idbcontroller=None):
self.name = name
self._scheme = _scheme
self.idbcontroller = idbcontroller
ZeroaryOperator.__init__(self)

def partitioning(self):
return RepresentationProperties()

def num_tuples(self):
raise NotImplementedError("{op}.num_tuples".format(op=type(self)))

def shortStr(self):
return "%s(%s)" % (self.opname(), self.name)

def scheme(self):
if self._scheme is not None:
return self._scheme
if self.idbcontroller is not None:
return self.idbcontroller.scheme()
return None

def __repr__(self):
return "{op}({name!r},{sch!r},{idb!r})".format(
op=self.opname(), name=self.name, sch=self._scheme,
idb=self.idbcontroller)


class IDBController(NaryOperator):

def __init__(self, name=None, idb_id=None, children=None, emits=None,
relation_key=None, recursion_mode=None):
self.name = name
self.idb_id = idb_id
self.emits = emits
self.relation_key = relation_key
self.recursion_mode = recursion_mode
NaryOperator.__init__(self, children)

def partitioning(self):
return RepresentationProperties()

def get_agg(self):
group_list, expr = self.get_group_agg()
if expr is None:
return {"type": "DupElim"}
if isinstance(expr,
(expression.aggregate.MIN, expression.aggregate.LEXMIN)):
if isinstance(expr, expression.aggregate.MIN):
valueCols = [expr.input.get_position(self.scheme())]
else:
valueCols = [operand.get_position(self.scheme())
for operand in expr.operands]
return {
"type": "KeepMinValue",
"keyColIndices": group_list,
"valueColIndices": valueCols
}
if isinstance(expr, expression.aggregate.COUNTALL):
return {
"type": "CountFilter",
"keyColIndices": group_list,
"threshold": expr.threshold
}

def get_group_agg(self):
agg_list = []
group_list = []
for emit in self.emits:
expr = emit.sexprs[0]
if isinstance(expr, expression.AggregateExpression):
if not isinstance(expr, (expression.aggregate.MIN,
expression.aggregate.LEXMIN,
expression.aggregate.COUNTALL)):
raise NotImplementedError(
"IDBController does not support agg type {}".format(
type(expr)))
agg_list.append(expr)
else:
group_list.append(expr.get_position(self.scheme()))
if len(agg_list) > 1:
raise NotImplementedError("IDBController only can have one agg")
if len(agg_list) == 0:
agg = None
else:
agg = agg_list[0]
return (group_list, agg)

def num_tuples(self):
# TODO
return DEFAULT_CARDINALITY

def scheme(self):
if ((self.children()[0] is not None) and
(not isinstance(self.children()[0], EmptyRelation))):
in_scheme = self.children()[0].scheme()
elif ((self.children()[1] is not None) and
(not isinstance(self.children()[1], EmptyRelation))):
in_scheme = self.children()[1].scheme()
else:
return None

schema = scheme.Scheme()
for index, emit in enumerate(self.emits):
sexpr = emit.sexprs[0]
if isinstance(sexpr, expression.aggregate.LEXMIN):
for col in sexpr.operands:
_name, _type = in_scheme.resolve(col)
schema.addAttribute(_name, _type)
else:
name = (None if emit.column_names is None
else emit.column_names[0])
_name = resolve_attribute_name(name, in_scheme, sexpr, index)
_type = sexpr.typeof(in_scheme, None)
schema.addAttribute(_name, _type)
return schema

def shortStr(self):
return "%s(%s)" % (self.opname(), real_str(self.name,
skip_out=True))

def copy(self, other):
"""deep copy"""
self.name = other.name
self.idb_id = other.idb_id
self.emits = other.emits
self.recursion_mode = other.recursion_mode
NaryOperator.copy(self, other)

def __repr__(self):
return "{op}({name!r},{id!r},{ch!r},{em!r},{key!r},{recur!r})".format(
op=self.opname(), name=self.name, id=self.idb_id, ch=self.args,
em=self.emits, key=self.relation_key, recur=self.recursion_mode)


class EOSController(UnaryOperator):

"""EOSController"""

def __init__(self, input=None):
UnaryOperator.__init__(self, input)

def partitioning(self):
return RepresentationProperties()

def num_tuples(self):
return 1

def scheme(self):
return scheme.Scheme([])

def shortStr(self):
return self.opname()


class IdenticalSchemeBinaryOperator(BinaryOperator):

"""BinaryOperator where both sides have the same schema"""
Expand Down Expand Up @@ -1087,8 +1252,9 @@ class ProjectingJoin(Join):
"""Logical Projecting Join operator"""

def __init__(self, condition=None, left=None, right=None,
output_columns=None):
output_columns=None, pull_order=None):
self.output_columns = output_columns
self.pull_order = pull_order
Join.__init__(self, condition, left, right)

def __eq__(self, other):
Expand All @@ -1102,9 +1268,10 @@ def shortStr(self):
real_str(self.output_columns, skip_out=True))

def __repr__(self):
return "{op}({cond!r}, {l!r}, {r!r}, {oc!r})"\
return "{op}({cond!r}, {l!r}, {r!r}, {oc!r}, {pr!r})"\
.format(op=self.opname(), cond=self.condition,
l=self.left, r=self.right, oc=self.output_columns)
l=self.left, r=self.right, oc=self.output_columns,
pr=self.pull_order)

def copy(self, other):
"""deep copy"""
Expand Down Expand Up @@ -1411,6 +1578,7 @@ class EmptyRelation(ZeroaryOperator):
"""Relation with no tuples."""

def __init__(self, _scheme=None):
ZeroaryOperator.__init__(self)
self._scheme = _scheme

def num_tuples(self):
Expand Down Expand Up @@ -1830,6 +1998,29 @@ def scheme(self):
return None


class UntilConvergence(NaryOperator):

def __init__(self, ops=None, pull_order=None):
"""Repeatedly execute a sequence of plans until convergence.
:params ops: A list of operations to execute in parallel.
"""
NaryOperator.__init__(self, ops)
self.pull_order = pull_order

def partitioning(self):
return RepresentationProperties()

def num_tuples(self):
raise NotImplementedError("{op}.num_tuples".format(op=type(self)))

def shortStr(self):
return self.opname()

def scheme(self):
"""UntilConvergence does not return any tuples."""
return None


def inline_operator(dest_op, var, target_op):
"""Convert two operator trees into one by inlining.
Expand Down
Loading

0 comments on commit b5794d1

Please sign in to comment.