diff --git a/pytest.ini b/pytest.ini index 4cb808b..7c3dd5f 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,6 +2,5 @@ addopts = testpaths = tests src/RestrictedPython/tests norecursedirs = fixures - isort_ignore = bootstrap.py diff --git a/src/RestrictedPython/test_helper.py b/src/RestrictedPython/test_helper.py index 972374c..6f9c7e9 100644 --- a/src/RestrictedPython/test_helper.py +++ b/src/RestrictedPython/test_helper.py @@ -22,6 +22,7 @@ """ import dis +import sys import types from dis import findlinestarts @@ -135,7 +136,11 @@ def __init__(self, opcode, pos): def _disassemble(co, lasti=-1): + # in py2 code is str code = co.co_code + #in py3 code is bytes + #if sys.version_info.major > 2: + # code = code.decode() labels = dis.findlabels(code) linestarts = dict(findlinestarts(co)) n = len(code) @@ -143,7 +148,10 @@ def _disassemble(co, lasti=-1): extended_arg = 0 free = co.co_cellvars + co.co_freevars while i < n: - op = ord(code[i]) + if sys.version_info.major < 3: + op = ord(code[i]) + else: + op = code[i] o = Op(op, i) i += 1 if i in linestarts and i > 0: @@ -151,7 +159,11 @@ def _disassemble(co, lasti=-1): if i in labels: o.target = True if op > dis.HAVE_ARGUMENT: - arg = ord(code[i]) + ord(code[i + 1]) * 256 + extended_arg + if sys.version_info.major < 3: + arg = ord(code[i]) + ord(code[i + 1]) * 256 + extended_arg + else: + arg = code[i] + code[i + 1] * 256 + extended_arg + extended_arg = 0 i += 2 if op == dis.EXTENDED_ARG: diff --git a/src/RestrictedPython/tests/security_in_syntax.py b/src/RestrictedPython/tests/security_in_syntax.py index 7e7c703..c0ad803 100644 --- a/src/RestrictedPython/tests/security_in_syntax.py +++ b/src/RestrictedPython/tests/security_in_syntax.py @@ -87,3 +87,40 @@ def no_augmeneted_assignment_to_slice(): def no_augmeneted_assignment_to_slice2(): a[x:y:z] += c + +# These are all supposed to raise a SyntaxError when using +# compile_restricted() but not when using compile(). +# Each function in this module is compiled using compile_restricted(). + + +def with_as_bad_name(): + with x as _leading_underscore: + pass + + +def relative_import_as_bad_name(): + from .x import y as _leading_underscore + + +def except_as_bad_name(): + try: + 1 / 0 + except Exception as _leading_underscore: + pass + +# These are all supposed to raise a SyntaxError when using +# compile_restricted() but not when using compile(). +# Each function in this module is compiled using compile_restricted(). + + +def dict_comp_bad_name(): + {y: y for _restricted_name in x} + + +def set_comp_bad_name(): + {y for _restricted_name in x} + + +def compound_with_bad_name(): + with a as b, c as _restricted_name: + pass diff --git a/src/RestrictedPython/transformer.py b/src/RestrictedPython/transformer.py index b0cd219..e000aa5 100644 --- a/src/RestrictedPython/transformer.py +++ b/src/RestrictedPython/transformer.py @@ -123,7 +123,7 @@ #ast.Nonlocal, ast.ClassDef, ast.Module, - ast.Param + ast.Param, ] @@ -158,7 +158,9 @@ ast.Bytes, ast.Starred, ast.arg, - #ast.Try, # Try should not be supported + ast.Try, # Try should not be supported + ast.TryExcept, # TryExcept should not be supported + ast.NameConstant ]) if version >= (3, 4): @@ -322,8 +324,8 @@ def check_name(self, node, name): self.error(node, '"%s" is an invalid variable name because ' 'it ends with "__roles__".' % name) - elif name == "printed": - self.error(node, '"printed" is a reserved name.') + # elif name == "printed": + # self.error(node, '"printed" is a reserved name.') def transform_tuple_unpack(self, root, src, to_wrap=None): """Protects tuple unpacking with _getiter_ diff --git a/tests/fixtures/class.py b/tests/fixtures/class.py new file mode 100644 index 0000000..cc86e8e --- /dev/null +++ b/tests/fixtures/class.py @@ -0,0 +1,13 @@ +class MyClass: + + def set(self, val): + self.state = val + + def get(self): + return self.state + +x = MyClass() +x.set(12) +x.set(x.get() + 1) +if x.get() != 13: + raise AssertionError("expected 13, got %d" % x.get()) diff --git a/tests/fixtures/lambda.py b/tests/fixtures/lambda.py new file mode 100644 index 0000000..9a268b7 --- /dev/null +++ b/tests/fixtures/lambda.py @@ -0,0 +1,5 @@ +f = lambda x, y=1: x + y +if f(2) != 3: + raise ValueError +if f(2, 2) != 4: + raise ValueError diff --git a/tests/fixtures/restricted_module.py b/tests/fixtures/restricted_module.py new file mode 100644 index 0000000..e6e7cc4 --- /dev/null +++ b/tests/fixtures/restricted_module.py @@ -0,0 +1,210 @@ +import sys + + +def print0(): + print 'Hello, world!', + return printed + + +def print1(): + print 'Hello,', + print 'world!', + return printed + + +def printStuff(): + print 'a', 'b', 'c', + return printed + + +def printToNone(): + x = None + print >>x, 'Hello, world!', + return printed + + +def printLines(): + # This failed before Zope 2.4.0a2 + r = range(3) + for n in r: + for m in r: + print m + n * len(r), + print + return printed + + +def try_map(): + inc = lambda i: i + 1 + x = [1, 2, 3] + print map(inc, x), + return printed + + +def try_apply(): + def f(x, y, z): + return x + y + z + print f(*(300, 20), **{'z': 1}), + return printed + + +def try_inplace(): + x = 1 + x += 3 + + +def primes(): + # Somewhat obfuscated code on purpose + print filter(None,map(lambda y:y*reduce(lambda x,y:x*y!=0, + map(lambda x,y=y:y%x,range(2,int(pow(y,0.5)+1))),1),range(2,20))), + return printed + + +def allowed_read(ob): + print ob.allowed + print ob.s + print ob[0] + print ob[2] + print ob[3:-1] + print len(ob) + return printed + + +def allowed_default_args(ob): + def f(a=ob.allowed, s=ob.s): + return a, s + + +def allowed_simple(): + q = {'x': 'a'} + q['y'] = 'b' + q.update({'z': 'c'}) + r = ['a'] + r.append('b') + r[2:2] = ['c'] + s = 'a' + s = s[:100] + 'b' + s += 'c' + if sys.version_info >= (2, 3): + t = ['l', 'm', 'n', 'o', 'p', 'q'] + t[1:5:2] = ['n', 'p'] + _ = q + + return q['x'] + q['y'] + q['z'] + r[0] + r[1] + r[2] + s + + +def allowed_write(ob): + ob.writeable = 1 + # ob.writeable += 1 + [1 for ob.writeable in 1, 2] + ob['safe'] = 2 + # ob['safe'] += 2 + [1 for ob['safe'] in 1, 2] + + +def denied_print(ob): + print >> ob, 'Hello, world!', + + +def denied_getattr(ob): + # ob.disallowed += 1 + ob.disallowed = 1 + return ob.disallowed + + +def denied_default_args(ob): + def f(d=ob.disallowed): + return d + + +def denied_setattr(ob): + ob.allowed = -1 + + +def denied_setattr2(ob): + # ob.allowed += -1 + ob.allowed = -1 + + +def denied_setattr3(ob): + [1 for ob.allowed in 1, 2] + + +def denied_getitem(ob): + ob[1] + + +def denied_getitem2(ob): + # ob[1] += 1 + ob[1] + + +def denied_setitem(ob): + ob['x'] = 2 + + +def denied_setitem2(ob): + # ob[0] += 2 + ob['x'] = 2 + + +def denied_setitem3(ob): + [1 for ob['x'] in 1, 2] + + +def denied_setslice(ob): + ob[0:1] = 'a' + + +def denied_setslice2(ob): + # ob[0:1] += 'a' + ob[0:1] = 'a' + + +def denied_setslice3(ob): + [1 for ob[0:1] in 1, 2] + + +##def strange_attribute(): +## # If a guard has attributes with names that don't start with an +## # underscore, those attributes appear to be an attribute of +## # anything. +## return [].attribute_of_anything + +def order_of_operations(): + return 3 * 4 * -2 + 2 * 12 + + +def rot13(ss): + mapping = {} + orda = ord('a') + ordA = ord('A') + for n in range(13): + c1 = chr(orda + n) + c2 = chr(orda + n + 13) + c3 = chr(ordA + n) + c4 = chr(ordA + n + 13) + mapping[c1] = c2 + mapping[c2] = c1 + mapping[c3] = c4 + mapping[c4] = c3 + del c1, c2, c3, c4, orda, ordA + res = '' + for c in ss: + res = res + mapping.get(c, c) + return res + + +def nested_scopes_1(): + # Fails if 'a' is consumed by the first function. + a = 1 + + def f1(): + return a + + def f2(): + return a + return f1() + f2() + + +class Classic: + pass diff --git a/tests/fixtures/restricted_module_py3.py b/tests/fixtures/restricted_module_py3.py new file mode 100644 index 0000000..7ef777d --- /dev/null +++ b/tests/fixtures/restricted_module_py3.py @@ -0,0 +1,212 @@ +import sys + + +def print0(): + print('Hello, world!', end="") + return printed + + +def print1(): + print('Hello,', end="") + print('world!', end="") + return printed + + +def printStuff(): + print('a', 'b', 'c', end="") + return printed + + +def printToNone(): + x = None + print('Hello, world!', end="", file=x) + return printed + + +def printLines(): + # This failed before Zope 2.4.0a2 + r = range(3) + for n in r: + for m in r: + print(m + n * len(r), end="") + print("") + return printed + + +def try_map(): + inc = lambda i: i + 1 + x = [1, 2, 3] + print(map(inc, x), end="") + return printed + + +def try_apply(): + def f(x, y, z): + return x + y + z + print(f(*(300, 20), **{'z': 1}), end="") + return printed + + +def try_inplace(): + x = 1 + x += 3 + + +def primes(): + # Somewhat obfuscated code on purpose + print(filter(None,map(lambda y:y*reduce(lambda x,y:x*y!=0, + map(lambda x,y=y:y%x,range(2,int(pow(y,0.5)+1))),1),range(2,20))), end="") + return printed + + +def allowed_read(ob): + print(ob.allowed) + print(ob.s) + print(ob[0]) + print(ob[2]) + print(ob[3:-1]) + print(len(ob)) + return printed + + +def allowed_default_args(ob): + def f(a=ob.allowed, s=ob.s): + return a, s + + +def allowed_simple(): + q = {'x': 'a'} + q['y'] = 'b' + q.update({'z': 'c'}) + r = ['a'] + r.append('b') + r[2:2] = ['c'] + s = 'a' + s = s[:100] + 'b' + s += 'c' + if sys.version_info >= (2, 3): + t = ['l', 'm', 'n', 'o', 'p', 'q'] + t[1:5:2] = ['n', 'p'] + _ = q + + return q['x'] + q['y'] + q['z'] + r[0] + r[1] + r[2] + s + + +def allowed_write(ob): + ob.writeable = 1 + # ob.writeable += 1 + [1 for ob.writeable in (1, 2)] + ob['safe'] = 2 + # ob['safe'] += 2 + [1 for ob['safe'] in (1, 2)] + + +def denied_print(ob): + print('Hello, world!', end="", file=ob) + + +def denied_getattr(ob): + # ob.disallowed += 1 + ob.disallowed = 1 + return ob.disallowed + + +def denied_default_args(ob): + def f(d=ob.disallowed): + return d + + +def denied_setattr(ob): + ob.allowed = -1 + + +def denied_setattr2(ob): + # ob.allowed += -1 + ob.allowed = -1 + + +def denied_setattr3(ob): + [1 for ob.allowed in (1, 2)] + + +def denied_getitem(ob): + ob[1] + + +def denied_getitem2(ob): + # ob[1] += 1 + ob[1] + + +def denied_setitem(ob): + ob['x'] = 2 + + +def denied_setitem2(ob): + # ob[0] += 2 + ob['x'] = 2 + + +def denied_setitem3(ob): + [1 for ob['x'] in (1, 2)] + + +def denied_setslice(ob): + ob[0:1] = 'a' + + +def denied_setslice2(ob): + # ob[0:1] += 'a' + ob[0:1] = 'a' + + +def denied_setslice3(ob): + [1 for ob[0:1] in (1, 2)] + + +##def strange_attribute(): +## # If a guard has attributes with names that don't start with an +## # underscore, those attributes appear to be an attribute of +## # anything. +## return [].attribute_of_anything + +def order_of_operations(): + return 3 * 4 * -2 + 2 * 12 + + +def rot13(ss): + mapping = {} + orda = ord('a') + ordA = ord('A') + for n in range(13): + c1 = chr(orda + n) + c2 = chr(orda + n + 13) + c3 = chr(ordA + n) + c4 = chr(ordA + n + 13) + mapping[c1] = c2 + mapping[c2] = c1 + mapping[c3] = c4 + mapping[c4] = c3 + del c1, c2, c3, c4, orda, ordA + res = '' + for c in ss: + res = res + mapping.get(c, c) + return res + + +def nested_scopes_1(): + # Fails if 'a' is consumed by the first function. + a = 1 + + def f1(): + return a + + def f2(): + return a + return f1() + f2() + + +#class Classic: +# +# def __init__(self): +# pass diff --git a/tests/fixtures/security_in_syntax.py b/tests/fixtures/security_in_syntax.py new file mode 100644 index 0000000..2731ed9 --- /dev/null +++ b/tests/fixtures/security_in_syntax.py @@ -0,0 +1,126 @@ +# These are all supposed to raise a SyntaxError when using +# compile_restricted() but not when using compile(). +# Each function in this module is compiled using compile_restricted(). + + +def overrideGuardWithFunction(): + def _getattr(o): + return o + + +def overrideGuardWithLambda(): + lambda o, _getattr=None: o + + +def overrideGuardWithClass(): + class _getattr: + pass + + +def overrideGuardWithName(): + _getattr = None + + +def overrideGuardWithArgument(): + def f(_getattr=None): + pass + + +def reserved_names(): + printed = '' + + +def bad_name(): # ported + __ = 12 + + +def bad_attr(): # ported + some_ob._some_attr = 15 + + +def no_exec(): # ported + exec('q = 1') + + +def no_yield(): # ported + yield 42 + + +def check_getattr_in_lambda(arg=lambda _getattr=(lambda ob, name: name): + _getattr): + 42 + + +def import_as_bad_name(): + import os as _leading_underscore + + +def from_import_as_bad_name(): + from x import y as _leading_underscore + + +def except_using_bad_name(): + try: + foo + except NameError: #, _leading_underscore: + # The name of choice (say, _write) is now assigned to an exception + # object. Hard to exploit, but conceivable. + pass + + +def keyword_arg_with_bad_name(): + def f(okname=1, __badname=2): + pass + + +def no_augmeneted_assignment_to_sub(): + a[b] += c + + +def no_augmeneted_assignment_to_attr(): + a.b += c + + +def no_augmeneted_assignment_to_slice(): + a[x:y] += c + + +def no_augmeneted_assignment_to_slice2(): + a[x:y:z] += c + +# These are all supposed to raise a SyntaxError when using +# compile_restricted() but not when using compile(). +# Each function in this module is compiled using compile_restricted(). + + +def with_as_bad_name(): + with x as _leading_underscore: + pass + + +def relative_import_as_bad_name(): + from .x import y as _leading_underscore + + +def except_as_bad_name(): + try: + 1 / 0 + except Exception as _leading_underscore: + pass + +# These are all supposed to raise a SyntaxError when using +# compile_restricted() but not when using compile(). +# Each function in this module is compiled using compile_restricted(). + + +def dict_comp_bad_name(): + {y: y for _restricted_name in x} + + +def set_comp_bad_name(): + {y for _restricted_name in x} + + +def compound_with_bad_name(): + with a as b, c as _restricted_name: + pass diff --git a/tests/fixtures/unpack.py b/tests/fixtures/unpack.py new file mode 100644 index 0000000..dd57fa3 --- /dev/null +++ b/tests/fixtures/unpack.py @@ -0,0 +1,91 @@ +# A series of short tests for unpacking sequences. + + +def u1(L): + x, y = L + assert x == 1 + assert y == 2 + +u1([1, 2]) +u1((1, 2)) + + +def u1a(L): + x, y = L + assert x == '1' + assert y == '2' + +u1a("12") + +try: + u1([1]) +except ValueError: + pass +else: + raise AssertionError("expected 'unpack list of wrong size'") + + +def u2(L): + x, (a, b), y = L + assert x == 1 + assert a == 2 + assert b == 3 + assert y == 4 + +u2([1, [2, 3], 4]) +u2((1, (2, 3), 4)) + +try: + u2([1, 2, 3]) +except TypeError: + pass +else: + raise AssertionError("expected 'iteration over non-sequence'") + + +def u3((x, y)): + assert x == 'a' + assert y == 'b' + return x, y + +u3(('a', 'b')) + + +def u4(x): + (a, b), c = d, (e, f) = x + assert a == 1 and b == 2 and c == (3, 4) + assert d == (1, 2) and e == 3 and f == 4 + + +u4(((1, 2), (3, 4))) + + +def u5(x): + try: + raise TypeError(x) + # This one is tricky to test, because the first level of unpacking + # has a TypeError instance. That's a headache for the test driver. + except TypeError, [(a, b)]: + assert a == 42 + assert b == 666 + +u5([42, 666]) + + +def u6(x): + expected = 0 + for i, j in x: + assert i == expected + expected += 1 + assert j == expected + expected += 1 + +u6([[0, 1], [2, 3], [4, 5]]) + + +def u7(x): + stuff = [i + j for toplevel, in x for i, j in toplevel] + assert stuff == [3, 7] + + +u7(([[[1, 2]]], [[[3, 4]]])) diff --git a/tests/fixtures/unpack_py3.py b/tests/fixtures/unpack_py3.py new file mode 100644 index 0000000..ead2177 --- /dev/null +++ b/tests/fixtures/unpack_py3.py @@ -0,0 +1,92 @@ +# A series of short tests for unpacking sequences. + + +def u1(L): + x, y = L + assert x == 1 + assert y == 2 + +u1([1, 2]) +u1((1, 2)) + + +def u1a(L): + x, y = L + assert x == '1' + assert y == '2' + +u1a("12") + +try: + u1([1]) +except ValueError: + pass +else: + raise AssertionError("expected 'unpack list of wrong size'") + + +def u2(L): + x, (a, b), y = L + assert x == 1 + assert a == 2 + assert b == 3 + assert y == 4 + +u2([1, [2, 3], 4]) +u2((1, (2, 3), 4)) + +try: + u2([1, 2, 3]) +except TypeError: + pass +else: + raise AssertionError("expected 'iteration over non-sequence'") + + +def u3(x, y): + assert x == 'a' + assert y == 'b' + return x, y + +u3(('a', 'b')) + + +def u4(x): + (a, b), c = d, (e, f) = x + assert a == 1 and b == 2 and c == (3, 4) + assert d == (1, 2) and e == 3 and f == 4 + + +u4(((1, 2), (3, 4))) + + +def u5(x): + try: + raise TypeError(x) + # This one is tricky to test, because the first level of unpacking + # has a TypeError instance. That's a headache for the test driver. + except TypeError as e: + import pdb; pdb.set_strace() + assert a == 42 + assert b == 666 + +u5([42, 666]) + + +def u6(x): + expected = 0 + for i, j in x: + assert i == expected + expected += 1 + assert j == expected + expected += 1 + +u6([[0, 1], [2, 3], [4, 5]]) + + +def u7(x): + stuff = [i + j for toplevel, in x for i, j in toplevel] + assert stuff == [3, 7] + + +u7(([[[1, 2]]], [[[3, 4]]])) diff --git a/tests/test_print.py b/tests/test_print.py index 838d4c7..2b5ef45 100644 --- a/tests/test_print.py +++ b/tests/test_print.py @@ -39,13 +39,13 @@ """ ALLOWED_FUTURE_PRINT_FUNCTION = """\ -from __future import print_function +from __future__ import print_function print('Hello World!') """ ALLOWED_FUTURE_MULTI_PRINT_FUNCTION = """\ -from __future import print_function +from __future__ import print_function print('Hello World!', 'Hello Earth!') """ diff --git a/tests/test_restrictions.py b/tests/test_restrictions.py new file mode 100644 index 0000000..5dba904 --- /dev/null +++ b/tests/test_restrictions.py @@ -0,0 +1,694 @@ +from RestrictedPython import PrintCollector +from RestrictedPython.test_helper import verify + +import os +import pytest +import re +import sys + + +if sys.version_info.major > 2: + from RestrictedPython.compile import compile_restricted +else: + from RestrictedPython.RCompile import RFunction + from RestrictedPython.Eval import RestrictionCapableEval + from RestrictedPython.RCompile import compile_restricted + from RestrictedPython.tests import restricted_module + +try: + __file__ +except NameError: + __file__ = os.path.abspath(sys.argv[1]) +_FILEPATH = os.path.abspath(__file__) +_HERE = os.path.dirname(_FILEPATH) + + +def _getindent(line): + """Returns the indentation level of the given line.""" + indent = 0 + for c in line: + if c == ' ': + indent = indent + 1 + elif c == '\t': + indent = indent + 8 + else: + break + return indent + + +def find_source(fn, func): + """Given a func_code object, this function tries to find and return + the python source code of the function. + Originally written by + Harm van der Heijden (H.v.d.Heijden@phys.tue.nl)""" + f = open(fn, "r") + for i in range(func.co_firstlineno): + line = f.readline() + ind = _getindent(line) + msg = "" + while line: + msg = msg + line + line = f.readline() + # the following should be <= ind, but then we get + # confused by multiline docstrings. Using == works most of + # the time... but not always! + if _getindent(line) == ind: + break + f.close() + return fn, msg + + +def get_source(func): + """Less silly interface to find_source""" + file = func.func_globals['__file__'] + if file.endswith('.pyc'): + file = file[:-1] + source = find_source(file, func.func_code)[1] + assert source.strip(), "Source should not be empty!" + return source + + +def create_rmodule(): + global rmodule + if sys.version_info.major > 2: + fn = os.path.join(_HERE, 'fixtures', 'restricted_module_py3.py') + else: + fn = os.path.join(_HERE, 'fixtures', 'restricted_module.py') + + f = open(fn, 'r') + source = f.read() + f.close() + # Sanity check + compile(source, fn, 'exec') + # Now compile it for real + code = compile_restricted(source, fn, 'exec') + rmodule = {'__builtins__': {'__import__': __import__, + 'None': None, + '__name__': 'restricted_module' + } + } + + builtins = getattr(__builtins__, '__dict__', __builtins__) + words = ('map', 'int', 'pow', 'range', 'filter', + 'len', 'chr', 'ord', 'print') + for name in words: + rmodule[name] = builtins[name] + + if sys.version_info.major < 3: + rmodule['reduce'] = builtins['reduce'] + + exec(code, rmodule) + + +class AccessDenied (Exception): + pass + +DisallowedObject = [] + + +class TestGuard: + '''A guard class''' + def __init__(self, _ob, write=None): + self.__dict__['_ob'] = _ob + + # Write guard methods + + def __setattr__(self, name, value): + _ob = self.__dict__['_ob'] + writeable = getattr(_ob, '__writeable_attrs__', ()) + if name not in writeable: + raise AccessDenied + if name[:5] == 'func_': + raise AccessDenied + setattr(_ob, name, value) + + def __setitem__(self, index, value): + _ob = self.__dict__['_ob'] + _ob[index] = value + + def __setslice__(self, lo, hi, value): + _ob = self.__dict__['_ob'] + _ob[lo:hi] = value + +# A wrapper for _apply_. +apply_wrapper_called = [] + + +def apply_wrapper(func, *args, **kws): + apply_wrapper_called.append('yes') + return func(*args, **kws) + +inplacevar_wrapper_called = {} + + +def inplacevar_wrapper(op, x, y): + inplacevar_wrapper_called[op] = x, y + # This is really lame. But it's just a test. :) + globs = {'x': x, 'y': y} + exec('x' + op + 'y', globs) + return globs['x'] + + +class RestrictedObject: + disallowed = DisallowedObject + allowed = 1 + _ = 2 + __ = 3 + _some_attr = 4 + __some_other_attr__ = 5 + s = 'Another day, another test...' + __writeable_attrs__ = ('writeable',) + + def __getitem__(self, idx): + if idx == 'protected': + raise AccessDenied + elif idx == 0 or idx == 'safe': + return 1 + elif idx == 1: + return DisallowedObject + else: + return self.s[idx] + + def __getslice__(self, lo, hi): + return self.s[lo:hi] + + def __len__(self): + return len(self.s) + + def __setitem__(self, idx, v): + if idx == 'safe': + self.safe = v + else: + raise AccessDenied + + def __setslice__(self, lo, hi, value): + raise AccessDenied + + write = DisallowedObject + + +def guarded_getattr(ob, name): + v = getattr(ob, name) + if v is DisallowedObject: + raise AccessDenied + return v + +SliceType = type(slice(0)) + + +def guarded_getitem(ob, index): + if type(index) is SliceType and index.step is None: + start = index.start + stop = index.stop + if start is None: + start = 0 + if stop is None: + v = ob[start:] + else: + v = ob[start:stop] + else: + v = ob[index] + if v is DisallowedObject: + raise AccessDenied + return v + + +def minimal_import(name, _globals, _locals, names): + if name != "__future__": + raise ValueError("Only future imports are allowed") + import __future__ + return __future__ + + +def exec_func(name, *args, **kw): + func = rmodule[name] + func_dict = {'_getattr_': guarded_getattr, + '_getitem_': guarded_getitem, + '_write_': TestGuard, + '_print_': PrintCollector, + '_getiter_': iter, + '_apply_': apply_wrapper, + '_inplacevar_': inplacevar_wrapper, + } + if sys.version_info.major < 3: + verify(func.func_code) + func.func_globals.update(func_dict) + else: + verify(func.__code__) + func.__globals__.update(func_dict) + return func(*args, **kw) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_print(): + for i in range(2): + res = exec_func('print%s' % i) + assert res == 'Hello, world!' + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_print_to_None(): + try: + res = exec_func('printToNone') + except AttributeError: + # Passed. "None" has no "write" attribute. + pass + else: + raise AssertionError(res) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_print_stuff(): + res = exec_func('printStuff') + assert res == 'a b c' + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_print_lines(): + res = exec_func('printLines') + assert res == '0 1 2\n3 4 5\n6 7 8\n' + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_primes(): + res = exec_func('primes') + assert res == '[2, 3, 5, 7, 11, 13, 17, 19]' + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_allowed_simple(): + res = exec_func('allowed_simple') + assert res == 'abcabcabc' + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_allowed_read(): + res = exec_func('allowed_read', RestrictedObject()) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_allowed_write(): + res = exec_func('allowed_write', RestrictedObject()) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_allowed_args(): + res = exec_func('allowed_default_args', RestrictedObject()) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_try_map(): + res = exec_func('try_map') + assert res == "[2, 3, 4]" + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="print statement no longer exists in python 3") +def test_apply(): + del apply_wrapper_called[:] + res = exec_func("try_apply") + assert apply_wrapper_called == ["yes"] + assert res == "321" + + +def test_inplace(): + inplacevar_wrapper_called.clear() + exec_func('try_inplace') + inplacevar_wrapper_called['+='] == (1, 3) + + +def test_denied(): + for k in [k for k in rmodule.keys() if k.startswith("denied")]: + try: + exec_func(k, RestrictedObject()) + + except (AccessDenied, TypeError): + # Passed the test + # TODO: TypeError is not 100% correct ... + # remove this and fixed `denied` + pass + else: + raise AssertionError('%s() did not trip security' % k) + + +def test_syntax_security(): + # Ensures that each of the functions in security_in_syntax.py + # throws a SyntaxError when using compile_restricted. + fn = os.path.join(_HERE, 'fixtures', 'security_in_syntax.py') + f = open(fn, 'r') + source = f.read() + f.close() + # Unrestricted compile. + code = compile(source, fn, 'exec') + m = {'__builtins__': {'__import__': minimal_import}} + exec(code, m) + for k, v in m.items(): + if hasattr(v, 'func_code'): + filename, source = find_source(fn, v.func_code) + # Now compile it with restrictions + try: + code = compile_restricted(source, filename, 'exec') + except SyntaxError: + # Passed the test. + pass + else: + raise AssertionError('%s should not have compiled' % k) + + +def test_order_of_operations(): + res = exec_func('order_of_operations') + assert res == 0 + + +def test_rot13(): + res = exec_func('rot13', 'Zope is k00l') + assert res == 'Mbcr vf x00y' + + +def test_nested_scopes1(): + res = exec_func('nested_scopes_1') + assert res == 2 + +# TODO: check if this need py3 love +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_unrestricted_eval(): + expr = RestrictionCapableEval("{'a':[m.pop()]}['a'] + [m[0]]") + v = [12, 34] + expect = v[:] + expect.reverse() + res = expr.eval({'m': v}) + assert res == expect + v = [12, 34] + res = expr(m=v) + assert res == expect + + +def test_stacksize(): + for k, rfunc in rmodule.items(): + if not k.startswith('_') and hasattr(rfunc, 'func_code'): + rss = rfunc.func_code.co_stacksize + ss = getattr(restricted_module, k).func_code.co_stacksize + + if not rss >= ss: + raise AssertionError( + 'The stack size estimate for %s() ' + 'should have been at least %d, but was only %d' + % (k, ss, rss)) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_before_and_after(): + from RestrictedPython.RCompile import RModule + from RestrictedPython.tests import before_and_after + from compiler import parse + + defre = re.compile(r'def ([_A-Za-z0-9]+)_(after|before)\(') + + beforel = [name for name in before_and_after.__dict__ + if name.endswith("_before")] + + for name in beforel: + before = getattr(before_and_after, name) + before_src = get_source(before) + before_src = re.sub(defre, r'def \1(', before_src) + rm = RModule(before_src, '') + tree_before = rm._get_tree() + + after = getattr(before_and_after, name[:-6] + 'after') + after_src = get_source(after) + after_src = re.sub(defre, r'def \1(', after_src) + tree_after = parse(after_src) + + assert str(tree_before) == str(tree_after) + + rm.compile() + verify(rm.getCode()) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def _test_before_and_after(mod): + from RestrictedPython.RCompile import RModule + from compiler import parse + + defre = re.compile(r'def ([_A-Za-z0-9]+)_(after|before)\(') + + beforel = [name for name in mod.__dict__ + if name.endswith("_before")] + + for name in beforel: + before = getattr(mod, name) + before_src = get_source(before) + before_src = re.sub(defre, r'def \1(', before_src) + rm = RModule(before_src, '') + tree_before = rm._get_tree() + + after = getattr(mod, name[:-6] + 'after') + after_src = get_source(after) + after_src = re.sub(defre, r'def \1(', after_src) + tree_after = parse(after_src) + + assert str(tree_before) == str(tree_after) + + rm.compile() + verify(rm.getCode()) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_BeforeAndAfter24(): + from RestrictedPython.tests import before_and_after24 + _test_before_and_after(before_and_after24) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_BeforeAndAfter25(): + from RestrictedPython.tests import before_and_after25 + _test_before_and_after(before_and_after25) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_BeforeAndAfter26(): + from RestrictedPython.tests import before_and_after26 + _test_before_and_after(before_and_after26) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_BeforeAndAfter27(): + from RestrictedPython.tests import before_and_after27 + _test_before_and_after(before_and_after27) + + +def _compile_file(name): + path = os.path.join(_HERE, 'fixtures', name) + f = open(path, "r") + source = f.read() + f.close() + + co = compile_restricted(source, path, "exec") + verify(co) + return co + + +# TODO: alex, unpacking syntax in Python3 has slightly changed +# We need to carefully design the tests in unpack_py3.py +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_unpack_sequence(): + if sys.version_info.major > 2: + co = _compile_file("unpack_py3.py") + else: + co = _compile_file("unpack.py") + + calls = [] + + def getiter(seq): + calls.append(seq) + return list(seq) + globals = {"_getiter_": getiter, '_inplacevar_': inplacevar_wrapper} + exec(co, globals, {}) + # The comparison here depends on the exact code that is + # contained in unpack.py. + # The test doing implicit unpacking in an "except:" clause is + # a pain, because there are two levels of unpacking, and the top + # level is unpacking the specific TypeError instance constructed + # by the test. We have to worm around that one. + ineffable = "a TypeError instance" + expected = [[1, 2], + (1, 2), + "12", + [1], + [1, [2, 3], 4], + [2, 3], + (1, (2, 3), 4), + (2, 3), + [1, 2, 3], + 2, + ('a', 'b'), + ((1, 2), (3, 4)), (1, 2), + ((1, 2), (3, 4)), (3, 4), + ineffable, [42, 666], + [[0, 1], [2, 3], [4, 5]], [0, 1], [2, 3], [4, 5], + ([[[1, 2]]], [[[3, 4]]]), [[[1, 2]]], [[1, 2]], [1, 2], + [[[3, 4]]], [[3, 4]], [3, 4], + ] + i = expected.index(ineffable) + assert isinstance(calls[i], TypeError) is True + expected[i] = calls[i] + assert calls == expected + + +# TODO: alex, unpacking syntax in Python3 has slightly changed +# We need to carefully design the tests in unpack_py3.py +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_unpack_sequence_expression(): + """ + On python3 this fails with: + + SyntaxError: ('Line None: Expression statements are not allowed.',) + """ + co = compile_restricted("[x for x, y in [(1, 2)]]", "", "eval") + verify(co) + calls = [] + + def getiter(s): + calls.append(s) + return list(s) + globals = {"_getiter_": getiter} + exec(co, globals, {}) + assert calls == [[(1, 2)], (1, 2)] + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_unpack_sequence_single(): + """ + On python3 this fails with: + + SyntaxError: ('Line None: Interactive statements are not allowed.',) + """ + co = compile_restricted("x, y = 1, 2", "", "single") + verify(co) + calls = [] + + def getiter(s): + calls.append(s) + return list(s) + globals = {"_getiter_": getiter} + exec(co, globals, {}) + assert calls == [(1, 2)] + + +def test_class(): + getattr_calls = [] + setattr_calls = [] + + def test_getattr(obj, attr): + getattr_calls.append(attr) + return getattr(obj, attr) + + def test_setattr(obj): + setattr_calls.append(obj.__class__.__name__) + return obj + + co = _compile_file("class.py") + globals = {"_getattr_": test_getattr, + "_write_": test_setattr, + } + exec(co, globals, {}) + # Note that the getattr calls don't correspond to the method call + # order, because the x.set method is fetched before its arguments + # are evaluated. + assert getattr_calls == ["set", "set", "get", "state", "get", "state"] + assert setattr_calls == ["MyClass", "MyClass"] + + +def test_lambda(): + co = _compile_file("lambda.py") + exec(co, {}, {}) + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_empty(): + """ + Rfunction depends on compiler + """ + rf = RFunction("", "", "issue945", "empty.py", {}) + rf.parse() + rf2 = RFunction("", "# still empty\n\n# by", "issue945", "empty.py", {}) + rf2.parse() + + +def test_SyntaxError(): + err = ("def f(x, y):\n" + " if x, y < 2 + 1:\n" + " return x + y\n" + " else:\n" + " return x - y\n") + with pytest.raises(SyntaxError): + compile_restricted(err, "", "exec") + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_line_endings_RFunction(): + from RestrictedPython.RCompile import RFunction + gen = RFunction( + p='', + body='# testing\r\nprint "testing"\r\nreturn printed\n', + name='test', + filename='', + globals=(), + ) + gen.mode = 'exec' + # if the source has any line ending other than \n by the time + # parse() is called, then you'll get a syntax error. + gen.parse() + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_line_endings_RestrictedCompileMode(): + from RestrictedPython.RCompile import RestrictedCompileMode + gen = RestrictedCompileMode( + '# testing\r\nprint "testing"\r\nreturn printed\n', + '' + ) + gen.mode = 'exec' + # if the source has any line ending other than \n by the time + # parse() is called, then you'll get a syntax error. + gen.parse() + + +@pytest.mark.skipif(sys.version_info >= (3, 0), + reason="compiler no longer exists in python 3") +def test_Collector2295(): + from RestrictedPython.RCompile import RestrictedCompileMode + gen = RestrictedCompileMode( + 'if False:\n pass\n# Me Grok, Say Hi', + '' + ) + gen.mode = 'exec' + # if the source has any line ending other than \n by the time + # parse() is called, then you'll get a syntax error. + gen.parse() + + +create_rmodule()