Skip to content

Commit

Permalink
Don't rely on counting counting objects when reindexing (#9442)
Browse files Browse the repository at this point in the history
Also try to limit memory consumption in Celery eager mode.
  • Loading branch information
patrys committed Apr 4, 2022
1 parent a5d963d commit 505ff51
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@


def update_user_search_document_values(apps, _schema_editor):
User = apps.get_model("account", "User")
total_count = User.objects.filter(search_document="").count()

def on_migrations_complete(sender=None, **kwargs):
set_user_search_document_values.delay(total_count, 0)
set_user_search_document_values.delay()

post_migrate.connect(on_migrations_complete)

Expand Down
6 changes: 3 additions & 3 deletions saleor/core/management/commands/update_search_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ class Command(BaseCommand):
def handle(self, *args, **options):
# Update products
products_total_count = Product.objects.filter(search_document="").count()
set_product_search_document_values.delay(products_total_count, 0)
self.stdout.write(f"Updating products: {products_total_count}")
set_product_search_document_values.delay()

# Update orders
orders_total_count = Order.objects.filter(search_document="").count()
set_order_search_document_values.delay(orders_total_count, 0)
self.stdout.write(f"Updating orders: {orders_total_count}")
set_order_search_document_values.delay()

# Update users
users_total_count = User.objects.filter(search_document="").count()
set_user_search_document_values.delay(users_total_count, 0)
self.stdout.write(f"Updating users: {users_total_count}")
set_user_search_document_values.delay()
115 changes: 64 additions & 51 deletions saleor/core/search_tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import List

from celery.utils.log import get_task_logger

from ..account.models import User
Expand All @@ -13,94 +15,105 @@

task_logger = get_task_logger(__name__)

BATCH_SIZE = 10000
BATCH_SIZE = 1000


@app.task
def set_user_search_document_values(total_count, updated_count):
qs = User.objects.filter(search_document="").prefetch_related("addresses")[
:BATCH_SIZE
]
if not qs:
def set_user_search_document_values(updated_count: int = 0) -> None:
users = list(
User.objects.filter(search_document="")
.prefetch_related("addresses")[:BATCH_SIZE]
.iterator()
)

if not users:
task_logger.info("No users to update.")
return

updated_count = set_search_document_values(
qs, total_count, updated_count, prepare_user_search_document_value
updated_count += set_search_document_values(
users, prepare_user_search_document_value
)

if updated_count == total_count:
task_logger.info("Updated %d users", updated_count)

if len(users) < BATCH_SIZE:
task_logger.info("Setting user search document values finished.")
return

return set_user_search_document_values.delay(total_count, updated_count)
del users

set_user_search_document_values.delay(updated_count)


@app.task
def set_order_search_document_values(total_count, updated_count):
qs = Order.objects.filter(search_document="").prefetch_related(
"user",
"billing_address",
"shipping_address",
"payments",
"discounts",
"lines",
)[:BATCH_SIZE]
if not qs:
def set_order_search_document_values(updated_count: int = 0) -> None:
orders = list(
Order.objects.filter(search_document="")
.prefetch_related(
"user",
"billing_address",
"shipping_address",
"payments",
"discounts",
"lines",
)[:BATCH_SIZE]
.iterator()
)

if not orders:
task_logger.info("No orders to update.")
return

updated_count = set_search_document_values(
qs, total_count, updated_count, prepare_order_search_document_value
updated_count += set_search_document_values(
orders, prepare_order_search_document_value
)

if updated_count == total_count:
task_logger.info("Updated %d orders", updated_count)

if len(orders) < BATCH_SIZE:
task_logger.info("Setting order search document values finished.")
return

return set_order_search_document_values.delay(total_count, updated_count)
del orders

set_order_search_document_values.delay(updated_count)


@app.task
def set_product_search_document_values(total_count, updated_count):
# set lower batch size as it was crashing for products with
# lots of attributes because out of memory issues
batch_size = 500
qs = Product.objects.filter(search_document="").prefetch_related(
*PRODUCT_FIELDS_TO_PREFETCH
)[:batch_size]
if not qs:
def set_product_search_document_values(updated_count: int = 0) -> None:
products = list(
Product.objects.filter(search_document="")
.prefetch_related(*PRODUCT_FIELDS_TO_PREFETCH)[:BATCH_SIZE]
.iterator()
)

if not products:
task_logger.info("No products to update.")
return

updated_count = set_search_document_values(
qs, total_count, updated_count, prepare_product_search_document_value
updated_count += set_search_document_values(
products, prepare_product_search_document_value
)

if updated_count == total_count:
task_logger.info("Updated %d products", updated_count)

if len(products) < BATCH_SIZE:
task_logger.info("Setting product search document values finished.")
return

return set_product_search_document_values.delay(total_count, updated_count)
del products

set_product_search_document_values.delay(updated_count)

def set_search_document_values(
qs, total_count, updated_count, prepare_search_document_func
):
Model = qs.model
instances = []
for instance in qs:

def set_search_document_values(instances: List, prepare_search_document_func):
if not instances:
return 0
Model = instances[0]._meta.Model
for instance in instances:
instance.search_document = prepare_search_document_func(
instance, already_prefetched=True
)
instances.append(instance)
Model.objects.bulk_update(instances, ["search_document"])

updated_count += len(instances)
progress = round((updated_count / total_count) * 100, 2)

task_logger.info(
f"Updated {updated_count} from {total_count} {Model.__name__.lower()}s - "
f"{progress}% done."
)
return updated_count
return len(instances)
5 changes: 1 addition & 4 deletions saleor/order/migrations/0123_update_order_search_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@


def update_order_search_document_values(apps, _schema_editor):
Order = apps.get_model("order", "Order")
total_count = Order.objects.filter(search_document="").count()

def on_migrations_complete(sender=None, **kwargs):
set_order_search_document_values.delay(total_count, 0)
set_order_search_document_values.delay()

post_migrate.connect(on_migrations_complete)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@


def update_product_search_document_values(apps, _schema_editor):
Product = apps.get_model("product", "Product")
total_count = Product.objects.filter(search_document="").count()

def on_migrations_complete(sender=None, **kwargs):
set_product_search_document_values.delay(total_count, 0)
set_product_search_document_values.delay()

post_migrate.connect(on_migrations_complete)

Expand Down

0 comments on commit 505ff51

Please sign in to comment.