From 4e3079dc3c3cf760121ed29560c0216da0e61135 Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Mon, 24 Nov 2025 12:30:06 +0000 Subject: [PATCH 1/3] [ADD] pg.query_ids: query large numbers of ids memory-safely This is mainly the code that has been recently added to `orm.recompute_fields`, here we're making it re-usasble. --- src/base/tests/test_util.py | 26 ++++++++++++ src/util/pg.py | 80 +++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/src/base/tests/test_util.py b/src/base/tests/test_util.py index 424b8153a..e7f7f9d5c 100644 --- a/src/base/tests/test_util.py +++ b/src/base/tests/test_util.py @@ -18,6 +18,11 @@ from odoo import modules from odoo.tools import mute_logger +try: + from odoo.sql_db import db_connect +except ImportError: + from openerp.sql_db import db_connect + from odoo.addons.base.maintenance.migrations import util from odoo.addons.base.maintenance.migrations.testing import UnitTestCase, parametrize from odoo.addons.base.maintenance.migrations.util import snippets @@ -1481,6 +1486,27 @@ def test_iter(self): self.assertEqual(result, expected) +class TestQueryIds(UnitTestCase): + def test_straight(self): + result = list(util.query_ids(self.env.cr, "SELECT * FROM (VALUES (1), (2)) AS x(x)", itersize=2)) + self.assertEqual(result, [1, 2]) + + def test_chunks(self): + with util.query_ids(self.env.cr, "SELECT * FROM (VALUES (1), (2)) AS x(x)") as ids: + result = list(util.chunks(ids, 100, fmt=list)) + self.assertEqual(result, [[1, 2]]) + + def test_destructor(self): + ids = util.query_ids(self.env.cr, "SELECT id from res_users") + del ids + + def test_pk_violation(self): + with db_connect(self.env.cr.dbname).cursor() as cr, mute_logger("odoo.sql_db"), self.assertRaises( + ValueError + ), util.query_ids(cr, "SELECT * FROM (VALUES (1), (1)) AS x(x)") as ids: + list(ids) + + class TestRecords(UnitTestCase): def test_rename_xmlid(self): cr = self.env.cr diff --git a/src/util/pg.py b/src/util/pg.py index 0ab92a9c2..0c66cf2e7 100644 --- a/src/util/pg.py +++ b/src/util/pg.py @@ -1932,3 +1932,83 @@ def bulk_update_table(cr, table, columns, mapping, key_col="id"): key_col=key_col, ) cr.execute(query, [Json(mapping)]) + + +class query_ids(object): + """ + Iterator over ids returned by a query. + + This iterator can memory efficiently query a potentially huge number of ids. + + :param str query: the query that returns the ids. It can be DML, e.g. `UPDATE table WHERE ... RETURNING id`. + :param int itersize: passed to a named_cursor, determines the number of rows fetched from PG at once. + """ + + def __init__(self, cr, query, itersize=None): + self._ncr = None + self._cr = cr + self._tmp_tbl = "_upgrade_query_ids_{}".format(uuid.uuid4().hex) + cr.execute( + format_query( + cr, + "CREATE UNLOGGED TABLE {}(id) AS (WITH query AS ({}) SELECT * FROM query)", + self._tmp_tbl, + SQLStr(query), + ) + ) + self._len = cr.rowcount + try: + cr.execute( + format_query( + cr, + "ALTER TABLE {} ADD CONSTRAINT {} PRIMARY KEY (id)", + self._tmp_tbl, + "pk_{}_id".format(self._tmp_tbl), + ) + ) + except psycopg2.IntegrityError as e: + if e.pgcode == errorcodes.UNIQUE_VIOLATION: + raise ValueError("The query for ids is producing duplicate values:\n{}".format(query)) + raise + self._ncr = named_cursor(cr, itersize) + self._ncr.execute(format_query(cr, "SELECT id FROM {} ORDER BY id", self._tmp_tbl)) + self._it = iter(self._ncr) + + def _close(self): + if self._ncr: + if self._ncr.closed: + return + self._ncr.close() + try: + self._cr.execute(format_query(self._cr, "DROP TABLE IF EXISTS {}", self._tmp_tbl)) + except psycopg2.InternalError as e: + if e.pgcode != errorcodes.IN_FAILED_SQL_TRANSACTION: + raise + + def __len__(self): + return self._len + + def __iter__(self): + return self + + def __next__(self): + if self._ncr.closed: + raise StopIteration + try: + return next(self._it)[0] + except StopIteration: + self._close() + raise + + def next(self): + return self.__next__() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._close() + return False + + def __del__(self): + self._close() From 4f0d4c16519025b69a7500aa81be39a649554c39 Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Mon, 24 Nov 2025 13:34:07 +0000 Subject: [PATCH 2/3] [IMP] orm.recompute_fields: use new pg.query_ids This code in recompute_fields has been made re-usable in a new util pg.query_ids. Use that. --- src/util/orm.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/src/util/orm.py b/src/util/orm.py index f43213a3d..c192e817b 100644 --- a/src/util/orm.py +++ b/src/util/orm.py @@ -42,7 +42,7 @@ from .exceptions import MigrationError from .helpers import table_of_model from .misc import chunks, log_progress, version_between, version_gte -from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor +from .pg import SQLStr, column_exists, format_query, get_columns, query_ids # python3 shims try: @@ -288,27 +288,16 @@ def recompute_fields(cr, model, fields, ids=None, logger=_logger, chunk_size=256 Model = env(cr)[model] if isinstance(model, basestring) else model model = Model._name - if ids is None: - query = format_query(cr, "SELECT id FROM {}", table_of_model(cr, model)) if query is None else SQLStr(query) - cr.execute( - format_query(cr, "CREATE UNLOGGED TABLE _upgrade_rf(id) AS (WITH query AS ({}) SELECT * FROM query)", query) + ids_ = ids + if ids_ is None: + ids_ = query_ids( + cr, + format_query(cr, "SELECT id FROM {}", table_of_model(cr, model)) if query is None else SQLStr(query), + itersize=2**20, ) - count = cr.rowcount - cr.execute("ALTER TABLE _upgrade_rf ADD CONSTRAINT pk_upgrade_rf_id PRIMARY KEY (id)") - - def get_ids(): - with named_cursor(cr, itersize=2**20) as ncr: - ncr.execute("SELECT id FROM _upgrade_rf ORDER BY id") - for (id_,) in ncr: - yield id_ - - ids_ = get_ids() - else: - count = len(ids) - ids_ = ids + count = len(ids_) if not count: - cr.execute("DROP TABLE IF EXISTS _upgrade_rf") return _logger.info("Computing fields %s of %r on %d records", fields, model, count) @@ -338,7 +327,6 @@ def get_ids(): else: flush(records) invalidate(records) - cr.execute("DROP TABLE IF EXISTS _upgrade_rf") class iter_browse(object): From 894c027f54c068494c568d873e991082a4ee84de Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Mon, 24 Nov 2025 14:26:03 +0000 Subject: [PATCH 3/3] [FIX] models.remove_model: MemoryError ``` Traceback (most recent call last): [...] File "/tmp/tmpipxrg2eq/migrations/util/models.py", line 563, in merge_model remove_model(cr, source, drop_table=drop_table, ignore_m2m=ignore_m2m) File "/tmp/tmpipxrg2eq/migrations/util/models.py", line 138, in remove_model it = chunks([id for (id,) in cr.fetchall()], chunk_size, fmt=tuple) MemoryError ``` Some IR tables can be large. Avoid `cr.fetchall()` when getting ids by use of pg.query_ids() --- src/util/models.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/util/models.py b/src/util/models.py index c92d42a0b..d3c769b56 100644 --- a/src/util/models.py +++ b/src/util/models.py @@ -27,6 +27,7 @@ get_m2m_tables, get_value_or_en_translation, parallel_execute, + query_ids, table_exists, update_m2m_tables, view_exists, @@ -128,17 +129,17 @@ def remove_model(cr, model, drop_table=True, ignore_m2m=()): 'SELECT id FROM "{}" r WHERE {}'.format(ir.table, ir.model_filter(prefix="r.")), [model] ).decode() - cr.execute(query) - if ir.table == "ir_ui_view": - for (view_id,) in cr.fetchall(): - remove_view(cr, view_id=view_id, silent=True) - else: - # remove in batch - size = (cr.rowcount + chunk_size - 1) / chunk_size - it = chunks([id for (id,) in cr.fetchall()], chunk_size, fmt=tuple) - for sub_ids in log_progress(it, _logger, qualifier=ir.table, size=size): - remove_records(cr, ref_model, sub_ids) - _rm_refs(cr, ref_model, sub_ids) + with query_ids(cr, query, itersize=chunk_size) as ids_: + if ir.table == "ir_ui_view": + for view_id in ids_: + remove_view(cr, view_id=view_id, silent=True) + else: + # remove in batch + size = (len(ids_) + chunk_size - 1) / chunk_size + it = chunks(ids_, chunk_size, fmt=tuple) + for sub_ids in log_progress(it, _logger, qualifier=ir.table, size=size): + remove_records(cr, ref_model, sub_ids) + _rm_refs(cr, ref_model, sub_ids) if ir.set_unknown: # Link remaining records not linked to a XMLID