Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 55 additions & 18 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,30 @@ async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool)
# return whatever detail may be included in the response
raise HTTPException(status_code=400, detail=result.get("error"))

async def _crawl_queue_len(self, redis, key):
try:
return await redis.zcard(key)
except exceptions.ResponseError:
# fallback to old crawler queue
return await redis.llen(key)

async def _crawl_queue_range(self, redis, key, offset, count):
try:
return await redis.zrangebyscore(key, 0, "inf", offset, count)
except exceptions.ResponseError:
# fallback to old crawler queue
return reversed(await redis.lrange(key, -offset - count, -offset - 1))

async def _crawl_queue_rem(self, redis, key, values, dircount=1):
try:
return await redis.zrem(key, *values)
except exceptions.ResponseError:
# fallback to old crawler queue
res = 0
for value in values:
res += await redis.lrem(key, dircount, value)
return res

async def get_crawl_queue(self, crawl_id, offset, count, regex):
"""get crawl queue"""

Expand All @@ -511,9 +535,12 @@ async def get_crawl_queue(self, crawl_id, offset, count, regex):

try:
redis = await self.get_redis(crawl_id)
total = await redis.llen(f"{crawl_id}:q")
results = await redis.lrange(f"{crawl_id}:q", -offset - count, -offset - 1)
results = [json.loads(result)["url"] for result in reversed(results)]

total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
results = await self._crawl_queue_range(
redis, f"{crawl_id}:q", offset, count
)
results = [json.loads(result)["url"] for result in results]
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
Expand All @@ -525,17 +552,14 @@ async def get_crawl_queue(self, crawl_id, offset, count, regex):

return {"total": total, "results": results, "matched": matched}

async def iter_crawl_queue(self, regex, redis, crawl_id, total, step=50):
"""iterate over urls that match regex in crawl queue list"""

async def match_crawl_queue(self, crawl_id, regex):
"""get list of urls that match regex"""
total = 0
redis = None

try:
redis = await self.get_redis(crawl_id)
total = await redis.llen(f"{crawl_id}:q")
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
Expand All @@ -545,8 +569,8 @@ async def match_crawl_queue(self, crawl_id, regex):
step = 50

for count in range(0, total, step):
results = await redis.lrange(f"{crawl_id}:q", -count - step, -count - 1)
for result in reversed(results):
results = await self._crawl_queue_range(redis, f"{crawl_id}:q", count, step)
for result in results:
url = json.loads(result)["url"]
if regex.search(url):
matched.append(url)
Expand All @@ -555,6 +579,7 @@ async def match_crawl_queue(self, crawl_id, regex):

async def filter_crawl_queue(self, crawl_id, regex):
"""filter out urls that match regex"""
# pylint: disable=too-many-locals
total = 0
redis = None

Expand All @@ -563,7 +588,7 @@ async def filter_crawl_queue(self, crawl_id, regex):

try:
redis = await self.get_redis(crawl_id)
total = await redis.llen(q_key)
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
Expand All @@ -583,17 +608,29 @@ async def filter_crawl_queue(self, crawl_id, regex):
while count < total:
if dircount == -1 and count > total / 2:
dircount = 1
results = await redis.lrange(q_key, -count - step, -count - 1)
results = await self._crawl_queue_range(redis, q_key, count, step)
count += step
for result in reversed(results):

qrems = []
srems = []

for result in results:
url = json.loads(result)["url"]
if regex.search(url):
await redis.srem(s_key, url)
res = await redis.lrem(q_key, dircount, result)
if res:
count -= res
num_removed += res
print(f"Removed {result}: {res}", flush=True)
srems.append(url)
# await redis.srem(s_key, url)
# res = await self._crawl_queue_rem(redis, q_key, result, dircount)
qrems.append(result)

if not srems:
continue

await redis.srem(s_key, *srems)
res = await self._crawl_queue_rem(redis, q_key, qrems, dircount)
if res:
count -= res
num_removed += res
print(f"Removed {res} from queue", flush=True)

return num_removed

Expand Down