Skip to content

Commit

Permalink
Fix Save/Load State (#495)
Browse files Browse the repository at this point in the history
- Fixes state serialization, which was missing the done list. Instead,
adds a 'finished' list computed from the seen list, minus failed and
queued URLs.
- Also adds serialization support for 'extraSeeds', seeds added
dynamically from a redirect (via #475). Extra seeds are added to Redis
and also included in the serialization.

Fixes #491
  • Loading branch information
ikreymer committed Mar 16, 2024
1 parent fa37f62 commit 6d04c95
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 89 deletions.
37 changes: 26 additions & 11 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,18 @@ export class Crawler {
os.hostname(),
);

// load full state from config
if (this.params.state) {
await this.crawlState.load(
this.params.state,
this.params.scopedSeeds,
true,
);
// otherwise, just load extra seeds
} else {
await this.loadExtraSeeds();
}

// clear any pending URLs from this instance
await this.crawlState.clearOwnPendingLocks();

Expand All @@ -348,6 +360,15 @@ export class Crawler {
return this.crawlState;
}

async loadExtraSeeds() {
const extraSeeds = await this.crawlState.getExtraSeeds();

for (const { origSeedId, newUrl } of extraSeeds) {
const seed = this.params.scopedSeeds[origSeedId];
this.params.scopedSeeds.push(seed.newScopedSeed(newUrl));
}
}

initScreenCaster() {
let transport;

Expand Down Expand Up @@ -1190,14 +1211,6 @@ self.__bx_behaviors.selectMainBehavior();

await this.crawlState.setStatus("running");

if (this.params.state) {
await this.crawlState.load(
this.params.state,
this.params.scopedSeeds,
true,
);
}

await this.initPages();

this.adBlockRules = new AdBlockRules(
Expand Down Expand Up @@ -1577,9 +1590,11 @@ self.__bx_behaviors.selectMainBehavior();
const isChromeError = page.url().startsWith("chrome-error://");

if (depth === 0 && !isChromeError && respUrl !== url) {
const seed = this.params.scopedSeeds[data.seedId];
this.params.scopedSeeds.push(seed.newScopedSeed(respUrl));
data.seedId = this.params.scopedSeeds.length - 1;
data.seedId = await this.crawlState.addExtraSeed(
this.params.scopedSeeds,
data.seedId,
respUrl,
);
logger.info("Seed page redirected, adding redirected seed", {
origUrl: url,
newUrl: respUrl,
Expand Down
155 changes: 125 additions & 30 deletions src/util/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export type QueueEntry = {
extraHops: number;
};

// ============================================================================
export type ExtraRedirectSeed = {
newUrl: string;
origSeedId: number;
};

// ============================================================================
export type PageCallbacks = {
addLink?: (url: string) => Promise<void>;
Expand Down Expand Up @@ -127,6 +133,17 @@ declare module "ioredis" {
}
}

// ============================================================================
type SaveState = {
done?: number | string[];
finished: string[];
queued: string[];
pending: string[];
failed: string[];
errors: string[];
extraSeeds: string[];
};

// ============================================================================
export class RedisCrawlState {
redis: Redis;
Expand All @@ -143,6 +160,7 @@ export class RedisCrawlState {
fkey: string;
ekey: string;
pageskey: string;
esKey: string;

constructor(redis: Redis, key: string, maxPageTime: number, uid: string) {
this.redis = redis;
Expand All @@ -163,6 +181,8 @@ export class RedisCrawlState {
// pages
this.pageskey = this.key + ":pages";

this.esKey = this.key + ":extraSeeds";

this._initLuaCommands(this.redis);
}

Expand Down Expand Up @@ -492,22 +512,50 @@ return 0;
return !!(await this.redis.sismember(this.skey, url));
}

async serialize() {
async serialize(): Promise<SaveState> {
//const queued = await this._iterSortKey(this.qkey);
const done = await this.numDone();
const queued = await this._iterSortedKey(this.qkey);
// const done = await this.numDone();
const seen = await this._iterSet(this.skey);
const queued = await this._iterSortedKey(this.qkey, seen);
const pending = await this.getPendingList();
const failed = await this._iterListKeys(this.fkey);
const failed = await this._iterListKeys(this.fkey, seen);
const errors = await this.getErrorList();
const extraSeeds = await this._iterListKeys(this.esKey, seen);

const finished = [...seen.values()];

return { done, queued, pending, failed, errors };
return { extraSeeds, finished, queued, pending, failed, errors };
}

_getScore(data: QueueEntry) {
return (data.depth || 0) + (data.extraHops || 0) * MAX_DEPTH;
}

async _iterSortedKey(key: string, inc = 100) {
async _iterSet(key: string, count = 100) {
const stream = this.redis.sscanStream(key, { count });

const results: Set<string> = new Set<string>();

stream.on("data", async (someResults: string[]) => {
stream.pause();

for (const result of someResults) {
results.add(result);
}

stream.resume();
});

await new Promise<void>((resolve) => {
stream.on("end", () => {
resolve();
});
});

return results;
}

async _iterSortedKey(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];

const len = await this.redis.zcard(key);
Expand All @@ -521,33 +569,35 @@ return 0;
i,
inc,
);
results.push(...someResults);

for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}

return results;
}

async _iterListKeys(key: string, inc = 100) {
async _iterListKeys(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];

const len = await this.redis.llen(key);

for (let i = 0; i < len; i += inc) {
const someResults = await this.redis.lrange(key, i, i + inc - 1);
results.push(...someResults);

for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}
return results;
}

async load(
// TODO: Fix this the next time the file is edited.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
state: Record<string, any>,
seeds: ScopedSeed[],
checkScope: boolean,
) {
const seen: string[] = [];

async load(state: SaveState, seeds: ScopedSeed[], checkScope: boolean) {
// need to delete existing keys, if exist to fully reset state
await this.redis.del(this.qkey);
await this.redis.del(this.pkey);
Expand All @@ -556,6 +606,21 @@ return 0;
await this.redis.del(this.skey);
await this.redis.del(this.ekey);

let seen: string[] = [];

if (state.finished) {
seen = state.finished;

await this.redis.set(this.dkey, state.finished.length);
}

if (state.extraSeeds) {
for (const extraSeed of state.extraSeeds) {
const { newUrl, origSeedId }: ExtraRedirectSeed = JSON.parse(extraSeed);
await this.addExtraSeed(seeds, origSeedId, newUrl);
}
}

for (const json of state.queued) {
const data = JSON.parse(json);
if (checkScope) {
Expand All @@ -580,19 +645,23 @@ return 0;
seen.push(data.url);
}

if (typeof state.done === "number") {
// done key is just an int counter
await this.redis.set(this.dkey, state.done);
} else if (state.done instanceof Array) {
// for backwards compatibility with old save states
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.incr(this.dkey);
// backwards compatibility: not using done, instead 'finished'
// contains list of finished URLs
if (state.done) {
if (typeof state.done === "number") {
// done key is just an int counter
await this.redis.set(this.dkey, state.done);
} else if (state.done instanceof Array) {
// for backwards compatibility with old save states
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.incr(this.dkey);
}
seen.push(data.url);
}
seen.push(data.url);
}
}

Expand Down Expand Up @@ -698,4 +767,30 @@ return 0;
async writeToPagesQueue(value: string) {
return await this.redis.lpush(this.pageskey, value);
}

// add extra seeds from redirect
async addExtraSeed(seeds: ScopedSeed[], origSeedId: number, newUrl: string) {
if (!seeds[origSeedId]) {
logger.fatal(
"State load, original seed missing",
{ origSeedId },
"state",
);
}
seeds.push(seeds[origSeedId].newScopedSeed(newUrl));
const newSeedId = seeds.length - 1;
const redirectSeed: ExtraRedirectSeed = { origSeedId, newUrl };
await this.redis.sadd(this.skey, newUrl);
await this.redis.lpush(this.esKey, JSON.stringify(redirectSeed));
return newSeedId;
}

async getExtraSeeds() {
const seeds: ExtraRedirectSeed[] = [];
const res = await this.redis.lrange(this.esKey, 0, -1);
for (const key of res) {
seeds.push(JSON.parse(key));
}
return seeds;
}
}

0 comments on commit 6d04c95

Please sign in to comment.