Skip to content

Commit

Permalink
Fix arguments to variadic (Python) functions
Browse files Browse the repository at this point in the history
  • Loading branch information
BrandonHaynes committed May 9, 2017
1 parent 004f76b commit 6b719fb
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
25 changes: 17 additions & 8 deletions raco/expression/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .expression import (ZeroaryOperator, UnaryOperator, BinaryOperator,
NaryOperator, types, check_is_numeric, check_type,
TypeSafetyViolation, UnnamedAttributeRef)
TypeSafetyViolation, NamedAttributeRef)


class UnaryFunction(UnaryOperator):
Expand Down Expand Up @@ -176,10 +176,11 @@ def __init__(self, ftype, name, typ, **kwargs):

def bind(self, *args):
return Function(
['arg_{}'.format(i) for i in xrange(len(args))],
['arg{}'.format(i) for i in xrange(len(args))],
self.ftype(self.name,
self.typ,
*[UnnamedAttributeRef(i) for i in xrange(len(args))],
*[NamedAttributeRef('arg{}'.format(i))
for i in xrange(len(args))],
**self.kwargs))


Expand Down Expand Up @@ -322,12 +323,11 @@ class PYUDF(NaryFunction):
literals = []

def __init__(self, name, typ, *args, **kwargs):
super(PYUDF, self).__init__(args)
super(PYUDF, self).__init__(tuple(args))
self.name = name
self.source = kwargs.get('source', None)
self.func = eval(self.source) if self.source else None
self.typ = typ
self.arguments = tuple(args)

def __str__(self):
return "%s(%s, %s, %s)" % (self.__class__.__name__,
Expand All @@ -343,15 +343,24 @@ def __repr__(self):
t=self.typ,
s=self.source)

@property
def arguments(self):
return self.operands

@arguments.setter
def arguments(self, value):
self.operands = value

def apply(self, f):
super(PYUDF, self).apply(f)
self.arguments = tuple(self.arguments)

def typeof(self, scheme, state_scheme):
return self.typ

def set_typ(self, typ):
self.typ = typ

def apply(self, f):
map(lambda a: f(a), self.arguments)

def evaluate(self, _tuple, scheme, state=None):
if self.func:
return self.func(*map(lambda a: a.evaluate(_tuple, scheme, state),
Expand Down
24 changes: 20 additions & 4 deletions raco/myrial/query_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import raco.scheme as scheme
import raco.myrial.groupby
import raco.myrial.myrial_test as myrial_test
from raco.algebra import Apply
from raco import types

from raco.myrial.exceptions import *
Expand Down Expand Up @@ -2883,19 +2884,34 @@ def test_simple_do_while(self):
expected = collections.Counter([(32, 5)])
self.check_result(query, expected, output="powersOfTwo")

def test_pyUDF(self):
def test_pyUDF_dotted_arguments(self):
query = """
T1=scan(%s);
out = [from T1 emit test(T1.id, T1.dept_id) As ratio];
out = [from T1 emit test(T1.id, T1.dept_id) As output];
store(out, OUTPUT);
""" % self.emp_key

self.get_physical_plan(query, udas=[('test', LONG_TYPE)])
plan = self.get_physical_plan(query, udas=[('test', LONG_TYPE)])
apply = [op for op in plan.walk() if isinstance(op, Apply)][0]
ref = apply.emitters[0][1]
assert str(ref) == "PYUDF(test, ['id', 'dept_id'], LONG_TYPE)"

def test_pyUDF_with_positional_arguments(self):
query = """
T1=scan(%s);
out = [from T1 emit test($0, $1) As output];
store(out, OUTPUT);
""" % self.emp_key

plan = self.get_physical_plan(query, udas=[('test', LONG_TYPE)])
apply = [op for op in plan.walk() if isinstance(op, Apply)][0]
ref = apply.emitters[0][1]
assert str(ref) == "PYUDF(test, ['$0', '$1'], LONG_TYPE)"

def test_pyUDF_uda(self):
query = """
uda Foo(x){
[0 as _count,0 as _sum];
[0 as _count, 0 as _sum];
[ _count+1, test_uda(_sum, x)];
[ test_uda(_sum,_count) ];
};
Expand Down

0 comments on commit 6b719fb

Please sign in to comment.