Skip to content

Commit

Permalink
Concurrent Crawl Limit (#874)
Browse files Browse the repository at this point in the history
concurrent crawl limits: (addresses #866)
- support limits on concurrent crawls that can be run within a single org
- change 'waiting' state to 'waiting_org_limit' for concurrent crawl limit and 'waiting_capacity' for capacity-based
limits

orgs:
- add 'maxConcurrentCrawl' to new 'quotas' object on orgs
- add /quotas endpoint for updating quotas object

operator:
- add all crawljobs as related, appear to be returned in creation order
- operator: if concurrent crawl limit set, ensures current job is in the first N set of crawljobs (as provided via 'related' list of crawljob objects) before it can proceed to 'starting', otherwise set to 'waiting_org_limit'
- api: add org /quotas endpoint for configuring quotas
- remove 'new' state, always start with 'starting'
- crawljob: add 'oid' to crawljob spec and label for easier querying
- more stringent state transitions: add allowed_from to set_state()
- ensure state transitions only happened from allowed states, while failed/canceled can happen from any state
- ensure finished and state synched from db if transition not allowed
- add crawl indices by oid and cid

frontend: 
- show different waiting states on frontend: 'Waiting (Crawl Limit) and 'Waiting (At Capacity)'
- add gear icon on orgs admin page
- and initial popup for setting org quotas, showing all properties from org 'quotas' object

tests:
- add concurrent crawl limit nightly tests
- fix state waiting -> waiting_capacity
- ci: add logging of operator output on test failure
  • Loading branch information
ikreymer committed May 30, 2023
1 parent ab518f5 commit 00fb8ac
Show file tree
Hide file tree
Showing 19 changed files with 484 additions and 76 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/k3d-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ jobs:
- name: Run Tests
run: pytest -vv ./backend/test/*.py

- name: Print Backend Logs
- name: Print Backend Logs (API)
if: ${{ failure() }}
run: kubectl logs svc/browsertrix-cloud-backend
run: kubectl logs svc/browsertrix-cloud-backend -c api

- name: Print Backend Logs (Operator)
if: ${{ failure() }}
run: kubectl logs svc/browsertrix-cloud-backend -c op
11 changes: 7 additions & 4 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ async def create_crawl_job(self, crawlconfig, userid: str):
cid = str(crawlconfig.id)

return await self.new_crawl_job(
cid, userid, crawlconfig.scale, crawlconfig.crawlTimeout, manual=True
cid,
userid,
crawlconfig.oid,
crawlconfig.scale,
crawlconfig.crawlTimeout,
manual=True,
)

async def update_crawl_config(self, crawlconfig, update, profile_filename=None):
Expand Down Expand Up @@ -275,9 +280,7 @@ async def shutdown_crawl(self, crawl_id, oid, graceful=True):
patch = {"stopping": True}
return await self._patch_job(crawl_id, patch)

await self.delete_crawl_job(crawl_id)

return {"success": True}
return await self.delete_crawl_job(crawl_id)

async def delete_crawl_configs_for_org(self, org):
"""Delete all crawl configs for given org"""
Expand Down
61 changes: 51 additions & 10 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@

RUNNING_STATES = ("running", "pending-wait", "generate-wacz", "uploading-wacz")

RUNNING_AND_STARTING_STATES = ("starting", "waiting", *RUNNING_STATES)
STARTING_STATES = ("starting", "waiting_capacity", "waiting_org_limit")

FAILED_STATES = ("canceled", "failed")

SUCCESSFUL_STATES = ("complete", "partial_complete", "timed_out")

ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *FAILED_STATES, *SUCCESSFUL_STATES)
RUNNING_AND_STARTING_STATES = (*STARTING_STATES, *RUNNING_STATES)

NON_RUNNING_STATES = (*FAILED_STATES, *SUCCESSFUL_STATES)

ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES)


# ============================================================================
Expand Down Expand Up @@ -218,6 +222,9 @@ def __init__(self, mdb, users, crawl_manager, crawl_configs, orgs):
async def init_index(self):
"""init index for crawls db collection"""
await self.crawls.create_index([("finished", pymongo.DESCENDING)])
await self.crawls.create_index([("oid", pymongo.HASHED)])
await self.crawls.create_index([("cid", pymongo.HASHED)])
await self.crawls.create_index([("state", pymongo.HASHED)])

async def list_crawls(
self,
Expand Down Expand Up @@ -591,6 +598,22 @@ async def update_crawl_scale(

return True

async def update_crawl_state(self, crawl_id: str, state: str):
"""called only when job container is being stopped/canceled"""

data = {"state": state}
# if cancelation, set the finish time here
if state == "canceled":
data["finished"] = dt_now()

await self.crawls.find_one_and_update(
{
"_id": crawl_id,
"state": {"$in": RUNNING_AND_STARTING_STATES},
},
{"$set": data},
)

async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool):
"""stop or cancel specified crawl"""
result = None
Expand All @@ -614,6 +637,15 @@ async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool)
status_code=404, detail=f"crawl_not_found, (details: {exc})"
)

# if job no longer running, canceling is considered success,
# but graceful stoppage is not possible, so would be a failure
if result.get("error") == "Not Found":
if not graceful:
await self.update_crawl_state(crawl_id, "canceled")
crawl = await self.get_crawl_raw(crawl_id, org)
await self.crawl_configs.stats_recompute_remove_crawl(crawl["cid"], 0)
return {"success": True}

# return whatever detail may be included in the response
raise HTTPException(status_code=400, detail=result)

Expand Down Expand Up @@ -880,16 +912,25 @@ async def add_new_crawl(


# ============================================================================
async def update_crawl_state_if_changed(crawls, crawl_id, state, **kwargs):
async def update_crawl_state_if_allowed(
crawls, crawl_id, state, allowed_from, **kwargs
):
"""update crawl state and other properties in db if state has changed"""
kwargs["state"] = state
res = await crawls.find_one_and_update(
{"_id": crawl_id, "state": {"$ne": state}},
{"$set": kwargs},
return_document=pymongo.ReturnDocument.AFTER,
)
print("** UPDATE", crawl_id, state, res is not None)
return res
query = {"_id": crawl_id}
if allowed_from:
query["state"] = {"$in": allowed_from}

return await crawls.find_one_and_update(query, {"$set": kwargs})


# ============================================================================
async def get_crawl_state(crawls, crawl_id):
"""return current crawl state of a crawl"""
res = await crawls.find_one({"_id": crawl_id}, projection=["state", "finished"])
if not res:
return None, None
return res.get("state"), res.get("finished")


# ============================================================================
Expand Down
19 changes: 9 additions & 10 deletions backend/btrixcloud/k8sapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio.client.api import custom_objects_api
from kubernetes_asyncio.utils import create_from_dict

from kubernetes_asyncio.client.exceptions import ApiException

from fastapi.templating import Jinja2Templates
from .utils import get_templates_dir, dt_now, to_k8s_date
Expand Down Expand Up @@ -63,7 +63,9 @@ def get_redis_url(self, crawl_id):
return redis_url

# pylint: disable=too-many-arguments
async def new_crawl_job(self, cid, userid, scale=1, crawl_timeout=0, manual=True):
async def new_crawl_job(
self, cid, userid, oid, scale=1, crawl_timeout=0, manual=True
):
"""load job template from yaml"""
if crawl_timeout:
crawl_expire_time = to_k8s_date(dt_now() + timedelta(seconds=crawl_timeout))
Expand All @@ -77,6 +79,7 @@ async def new_crawl_job(self, cid, userid, scale=1, crawl_timeout=0, manual=True
params = {
"id": crawl_id,
"cid": cid,
"oid": oid,
"userid": userid,
"scale": scale,
"expire_time": crawl_expire_time,
Expand Down Expand Up @@ -135,12 +138,10 @@ async def delete_crawl_job(self, crawl_id):
grace_period_seconds=0,
propagation_policy="Foreground",
)
return True
return {"success": True}

# pylint: disable=broad-except
except Exception as exc:
print("CrawlJob delete failed", exc)
return False
except ApiException as api_exc:
return {"error": str(api_exc.reason)}

async def delete_profile_browser(self, browserid):
"""delete custom crawljob object"""
Expand All @@ -156,9 +157,7 @@ async def delete_profile_browser(self, browserid):
)
return True

# pylint: disable=broad-except
except Exception as exc:
print("ProfileJob delete failed", exc)
except ApiException:
return False

async def get_profile_browser(self, browserid):
Expand Down
3 changes: 2 additions & 1 deletion backend/btrixcloud/main_scheduled_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ async def run(self):
userid = data["USER_ID"]
scale = int(data.get("INITIAL_SCALE", 0))
crawl_timeout = int(data.get("CRAWL_TIMEOUT", 0))
oid = data["ORG_ID"]

crawlconfig = await get_crawl_config(self.crawlconfigs, uuid.UUID(self.cid))

# k8s create
crawl_id = await self.new_crawl_job(
self.cid, userid, scale, crawl_timeout, manual=False
self.cid, userid, oid, scale, crawl_timeout, manual=False
)

# db create
Expand Down
Loading

0 comments on commit 00fb8ac

Please sign in to comment.