Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
PaginatedResponse,
AnyJob,
StorageRef,
User,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format

Expand Down Expand Up @@ -413,7 +414,7 @@ async def get_replica_job_file(
async def retry_background_job(
self, job_id: str, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry background job and return new job id"""
"""Retry background job"""
job = await self.get_background_job(job_id, org.id)
if not job:
raise HTTPException(status_code=404, detail="job_not_found")
Expand Down Expand Up @@ -455,11 +456,42 @@ async def retry_background_job(

return {"success": True}

async def retry_failed_background_jobs(
self, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry all failed background jobs in an org

Keep track of tasks in set to prevent them from being garbage collected
See: https://stackoverflow.com/a/74059981
"""
bg_tasks = set()
async for job in self.jobs.find({"oid": org.id, "success": False}):
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
bg_tasks.add(task)
task.add_done_callback(bg_tasks.discard)
return {"success": True}

async def retry_all_failed_background_jobs(
self,
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry all failed background jobs from all orgs

Keep track of tasks in set to prevent them from being garbage collected
See: https://stackoverflow.com/a/74059981
"""
bg_tasks = set()
async for job in self.jobs.find({"success": False}):
org = await self.org_ops.get_org_by_id(job["oid"])
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
bg_tasks.add(task)
task.add_done_callback(bg_tasks.discard)
return {"success": True}


# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
def init_background_jobs_api(
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
app, mdb, email, user_manager, org_ops, crawl_manager, storage_ops, user_dep
):
"""init background jobs system"""
# pylint: disable=invalid-name
Expand Down Expand Up @@ -494,6 +526,25 @@ async def retry_background_job(
"""Retry background job"""
return await ops.retry_background_job(job_id, org)

@app.post(
"/orgs/all/jobs/retryFailed",
)
async def retry_all_failed_background_jobs(user: User = Depends(user_dep)):
"""Retry failed background jobs from all orgs"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

return await ops.retry_all_failed_background_jobs()

@router.post(
"/retryFailed",
)
async def retry_failed_background_jobs(
org: Organization = Depends(org_crawl_dep),
):
"""Retry failed background jobs"""
return await ops.retry_failed_background_jobs(org)

@router.get("", response_model=PaginatedResponse)
async def list_background_jobs(
org: Organization = Depends(org_crawl_dep),
Expand Down
9 changes: 8 additions & 1 deletion backend/btrixcloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ def main():
storage_ops = init_storages_api(org_ops, crawl_manager)

background_job_ops = init_background_jobs_api(
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
app,
mdb,
email,
user_manager,
org_ops,
crawl_manager,
storage_ops,
current_active_user,
)

profiles = init_profiles_api(
Expand Down
29 changes: 29 additions & 0 deletions backend/test_nightly/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
ADMIN_USERNAME = "admin@example.com"
ADMIN_PW = "PASSW0RD!"

CRAWLER_USERNAME = "crawlernightly@example.com"
CRAWLER_PW = "crawlerPASSWORD!"


@pytest.fixture(scope="session")
def admin_auth_headers():
Expand Down Expand Up @@ -44,6 +47,32 @@ def default_org_id(admin_auth_headers):
time.sleep(5)


@pytest.fixture(scope="session")
def crawler_auth_headers(admin_auth_headers, default_org_id):
requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/add-user",
json={
"email": CRAWLER_USERNAME,
"password": CRAWLER_PW,
"name": "new-crawler",
"description": "crawler test crawl",
"role": 20,
},
headers=admin_auth_headers,
)
r = requests.post(
f"{API_PREFIX}/auth/jwt/login",
data={
"username": CRAWLER_USERNAME,
"password": CRAWLER_PW,
"grant_type": "password",
},
)
data = r.json()
access_token = data.get("access_token")
return {"Authorization": f"Bearer {access_token}"}


@pytest.fixture(scope="session")
def crawl_id_wr(admin_auth_headers, default_org_id):
# Start crawl.
Expand Down
8 changes: 8 additions & 0 deletions backend/test_nightly/test_z_background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,11 @@ def test_get_background_job(admin_auth_headers, default_org_id):
assert data["object_type"]
assert data["object_id"]
assert data["replica_storage"]


def test_retry_all_failed_bg_jobs_not_superuser(crawler_auth_headers):
r = requests.post(
f"{API_PREFIX}/orgs/all/jobs/retryFailed", headers=crawler_auth_headers
)
assert r.status_code == 403
assert r.json()["detail"] == "Not Allowed"