Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions learning_resources_search/indexing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,20 +514,46 @@ def switch_indices(backing_index, object_type):
)


def delete_orphaned_indices():
def delete_orphaned_indexes(obj_types, delete_reindexing_tags):
"""
Delete any indices without aliases and any reindexing aliases
Delete any indices without aliases
"""
conn = get_conn()
indices = conn.indices.get_alias(index="*")
for index in indices:
aliases = indices[index]["aliases"]
keys = list(aliases)

if delete_reindexing_tags:
for alias in aliases:
if "reindexing" in alias:
log.info("Deleting alias %s for index %s", alias, index)
conn.indices.delete_alias(name=alias, index=index)
keys.remove(alias)

if not keys and not index.startswith("."):
for object_type in obj_types:
if object_type in index:
log.info("Deleting orphaned index %s", index)
conn.indices.delete(index)
break


def get_existing_reindexing_indexes(obj_types):
"""
Check for existing indexes with reindexing tag
"""
conn = get_conn()
reindexing_indexes = []
indices = conn.indices.get_alias(index="*")
for index in indices:
aliases = indices[index]["aliases"]

for alias in aliases:
if "reindexing" in alias:
log.info("Deleting alias %s for index %s", alias, index)
conn.indices.delete_alias(name=alias, index=index)
keys.remove(alias)
if not keys:
log.info("Deleting index %s", index)
conn.indices.delete(index)
for object_type in obj_types:
if object_type in index:
reindexing_indexes.append(index)
break

return reindexing_indexes
46 changes: 31 additions & 15 deletions learning_resources_search/indexing_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
deindex_learning_resources,
deindex_percolators,
deindex_run_content_files,
delete_orphaned_indices,
delete_orphaned_indexes,
get_reindexing_alias_name,
index_content_files,
index_course_content_files,
Expand Down Expand Up @@ -349,7 +349,8 @@ def test_index_items_size_limits(settings, mocker, max_size, chunks, exceeds_siz
assert mock_log.call_count == (10 if exceeds_size else 0)


def test_delete_orphaned_indices(mocker, mocked_es):
@pytest.mark.parametrize("delete_reindexing_tags", [True, False])
def test_delete_orphaned_indexes(mocker, mocked_es, delete_reindexing_tags):
"""
Delete any indices without aliases and any reindexing aliases
"""
Expand All @@ -369,6 +370,12 @@ def test_delete_orphaned_indices(mocker, mocked_es):
"some_other_alias": {},
}
},
"discussions_local_podcast_1e61d15ff35b8c9b4f6884bba05484cb": {
"aliases": {
"discussions_local_program_reindexing": {},
"some_other_alias": {},
}
},
"discussions_local_program_5484cbb8c9b1e61d15ff354f6884bba0": {"aliases": {}},
"discussions_local_course_15ff354d6884bba05484cbb8c9b1e61d": {
"aliases": {
Expand All @@ -380,23 +387,32 @@ def test_delete_orphaned_indices(mocker, mocked_es):
mocked_es.conn.indices = mocker.Mock(
delete_alias=mocker.Mock(), get_alias=mocker.Mock(return_value=mock_aliases)
)
delete_orphaned_indices()
delete_orphaned_indexes(["program"], delete_reindexing_tags=delete_reindexing_tags)
mocked_es.conn.indices.get_alias.assert_called_once_with(index="*")
mocked_es.conn.indices.delete_alias.assert_any_call(
name="discussions_local_program_reindexing",
index="discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb",
)
mocked_es.conn.indices.delete_alias.assert_any_call(
name="discussions_local_program_reindexing",
index="discussions_local_program_1e61d15ff35b8c9b4f6884bba05484cb",
)

if delete_reindexing_tags:
mocked_es.conn.indices.delete_alias.assert_any_call(
name="discussions_local_program_reindexing",
index="discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb",
)
mocked_es.conn.indices.delete_alias.assert_any_call(
name="discussions_local_program_reindexing",
index="discussions_local_program_1e61d15ff35b8c9b4f6884bba05484cb",
)
else:
mocked_es.conn.indices.delete_alias.assert_not_called()

mocked_es.conn.indices.delete.assert_any_call(
"discussions_local_program_5484cbb8c9b1e61d15ff354f6884bba0"
)
mocked_es.conn.indices.delete.assert_any_call(
"discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb"
)
assert mocked_es.conn.indices.delete.call_count == 2

if delete_reindexing_tags:
mocked_es.conn.indices.delete.assert_any_call(
"discussions_local_program_b8c9b1e61d15ff354f6884bba05484cb"
)
assert mocked_es.conn.indices.delete.call_count == 2
else:
assert mocked_es.conn.indices.delete.call_count == 1


def test_bulk_content_file_deindex_on_course_deletion(mocker):
Expand Down
36 changes: 27 additions & 9 deletions learning_resources_search/management/commands/recreate_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.core.management.base import BaseCommand, CommandError

from learning_resources_search.constants import ALL_INDEX_TYPES
from learning_resources_search.indexing_api import get_existing_reindexing_indexes
from learning_resources_search.tasks import start_recreate_index
from main.utils import now_in_utc

Expand All @@ -13,6 +14,13 @@ class Command(BaseCommand):
help = "Recreate opensearch index"

def add_arguments(self, parser):
parser.add_argument(
"--remove_existing_reindexing_tags",
dest="remove_existing_reindexing_tags",
action="store_true",
help="Overwrite any existing reindexing tags and remove those indexes",
)

parser.add_argument(
"--all", dest="all", action="store_true", help="Recreate all indexes"
)
Expand All @@ -28,11 +36,9 @@ def add_arguments(self, parser):

def handle(self, *args, **options): # noqa: ARG002
"""Index all LEARNING_RESOURCE_TYPES"""
remove_existing_reindexing_tags = options["remove_existing_reindexing_tags"]
if options["all"]:
task = start_recreate_index.delay(list(ALL_INDEX_TYPES))
self.stdout.write(
f"Started celery task {task} to index content for all indexes"
)
indexes_to_update = list(ALL_INDEX_TYPES)
else:
indexes_to_update = list(
filter(lambda object_type: options[object_type], ALL_INDEX_TYPES)
Expand All @@ -44,12 +50,24 @@ def handle(self, *args, **options): # noqa: ARG002
for object_type in sorted(ALL_INDEX_TYPES):
self.stdout.write(f" --{object_type}s")
return

task = start_recreate_index.delay(indexes_to_update)
self.stdout.write(
f"Started celery task {task} to index content for the following"
f" indexes: {indexes_to_update}"
if not remove_existing_reindexing_tags:
existing_reindexing_indexes = get_existing_reindexing_indexes(
indexes_to_update
)
if existing_reindexing_indexes:
self.stdout.write(
f"Reindexing in progress. Reindexing indexes already exist:"
f" {', '.join(existing_reindexing_indexes)}"
)
return

task = start_recreate_index.delay(
indexes_to_update, remove_existing_reindexing_tags
)
self.stdout.write(
f"Started celery task {task} to index content for the following"
f" indexes: {indexes_to_update}"
)

self.stdout.write("Waiting on task...")
start = now_in_utc()
Expand Down
25 changes: 22 additions & 3 deletions learning_resources_search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,26 @@ def wrap_retry_exception(*exception_classes):


@app.task(bind=True)
def start_recreate_index(self, indexes):
def start_recreate_index(self, indexes, remove_existing_reindexing_tags):
"""
Wipe and recreate index and mapping, and index all items.
"""
try:
if not remove_existing_reindexing_tags:
existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes)
Copy link
Contributor Author

@abeglova abeglova Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check both here and in the management command in case start_recreate_index is stuck behind other jobs and another indexing job is started while this task is in the queue. We could just have the check here but it's nicer for the user to know about the existing reindexing indexes right away if it already exists when the management command runs


if existing_reindexing_indexes:
error = (
f"Reindexing in progress. Reindexing indexes already exist: "
f"{', '.join(existing_reindexing_indexes)}"
)
log.exception(error)
return error

api.delete_orphaned_indexes(
indexes, delete_reindexing_tags=remove_existing_reindexing_tags
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are reindex orphans to delete, remove_existing_reindexing_tags has to be True to get to this point. If there aren't any, it seems like delete_reindexing_tags could still be True and it would just step through that conditional block in delete_orphaned_indexes without doing anything? So maybe this argument is not strictly necessary. But it works fine as is anyway, and does avoid running the code in that conditional block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the conditional is not strictly necessary .

)

new_backing_indices = {
obj_type: api.create_backing_index(obj_type) for obj_type in indexes
}
Expand Down Expand Up @@ -768,7 +783,9 @@ def get_update_learning_resource_tasks(resource_type):
]


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
@app.task(
acks_late=True, autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added acks_late=True here just in case, although I don't think this job failing is how orphaned indexes are generally created

)
def finish_recreate_index(results, backing_indices):
"""
Swap reindex backing index with default backing index
Expand All @@ -780,7 +797,9 @@ def finish_recreate_index(results, backing_indices):
errors = merge_strings(results)
if errors:
try:
api.delete_orphaned_indices()
api.delete_orphaned_indexes(
list(backing_indices.keys()), delete_reindexing_tags=True
)
except RequestError as ex:
raise RetryError(str(ex)) from ex
msg = f"Errors occurred during recreate_index: {errors}"
Expand Down
99 changes: 96 additions & 3 deletions learning_resources_search/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,27 @@ def test_start_recreate_index(mocker, mocked_celery, user, indexes):
autospec=True,
return_value=backing_index,
)
mocker.patch(
"learning_resources_search.indexing_api.get_existing_reindexing_indexes",
autospec=True,
return_value=[],
)
delete_orphaned_indexes_mock = mocker.patch(
"learning_resources_search.indexing_api.delete_orphaned_indexes", autospec=True
)
finish_recreate_index_mock = mocker.patch(
"learning_resources_search.tasks.finish_recreate_index", autospec=True
)

with pytest.raises(mocked_celery.replace_exception_class):
start_recreate_index.delay(indexes)
start_recreate_index.delay(indexes, remove_existing_reindexing_tags=False)

finish_recreate_index_dict = {}

delete_orphaned_indexes_mock.assert_called_once_with(
indexes, delete_reindexing_tags=False
)

for doctype in LEARNING_RESOURCE_TYPES:
if doctype in indexes:
finish_recreate_index_dict[doctype] = backing_index
Expand Down Expand Up @@ -240,6 +252,87 @@ def test_start_recreate_index(mocker, mocked_celery, user, indexes):
assert mocked_celery.replace.call_args[0][1] == mocked_celery.chain.return_value


@pytest.mark.parametrize(
"remove_existing_reindexing_tags",
[True, False],
)
def test_start_recreate_index_existing_reindexing_index(
mocker, mocked_celery, user, remove_existing_reindexing_tags
):
settings.OPENSEARCH_INDEXING_CHUNK_SIZE = 2
settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE = 2
indexes = ["program"]

programs = sorted(
ProgramFactory.create_batch(4),
key=lambda program: program.learning_resource_id,
)

index_learning_resources_mock = mocker.patch(
"learning_resources_search.tasks.index_learning_resources", autospec=True
)

backing_index = "backing"
mocker.patch(
"learning_resources_search.indexing_api.create_backing_index",
autospec=True,
return_value=backing_index,
)
delete_orphaned_indexes_mock = mocker.patch(
"learning_resources_search.indexing_api.delete_orphaned_indexes", autospec=True
)
finish_recreate_index_mock = mocker.patch(
"learning_resources_search.tasks.finish_recreate_index", autospec=True
)

mocker.patch(
"learning_resources_search.indexing_api.get_existing_reindexing_indexes",
autospec=True,
return_value=["another_reindexing_index"],
)

if remove_existing_reindexing_tags:
with pytest.raises(mocked_celery.replace_exception_class):
start_recreate_index.delay(
indexes, remove_existing_reindexing_tags=remove_existing_reindexing_tags
)
else:
start_recreate_index.delay(
indexes, remove_existing_reindexing_tags=remove_existing_reindexing_tags
)

finish_recreate_index_dict = {"program": "backing"}

if remove_existing_reindexing_tags:
delete_orphaned_indexes_mock.assert_called_once_with(
indexes, delete_reindexing_tags=True
)

finish_recreate_index_mock.s.assert_called_once_with(finish_recreate_index_dict)
assert mocked_celery.group.call_count == 1

# Celery's 'group' function takes a generator as an argument. In order to make assertions about the items
# in that generator, 'list' is being called to force iteration through all of those items.
list(mocked_celery.group.call_args[0][0])
assert index_learning_resources_mock.si.call_count == 2
index_learning_resources_mock.si.assert_any_call(
[programs[0].learning_resource_id, programs[1].learning_resource_id],
PROGRAM_TYPE,
index_types=IndexestoUpdate.reindexing_index.value,
)
index_learning_resources_mock.si.assert_any_call(
[programs[2].learning_resource_id, programs[3].learning_resource_id],
PROGRAM_TYPE,
index_types=IndexestoUpdate.reindexing_index.value,
)

assert mocked_celery.replace.call_count == 1
assert mocked_celery.replace.call_args[0][1] == mocked_celery.chain.return_value
else:
assert index_learning_resources_mock.si.call_count == 0
assert mocked_celery.replace.call_count == 0


@pytest.mark.parametrize("with_error", [True, False])
def test_finish_recreate_index(mocker, with_error):
"""
Expand All @@ -251,7 +344,7 @@ def test_finish_recreate_index(mocker, with_error):
"learning_resources_search.indexing_api.switch_indices", autospec=True
)
mock_delete_orphans = mocker.patch(
"learning_resources_search.indexing_api.delete_orphaned_indices"
"learning_resources_search.indexing_api.delete_orphaned_indexes"
)

if with_error:
Expand Down Expand Up @@ -280,7 +373,7 @@ def test_finish_recreate_index_retry_exceptions(mocker, with_error):
side_effect=[mock_error, None],
)
mock_delete_orphans = mocker.patch(
"learning_resources_search.indexing_api.delete_orphaned_indices",
"learning_resources_search.indexing_api.delete_orphaned_indexes",
side_effect=[mock_error, None],
)

Expand Down