-
Notifications
You must be signed in to change notification settings - Fork 24.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FIX] stock: prevent stock_quant.quantity corruption on concurrent _u… #32160
Conversation
my CLA signature is in unmerged #32092 |
…pdate_available_quantity calls by invalidating the quant cache after having obtained the lock for update. we make sure our record update using the ORM ('quantity' : quant.quantity + quantity) is based on up-to-date quantity value. Even if an other worker did update the quantity since it was first fetched (before the lock) - test_quant_mt simulates multiple workers updating same quant in parallel to demonstrate the bug and the fix
452473b
to
198d414
Compare
Hi @bletourmy,
Odoo has default transaction isolation level of repeatable read and, in this particular part of the code, a single transaction will directly lock the quant candidate row. That's why quant.quantity cannot change because of a concurrent transaction:
In your test, you're setting the transaction isolation level as read committed. That's probably why it is failing. Regards |
Hello @sle-odoo , I've last question there. Best regards, |
I suspect it's because of the way we handle the track/untracked products here but i still didn't find a way to reproduce it. We use a quant without a lot on a move line with a lot. |
Could you use the following patch and share us your tests? with self._cr.savepoint():
- self._cr.execute("SELECT 1 FROM stock_quant WHERE id = %s FOR UPDATE NOWAIT", [quant.id], log_exceptions=False)
+ self._cr.execute("SELECT quantity, in_date FROM stock_quant WHERE id = %s FOR UPDATE NOWAIT", [quant.id], log_exceptions=False)
+ self._cr.execute("UPDATE stock_quant SET quantity=quantity + %s, in_date=%s WHERE id=%s" % (quantity, in_date), [quant.id])
+ quant.invalidate_cache(fnames=['quantity', 'in_date'], ids=[quant.id])
- quant.write({
- 'quantity': quant.quantity + quantity,
- 'in_date': in_date,
- })
break FYI it is the way like odoo works for no-gap sequence, if you doesn't have issue over there then maybe you can use the same way here too.
Maybe we are missing something using cache and ISOLATION_LEVEL flags. |
Hello @moylop260, Also @sle-odoo I've adapted my unit tests to support odoo standard ISOLATION_LEVEL_REPEATABLE_READ (see diff bellow) and I can confirm there is no issue with quantity nor with reservation (#32434) in this context. My tests pass once accounted for the 50% to 60% of TransactionRollbackError / "could not serialize access due to concurrent update". This level of transaction abortion was why I originally modified the isolation level in the context of our simulation application to allow for more "unattended" parallel execution of many odoo transactions diff --git a/addons/stock/tests/test_quant_mt.py b/addons/stock/tests/test_quant_mt.py
index 385ce89dea9..ccd418f57b1 100644
--- a/addons/stock/tests/test_quant_mt.py
+++ b/addons/stock/tests/test_quant_mt.py
@@ -3,13 +3,16 @@
import concurrent
import logging, threading
+from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from contextlib import closing
-from psycopg2.extensions import ISOLATION_LEVEL_READ_COMMITTED
+from typing import DefaultDict
+
import odoo
from odoo.exceptions import UserError
from odoo.tests.common import tagged, BaseCase, get_db_name
from odoo import api
+from psycopg2._psycopg import TransactionRollbackError
_logger = logging.getLogger(__name__)
@@ -74,9 +77,8 @@ class MTStockQuant(CommitCase):
db_name = get_db_name()
registry = odoo.registry(db_name)
threading.current_thread().dbname = db_name
- user_error_count = 0
+ err_counts = defaultdict(int)
with closing(registry.cursor()) as thread_cr:
- thread_cr._cnx.set_isolation_level(ISOLATION_LEVEL_READ_COMMITTED)
with api.Environment.manage():
env = api.Environment(thread_cr, odoo.SUPERUSER_ID, {})
@@ -87,19 +89,27 @@ class MTStockQuant(CommitCase):
try:
func(env, stock_location, product1, **kwargs)
except UserError as e:
- user_error_count+=1
+ err_counts['user_error_count']+=1
_logger.error(
'%s encountered an Exception: %s', func.__name__, e,
- exc_info=True)
+ exc_info=False)
+ thread_cr.rollback()
+ except TransactionRollbackError as e:
+ err_counts['transaction_rollback_error_count']+=1
+ # _logger.warning('%s encountered a TransactionRollbackError Exception: %s rolling back', func.__name__, e, exc_info=False)
+ thread_cr.rollback()
+
except Exception as e:
+ err_counts['other_error_count']+=1
_logger.error(
'%s encountered an Exception: %s rolling back', func.__name__, e,
exc_info=True)
thread_cr.rollback()
else:
thread_cr.commit()
+ env.clear()
thread_cr.commit()
- return user_error_count
+ return err_counts
@classmethod
def set_initial_stock(cls, env, stock_location, product1):
@@ -125,34 +135,37 @@ class MTStockQuant(CommitCase):
# then empty the odoo cursor pool before starting multiprocessing,
# as cursors can't be shared among forked processes.
odoo.sql_db.close_all()
- user_error_count = 0
+ all_errors=defaultdict(int)
with ProcessPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(self.thread_env_wrapper, stress_function_list, self.id_stock_location,
self.id_product1) for stress_function_list in jobs]
# wait for all jobs submitted the worker processes to terminate
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
- user_error_count += future.result()
+ errors=future.result()
+ for k in errors:
+ all_errors[k]+=errors[k]
+
# reinit class registries after end of children test processes
self.setUpClass()
- return user_error_count
+ return all_errors
def test_available_quantity(self):
-
self.set_initial_stock(self.env,
self.env['stock.location'].browse(self.id_stock_location),
self.env['product.product'].browse(self.id_product1))
- user_error_count = self.run_mp_test_jobs(NBO_TEST_JOBS * [128 * [self.decrease_quant]])
+ err_counts = self.run_mp_test_jobs(NBO_TEST_JOBS * [128 * [self.decrease_quant]])
stock_location = self.env['stock.location'].browse(self.id_stock_location)
product1 = self.env['product.product'].browse(self.id_product1)
+ _logger.info('test_available_quantity: errors counts:%s / %s calls',err_counts, NBO_TEST_JOBS * 128)
- self.assertEqual(self.env['stock.quant']._get_available_quantity(product1, stock_location), 0)
- self.assertEqual(user_error_count, 0)
+ self.assertEqual(self.env['stock.quant']._get_available_quantity(product1, stock_location), err_counts['transaction_rollback_error_count']*5)
+ self.assertEqual(err_counts['user_error_count'], 0)
@classmethod
def reserve_quant(cls, env, stock_location, product1):
@@ -163,22 +176,32 @@ class MTStockQuant(CommitCase):
env['stock.quant']._update_reserved_quantity(product1, stock_location, -10)
env['stock.quant']._update_available_quantity(product1, stock_location, -10)
+ @classmethod
+ def reserve_and_consume_quant(cls, env, stock_location, product1):
+ # call both functions within a single transaction
+ # to prevent trying to un-reserve a failed reservation
+ cls.reserve_quant(env, stock_location, product1)
+ cls.consume_quant(env, stock_location, product1)
+
+
def test_reserved_quantity(self):
self.set_initial_stock(self.env,
self.env['stock.location'].browse(self.id_stock_location),
self.env['product.product'].browse(self.id_product1))
- user_error_count = self.run_mp_test_jobs(NBO_TEST_JOBS * [64 * [self.reserve_quant, self.consume_quant]])
+ err_counts = self.run_mp_test_jobs(NBO_TEST_JOBS * [64 * [self.reserve_and_consume_quant]])
stock_location = self.env['stock.location'].browse(self.id_stock_location)
product1 = self.env['product.product'].browse(self.id_product1)
- self.assertEqual(self.env['stock.quant']._get_available_quantity(product1, stock_location), 0)
+ _logger.info('test_reserved_quantity: errors counts:%s / %s calls',err_counts, NBO_TEST_JOBS * 128)
+
+ self.assertEqual(self.env['stock.quant']._get_available_quantity(product1, stock_location), err_counts['transaction_rollback_error_count']*10)
# we must not get any UserError
# "It is not possible to reserve more products of Product A than you have in stock"
# or
# "It is not possible to unreserve more products of Product A than you have in stock"
- self.assertEqual(user_error_count, 0)
+ self.assertEqual(err_counts['user_error_count'] , 0) |
Impacted versions: tested on 12, ( similar code on 11)
Description of the issue/feature this PR addresses:
by invalidating the quant cache after having obtained the lock for update.
we make sure our record update using the ORM ('quantity' : quant.quantity + quantity)
is based on up-to-date quantity value. Even if another worker did update the quantity since it was
first fetched (before the lock)
Steps to reproduce:
Current behavior before PR:
Under heavy usage in parallel by multiple workers (e.g. multiple Purchase of raw material product and MO in production using that product)
stock_quant available quantity calculation might be wrong, leading to corrupted value:
stock_quant for a given product/location does not match the sum of all corresponding stock_move_line quantities.
Desired behavior after PR is merged:
stock_quant for a given product/location does match the sum of all corresponding stock_move_line quantities whatever odoo usage in parallel
should fix #30008
--
I confirm I have signed the CLA and read the PR guidelines at www.odoo.com/submit-pr
see PR #32092