diff --git a/ipalib/plugins/permission.py b/ipalib/plugins/permission.py index ff5335d1600..da1c41d656f 100644 --- a/ipalib/plugins/permission.py +++ b/ipalib/plugins/permission.py @@ -18,6 +18,7 @@ # along with this program. If not, see . import re +import traceback from ipalib.plugins import baseldap from ipalib import errors @@ -366,24 +367,40 @@ def add_aci(self, permission_entry): self.api.env.basedn) self.log.debug('Adding ACI %r to %s' % (acistring, location)) - entry = ldap.get_entry(location, ['aci']) + try: + entry = ldap.get_entry(location, ['aci']) + except errors.NotFound: + raise errors.NotFound(reason=_('Entry %s not found') % location) entry.setdefault('aci', []).append(acistring) ldap.update_entry(entry) def remove_aci(self, permission_entry): - """Remove the ACI corresponding to the given permission entry""" - self._replace_aci(permission_entry) + """Remove the ACI corresponding to the given permission entry + + :return: tuple: + - entry + - removed ACI string, or None if none existed previously + """ + return self._replace_aci(permission_entry) def update_aci(self, permission_entry, old_name=None): - """Update the ACI corresponding to the given permission entry""" + """Update the ACI corresponding to the given permission entry + + :return: tuple: + - entry + - removed ACI string, or None if none existed previously + """ new_acistring = self.make_aci(permission_entry) - self._replace_aci(permission_entry, old_name, new_acistring) + return self._replace_aci(permission_entry, old_name, new_acistring) def _replace_aci(self, permission_entry, old_name=None, new_acistring=None): """Replace ACI corresponding to permission_entry :param old_name: the old name of the permission, if different from new :param new_acistring: new ACI string; if None the ACI is just deleted + :return: tuple: + - entry + - removed ACI string, or None if none existed previously """ ldap = self.api.Backend.ldap2 acientry, acistring = self._get_aci_entry_and_string( @@ -402,6 +419,7 @@ def _replace_aci(self, permission_entry, old_name=None, new_acistring=None): ldap.update_entry(acientry) except errors.EmptyModlist: self.log.info('No changes to ACI') + return acientry, acistring def _get_aci_entry_and_string(self, permission_entry, name=None, notfound_ok=False): @@ -422,7 +440,7 @@ def _get_aci_entry_and_string(self, permission_entry, name=None, try: acientry = ldap.get_entry(location, ['aci']) except errors.NotFound: - acientry = {} + acientry = ldap.make_entry(location) acis = acientry.get('aci', ()) for acistring in acis: aci = ACI(acistring) @@ -759,7 +777,7 @@ def pre_callback(self, ldap, dn, entry, attrs_list, *keys, **options): else: context.permision_moving_aci = True try: - self.obj.remove_aci(old_entry) + context.old_aci_info = self.obj.remove_aci(old_entry) except errors.NotFound, e: self.log.error('permission ACI not found: %s' % e) @@ -768,13 +786,36 @@ def pre_callback(self, ldap, dn, entry, attrs_list, *keys, **options): return dn + def exc_callback(self, keys, options, exc, call_func, *call_args, **call_kwargs): + if call_func.func_name == 'update_entry': + self._revert_aci() + raise exc + + def _revert_aci(self): + old_aci_info = getattr(context, 'old_aci_info', None) + if old_aci_info: + # Try to roll back the old ACI + entry, old_aci_string = old_aci_info + if old_aci_string: + self.log.warn('Reverting ACI on %s to %s' % (entry.dn, + old_aci_string)) + entry['aci'].append(old_aci_string) + self.Backend.ldap2.update_entry(entry) + def post_callback(self, ldap, dn, entry, *keys, **options): old_entry = context.old_entry - if context.permision_moving_aci: - self.obj.add_aci(entry) - else: - self.obj.update_aci(entry, old_entry.single_value['cn']) + try: + if context.permision_moving_aci: + self.obj.add_aci(entry) + else: + self.obj.update_aci(entry, old_entry.single_value['cn']) + except Exception: + self.log.error('Error updating ACI: %s' % traceback.format_exc()) + self.log.warn('Reverting entry') + ldap.update_entry(old_entry) + self._revert_aci() + raise self.obj.postprocess_result(entry, options) entry['dn'] = entry.dn return dn diff --git a/ipatests/test_xmlrpc/test_permission_plugin.py b/ipatests/test_xmlrpc/test_permission_plugin.py index 82436b3bb72..8b125a90bfb 100644 --- a/ipatests/test_xmlrpc/test_permission_plugin.py +++ b/ipatests/test_xmlrpc/test_permission_plugin.py @@ -87,6 +87,7 @@ users_dn = DN(api.env.container_user, api.env.basedn) groups_dn = DN(api.env.container_group, api.env.basedn) +etc_dn = DN('cn=etc', api.env.basedn) def verify_permission_aci(name, dn, acistring): @@ -1470,6 +1471,106 @@ class test_permission(Declarative): ] +class test_permission_rollback(Declarative): + """Test rolling back changes after failed update""" + cleanup_commands = [ + ('permission_del', [permission1], {'force': True}), + ] + + _verifications = [ + dict( + desc='Retrieve %r' % permission1, + command=('permission_show', [permission1], {}), + expected=dict( + value=permission1, + summary=None, + result={ + 'dn': permission1_dn, + 'cn': [permission1], + 'objectclass': objectclasses.permission, + 'ipapermright': [u'write'], + 'ipapermallowedattr': [u'sn'], + 'ipapermbindruletype': [u'permission'], + 'ipapermissiontype': [u'SYSTEM', u'V2'], + 'ipapermlocation': [users_dn], + 'ipapermtarget': [DN(('uid', 'admin'), users_dn)], + }, + ), + ), + + verify_permission_aci( + permission1, users_dn, + '(targetattr = "sn")' + + '(target = "ldap:///%s")' % DN(('uid', 'admin'), users_dn) + + '(version 3.0;acl "permission:%s";' % permission1 + + 'allow (write) groupdn = "ldap:///%s";)' % permission1_dn, + ), + + verify_permission_aci_missing(permission1, etc_dn) + ] + + tests = [ + dict( + desc='Create %r' % permission1, + command=( + 'permission_add', [permission1], dict( + ipapermlocation=users_dn, + ipapermtarget=DN('uid=admin', users_dn), + ipapermright=[u'write'], + ipapermallowedattr=[u'sn'], + ) + ), + expected=dict( + value=permission1, + summary=u'Added permission "%s"' % permission1, + result=dict( + dn=permission1_dn, + cn=[permission1], + objectclass=objectclasses.permission, + ipapermright=[u'write'], + ipapermallowedattr=[u'sn'], + ipapermbindruletype=[u'permission'], + ipapermissiontype=[u'SYSTEM', u'V2'], + ipapermlocation=[users_dn], + ipapermtarget=[DN(('uid', 'admin'), users_dn)], + ), + ), + ), + + ] + _verifications + [ + + dict( + desc='Move %r to non-existent DN' % permission1, + command=( + 'permission_mod', [permission1], dict( + ipapermlocation=DN('foo=bar'), + ) + ), + expected=errors.NotFound(reason='Entry foo=bar not found'), + ), + + ] + _verifications + [ + + dict( + desc='Move %r to another DN' % permission1, + command=('permission_mod', [permission1], + dict(ipapermlocation=etc_dn) + ), + expected=errors.InvalidSyntax( + attr=r'ACL Invalid Target Error(-8): ' + r'Target is beyond the scope of the ACL' + r'(SCOPE:%(sdn)s) ' + r'(targetattr = \22sn\22)' + r'(target = \22ldap:///%(tdn)s\22)' + r'(version 3.0;acl \22permission:testperm\22;' + r'allow (write) groupdn = \22ldap:///%(pdn)s\22;)' % dict( + sdn=etc_dn, + tdn=DN('uid=admin', users_dn), + pdn=permission1_dn)), + ), + + ] + _verifications + class test_permission_sync_attributes(Declarative): """Test the effects of setting permission attributes""" cleanup_commands = [