diff --git a/CHANGES.rst b/CHANGES.rst index 6411c86..5cf6ca3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,7 +5,11 @@ CHANGES 4.0.1 (unreleased) ------------------ -- TBD +- Added ability to perform checks based on values assigned to + attributes, or operands of in-place mutation operations. + +- Added ability to check for attribute deletion separately from + attribute assignment. 4.0.0 (2013-07-09) ------------------ diff --git a/docs/api/interfaces.rst b/docs/api/interfaces.rst index cf4e757..741756e 100644 --- a/docs/api/interfaces.rst +++ b/docs/api/interfaces.rst @@ -47,6 +47,10 @@ Utilities :members: :member-order: bysource + .. autointerface:: IValueBasedChecker + :members: + :member-order: bysource + .. autointerface:: ISecurityPolicy :members: :member-order: bysource diff --git a/src/zope/security/_proxy.c b/src/zope/security/_proxy.c index 27edbb3..66144c3 100644 --- a/src/zope/security/_proxy.c +++ b/src/zope/security/_proxy.c @@ -72,8 +72,11 @@ static PyObject *__class__str = 0, *__name__str = 0, *__module__str = 0; DECLARE_STRING(__3pow__); DECLARE_STRING(__call__); DECLARE_STRING(check); +DECLARE_STRING(check_with_value); DECLARE_STRING(check_getattr); DECLARE_STRING(check_setattr); +DECLARE_STRING(check_setattr_with_value); +DECLARE_STRING(check_delattr); DECLARE_STRING(__cmp__); DECLARE_STRING(__coerce__); DECLARE_STRING(__contains__); @@ -161,7 +164,7 @@ static PyTypeObject SecurityProxyType; */ static int -check(SecurityProxy *self, PyObject *meth, PyObject *name) +check(SecurityProxy *self, PyObject *meth, PyObject *name, PyObject *value) { PyObject *r; @@ -177,9 +180,33 @@ check(SecurityProxy *self, PyObject *meth, PyObject *name) return self->proxy_checker->ob_type->tp_as_mapping-> mp_ass_subscript(self->proxy_checker, self->proxy.proxy_object, name); - r = PyObject_CallMethodObjArgs(self->proxy_checker, meth, - self->proxy.proxy_object, name, - NULL); + if (value != NULL + && meth == str_check_setattr + && PyObject_HasAttr(self->proxy_checker, str_check_setattr_with_value) == 1) + // value is NULL if this is really a delattr call + r = PyObject_CallMethodObjArgs(self->proxy_checker, + str_check_setattr_with_value, + self->proxy.proxy_object, name, + value, NULL); + else if (value != NULL + && meth == str_check + && PyObject_HasAttr(self->proxy_checker, str_check_with_value) == 1) + // value is NULL if this is really a delattr call + r = PyObject_CallMethodObjArgs(self->proxy_checker, + str_check_with_value, + self->proxy.proxy_object, name, + value, NULL); + else if (value == NULL + && meth == str_check_setattr + && PyObject_HasAttr(self->proxy_checker, str_check_delattr) == 1) + r = PyObject_CallMethodObjArgs(self->proxy_checker, + str_check_delattr, + self->proxy.proxy_object, name, + value, NULL); + else + r = PyObject_CallMethodObjArgs(self->proxy_checker, meth, + self->proxy.proxy_object, name, + NULL); if (r == NULL) return -1; @@ -214,7 +241,7 @@ check1(SecurityProxy *self, PyObject *opname, function1 operation) { PyObject *result = NULL; - if (check(self, str_check, opname) >= 0) { + if (check(self, str_check, opname, (PyObject *)NULL) >= 0) { result = operation(self->proxy.proxy_object); PROXY_RESULT(self, result); } @@ -229,7 +256,7 @@ check2(PyObject *self, PyObject *other, if (Proxy_Check(self)) { - if (check((SecurityProxy*)self, str_check, opname) >= 0) + if (check((SecurityProxy*)self, str_check, opname, (PyObject *)NULL) >= 0) { result = operation(((SecurityProxy*)self)->proxy.proxy_object, other); @@ -238,7 +265,7 @@ check2(PyObject *self, PyObject *other, } else if (Proxy_Check(other)) { - if (check((SecurityProxy*)other, str_check, ropname) >= 0) + if (check((SecurityProxy*)other, str_check, ropname, (PyObject *)NULL) >= 0) { result = operation(self, ((SecurityProxy*)other)->proxy.proxy_object); @@ -261,7 +288,7 @@ check2i(SecurityProxy *self, PyObject *other, { PyObject *result = NULL; - if (check(self, str_check, opname) >= 0) + if (check(self, str_check, opname, other) >= 0) { result = operation(self->proxy.proxy_object, other); if (result == self->proxy.proxy_object) @@ -372,7 +399,7 @@ proxy_iter(SecurityProxy *self) { PyObject *result = NULL; - if (check(self, str_check, str___iter__) >= 0) + if (check(self, str_check, str___iter__, (PyObject *)NULL) >= 0) { result = PyObject_GetIter(self->proxy.proxy_object); PROXY_RESULT(self, result); @@ -385,7 +412,7 @@ proxy_iternext(SecurityProxy *self) { PyObject *result = NULL; - if (check(self, str_check_getattr, str_next) >= 0) + if (check(self, str_check_getattr, str_next, (PyObject *)NULL) >= 0) { result = PyIter_Next(self->proxy.proxy_object); PROXY_RESULT(self, result); @@ -398,7 +425,7 @@ proxy_getattro(SecurityProxy *self, PyObject *name) { PyObject *result = NULL; - if (check(self, str_check_getattr, name) >= 0) + if (check(self, str_check_getattr, name, (PyObject *)NULL) >= 0) { result = PyObject_GetAttr(self->proxy.proxy_object, name); PROXY_RESULT(self, result); @@ -409,7 +436,7 @@ proxy_getattro(SecurityProxy *self, PyObject *name) static int proxy_setattro(SecurityProxy *self, PyObject *name, PyObject *value) { - if (check(self, str_check_setattr, name) >= 0) + if (check(self, str_check_setattr, name, value) >= 0) return PyObject_SetAttr(self->proxy.proxy_object, name, value); return -1; } @@ -458,7 +485,7 @@ proxy_str(SecurityProxy *self) { PyObject *result = NULL; - if (check(self, str_check, str___str__) >= 0) + if (check(self, str_check, str___str__, (PyObject *)NULL) >= 0) { result = PyObject_Str(self->proxy.proxy_object); } @@ -475,7 +502,7 @@ proxy_repr(SecurityProxy *self) { PyObject *result = NULL; - if (check(self, str_check, str___repr__) >= 0) { + if (check(self, str_check, str___repr__, (PyObject *)NULL) >= 0) { result = PyObject_Repr(self->proxy.proxy_object); } else { @@ -504,7 +531,7 @@ proxy_call(SecurityProxy *self, PyObject *args, PyObject *kwds) { PyObject *result = NULL; - if (check(self, str_check, str___call__) >= 0) + if (check(self, str_check, str___call__, (PyObject *)NULL) >= 0) { result = PyObject_Call(self->proxy.proxy_object, args, kwds); PROXY_RESULT(self, result); @@ -560,7 +587,7 @@ proxy_pow(PyObject *self, PyObject *other, PyObject *modulus) if (Proxy_Check(self)) { - if (check((SecurityProxy*)self, str_check, str___pow__) >= 0) + if (check((SecurityProxy*)self, str_check, str___pow__, (PyObject *)NULL) >= 0) { result = PyNumber_Power(((SecurityProxy*)self)->proxy.proxy_object, other, modulus); @@ -569,7 +596,7 @@ proxy_pow(PyObject *self, PyObject *other, PyObject *modulus) } else if (Proxy_Check(other)) { - if (check((SecurityProxy*)other, str_check, str___rpow__) >= 0) + if (check((SecurityProxy*)other, str_check, str___rpow__, (PyObject *)NULL) >= 0) { result = PyNumber_Power(self, ((SecurityProxy*)other)->proxy.proxy_object, @@ -579,7 +606,7 @@ proxy_pow(PyObject *self, PyObject *other, PyObject *modulus) } else if (modulus != NULL && Proxy_Check(modulus)) { - if (check((SecurityProxy*)modulus, str_check, str___3pow__) >= 0) + if (check((SecurityProxy*)modulus, str_check, str___3pow__, (PyObject *)NULL) >= 0) { result = PyNumber_Power(self, other, ((SecurityProxy*)modulus)->proxy.proxy_object); @@ -608,7 +635,7 @@ proxy_coerce(PyObject **p_self, PyObject **p_other) assert(Proxy_Check(self)); - if (check((SecurityProxy*)self, str_check, str___coerce__) >= 0) + if (check((SecurityProxy*)self, str_check, str___coerce__, (PyObject *)NULL) >= 0) { PyObject *left = ((SecurityProxy*)self)->proxy.proxy_object; PyObject *right = other; @@ -692,7 +719,7 @@ INPLACE(truediv, PyNumber_InPlaceTrueDivide) static Py_ssize_t proxy_length(SecurityProxy *self) { - if (check(self, str_check, str___len__) >= 0) + if (check(self, str_check, str___len__, (PyObject *)NULL) >= 0) return PyObject_Length(self->proxy.proxy_object); return -1; } @@ -733,7 +760,7 @@ proxy_slice(SecurityProxy *self, Py_ssize_t start, Py_ssize_t end) { PyObject *result = NULL; - if (check(self, str_check, str___getslice__) >= 0) { + if (check(self, str_check, str___getslice__, (PyObject *)NULL) >= 0) { result = PySequence_GetSlice(self->proxy.proxy_object, start, end); PROXY_RESULT(self, result); } @@ -743,7 +770,7 @@ proxy_slice(SecurityProxy *self, Py_ssize_t start, Py_ssize_t end) static int proxy_ass_slice(SecurityProxy *self, Py_ssize_t i, Py_ssize_t j, PyObject *value) { - if (check(self, str_check, str___setslice__) >= 0) + if (check(self, str_check, str___setslice__, value) >= 0) return PySequence_SetSlice(self->proxy.proxy_object, i, j, value); return -1; } @@ -751,7 +778,7 @@ proxy_ass_slice(SecurityProxy *self, Py_ssize_t i, Py_ssize_t j, PyObject *value static int proxy_contains(SecurityProxy *self, PyObject *value) { - if (check(self, str_check, str___contains__) >= 0) + if (check(self, str_check, str___contains__, (PyObject *)NULL) >= 0) return PySequence_Contains(self->proxy.proxy_object, value); return -1; } @@ -765,7 +792,7 @@ proxy_getitem(SecurityProxy *self, PyObject *key) { PyObject *result = NULL; - if (check(self, str_check, str___getitem__) >= 0) + if (check(self, str_check, str___getitem__, (PyObject *)NULL) >= 0) { result = PyObject_GetItem(self->proxy.proxy_object, key); PROXY_RESULT(self, result); @@ -777,11 +804,11 @@ static int proxy_setitem(SecurityProxy *self, PyObject *key, PyObject *value) { if (value == NULL) { - if (check(self, str_check, str___delitem__) >= 0) + if (check(self, str_check, str___delitem__, (PyObject *)NULL) >= 0) return PyObject_DelItem(self->proxy.proxy_object, key); } else { - if (check(self, str_check, str___setitem__) >= 0) + if (check(self, str_check, str___setitem__, value) >= 0) return PyObject_SetItem(self->proxy.proxy_object, key, value); } return -1; @@ -999,8 +1026,11 @@ if((str_op_##S = INTERN("__" #S "__")) == NULL) return MOD_ERROR_VAL INIT_STRING(__3pow__); INIT_STRING(__call__); INIT_STRING(check); + INIT_STRING(check_with_value); INIT_STRING(check_getattr); INIT_STRING(check_setattr); + INIT_STRING(check_setattr_with_value); + INIT_STRING(check_delattr); INIT_STRING(__cmp__); INIT_STRING(__coerce__); INIT_STRING(__contains__); diff --git a/src/zope/security/interfaces.py b/src/zope/security/interfaces.py index 9831b29..16e2cfd 100644 --- a/src/zope/security/interfaces.py +++ b/src/zope/security/interfaces.py @@ -149,6 +149,28 @@ def proxy(value): """ +class IValueBasedChecker(IChecker): + """Security checker that also checks the operands of attribute + assignment and in-place mutation operations. + """ + + def check_delattr(ob, name): + """As ``check_delattr`` but only applies when the attribute + is being deleted via ``del`` or ``delattr``. + """ + + def check_setattr_with_value(ob, name, value): + """As ``check_setattr`` but also includes the value to be + assigned. + """ + + def check_with_value(ob, name, value): + """As ``check`` but also includes the operand. + + Only called for in-place mutation operations. + """ + + class INameBasedChecker(IChecker): """Security checker that uses permissions to check attribute access.""" diff --git a/src/zope/security/proxy.py b/src/zope/security/proxy.py index 7f7d626..ba240fc 100644 --- a/src/zope/security/proxy.py +++ b/src/zope/security/proxy.py @@ -101,7 +101,10 @@ def __setattr__(self, name, value): return super(PyProxyBase, self).__setattr__(name, value) wrapped = super(PyProxyBase, self).__getattribute__('_wrapped') checker = super(PyProxyBase, self).__getattribute__('_checker') - checker.check_setattr(wrapped, name) + if getattr(checker, 'check_setattr_with_value', None) is not None: + checker.check_setattr_with_value(wrapped, name, value) + else: + checker.check_setattr(wrapped, name) setattr(wrapped, name, value) def __delattr__(self, name): @@ -109,7 +112,10 @@ def __delattr__(self, name): raise AttributeError() wrapped = super(PyProxyBase, self).__getattribute__('_wrapped') checker = super(PyProxyBase, self).__getattribute__('_checker') - checker.check_setattr(wrapped, name) + if getattr(checker, 'check_delattr', None) is not None: + checker.check_delattr(wrapped, name) + else: + checker.check_setattr(wrapped, name) delattr(wrapped, name) @_check_name diff --git a/src/zope/security/tests/test_proxy.py b/src/zope/security/tests/test_proxy.py index d89e355..c949077 100644 --- a/src/zope/security/tests/test_proxy.py +++ b/src/zope/security/tests/test_proxy.py @@ -1848,6 +1848,148 @@ def test_coerce(self): self.assertTrue(type(removeSecurityProxy(a)) is float and b is y) +class Test_ValuesChecker(unittest.TestCase): + + def _callFUT(self, object, *args, **kw): + from zope.interface import implementer + from zope.security.checker import ProxyFactory + from zope.security.interfaces import IValueBasedChecker + from zope.security.interfaces import Unauthorized + + @implementer(IValueBasedChecker) + class ValuesChecker(object): + def __init__(self, forbidden_values=None, + forbidden_operands=None, forbidden_del=None): + self.forbidden_values = forbidden_values \ + if forbidden_values is not None \ + else set() + self.forbidden_operands = forbidden_operands \ + if forbidden_operands is not None \ + else set() + self.forbidden_del = forbidden_del \ + if forbidden_del is not None \ + else set() + + def check(self, ob, name): + return True + + def check_getattr(self, ob, name): + return True + + def check_setattr(self, ob, name): + return True + + def check_delattr(self, ob, name): + if name in self.forbidden_del: + raise Unauthorized(ob, name) + return True + + def check_setattr_with_value(self, ob, name, value): + if value in self.forbidden_values: + raise Unauthorized(ob, name) + return self.check_setattr(ob, name) + + def check_with_value(self, ob, name, value): + if value in self.forbidden_operands: + raise Unauthorized(ob, name) + return self.check(ob, name) + + checker = ValuesChecker(*args, **kw) + return ProxyFactory(object, checker) + + def test_in_place_mutation(self): + from zope.security.interfaces import Unauthorized + + class AddableObject: + def __init__(self, value): + self.value = value + + def __add__(self, other): + return AddableObject(self.value + other.value) + + def __iadd__(self, other): + self.value += other.value + + foo = AddableObject(1) + bar = AddableObject(2) + proxied_bar = self._callFUT(bar) + proxied_foo = self._callFUT(foo, forbidden_operands=(proxied_bar,)) + + def add(): + return proxied_foo + proxied_bar + + def iadd(): + proxied_foo += proxied_bar + return proxied_foo + + def riadd(): + proxied_bar += proxied_foo + return proxied_bar + + try: + add() + except Unauthorized: + self.assertTrue(False, + "Addition unexpectedly raised 'Unauthorized'.") + self.assertRaises(Unauthorized, iadd) + try: + riadd() + except Unauthorized: + self.assertTrue(False, + "Addition unexpectedly raised 'Unauthorized'.") + + def test_setattr(self): + from zope.security.interfaces import Unauthorized + + class AssignableObject: + def __init__(self, value): + self.value = None + self.optional_attribute = None + + foo = AssignableObject() + bar = AssignableObject() + baz = AssignableObject() + proxied_bar = self._callFUT(bar) + proxied_baz = self._callFUT(baz) + proxied_foo = self._callFUT(foo, + forbidden_values=(proxied_bar,), + forbidden_del=('value',)) + + def assign(): + proxied_foo.value = proxied_baz + + def bad_assign(): + proxied_foo.value = proxied_bar + + def get(): + proxied_foo.value = proxied_baz + return proxied_foo.value + + def delete(): + del proxied_foo.optional_attribute + + def bad_delete(): + del proxied_foo.value + + try: + assign() + except Unauthorized: + self.assertTrue(False, + "Assignment unexpectedly raised 'Unauthorized'.") + self.assertRaises(Unauthorized, bad_assign) + try: + get() + except Unauthorized: + self.assertTrue(False, + "Retrieval unexpectedly raised 'Unauthorized'.") + try: + delete() + except Unauthorized: + self.assertTrue(False, + "Deletion unexpectedly raised 'Unauthorized'.") + self.assertRaises(Unauthorized, bad_delete) + + def test_using_mapping_slots_hack(): """The security proxy will use mapping slots, on the checker to go faster