Skip to content
Permalink
Browse files

[ADD] api: flag `su` on environments

The flag defines a "superuser mode" on environments, which allows to
bypass access rights without changing the current user id.
  • Loading branch information
rco-odoo committed May 31, 2019
1 parent 541012c commit 1e6c3bec2c5ca621344b09e58dbed535e80fb343
@@ -64,7 +64,7 @@ def get_param(self, key, default=False):
return self._get_param(key) or default

@api.model
@ormcache('self._uid', 'key')
@ormcache('self.env.uid', 'self.env.su', 'key')
def _get_param(self, key):
params = self.search_read([('key', '=', key)], fields=['value'], limit=1)
return params[0]['value'] if params else None
@@ -988,7 +988,7 @@ def _module_data_uninstall(self):
"""
Delete PostgreSQL foreign keys and constraints tracked by this model.
"""
if not (self._uid == SUPERUSER_ID or self.env.user.has_group('base.group_system')):
if not (self.env.su or self.env.user.has_group('base.group_system')):
raise AccessError(_('Administrator access is required to uninstall a module'))

ids_set = set(self.ids)
@@ -1101,7 +1101,7 @@ def _module_data_uninstall(self):
"""
Delete PostgreSQL many2many relations tracked by this model.
"""
if not (self._uid == SUPERUSER_ID or self.env.user.has_group('base.group_system')):
if not (self.env.su or self.env.user.has_group('base.group_system')):
raise AccessError(_('Administrator access is required to uninstall a module'))

ids_set = set(self.ids)
@@ -1216,9 +1216,9 @@ def group_names_with_access(self, model_name, access_mode):
# not be really necessary as a cache key, unless the `ormcache_context`
# decorator catches the exception (it does not at the moment.)
@api.model
@tools.ormcache_context('self._uid', 'model', 'mode', 'raise_exception', keys=('lang',))
@tools.ormcache_context('self.env.uid', 'self.env.su', 'model', 'mode', 'raise_exception', keys=('lang',))
def check(self, model, mode='read', raise_exception=True):
if self._uid == SUPERUSER_ID:
if self.env.su:
# User root have all accesses
return True

@@ -1566,7 +1566,7 @@ def _module_data_uninstall(self, modules_to_remove):
the chance of gracefully deleting all records.
This step is performed as part of the full uninstallation of a module.
"""
if not (self._uid == SUPERUSER_ID or self.env.user.has_group('base.group_system')):
if not (self.env.su or self.env.user.has_group('base.group_system')):
raise AccessError(_('Administrator access is required to uninstall a module'))

# enable model/field deletion
@@ -115,7 +115,7 @@ def _get_rules(self, model_name, mode='read'):
if mode not in self._MODES:
raise ValueError('Invalid mode: %r' % (mode,))

if self._uid == SUPERUSER_ID:
if self.env.su:
return self.browse(())

query = """ SELECT r.id FROM ir_rule r JOIN ir_model m ON (r.model_id=m.id)
@@ -132,7 +132,7 @@ def _get_rules(self, model_name, mode='read'):
@api.model
@tools.conditional(
'xml' not in config['dev_mode'],
tools.ormcache('self._uid', 'model_name', 'mode',
tools.ormcache('self.env.uid', 'self.env.su', 'model_name', 'mode',
'tuple(self._compute_domain_context_values())'),
)
def _compute_domain(self, model_name, mode="read"):
@@ -579,7 +579,7 @@ def write(self, vals):

result = True
# To write in SUPERUSER on field is_company and avoid access rights problems.
if 'is_company' in vals and self.user_has_groups('base.group_partner_manager') and not self.env.uid == SUPERUSER_ID:
if 'is_company' in vals and self.user_has_groups('base.group_partner_manager') and not self.env.su:
result = super(Partner, self.sudo()).write({'is_company': vals.get('is_company')})
del vals['is_company']
result = result and super(Partner, self).write(vals)
@@ -437,7 +437,7 @@ def read_group(self, domain, fields, groupby, offset=0, limit=None, orderby=Fals

@api.model
def _search(self, args, offset=0, limit=None, order=None, count=False, access_rights_uid=None):
if self._uid != SUPERUSER_ID and args:
if not self.env.su and args:
domain_fields = {term[0] for term in args if isinstance(term, (tuple, list))}
if domain_fields.intersection(USER_PRIVATE_FIELDS):
raise AccessError(_('Invalid search criterion'))
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from odoo import SUPERUSER_ID
from odoo.exceptions import AccessError
from odoo.tests import common, TransactionCase

@@ -17,6 +18,64 @@ def setUp(self):
})


class TestSudo(Feedback):
""" Test the behavior of method sudo(). """
def test_sudo(self):
record = self.env['test_access_right.some_obj'].create({'val': 5})
user1 = self.user
user2 = self.env.ref('base.user_demo')

# with_user(user)
record1 = record.with_user(user1)
self.assertEqual(record1.env.uid, user1.id)
self.assertFalse(record1.env.su)

record2 = record1.with_user(user2)
self.assertEqual(record2.env.uid, user2.id)
self.assertFalse(record2.env.su)

# the superuser is always in superuser mode
record3 = record2.with_user(SUPERUSER_ID)
self.assertEqual(record3.env.uid, SUPERUSER_ID)
self.assertTrue(record3.env.su)

# sudo()
surecord1 = record1.sudo()
self.assertEqual(surecord1.env.uid, user1.id)
self.assertTrue(surecord1.env.su)

surecord2 = record2.sudo()
self.assertEqual(surecord2.env.uid, user2.id)
self.assertTrue(surecord2.env.su)

surecord3 = record3.sudo()
self.assertEqual(surecord3.env.uid, SUPERUSER_ID)
self.assertTrue(surecord3.env.su)

# sudo().sudo()
surecord1 = surecord1.sudo()
self.assertEqual(surecord1.env.uid, user1.id)
self.assertTrue(surecord1.env.su)

# sudo(False)
record1 = surecord1.sudo(False)
self.assertEqual(record1.env.uid, user1.id)
self.assertFalse(record1.env.su)

record2 = surecord2.sudo(False)
self.assertEqual(record2.env.uid, user2.id)
self.assertFalse(record2.env.su)

record3 = surecord3.sudo(False)
self.assertEqual(record3.env.uid, SUPERUSER_ID)
self.assertTrue(record3.env.su)

# sudo().with_user(user)
record2 = surecord1.with_user(user2)
self.assertEqual(record2.env.uid, user2.id)
self.assertFalse(record2.env.su)


class TestACLFeedback(Feedback):
""" Tests that proper feedback is returned on ir.model.access errors
"""
@@ -750,7 +750,8 @@ class Environment(Mapping):
- :attr:`cr`, the current database cursor;
- :attr:`uid`, the current user id;
- :attr:`context`, the current context dictionary.
- :attr:`context`, the current context dictionary;
- :attr:`su`, whether in superuser mode.
It provides access to the registry by implementing a mapping from model
names to new api models. It also holds a cache for records, and a data
@@ -782,9 +783,11 @@ def reset(cls):
"""
cls._local.environments = Environments()

def __new__(cls, cr, uid, context):
def __new__(cls, cr, uid, context, su=False):
if uid == SUPERUSER_ID:
su = True
assert context is not None
args = (cr, uid, context)
args = (cr, uid, context, su)

# if env already exists, return it
env, envs = None, cls.envs
@@ -794,10 +797,11 @@ def __new__(cls, cr, uid, context):

# otherwise create environment, and add it in the set
self = object.__new__(cls)
self.cr, self.uid, self.context = self.args = (cr, uid, frozendict(context))
args = (cr, uid, frozendict(context), su)
self.cr, self.uid, self.context, self.su = self.args = args
self.registry = Registry(cr.dbname)
self.cache = envs.cache
self._cache_key = (cr, uid)
self._cache_key = (cr, uid, su)
self._protected = StackMap() # {field: ids, ...}
self.dirty = defaultdict(set) # {record: set(field_name), ...}
self.all = envs
@@ -833,17 +837,19 @@ def __ne__(self, other):
def __hash__(self):
return object.__hash__(self)

def __call__(self, cr=None, user=None, context=None):
def __call__(self, cr=None, user=None, context=None, su=None):
""" Return an environment based on ``self`` with modified parameters.
:param cr: optional database cursor to change the current cursor
:param user: optional user/user id to change the current user
:param context: optional context dictionary to change the current context
:param su: optional boolean to change the superuser mode
"""
cr = self.cr if cr is None else cr
uid = self.uid if user is None else int(user)
context = self.context if context is None else context
return Environment(cr, uid, context)
su = (user is None and self.su) if su is None else su
return Environment(cr, uid, context, su)

def ref(self, xml_id, raise_if_not_found=True):
""" return the record corresponding to the given ``xml_id`` """
@@ -852,7 +858,7 @@ def ref(self, xml_id, raise_if_not_found=True):
@property
def user(self):
""" return the current user (as an instance) """
return self(user=SUPERUSER_ID)['res.users'].browse(self.uid)
return self(su=True)['res.users'].browse(self.uid)

@property
def company(self):
@@ -659,7 +659,7 @@ def _compute_company_dependent(self, records):
if 'force_company' not in context:
company = records.env.company
context = dict(context, force_company=company.id)
Property = records.env(user=SUPERUSER_ID, context=context)['ir.property']
Property = records.env(context=context, su=True)['ir.property']
values = Property.get_multi(self.name, self.model_name, records.ids)
for record in records:
record[self.name] = values.get(record.id)
@@ -670,7 +670,7 @@ def _inverse_company_dependent(self, records):
if 'force_company' not in context:
company = records.env.company
context = dict(context, force_company=company.id)
Property = records.env(user=SUPERUSER_ID, context=context)['ir.property']
Property = records.env(context=context, su=True)['ir.property']
values = {
record.id: self.convert_to_write(record[self.name], record)
for record in records
@@ -2929,6 +2929,5 @@ def prefetch_value_ids(record, field):


# imported here to avoid dependency cycle issues
from odoo import SUPERUSER_ID
from .exceptions import AccessError, MissingError, UserError
from .models import check_pg_name, BaseModel, NewId, IdType
@@ -2735,7 +2735,7 @@ def check_field_access_rights(self, operation, fields):
fields (as is if the fields is not falsy, or the readable/writable
fields if fields is falsy).
"""
if self._uid == SUPERUSER_ID:
if self.env.su:
return fields or list(self._fields)

def valid(fname):
@@ -2922,7 +2922,7 @@ def _read_from_database(self, field_names, inherited_field_names=[]):
return

env = self.env
cr, user, context = env.args
cr, user, context, su = env.args

# make a query object for selecting ids, and apply security rules to it
param_ids = object()
@@ -3103,7 +3103,7 @@ def check_access_rule(self, operation):
:raise UserError: * if current ir.rules do not permit this operation.
:return: None if the operation is allowed
"""
if self._uid == SUPERUSER_ID:
if self.env.su:
return

invalid = self - self._filter_access_rules(operation)
@@ -3135,7 +3135,7 @@ def check_access_rule(self, operation):

def _filter_access_rules(self, operation):
""" Return the subset of ``self`` for which ``operation`` is allowed. """
if self._uid == SUPERUSER_ID:
if self.env.su:
return self

if not self._ids:
@@ -3337,7 +3337,7 @@ def write(self, vals):
bad_names = {'id', 'parent_path'}
if self._log_access:
# the superuser can set log_access fields while loading registry
if not(self.env.uid == SUPERUSER_ID and not self.pool.ready):
if not(self.env.su and not self.pool.ready):
bad_names.update(LOG_ACCESS_COLUMNS)

# distribute fields into sets for various purposes
@@ -3579,7 +3579,7 @@ def create(self, vals_list):
bad_names = {'id', 'parent_path'}
if self._log_access:
# the superuser can set log_access fields while loading registry
if not(self.env.uid == SUPERUSER_ID and not self.pool.ready):
if not(self.env.su and not self.pool.ready):
bad_names.update(LOG_ACCESS_COLUMNS)
unknown_names = set()

@@ -3995,7 +3995,7 @@ def _apply_ir_rules(self, query, mode='read'):
:param query: the current query object
"""
if self._uid == SUPERUSER_ID:
if self.env.su:
return

def apply_rule(clauses, params, tables, parent_model=None):
@@ -4813,14 +4813,12 @@ def with_env(self, env):
"""
return self._browse(env, self._ids, self._prefetch_ids)

def sudo(self, user=SUPERUSER_ID):
""" sudo([user=SUPERUSER])
def sudo(self, flag=True):
""" sudo([flag=True])
Returns a new version of this recordset attached to the provided
user.
By default this returns a ``SUPERUSER`` recordset, where access
control and record rules are bypassed.
Returns a new version of this recordset with superuser mode enabled or
disabled, depending on `flag`. The superuser mode does not change the
current user, and simply bypasses access rights checks.
.. note::
@@ -4842,7 +4840,19 @@ def sudo(self, user=SUPERUSER_ID):
The returned recordset has the same prefetch object as ``self``.
"""
return self.with_env(self.env(user=user))
if not isinstance(flag, bool):
_logger.warning("deprecated use of sudo(user), use with_user(user) instead", stack_info=True)
return self.with_user(flag)
return self.with_env(self.env(su=flag))

def with_user(self, user):
""" with_user(user)
Return a new version of this recordset attached to the given user, in
non-superuser mode, unless `user` is the superuser (by convention, the
superuser is always in superuser mode.)
"""
return self.with_env(self.env(user=user, su=False))

def with_context(self, *args, **kwargs):
""" with_context([context][, **overrides]) -> records
@@ -713,7 +713,7 @@ def parse(self):
:var obj comodel: relational model of field (field.comodel)
(res_partner.bank_ids -> res.partner.bank)
"""
cr, uid, context = self.root_model.env.args
cr, uid, context, su = self.root_model.env.args

def to_ids(value, comodel, leaf):
""" Normalize a single id or name, or a list of those, into a list of ids

0 comments on commit 1e6c3be

Please sign in to comment.
You can’t perform that action at this time.