diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 9f48ab0056..78d18d6b1c 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -370,21 +370,7 @@ async def get_active_crawls_size(self, oid: UUID) -> int: cursor = self.crawls.aggregate( [ {"$match": {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}}, - {"$group": {"_id": None, "totalSum": {"$sum": "$stats.size"}}}, - ] - ) - results = await cursor.to_list(length=1) - if not results: - return 0 - - return results[0].get("totalSum") or 0 - - async def get_active_crawls_uploaded_wacz_size(self, oid: UUID) -> int: - """get size of all waczs already uploaded for running/paused crawls""" - cursor = self.crawls.aggregate( - [ - {"$match": {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}}, - {"$group": {"_id": None, "totalSum": {"$sum": "$fileSize"}}}, + {"$group": {"_id": None, "totalSum": {"$sum": "$pendingSize"}}}, ] ) results = await cursor.to_list(length=1) @@ -669,14 +655,16 @@ async def update_crawl_state_if_allowed( return res is not None async def update_running_crawl_stats( - self, crawl_id: str, is_qa: bool, stats: CrawlStats + self, crawl_id: str, is_qa: bool, stats: CrawlStats, pending_size: int ) -> bool: """update running crawl stats""" prefix = "" if not is_qa else "qa." query = {"_id": crawl_id, "type": "crawl", f"{prefix}state": "running"} - res = await self.crawls.find_one_and_update( - query, {"$set": {f"{prefix}stats": stats.dict()}} - ) + update: dict[str, dict | int] = {f"{prefix}stats": stats.dict()} + if not is_qa: + update["pendingSize"] = pending_size + + res = await self.crawls.find_one_and_update(query, {"$set": update}) return res is not None async def inc_crawl_exec_time( diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 39f4edc904..4d387074cc 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -304,8 +304,6 @@ class CrawlStats(BaseModel): done: int = 0 size: int = 0 - profile_update: Optional[str] = "" - # ============================================================================ @@ -907,6 +905,7 @@ class CrawlOut(BaseMongoModel): fileSize: int = 0 fileCount: int = 0 + pendingSize: int = 0 tags: Optional[List[str]] = [] @@ -1091,6 +1090,8 @@ class Crawl(BaseCrawl, CrawlConfigCore): qa: Optional[QARun] = None qaFinished: Optional[Dict[str, QARun]] = {} + pendingSize: int = 0 + # ============================================================================ class CrawlCompleteIn(BaseModel): diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 1292537880..33cabec192 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -22,7 +22,6 @@ TYPE_RUNNING_STATES, TYPE_ALL_CRAWL_STATES, TYPE_PAUSED_STATES, - AUTO_PAUSED_STATES, RUNNING_STATES, WAITING_STATES, RUNNING_AND_STARTING_ONLY, @@ -30,11 +29,9 @@ SUCCESSFUL_STATES, FAILED_STATES, PAUSED_STATES, - CrawlStats, CrawlFile, CrawlCompleteIn, StorageRef, - Organization, ) from btrixcloud.utils import ( @@ -48,6 +45,7 @@ from .models import ( CrawlSpec, CrawlStatus, + OpCrawlStats, StopReason, MCBaseRequest, MCSyncData, @@ -398,7 +396,13 @@ async def sync_crawls(self, data: MCSyncData): if status.pagesFound < status.desiredScale: status.desiredScale = max(1, status.pagesFound) - is_paused = bool(crawl.paused_at) and status.state in PAUSED_STATES + # paused and shut down pods if size is <= 4096 (empty dir) + # paused_at is set state is a valid paused state + is_paused = ( + bool(crawl.paused_at) + and status.sizePending <= 4096 + and status.state in PAUSED_STATES + ) for i in range(0, status.desiredScale): if status.pagesFound < i * num_browsers_per_pod: @@ -686,7 +690,7 @@ async def set_state( crawl: CrawlSpec, allowed_from: Sequence[TYPE_ALL_CRAWL_STATES], finished: Optional[datetime] = None, - stats: Optional[CrawlStats] = None, + stats: Optional[OpCrawlStats] = None, ): """set status state and update db, if changed if allowed_from passed in, can only transition from allowed_from state, @@ -837,7 +841,7 @@ async def fail_crawl( crawl: CrawlSpec, status: CrawlStatus, pods: dict, - stats: CrawlStats, + stats: OpCrawlStats, redis: Redis, ) -> bool: """Mark crawl as failed, log crawl state and print crawl logs, if possible""" @@ -980,6 +984,10 @@ async def sync_crawl_state( ) if not crawler_running and redis: + # clear paused key now so can resume + if crawl.paused_at: + await redis.delete(f"{crawl.id}:paused") + # if crawler is not running for REDIS_TTL seconds, also stop redis # but not right away in case crawler pod is just restarting. # avoids keeping redis pods around while no crawler pods are up @@ -1006,12 +1014,12 @@ async def sync_crawl_state( status.lastActiveTime = date_to_str(dt_now()) file_done = await redis.rpop(self.done_key) + while file_done: msg = json.loads(file_done) # add completed file if msg.get("filename"): await self.add_file_to_crawl(msg, crawl, redis) - await redis.incr("filesAdded") # get next file done file_done = await redis.rpop(self.done_key) @@ -1381,7 +1389,7 @@ def get_log_line(self, message, details): } return json.dumps(err) - async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis): + async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis) -> int: """Handle finished CrawlFile to db""" filecomplete = CrawlCompleteIn(**cc_data) @@ -1398,6 +1406,11 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis): ) await redis.incr("filesAddedSize", filecomplete.size) + await redis.incr("filesAdded") + + # sizes = await redis.hkeys(f"{crawl.id}:size") + # for size in sizes: + # await redis.hmset(f"{crawl.id}:size", {size: 0 for size in sizes}) await self.crawl_ops.add_crawl_file( crawl.db_crawl_id, crawl.is_qa, crawl_file, filecomplete.size @@ -1407,7 +1420,7 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis): # no replicas for QA for now if crawl.is_qa: - return True + return filecomplete.size try: await self.background_job_ops.create_replica_jobs( @@ -1417,7 +1430,7 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis): except Exception as exc: print("Replicate Exception", exc, flush=True) - return True + return filecomplete.size async def is_crawl_stopping( self, crawl: CrawlSpec, status: CrawlStatus @@ -1446,8 +1459,7 @@ async def is_crawl_stopping( # pause crawl if org is set read-only if org.readOnly: - await self.pause_crawl(crawl, org) - return "paused_org_readonly" + return self.request_pause_crawl("paused_org_readonly", crawl) # pause crawl if storage quota is reached if org.quotas.storageQuota: @@ -1457,44 +1469,35 @@ async def is_crawl_stopping( active_crawls_total_size = await self.crawl_ops.get_active_crawls_size( crawl.oid ) - print(f"Active crawls total size: {active_crawls_total_size}", flush=True) - already_uploaded_size = ( - await self.crawl_ops.get_active_crawls_uploaded_wacz_size(crawl.oid) - ) - print( - f"Active crawls already uploaded size: {already_uploaded_size}", - flush=True, - ) - active_crawls_not_uploaded_size = ( - active_crawls_total_size - already_uploaded_size - ) - print( - f"Active crawls not yet uploaded size: {active_crawls_not_uploaded_size}", - flush=True, - ) - if self.org_ops.storage_quota_reached(org, active_crawls_not_uploaded_size): - await self.pause_crawl(crawl, org) - return "paused_storage_quota_reached" + + if self.org_ops.storage_quota_reached(org, active_crawls_total_size): + return self.request_pause_crawl("paused_storage_quota_reached", crawl) # pause crawl if execution time quota is reached if self.org_ops.exec_mins_quota_reached(org): - await self.pause_crawl(crawl, org) - return "paused_time_quota_reached" + return self.request_pause_crawl("paused_time_quota_reached", crawl) if crawl.paused_at and status.stopReason not in PAUSED_STATES: return "paused" return None - async def pause_crawl(self, crawl: CrawlSpec, org: Organization): - """Pause crawl and update crawl spec""" - paused_at = dt_now() - await self.crawl_ops.pause_crawl(crawl.id, org, pause=True, paused_at=paused_at) - crawl.paused_at = paused_at + def request_pause_crawl( + self, reason: StopReason, crawl: CrawlSpec + ) -> Optional[StopReason]: + """Request crawl to be paused asynchronously, equivalent of user clicking 'pause' button + if crawl is paused, then use the specified reason instead of default paused state + """ + if crawl.paused_at: + return reason + + print(f"request pause for {reason}") + self.run_task(self.crawl_ops.pause_crawl(crawl.id, crawl.org, pause=True)) + return None async def get_redis_crawl_stats( self, redis: Redis, crawl_id: str - ) -> tuple[CrawlStats, dict[str, Any]]: + ) -> tuple[OpCrawlStats, dict[str, Any]]: """get page stats""" try: # crawler >0.9.0, done key is a value @@ -1514,7 +1517,7 @@ async def get_redis_crawl_stats( profile_update = await redis.get(f"{crawl_id}:profileUploaded") - stats = CrawlStats( + stats = OpCrawlStats( found=pages_found, done=pages_done, size=archive_size, @@ -1534,45 +1537,36 @@ async def update_crawl_state( results = await redis.hgetall(f"{crawl.id}:status") stats, sizes = await self.get_redis_crawl_stats(redis, crawl.id) - print(f"crawl.paused_at: {crawl.paused_at}", flush=True) - print(f"crawl.stopping: {crawl.stopping}", flush=True) - print(f"status.stopReason: {status.stopReason}", flush=True) + pending_size = stats.size - print(f"stats.size initial: {stats.size}", flush=True) + stats.size += status.filesAddedSize + + total_size = stats.size + + print(f"pending size: {pending_size}", flush=True) print(f"status.filesAdded: {status.filesAdded}", flush=True) print(f"status.filesAddedSize: {status.filesAddedSize}", flush=True) - - # need to add size of previously completed WACZ files as well! - # TODO: Fix this so that it works as expected with pausing - # - The if clause here is close to a solution except it still results - # in pauses after the first showing a smaller-than-expected size - # because it no longer counts files added previous to resuming the crawl. - # - Kind of seems like what we need here is either a way of still adding - # files added prior to the current pause without double-adding files - # that are currently being uploaded. - # - Another way to do that might be to have the crawler decrement the size - # of a crawl by the amount of WACZs that are uploaded, so that this here - # in the operator can stay simpler? - if status.stopReason not in PAUSED_STATES: - stats.size += status.filesAddedSize - print(f"stats.size after adding filesAddedSize: {stats.size}", flush=True) - else: - print( - "not adding filesAddedSize to stats.size, crawl is pausing", flush=True - ) + print(f"total: {total_size}", flush=True) + print( + f"org quota: {crawl.org.bytesStored + stats.size} <= {crawl.org.quotas.storageQuota}", + flush=True, + ) # update status status.pagesDone = stats.done status.pagesFound = stats.found - status.size = stats.size + + status.sizePending = pending_size + status.size = total_size status.sizeHuman = humanize.naturalsize(status.size) await self.crawl_ops.update_running_crawl_stats( - crawl.db_crawl_id, crawl.is_qa, stats + crawl.db_crawl_id, crawl.is_qa, stats, pending_size ) for key, value in sizes.items(): increase_storage = False + pod_info = None value = int(value) if value > 0 and status.podStatus: pod_info = status.podStatus[key] @@ -1588,11 +1582,11 @@ async def update_crawl_state( increase_storage = True # out of storage - if pod_info.isNewExit and pod_info.exitCode == 3: + if pod_info and pod_info.isNewExit and pod_info.exitCode == 3: pod_info.used.storage = pod_info.allocated.storage increase_storage = True - if increase_storage: + if pod_info and increase_storage: new_storage = math.ceil( pod_info.used.storage * self.min_avail_storage_ratio / 1_000_000_000 ) @@ -1655,7 +1649,7 @@ async def update_crawl_state( else: paused_state = "paused" - await redis.delete(f"{crawl.id}:paused") + # await redis.delete(f"{crawl.id}:paused") await self.set_state( paused_state, status, @@ -1663,10 +1657,7 @@ async def update_crawl_state( allowed_from=RUNNING_AND_WAITING_STATES, ) - if ( - paused_state in AUTO_PAUSED_STATES - and not status.autoPausedEmailsSent - ): + if paused_state != "paused" and not status.autoPausedEmailsSent: await self.crawl_ops.notify_org_admins_of_auto_paused_crawl( paused_reason=paused_state, cid=crawl.cid, @@ -1731,7 +1722,7 @@ async def mark_finished( crawl: CrawlSpec, status: CrawlStatus, state: TYPE_NON_RUNNING_STATES, - stats: Optional[CrawlStats] = None, + stats: Optional[OpCrawlStats] = None, ) -> bool: """mark crawl as finished, set finished timestamp and final state""" @@ -1775,7 +1766,7 @@ async def do_crawl_finished_tasks( crawl: CrawlSpec, status: CrawlStatus, state: TYPE_NON_RUNNING_STATES, - stats: Optional[CrawlStats], + stats: Optional[OpCrawlStats], ) -> None: """Run tasks after crawl completes in asyncio.task coroutine.""" await self.crawl_config_ops.stats_recompute_last( diff --git a/backend/btrixcloud/operator/models.py b/backend/btrixcloud/operator/models.py index 56275bdee1..60099ee857 100644 --- a/backend/btrixcloud/operator/models.py +++ b/backend/btrixcloud/operator/models.py @@ -6,7 +6,12 @@ from typing import Optional, DefaultDict, Literal, Annotated, Any from pydantic import BaseModel, Field from kubernetes.utils import parse_quantity -from btrixcloud.models import StorageRef, TYPE_ALL_CRAWL_STATES, Organization +from btrixcloud.models import ( + StorageRef, + TYPE_ALL_CRAWL_STATES, + Organization, + CrawlStats, +) BTRIX_API = "btrix.cloud/v1" @@ -203,6 +208,13 @@ def should_restart_pod(self, forced: bool = False) -> Optional[str]: return None +# ============================================================================ +class OpCrawlStats(CrawlStats): + """crawl stats + internal profile update""" + + profile_update: Optional[str] = "" + + # ============================================================================ # pylint: disable=invalid-name class CrawlStatus(BaseModel): @@ -215,6 +227,9 @@ class CrawlStatus(BaseModel): # human readable size string sizeHuman: str = "" + # pending size (not uploaded) + sizePending: int = 0 + # actual observed scale (number of pods active) scale: int = 0 # desired scale as computed by crawl state (number of pods that should be active)