Skip to content

Commit

Permalink
[REF] core,project: remove _read_group_check_field_access_rights()
Browse files Browse the repository at this point in the history
_read_group_check_field_access_rights() is a superfluous hook of the
_read_group() refactor (#110737). But
this hook isn't very useful compared to check_field_access_rights(),
it only checks the usage of fields used in _read_group().

- For 'res.users': Check the security in _read_group_select() and
_read_group_groupby() instead. We cannot forbid USER_PRIVATE_CHECK in
check_field_access_rights() because we want to obfuscate this field,
and not throw an AccessError when reading it.
- For 'project.task': override check_field_access_rights() to cover all
cases and override _determine_fields_to_fetch() to avoid reading an
inaccessible field and having an accidental AccessError.

closes #146438

Related: odoo/enterprise#53409
Signed-off-by: Raphael Collet <rco@odoo.com>
  • Loading branch information
ryv-odoo authored and rco-odoo committed Jan 10, 2024
1 parent b5aeccf commit be4b013
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 41 deletions.
45 changes: 15 additions & 30 deletions addons/project/models/project_task.py
Expand Up @@ -829,45 +829,30 @@ def _ensure_fields_are_accessible(self, fields, operation='read', check_group_us
error_message = _('You cannot write on the following fields on tasks: %(field_list)s', field_list=unauthorized_field_list)
raise AccessError(error_message)

def read(self, fields=None, load='_classic_read'):
self._ensure_fields_are_accessible(fields)
return super(Task, self).read(fields=fields, load=load)
def _determine_fields_to_fetch(self, field_names, ignore_when_in_cache=False):
if not self.env.su and self.env.user.has_group('base.group_portal'):
valid_names = self.SELF_READABLE_FIELDS
field_names = [fname for fname in field_names if fname in valid_names]
return super()._determine_fields_to_fetch(field_names, ignore_when_in_cache)

@api.model
def _read_group_check_field_access_rights(self, field_names):
super()._read_group_check_field_access_rights(field_names)
self._ensure_fields_are_accessible(field_names)

@api.model
def _search(self, domain, offset=0, limit=None, order=None, access_rights_uid=None):
fields_list = {term[0] for term in domain if isinstance(term, (tuple, list)) and term not in [expression.TRUE_LEAF, expression.FALSE_LEAF]}
self._ensure_fields_are_accessible(fields_list)
return super()._search(domain, offset, limit, order, access_rights_uid)

def mapped(self, func):
# Note: This will protect the filtered method too
if func and isinstance(func, str):
fields_list = func.split('.')
self._ensure_fields_are_accessible(fields_list)
return super(Task, self).mapped(func)

def filtered_domain(self, domain):
fields_list = [term[0] for term in domain if isinstance(term, (tuple, list)) and term not in [expression.TRUE_LEAF, expression.FALSE_LEAF]]
self._ensure_fields_are_accessible(fields_list)
return super(Task, self).filtered_domain(domain)
def check_field_access_rights(self, operation, field_names):
if field_names and operation in ('read', 'write'):
self._ensure_fields_are_accessible(field_names, operation)
elif not field_names and not self.env.su and self.env.user.has_group('base.group_portal'):
valid_names = self.SELF_READABLE_FIELDS
return [
fname for fname in super().check_field_access_rights(operation, field_names)
if fname in valid_names
]
return super().check_field_access_rights(operation, field_names)

def copy_data(self, default=None):
defaults = super().copy_data(default=default)
if self.env.user.has_group('project.group_project_user'):
return defaults
return [{k: v for k, v in default.items() if k in self.SELF_READABLE_FIELDS} for default in defaults]

@api.model
def _ensure_portal_user_can_write(self, fields):
for field in fields:
if field not in self.SELF_WRITABLE_FIELDS:
raise AccessError(_("You don't have write access on the %(field)s field.", field=field))

def _set_stage_on_project_from_task(self):
stage_ids_per_project = defaultdict(list)
for task in self:
Expand Down
20 changes: 16 additions & 4 deletions odoo/addons/base/models/res_users.py
Expand Up @@ -575,10 +575,22 @@ def check_field_access_rights(self, operation, field_names):
return super(Users, self).check_field_access_rights(operation, field_names)

@api.model
def _read_group_check_field_access_rights(self, field_names):
super()._read_group_check_field_access_rights(field_names)
if set(field_names).intersection(USER_PRIVATE_FIELDS):
raise AccessError(_("Invalid 'group by' parameter"))
def _read_group_select(self, aggregate_spec, query):
try:
fname, __, __ = models.parse_read_group_spec(aggregate_spec)
except Exception:
# may happen if aggregate_spec == '__count', for instance
fname = None
if fname in USER_PRIVATE_FIELDS:
raise AccessError(_("Cannot aggregate on %s parameter", fname))
return super()._read_group_select(aggregate_spec, query)

@api.model
def _read_group_groupby(self, groupby_spec, query):
fname, __, __ = models.parse_read_group_spec(groupby_spec)
if fname in USER_PRIVATE_FIELDS:
raise AccessError(_("Cannot groupby on %s parameter", fname))
return super()._read_group_groupby(groupby_spec, query)

@api.model
def _search(self, domain, offset=0, limit=None, order=None, access_rights_uid=None):
Expand Down
7 changes: 0 additions & 7 deletions odoo/models.py
Expand Up @@ -1901,8 +1901,6 @@ def _read_group(self, domain, groupby=(), aggregates=(), having=(), offset=0, li
query_parts.append(SQL("OFFSET %s", offset))

self._flush_search(domain, fnames_to_flush)
if fnames_to_flush:
self._read_group_check_field_access_rights(fnames_to_flush)

self.env.cr.execute(SQL("\n").join(query_parts))
# row_values: [(a1, b1, c1), (a2, b2, c2), ...]
Expand Down Expand Up @@ -2099,11 +2097,6 @@ def _read_group_orderby(self, order: str, groupby_terms: dict[str, SQL],

return SQL(", ").join(orderby_terms), SQL(", ").join(extra_groupby_terms), fnames_used

@api.model
def _read_group_check_field_access_rights(self, field_names):
""" Check whether the given field names can be grouped or aggregated. """
self.check_field_access_rights('read', field_names)

@api.model
def _read_group_empty_value(self, spec):
""" Return the empty value corresponding to the given groupby spec or aggregate spec. """
Expand Down

0 comments on commit be4b013

Please sign in to comment.