From 0c9159e62182b28f7bfc0cd2a8a60ed5c11bc102 Mon Sep 17 00:00:00 2001 From: Anastasia Beglova Date: Tue, 9 Jul 2024 16:31:06 -0400 Subject: [PATCH] reindexing fixes --- learning_resources_search/indexing_api.py | 42 ++++++-- .../indexing_api_test.py | 46 ++++++--- .../management/commands/recreate_index.py | 36 +++++-- learning_resources_search/tasks.py | 25 ++++- learning_resources_search/tasks_test.py | 99 ++++++++++++++++++- 5 files changed, 210 insertions(+), 38 deletions(-) diff --git a/learning_resources_search/indexing_api.py b/learning_resources_search/indexing_api.py index a51dcc8c0c..6b6c3dd21c 100644 --- a/learning_resources_search/indexing_api.py +++ b/learning_resources_search/indexing_api.py @@ -514,20 +514,46 @@ def switch_indices(backing_index, object_type): ) -def delete_orphaned_indices(): +def delete_orphaned_indexes(obj_types, delete_reindexing_tags): """ - Delete any indices without aliases and any reindexing aliases + Delete any indices without aliases """ conn = get_conn() indices = conn.indices.get_alias(index="*") for index in indices: aliases = indices[index]["aliases"] keys = list(aliases) + + if delete_reindexing_tags: + for alias in aliases: + if "reindexing" in alias: + log.info("Deleting alias %s for index %s", alias, index) + conn.indices.delete_alias(name=alias, index=index) + keys.remove(alias) + + if not keys and not index.startswith("."): + for object_type in obj_types: + if object_type in index: + log.info("Deleting orphaned index %s", index) + conn.indices.delete(index) + break + + +def get_existing_reindexing_indexes(obj_types): + """ + Check for existing indexes with reindexing tag + """ + conn = get_conn() + reindexing_indexes = [] + indices = conn.indices.get_alias(index="*") + for index in indices: + aliases = indices[index]["aliases"] + for alias in aliases: if "reindexing" in alias: - log.info("Deleting alias %s for index %s", alias, index) - conn.indices.delete_alias(name=alias, index=index) - keys.remove(alias) - if not keys: - log.info("Deleting index %s", index) - conn.indices.delete(index) + for object_type in obj_types: + if object_type in index: + reindexing_indexes.append(index) + break + + return reindexing_indexes diff --git a/learning_resources_search/indexing_api_test.py b/learning_resources_search/indexing_api_test.py index 4befc98b58..50f9f31491 100644 --- a/learning_resources_search/indexing_api_test.py +++ b/learning_resources_search/indexing_api_test.py @@ -32,7 +32,7 @@ deindex_learning_resources, deindex_percolators, deindex_run_content_files, - delete_orphaned_indices, + delete_orphaned_indexes, get_reindexing_alias_name, index_content_files, index_course_content_files, @@ -349,7 +349,8 @@ def test_index_items_size_limits(settings, mocker, max_size, chunks, exceeds_siz assert mock_log.call_count == (10 if exceeds_size else 0) -def test_delete_orphaned_indices(mocker, mocked_es): +@pytest.mark.parametrize("delete_reindexing_tags", [True, False]) +def test_delete_orphaned_indexes(mocker, mocked_es, delete_reindexing_tags): """ Delete any indices without aliases and any reindexing aliases """ @@ -369,6 +370,12 @@ def test_delete_orphaned_indices(mocker, mocked_es): "some_other_alias": {}, } }, + "discussions_local_podcast_1e61d15ff35b8c9b4f6884bba05484cb": { + "aliases": { + "discussions_local_program_reindexing": {}, + "some_other_alias": {}, + } + }, "discussions_local_program_5484cbb8c9b1e61d15ff354f6884bba0": {"aliases": {}}, "discussions_local_course_15ff354d6884bba05484cbb8c9b1e61d": { "aliases": { @@ -380,23 +387,32 @@ def test_delete_orphaned_indices(mocker, mocked_es): mocked_es.conn.indices = mocker.Mock( delete_alias=mocker.Mock(), get_alias=mocker.Mock(return_value=mock_aliases) ) - delete_orphaned_indices() + delete_orphaned_indexes(["program"], delete_reindexing_tags=delete_reindexing_tags) mocked_es.conn.indices.get_alias.assert_called_once_with(index="*") - mocked_es.conn.indices.delete_alias.assert_any_call( - name="discussions_local_program_reindexing", - index="discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb", - ) - mocked_es.conn.indices.delete_alias.assert_any_call( - name="discussions_local_program_reindexing", - index="discussions_local_program_1e61d15ff35b8c9b4f6884bba05484cb", - ) + + if delete_reindexing_tags: + mocked_es.conn.indices.delete_alias.assert_any_call( + name="discussions_local_program_reindexing", + index="discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb", + ) + mocked_es.conn.indices.delete_alias.assert_any_call( + name="discussions_local_program_reindexing", + index="discussions_local_program_1e61d15ff35b8c9b4f6884bba05484cb", + ) + else: + mocked_es.conn.indices.delete_alias.assert_not_called() + mocked_es.conn.indices.delete.assert_any_call( "discussions_local_program_5484cbb8c9b1e61d15ff354f6884bba0" ) - mocked_es.conn.indices.delete.assert_any_call( - "discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb" - ) - assert mocked_es.conn.indices.delete.call_count == 2 + + if delete_reindexing_tags: + mocked_es.conn.indices.delete.assert_any_call( + "discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb" + ) + assert mocked_es.conn.indices.delete.call_count == 2 + else: + assert mocked_es.conn.indices.delete.call_count == 1 def test_bulk_content_file_deindex_on_course_deletion(mocker): diff --git a/learning_resources_search/management/commands/recreate_index.py b/learning_resources_search/management/commands/recreate_index.py index 6969f9c7f8..6d47b10b63 100644 --- a/learning_resources_search/management/commands/recreate_index.py +++ b/learning_resources_search/management/commands/recreate_index.py @@ -3,6 +3,7 @@ from django.core.management.base import BaseCommand, CommandError from learning_resources_search.constants import ALL_INDEX_TYPES +from learning_resources_search.indexing_api import get_existing_reindexing_indexes from learning_resources_search.tasks import start_recreate_index from main.utils import now_in_utc @@ -13,6 +14,13 @@ class Command(BaseCommand): help = "Recreate opensearch index" def add_arguments(self, parser): + parser.add_argument( + "--remove_existing_reindexing_tags", + dest="remove_existing_reindexing_tags", + action="store_true", + help="Overwrite any existing reindexing tags and remove those indexes", + ) + parser.add_argument( "--all", dest="all", action="store_true", help="Recreate all indexes" ) @@ -28,11 +36,9 @@ def add_arguments(self, parser): def handle(self, *args, **options): # noqa: ARG002 """Index all LEARNING_RESOURCE_TYPES""" + remove_existing_reindexing_tags = options["remove_existing_reindexing_tags"] if options["all"]: - task = start_recreate_index.delay(list(ALL_INDEX_TYPES)) - self.stdout.write( - f"Started celery task {task} to index content for all indexes" - ) + indexes_to_update = list(ALL_INDEX_TYPES) else: indexes_to_update = list( filter(lambda object_type: options[object_type], ALL_INDEX_TYPES) @@ -44,12 +50,24 @@ def handle(self, *args, **options): # noqa: ARG002 for object_type in sorted(ALL_INDEX_TYPES): self.stdout.write(f" --{object_type}s") return - - task = start_recreate_index.delay(indexes_to_update) - self.stdout.write( - f"Started celery task {task} to index content for the following" - f" indexes: {indexes_to_update}" + if not remove_existing_reindexing_tags: + existing_reindexing_indexes = get_existing_reindexing_indexes( + indexes_to_update ) + if existing_reindexing_indexes: + self.stdout.write( + f"Reindexing in progress. Reindexing indexes already exist:" + f" {', '.join(existing_reindexing_indexes)}" + ) + return + + task = start_recreate_index.delay( + indexes_to_update, remove_existing_reindexing_tags + ) + self.stdout.write( + f"Started celery task {task} to index content for the following" + f" indexes: {indexes_to_update}" + ) self.stdout.write("Waiting on task...") start = now_in_utc() diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index 8d941b4cb1..1d94d7b935 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -458,11 +458,26 @@ def wrap_retry_exception(*exception_classes): @app.task(bind=True) -def start_recreate_index(self, indexes): +def start_recreate_index(self, indexes, remove_existing_reindexing_tags): """ Wipe and recreate index and mapping, and index all items. """ try: + if not remove_existing_reindexing_tags: + existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes) + + if existing_reindexing_indexes: + error = ( + f"Reindexing in progress. Reindexing indexes already exist: " + f"{', '.join(existing_reindexing_indexes)}" + ) + log.exception(error) + return error + + api.delete_orphaned_indexes( + indexes, delete_reindexing_tags=remove_existing_reindexing_tags + ) + new_backing_indices = { obj_type: api.create_backing_index(obj_type) for obj_type in indexes } @@ -768,7 +783,9 @@ def get_update_learning_resource_tasks(resource_type): ] -@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") +@app.task( + acks_late=True, autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m" +) def finish_recreate_index(results, backing_indices): """ Swap reindex backing index with default backing index @@ -780,7 +797,9 @@ def finish_recreate_index(results, backing_indices): errors = merge_strings(results) if errors: try: - api.delete_orphaned_indices() + api.delete_orphaned_indexes( + list(backing_indices.keys()), delete_reindexing_tags=True + ) except RequestError as ex: raise RetryError(str(ex)) from ex msg = f"Errors occurred during recreate_index: {errors}" diff --git a/learning_resources_search/tasks_test.py b/learning_resources_search/tasks_test.py index 701b650a13..a65ac7d0e9 100644 --- a/learning_resources_search/tasks_test.py +++ b/learning_resources_search/tasks_test.py @@ -165,15 +165,27 @@ def test_start_recreate_index(mocker, mocked_celery, user, indexes): autospec=True, return_value=backing_index, ) + mocker.patch( + "learning_resources_search.indexing_api.get_existing_reindexing_indexes", + autospec=True, + return_value=[], + ) + delete_orphaned_indexes_mock = mocker.patch( + "learning_resources_search.indexing_api.delete_orphaned_indexes", autospec=True + ) finish_recreate_index_mock = mocker.patch( "learning_resources_search.tasks.finish_recreate_index", autospec=True ) with pytest.raises(mocked_celery.replace_exception_class): - start_recreate_index.delay(indexes) + start_recreate_index.delay(indexes, remove_existing_reindexing_tags=False) finish_recreate_index_dict = {} + delete_orphaned_indexes_mock.assert_called_once_with( + indexes, delete_reindexing_tags=False + ) + for doctype in LEARNING_RESOURCE_TYPES: if doctype in indexes: finish_recreate_index_dict[doctype] = backing_index @@ -240,6 +252,87 @@ def test_start_recreate_index(mocker, mocked_celery, user, indexes): assert mocked_celery.replace.call_args[0][1] == mocked_celery.chain.return_value +@pytest.mark.parametrize( + "remove_existing_reindexing_tags", + [True, False], +) +def test_start_recreate_index_existing_reindexing_index( + mocker, mocked_celery, user, remove_existing_reindexing_tags +): + settings.OPENSEARCH_INDEXING_CHUNK_SIZE = 2 + settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE = 2 + indexes = ["program"] + + programs = sorted( + ProgramFactory.create_batch(4), + key=lambda program: program.learning_resource_id, + ) + + index_learning_resources_mock = mocker.patch( + "learning_resources_search.tasks.index_learning_resources", autospec=True + ) + + backing_index = "backing" + mocker.patch( + "learning_resources_search.indexing_api.create_backing_index", + autospec=True, + return_value=backing_index, + ) + delete_orphaned_indexes_mock = mocker.patch( + "learning_resources_search.indexing_api.delete_orphaned_indexes", autospec=True + ) + finish_recreate_index_mock = mocker.patch( + "learning_resources_search.tasks.finish_recreate_index", autospec=True + ) + + mocker.patch( + "learning_resources_search.indexing_api.get_existing_reindexing_indexes", + autospec=True, + return_value=["another_reindexing_index"], + ) + + if remove_existing_reindexing_tags: + with pytest.raises(mocked_celery.replace_exception_class): + start_recreate_index.delay( + indexes, remove_existing_reindexing_tags=remove_existing_reindexing_tags + ) + else: + start_recreate_index.delay( + indexes, remove_existing_reindexing_tags=remove_existing_reindexing_tags + ) + + finish_recreate_index_dict = {"program": "backing"} + + if remove_existing_reindexing_tags: + delete_orphaned_indexes_mock.assert_called_once_with( + indexes, delete_reindexing_tags=True + ) + + finish_recreate_index_mock.s.assert_called_once_with(finish_recreate_index_dict) + assert mocked_celery.group.call_count == 1 + + # Celery's 'group' function takes a generator as an argument. In order to make assertions about the items + # in that generator, 'list' is being called to force iteration through all of those items. + list(mocked_celery.group.call_args[0][0]) + assert index_learning_resources_mock.si.call_count == 2 + index_learning_resources_mock.si.assert_any_call( + [programs[0].learning_resource_id, programs[1].learning_resource_id], + PROGRAM_TYPE, + index_types=IndexestoUpdate.reindexing_index.value, + ) + index_learning_resources_mock.si.assert_any_call( + [programs[2].learning_resource_id, programs[3].learning_resource_id], + PROGRAM_TYPE, + index_types=IndexestoUpdate.reindexing_index.value, + ) + + assert mocked_celery.replace.call_count == 1 + assert mocked_celery.replace.call_args[0][1] == mocked_celery.chain.return_value + else: + assert index_learning_resources_mock.si.call_count == 0 + assert mocked_celery.replace.call_count == 0 + + @pytest.mark.parametrize("with_error", [True, False]) def test_finish_recreate_index(mocker, with_error): """ @@ -251,7 +344,7 @@ def test_finish_recreate_index(mocker, with_error): "learning_resources_search.indexing_api.switch_indices", autospec=True ) mock_delete_orphans = mocker.patch( - "learning_resources_search.indexing_api.delete_orphaned_indices" + "learning_resources_search.indexing_api.delete_orphaned_indexes" ) if with_error: @@ -280,7 +373,7 @@ def test_finish_recreate_index_retry_exceptions(mocker, with_error): side_effect=[mock_error, None], ) mock_delete_orphans = mocker.patch( - "learning_resources_search.indexing_api.delete_orphaned_indices", + "learning_resources_search.indexing_api.delete_orphaned_indexes", side_effect=[mock_error, None], )