Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 7 additions & 19 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,21 +370,7 @@ async def get_active_crawls_size(self, oid: UUID) -> int:
cursor = self.crawls.aggregate(
[
{"$match": {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}},
{"$group": {"_id": None, "totalSum": {"$sum": "$stats.size"}}},
]
)
results = await cursor.to_list(length=1)
if not results:
return 0

return results[0].get("totalSum") or 0

async def get_active_crawls_uploaded_wacz_size(self, oid: UUID) -> int:
"""get size of all waczs already uploaded for running/paused crawls"""
cursor = self.crawls.aggregate(
[
{"$match": {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}},
{"$group": {"_id": None, "totalSum": {"$sum": "$fileSize"}}},
{"$group": {"_id": None, "totalSum": {"$sum": "$pendingSize"}}},
]
)
results = await cursor.to_list(length=1)
Expand Down Expand Up @@ -669,14 +655,16 @@ async def update_crawl_state_if_allowed(
return res is not None

async def update_running_crawl_stats(
self, crawl_id: str, is_qa: bool, stats: CrawlStats
self, crawl_id: str, is_qa: bool, stats: CrawlStats, pending_size: int
) -> bool:
"""update running crawl stats"""
prefix = "" if not is_qa else "qa."
query = {"_id": crawl_id, "type": "crawl", f"{prefix}state": "running"}
res = await self.crawls.find_one_and_update(
query, {"$set": {f"{prefix}stats": stats.dict()}}
)
update: dict[str, dict | int] = {f"{prefix}stats": stats.dict()}
if not is_qa:
update["pendingSize"] = pending_size

res = await self.crawls.find_one_and_update(query, {"$set": update})
return res is not None

async def inc_crawl_exec_time(
Expand Down
5 changes: 3 additions & 2 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,6 @@ class CrawlStats(BaseModel):
done: int = 0
size: int = 0

profile_update: Optional[str] = ""


# ============================================================================

Expand Down Expand Up @@ -907,6 +905,7 @@ class CrawlOut(BaseMongoModel):

fileSize: int = 0
fileCount: int = 0
pendingSize: int = 0

tags: Optional[List[str]] = []

Expand Down Expand Up @@ -1091,6 +1090,8 @@ class Crawl(BaseCrawl, CrawlConfigCore):
qa: Optional[QARun] = None
qaFinished: Optional[Dict[str, QARun]] = {}

pendingSize: int = 0


# ============================================================================
class CrawlCompleteIn(BaseModel):
Expand Down
137 changes: 64 additions & 73 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@
TYPE_RUNNING_STATES,
TYPE_ALL_CRAWL_STATES,
TYPE_PAUSED_STATES,
AUTO_PAUSED_STATES,
RUNNING_STATES,
WAITING_STATES,
RUNNING_AND_STARTING_ONLY,
RUNNING_AND_WAITING_STATES,
SUCCESSFUL_STATES,
FAILED_STATES,
PAUSED_STATES,
CrawlStats,
CrawlFile,
CrawlCompleteIn,
StorageRef,
Organization,
)

from btrixcloud.utils import (
Expand All @@ -48,6 +45,7 @@
from .models import (
CrawlSpec,
CrawlStatus,
OpCrawlStats,
StopReason,
MCBaseRequest,
MCSyncData,
Expand Down Expand Up @@ -398,7 +396,13 @@ async def sync_crawls(self, data: MCSyncData):
if status.pagesFound < status.desiredScale:
status.desiredScale = max(1, status.pagesFound)

is_paused = bool(crawl.paused_at) and status.state in PAUSED_STATES
# paused and shut down pods if size is <= 4096 (empty dir)
# paused_at is set state is a valid paused state
is_paused = (
bool(crawl.paused_at)
and status.sizePending <= 4096
and status.state in PAUSED_STATES
)

for i in range(0, status.desiredScale):
if status.pagesFound < i * num_browsers_per_pod:
Expand Down Expand Up @@ -686,7 +690,7 @@ async def set_state(
crawl: CrawlSpec,
allowed_from: Sequence[TYPE_ALL_CRAWL_STATES],
finished: Optional[datetime] = None,
stats: Optional[CrawlStats] = None,
stats: Optional[OpCrawlStats] = None,
):
"""set status state and update db, if changed
if allowed_from passed in, can only transition from allowed_from state,
Expand Down Expand Up @@ -837,7 +841,7 @@ async def fail_crawl(
crawl: CrawlSpec,
status: CrawlStatus,
pods: dict,
stats: CrawlStats,
stats: OpCrawlStats,
redis: Redis,
) -> bool:
"""Mark crawl as failed, log crawl state and print crawl logs, if possible"""
Expand Down Expand Up @@ -980,6 +984,10 @@ async def sync_crawl_state(
)

if not crawler_running and redis:
# clear paused key now so can resume
if crawl.paused_at:
await redis.delete(f"{crawl.id}:paused")

# if crawler is not running for REDIS_TTL seconds, also stop redis
# but not right away in case crawler pod is just restarting.
# avoids keeping redis pods around while no crawler pods are up
Expand All @@ -1006,12 +1014,12 @@ async def sync_crawl_state(
status.lastActiveTime = date_to_str(dt_now())

file_done = await redis.rpop(self.done_key)

while file_done:
msg = json.loads(file_done)
# add completed file
if msg.get("filename"):
await self.add_file_to_crawl(msg, crawl, redis)
await redis.incr("filesAdded")

# get next file done
file_done = await redis.rpop(self.done_key)
Expand Down Expand Up @@ -1381,7 +1389,7 @@ def get_log_line(self, message, details):
}
return json.dumps(err)

async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):
async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis) -> int:
"""Handle finished CrawlFile to db"""

filecomplete = CrawlCompleteIn(**cc_data)
Expand All @@ -1398,6 +1406,11 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):
)

await redis.incr("filesAddedSize", filecomplete.size)
await redis.incr("filesAdded")

# sizes = await redis.hkeys(f"{crawl.id}:size")
# for size in sizes:
# await redis.hmset(f"{crawl.id}:size", {size: 0 for size in sizes})

await self.crawl_ops.add_crawl_file(
crawl.db_crawl_id, crawl.is_qa, crawl_file, filecomplete.size
Expand All @@ -1407,7 +1420,7 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):

# no replicas for QA for now
if crawl.is_qa:
return True
return filecomplete.size

try:
await self.background_job_ops.create_replica_jobs(
Expand All @@ -1417,7 +1430,7 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):
except Exception as exc:
print("Replicate Exception", exc, flush=True)

return True
return filecomplete.size

async def is_crawl_stopping(
self, crawl: CrawlSpec, status: CrawlStatus
Expand Down Expand Up @@ -1446,8 +1459,7 @@ async def is_crawl_stopping(

# pause crawl if org is set read-only
if org.readOnly:
await self.pause_crawl(crawl, org)
return "paused_org_readonly"
return self.request_pause_crawl("paused_org_readonly", crawl)

# pause crawl if storage quota is reached
if org.quotas.storageQuota:
Expand All @@ -1457,44 +1469,35 @@ async def is_crawl_stopping(
active_crawls_total_size = await self.crawl_ops.get_active_crawls_size(
crawl.oid
)
print(f"Active crawls total size: {active_crawls_total_size}", flush=True)
already_uploaded_size = (
await self.crawl_ops.get_active_crawls_uploaded_wacz_size(crawl.oid)
)
print(
f"Active crawls already uploaded size: {already_uploaded_size}",
flush=True,
)
active_crawls_not_uploaded_size = (
active_crawls_total_size - already_uploaded_size
)
print(
f"Active crawls not yet uploaded size: {active_crawls_not_uploaded_size}",
flush=True,
)
if self.org_ops.storage_quota_reached(org, active_crawls_not_uploaded_size):
await self.pause_crawl(crawl, org)
return "paused_storage_quota_reached"

if self.org_ops.storage_quota_reached(org, active_crawls_total_size):
return self.request_pause_crawl("paused_storage_quota_reached", crawl)

# pause crawl if execution time quota is reached
if self.org_ops.exec_mins_quota_reached(org):
await self.pause_crawl(crawl, org)
return "paused_time_quota_reached"
return self.request_pause_crawl("paused_time_quota_reached", crawl)

if crawl.paused_at and status.stopReason not in PAUSED_STATES:
return "paused"

return None

async def pause_crawl(self, crawl: CrawlSpec, org: Organization):
"""Pause crawl and update crawl spec"""
paused_at = dt_now()
await self.crawl_ops.pause_crawl(crawl.id, org, pause=True, paused_at=paused_at)
crawl.paused_at = paused_at
def request_pause_crawl(
self, reason: StopReason, crawl: CrawlSpec
) -> Optional[StopReason]:
"""Request crawl to be paused asynchronously, equivalent of user clicking 'pause' button
if crawl is paused, then use the specified reason instead of default paused state
"""
if crawl.paused_at:
return reason

print(f"request pause for {reason}")
self.run_task(self.crawl_ops.pause_crawl(crawl.id, crawl.org, pause=True))
return None

async def get_redis_crawl_stats(
self, redis: Redis, crawl_id: str
) -> tuple[CrawlStats, dict[str, Any]]:
) -> tuple[OpCrawlStats, dict[str, Any]]:
"""get page stats"""
try:
# crawler >0.9.0, done key is a value
Expand All @@ -1514,7 +1517,7 @@ async def get_redis_crawl_stats(

profile_update = await redis.get(f"{crawl_id}:profileUploaded")

stats = CrawlStats(
stats = OpCrawlStats(
found=pages_found,
done=pages_done,
size=archive_size,
Expand All @@ -1534,45 +1537,36 @@ async def update_crawl_state(
results = await redis.hgetall(f"{crawl.id}:status")
stats, sizes = await self.get_redis_crawl_stats(redis, crawl.id)

print(f"crawl.paused_at: {crawl.paused_at}", flush=True)
print(f"crawl.stopping: {crawl.stopping}", flush=True)
print(f"status.stopReason: {status.stopReason}", flush=True)
pending_size = stats.size

print(f"stats.size initial: {stats.size}", flush=True)
stats.size += status.filesAddedSize

total_size = stats.size

print(f"pending size: {pending_size}", flush=True)
print(f"status.filesAdded: {status.filesAdded}", flush=True)
print(f"status.filesAddedSize: {status.filesAddedSize}", flush=True)

# need to add size of previously completed WACZ files as well!
# TODO: Fix this so that it works as expected with pausing
# - The if clause here is close to a solution except it still results
# in pauses after the first showing a smaller-than-expected size
# because it no longer counts files added previous to resuming the crawl.
# - Kind of seems like what we need here is either a way of still adding
# files added prior to the current pause without double-adding files
# that are currently being uploaded.
# - Another way to do that might be to have the crawler decrement the size
# of a crawl by the amount of WACZs that are uploaded, so that this here
# in the operator can stay simpler?
if status.stopReason not in PAUSED_STATES:
stats.size += status.filesAddedSize
print(f"stats.size after adding filesAddedSize: {stats.size}", flush=True)
else:
print(
"not adding filesAddedSize to stats.size, crawl is pausing", flush=True
)
print(f"total: {total_size}", flush=True)
print(
f"org quota: {crawl.org.bytesStored + stats.size} <= {crawl.org.quotas.storageQuota}",
flush=True,
)

# update status
status.pagesDone = stats.done
status.pagesFound = stats.found
status.size = stats.size

status.sizePending = pending_size
status.size = total_size
status.sizeHuman = humanize.naturalsize(status.size)

await self.crawl_ops.update_running_crawl_stats(
crawl.db_crawl_id, crawl.is_qa, stats
crawl.db_crawl_id, crawl.is_qa, stats, pending_size
)

for key, value in sizes.items():
increase_storage = False
pod_info = None
value = int(value)
if value > 0 and status.podStatus:
pod_info = status.podStatus[key]
Expand All @@ -1588,11 +1582,11 @@ async def update_crawl_state(
increase_storage = True

# out of storage
if pod_info.isNewExit and pod_info.exitCode == 3:
if pod_info and pod_info.isNewExit and pod_info.exitCode == 3:
pod_info.used.storage = pod_info.allocated.storage
increase_storage = True

if increase_storage:
if pod_info and increase_storage:
new_storage = math.ceil(
pod_info.used.storage * self.min_avail_storage_ratio / 1_000_000_000
)
Expand Down Expand Up @@ -1655,18 +1649,15 @@ async def update_crawl_state(
else:
paused_state = "paused"

await redis.delete(f"{crawl.id}:paused")
# await redis.delete(f"{crawl.id}:paused")
await self.set_state(
paused_state,
status,
crawl,
allowed_from=RUNNING_AND_WAITING_STATES,
)

if (
paused_state in AUTO_PAUSED_STATES
and not status.autoPausedEmailsSent
):
if paused_state != "paused" and not status.autoPausedEmailsSent:
await self.crawl_ops.notify_org_admins_of_auto_paused_crawl(
paused_reason=paused_state,
cid=crawl.cid,
Expand Down Expand Up @@ -1731,7 +1722,7 @@ async def mark_finished(
crawl: CrawlSpec,
status: CrawlStatus,
state: TYPE_NON_RUNNING_STATES,
stats: Optional[CrawlStats] = None,
stats: Optional[OpCrawlStats] = None,
) -> bool:
"""mark crawl as finished, set finished timestamp and final state"""

Expand Down Expand Up @@ -1775,7 +1766,7 @@ async def do_crawl_finished_tasks(
crawl: CrawlSpec,
status: CrawlStatus,
state: TYPE_NON_RUNNING_STATES,
stats: Optional[CrawlStats],
stats: Optional[OpCrawlStats],
) -> None:
"""Run tasks after crawl completes in asyncio.task coroutine."""
await self.crawl_config_ops.stats_recompute_last(
Expand Down
Loading
Loading