Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/base/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
from odoo import modules
from odoo.tools import mute_logger

try:
from odoo.sql_db import db_connect
except ImportError:
from openerp.sql_db import db_connect

from odoo.addons.base.maintenance.migrations import util
from odoo.addons.base.maintenance.migrations.testing import UnitTestCase, parametrize
from odoo.addons.base.maintenance.migrations.util import snippets
Expand Down Expand Up @@ -1481,6 +1486,27 @@ def test_iter(self):
self.assertEqual(result, expected)


class TestQueryIds(UnitTestCase):
def test_straight(self):
result = list(util.query_ids(self.env.cr, "SELECT * FROM (VALUES (1), (2)) AS x(x)", itersize=2))
self.assertEqual(result, [1, 2])

def test_chunks(self):
with util.query_ids(self.env.cr, "SELECT * FROM (VALUES (1), (2)) AS x(x)") as ids:
result = list(util.chunks(ids, 100, fmt=list))
self.assertEqual(result, [[1, 2]])

def test_destructor(self):
ids = util.query_ids(self.env.cr, "SELECT id from res_users")
del ids

def test_pk_violation(self):
with db_connect(self.env.cr.dbname).cursor() as cr, mute_logger("odoo.sql_db"), self.assertRaises(
ValueError
), util.query_ids(cr, "SELECT * FROM (VALUES (1), (1)) AS x(x)") as ids:
list(ids)


class TestRecords(UnitTestCase):
def test_rename_xmlid(self):
cr = self.env.cr
Expand Down
23 changes: 12 additions & 11 deletions src/util/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
get_m2m_tables,
get_value_or_en_translation,
parallel_execute,
query_ids,
table_exists,
update_m2m_tables,
view_exists,
Expand Down Expand Up @@ -128,17 +129,17 @@ def remove_model(cr, model, drop_table=True, ignore_m2m=()):
'SELECT id FROM "{}" r WHERE {}'.format(ir.table, ir.model_filter(prefix="r.")), [model]
).decode()

cr.execute(query)
if ir.table == "ir_ui_view":
for (view_id,) in cr.fetchall():
remove_view(cr, view_id=view_id, silent=True)
else:
# remove in batch
size = (cr.rowcount + chunk_size - 1) / chunk_size
it = chunks([id for (id,) in cr.fetchall()], chunk_size, fmt=tuple)
for sub_ids in log_progress(it, _logger, qualifier=ir.table, size=size):
remove_records(cr, ref_model, sub_ids)
_rm_refs(cr, ref_model, sub_ids)
with query_ids(cr, query, itersize=chunk_size) as ids_:
if ir.table == "ir_ui_view":
for view_id in ids_:
remove_view(cr, view_id=view_id, silent=True)
else:
# remove in batch
size = (len(ids_) + chunk_size - 1) / chunk_size
it = chunks(ids_, chunk_size, fmt=tuple)
for sub_ids in log_progress(it, _logger, qualifier=ir.table, size=size):
remove_records(cr, ref_model, sub_ids)
_rm_refs(cr, ref_model, sub_ids)

if ir.set_unknown:
# Link remaining records not linked to a XMLID
Expand Down
28 changes: 8 additions & 20 deletions src/util/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from .exceptions import MigrationError
from .helpers import table_of_model
from .misc import chunks, log_progress, version_between, version_gte
from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor
from .pg import SQLStr, column_exists, format_query, get_columns, query_ids

# python3 shims
try:
Expand Down Expand Up @@ -288,27 +288,16 @@ def recompute_fields(cr, model, fields, ids=None, logger=_logger, chunk_size=256
Model = env(cr)[model] if isinstance(model, basestring) else model
model = Model._name

if ids is None:
query = format_query(cr, "SELECT id FROM {}", table_of_model(cr, model)) if query is None else SQLStr(query)
cr.execute(
format_query(cr, "CREATE UNLOGGED TABLE _upgrade_rf(id) AS (WITH query AS ({}) SELECT * FROM query)", query)
ids_ = ids
if ids_ is None:
ids_ = query_ids(
cr,
format_query(cr, "SELECT id FROM {}", table_of_model(cr, model)) if query is None else SQLStr(query),
itersize=2**20,
)
count = cr.rowcount
cr.execute("ALTER TABLE _upgrade_rf ADD CONSTRAINT pk_upgrade_rf_id PRIMARY KEY (id)")

def get_ids():
with named_cursor(cr, itersize=2**20) as ncr:
ncr.execute("SELECT id FROM _upgrade_rf ORDER BY id")
for (id_,) in ncr:
yield id_

ids_ = get_ids()
else:
count = len(ids)
ids_ = ids

count = len(ids_)
if not count:
cr.execute("DROP TABLE IF EXISTS _upgrade_rf")
return

_logger.info("Computing fields %s of %r on %d records", fields, model, count)
Expand Down Expand Up @@ -338,7 +327,6 @@ def get_ids():
else:
flush(records)
invalidate(records)
cr.execute("DROP TABLE IF EXISTS _upgrade_rf")


class iter_browse(object):
Expand Down
80 changes: 80 additions & 0 deletions src/util/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -1932,3 +1932,83 @@ def bulk_update_table(cr, table, columns, mapping, key_col="id"):
key_col=key_col,
)
cr.execute(query, [Json(mapping)])


class query_ids(object):
"""
Iterator over ids returned by a query.

This iterator can memory efficiently query a potentially huge number of ids.

:param str query: the query that returns the ids. It can be DML, e.g. `UPDATE table WHERE ... RETURNING id`.
:param int itersize: passed to a named_cursor, determines the number of rows fetched from PG at once.
"""

def __init__(self, cr, query, itersize=None):
self._ncr = None
self._cr = cr
self._tmp_tbl = "_upgrade_query_ids_{}".format(uuid.uuid4().hex)
cr.execute(
format_query(
cr,
"CREATE UNLOGGED TABLE {}(id) AS (WITH query AS ({}) SELECT * FROM query)",
self._tmp_tbl,
SQLStr(query),
)
)
self._len = cr.rowcount
try:
cr.execute(
format_query(
cr,
"ALTER TABLE {} ADD CONSTRAINT {} PRIMARY KEY (id)",
self._tmp_tbl,
"pk_{}_id".format(self._tmp_tbl),
)
)
except psycopg2.IntegrityError as e:
if e.pgcode == errorcodes.UNIQUE_VIOLATION:
raise ValueError("The query for ids is producing duplicate values:\n{}".format(query))
raise
self._ncr = named_cursor(cr, itersize)
self._ncr.execute(format_query(cr, "SELECT id FROM {} ORDER BY id", self._tmp_tbl))
self._it = iter(self._ncr)

def _close(self):
if self._ncr:
if self._ncr.closed:
return
self._ncr.close()
try:
self._cr.execute(format_query(self._cr, "DROP TABLE IF EXISTS {}", self._tmp_tbl))
except psycopg2.InternalError as e:
if e.pgcode != errorcodes.IN_FAILED_SQL_TRANSACTION:
raise

def __len__(self):
return self._len

def __iter__(self):
return self

def __next__(self):
if self._ncr.closed:
raise StopIteration
try:
return next(self._it)[0]
except StopIteration:
self._close()
raise

def next(self):
return self.__next__()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self._close()
return False

def __del__(self):
self._close()