Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions learning_resources/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def get_content_tasks(
@app.task(bind=True)
def import_all_mit_edx_files(self, chunk_size=None):
"""Ingest MIT edX files from an S3 bucket"""
raise self.replace(
return self.replace(
get_content_tasks(
ETLSource.mit_edx.name,
chunk_size=chunk_size,
Expand All @@ -182,7 +182,7 @@ def import_all_mit_edx_files(self, chunk_size=None):
@app.task(bind=True)
def import_all_oll_files(self, chunk_size=None):
"""Ingest MIT edX files from an S3 bucket"""
raise self.replace(
return self.replace(
get_content_tasks(
ETLSource.oll.name,
chunk_size=chunk_size,
Expand All @@ -195,7 +195,7 @@ def import_all_oll_files(self, chunk_size=None):
@app.task(bind=True)
def import_all_mitxonline_files(self, chunk_size=None):
"""Ingest MITx Online files from an S3 bucket"""
raise self.replace(
return self.replace(
get_content_tasks(
ETLSource.mitxonline.name,
chunk_size=chunk_size,
Expand All @@ -207,7 +207,7 @@ def import_all_mitxonline_files(self, chunk_size=None):
def import_all_xpro_files(self, chunk_size=None):
"""Ingest xPRO OLX files from an S3 bucket"""

raise self.replace(
return self.replace(
get_content_tasks(
ETLSource.xpro.name,
chunk_size=chunk_size,
Expand Down Expand Up @@ -274,7 +274,7 @@ def get_ocw_data( # noqa: PLR0913
and settings.OCW_LIVE_BUCKET
):
log.warning("Required settings missing for get_ocw_data")
return
return None

# get all the courses prefixes we care about
raw_data_bucket = boto3.resource(
Expand All @@ -300,7 +300,7 @@ def get_ocw_data( # noqa: PLR0913

if len(ocw_courses) == 0:
log.info("No courses matching url substring")
return
return None

log.info("Backpopulating %d OCW courses...", len(ocw_courses))

Expand All @@ -317,7 +317,7 @@ def get_ocw_data( # noqa: PLR0913
)
]
)
raise self.replace(ocw_tasks)
return self.replace(ocw_tasks)


@app.task
Expand Down
12 changes: 8 additions & 4 deletions learning_resources_search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def send_subscription_emails(self, subscription_type, period="daily"):
)
]
)
raise self.replace(email_tasks)
return self.replace(email_tasks)


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
Expand Down Expand Up @@ -625,7 +625,7 @@ def start_recreate_index(self, indexes, remove_existing_reindexing_tags):

# Use self.replace so that code waiting on this task will also wait on the indexing
# and finish tasks
raise self.replace(
return self.replace(
celery.chain(index_tasks, finish_recreate_index.s(new_backing_indices))
)

Expand Down Expand Up @@ -678,7 +678,7 @@ def start_update_index(self, indexes, etl_source):
error = "start_update_index threw an error"
log.exception(error)
return [error]
raise self.replace(celery.chain(index_tasks, finish_update_index.s()))
return self.replace(celery.chain(index_tasks, finish_update_index.s()))


def get_update_resource_files_tasks(blocklisted_ids, etl_source):
Expand Down Expand Up @@ -841,7 +841,11 @@ def get_update_learning_resource_tasks(resource_type):


@app.task(
acks_late=True, autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m"
acks_late=True,
reject_on_worker_lost=True,
autoretry_for=(RetryError,),
retry_backoff=True,
rate_limit="600/m",
)
def finish_recreate_index(results, backing_indices):
"""
Expand Down