Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify WARC writing + CDXJ indexing into single class #507

Merged
merged 5 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 76 additions & 21 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import { CDPSession, Frame, HTTPRequest, Page } from "puppeteer-core";
import { Recorder } from "./util/recorder.js";
import { SitemapReader } from "./util/sitemapper.js";
import { ScopedSeed } from "./util/seeds.js";
import { WARCWriter } from "./util/warcwriter.js";

const HTTPS_AGENT = new HTTPSAgent({
rejectUnauthorized: false,
Expand Down Expand Up @@ -149,6 +150,11 @@ export class Crawler {
pagesFile: string;

archivesDir: string;
tempdir: string;
tempCdxDir: string;

screenshotWriter: WARCWriter | null;
textWriter: WARCWriter | null;

blockRules: BlockRules | null;
adBlockRules: AdBlockRules | null;
Expand Down Expand Up @@ -177,8 +183,6 @@ export class Crawler {
maxHeapUsed = 0;
maxHeapTotal = 0;

warcPrefix: string;

driver!: (opts: {
page: Page;
data: PageState;
Expand Down Expand Up @@ -271,6 +275,11 @@ export class Crawler {

// archives dir
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");

this.screenshotWriter = null;
this.textWriter = null;

this.blockRules = null;
this.adBlockRules = null;
Expand All @@ -288,12 +297,6 @@ export class Crawler {
this.customBehaviors = "";

this.browser = new Browser();

this.warcPrefix = process.env.WARC_PREFIX || this.params.warcPrefix || "";

if (this.warcPrefix) {
this.warcPrefix += "-" + this.crawlId + "-";
}
}

protected parseArgs() {
Expand Down Expand Up @@ -447,14 +450,10 @@ export class Crawler {

subprocesses.push(this.launchRedis());

//const initRes = child_process.spawnSync("wb-manager", ["init", this.params.collection], {cwd: this.params.cwd});

//if (initRes.status) {
// logger.info("wb-manager init failed, collection likely already exists");
//}

await fsp.mkdir(this.logDir, { recursive: true });
await fsp.mkdir(this.archivesDir, { recursive: true });
await fsp.mkdir(this.tempdir, { recursive: true });
await fsp.mkdir(this.tempCdxDir, { recursive: true });

this.logFH = fs.createWriteStream(this.logFilename);
logger.setExternalLogStream(this.logFH);
Expand Down Expand Up @@ -514,6 +513,13 @@ export class Crawler {
{ detached: RUN_DETACHED },
);
}

if (this.params.screenshot) {
this.screenshotWriter = this.createExtraResourceWarcWriter("screenshots");
}
if (this.params.text) {
this.textWriter = this.createExtraResourceWarcWriter("text");
}
}

extraChromeArgs() {
Expand Down Expand Up @@ -812,16 +818,15 @@ self.__bx_behaviors.selectMainBehavior();

const logDetails = { page: url, workerid };

if (this.params.screenshot) {
if (this.params.screenshot && this.screenshotWriter) {
if (!data.isHTMLPage) {
logger.debug("Skipping screenshots for non-HTML page", logDetails);
}
const screenshots = new Screenshots({
warcPrefix: this.warcPrefix,
browser: this.browser,
page,
url,
directory: this.archivesDir,
writer: this.screenshotWriter,
});
if (this.params.screenshot.includes("view")) {
await screenshots.take("view", saveOutput ? data : null);
Expand All @@ -836,11 +841,10 @@ self.__bx_behaviors.selectMainBehavior();

let textextract = null;

if (data.isHTMLPage) {
if (data.isHTMLPage && this.textWriter) {
textextract = new TextExtractViaSnapshot(cdp, {
warcPrefix: this.warcPrefix,
writer: this.textWriter,
url,
directory: this.archivesDir,
skipDocs: this.skipTextDocs,
});
const { text } = await textextract.extractAndStoreText(
Expand Down Expand Up @@ -1151,6 +1155,7 @@ self.__bx_behaviors.selectMainBehavior();
if (this.interrupted) {
await this.browser.close();
await closeWorkers(0);
await this.closeFiles();
await this.setStatusAndExit(13, "interrupted");
} else {
await this.setStatusAndExit(0, "done");
Expand Down Expand Up @@ -1298,6 +1303,8 @@ self.__bx_behaviors.selectMainBehavior();
await this.pagesFH.close();
}

await this.closeFiles();

await this.writeStats();

// if crawl has been stopped, mark as final exit for post-crawl tasks
Expand All @@ -1308,6 +1315,15 @@ self.__bx_behaviors.selectMainBehavior();
await this.postCrawl();
}

async closeFiles() {
if (this.textWriter) {
await this.textWriter.flush();
}
if (this.screenshotWriter) {
await this.screenshotWriter.flush();
}
}

protected async _addInitialSeeds() {
for (let i = 0; i < this.params.scopedSeeds.length; i++) {
const seed = this.params.scopedSeeds[i];
Expand Down Expand Up @@ -2363,15 +2379,54 @@ self.__bx_behaviors.selectMainBehavior();
}
}

getWarcPrefix(defaultValue = "") {
tw4l marked this conversation as resolved.
Show resolved Hide resolved
let warcPrefix =
process.env.WARC_PREFIX || this.params.warcPrefix || defaultValue;

if (warcPrefix) {
warcPrefix += "-" + this.crawlId + "-";
}

return warcPrefix;
}

createExtraResourceWarcWriter(resourceName: string, gzip = true) {
const filenameBase = `${this.getWarcPrefix()}${resourceName}`;

return this.createWarcWriter(filenameBase, gzip, { resourceName });
}

createWarcWriter(
filenameBase: string,
gzip: boolean,
logDetails: Record<string, string>,
) {
const filenameTemplate = `${filenameBase}.warc${gzip ? ".gz" : ""}`;

return new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
filenameTemplate,
rolloverSize: this.params.rolloverSize,
gzip,
logDetails,
});
}

createRecorder(id: number): Recorder | null {
if (!this.recording) {
return null;
}

const filenameBase = `${this.getWarcPrefix("rec")}$ts-${id}`;
tw4l marked this conversation as resolved.
Show resolved Hide resolved

const writer = this.createWarcWriter(filenameBase, true, { id: id + "" });
ikreymer marked this conversation as resolved.
Show resolved Hide resolved

const res = new Recorder({
workerid: id,
collDir: this.collDir,
crawler: this,
writer,
tempdir: this.tempdir,
});

this.browser.recorders.push(res);
Expand Down
26 changes: 15 additions & 11 deletions src/replaycrawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { ZipRangeReader } from "@webrecorder/wabac/src/wacz/ziprangereader.js";
import { createLoader } from "@webrecorder/wabac/src/blockloaders.js";

import { AsyncIterReader } from "warcio";
import { WARCResourceWriter } from "./util/warcresourcewriter.js";
import { parseArgs } from "./util/argParser.js";

import { PNG } from "pngjs";
Expand All @@ -25,6 +24,7 @@ import pixelmatch from "pixelmatch";
import levenshtein from "js-levenshtein";
import { MAX_URL_LENGTH } from "./util/reqresp.js";
import { openAsBlob } from "fs";
import { WARCWriter } from "./util/warcwriter.js";

// RWP Replay Prefix
const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/";
Expand Down Expand Up @@ -67,6 +67,7 @@ export class ReplayCrawler extends Crawler {
qaSource: string;

pageInfos: Map<Page, ReplayPageInfoRecord>;
infoWriter: WARCWriter | null;

reloadTimeouts: WeakMap<Page, NodeJS.Timeout>;

Expand Down Expand Up @@ -98,6 +99,14 @@ export class ReplayCrawler extends Crawler {
this.params.serviceWorker = "enabled";

this.reloadTimeouts = new WeakMap<Page, NodeJS.Timeout>();

this.infoWriter = null;
}

async bootstrap(): Promise<void> {
await super.bootstrap();

this.infoWriter = this.createExtraResourceWarcWriter("info");
}

protected parseArgs() {
Expand Down Expand Up @@ -666,18 +675,13 @@ export class ReplayCrawler extends Crawler {
(state as ComparisonPageState).comparison = comparison;
}

const writer = new WARCResourceWriter({
await this.infoWriter?.writeNewResourceRecord({
buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
resourceType: "pageinfo",
contentType: "application/json",
url: pageInfo.url,
directory: this.archivesDir,
warcPrefix: this.warcPrefix,
date: new Date(),
warcName: "info.warc.gz",
});
await writer.writeBufferToWARC(
new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
"pageinfo",
"application/json",
);

this.pageInfos.delete(page);
}
}
Expand Down
61 changes: 18 additions & 43 deletions src/util/recorder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import fs from "fs";
import path from "path";
import os from "os";

import { v4 as uuidv4 } from "uuid";

Expand All @@ -24,7 +22,6 @@ import { WARCWriter } from "./warcwriter.js";
import { RedisCrawlState, WorkerId } from "./state.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { Crawler } from "../crawler.js";
import { WARCResourceWriter } from "./warcresourcewriter.js";

const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000;
const MAX_BROWSER_TEXT_FETCH_SIZE = 25_000_000;
Expand Down Expand Up @@ -70,7 +67,6 @@ export type PageInfoRecord = {
// =================================================================
export class Recorder {
workerid: WorkerId;
collDir: string;

crawler: Crawler;

Expand All @@ -94,9 +90,7 @@ export class Recorder {

allowFull206 = false;

archivesDir: string;
tempdir: string;
tempCdxDir: string;

gzip = true;

Expand All @@ -107,46 +101,26 @@ export class Recorder {

constructor({
workerid,
collDir,
writer,
crawler,
tempdir,
}: {
workerid: WorkerId;
collDir: string;
writer: WARCWriter;
crawler: Crawler;
tempdir: string;
}) {
this.workerid = workerid;
this.crawler = crawler;
this.crawlState = crawler.crawlState;

this.writer = writer;

this.tempdir = tempdir;

this.warcQ = new PQueue({ concurrency: 1 });

this.fetcherQ = new PQueue({ concurrency: 1 });

this.collDir = collDir;

this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");

fs.mkdirSync(this.tempdir, { recursive: true });
fs.mkdirSync(this.archivesDir, { recursive: true });
// fs.mkdirSync(this.tempCdxDir, { recursive: true });

const prefix =
process.env.WARC_PREFIX || crawler.params.warcPrefix || "rec";
const crawlId = process.env.CRAWL_ID || os.hostname();
const filenameTemplate = `${prefix}-${crawlId}-$ts-${this.workerid}.warc${
this.gzip ? ".gz" : ""
}`;

this.writer = new WARCWriter({
archivesDir: this.archivesDir,
// tempCdxDir: this.tempCdxDir,
filenameTemplate,
rolloverSize: crawler.params.rolloverSize,
gzip: this.gzip,
logDetails: this.logDetails,
});
}

async onCreatePage({ cdp }: { cdp: CDPSession }) {
Expand Down Expand Up @@ -733,18 +707,19 @@ export class Recorder {
}
}

async writePageInfoRecord() {
writePageInfoRecord() {
const text = JSON.stringify(this.pageInfo, null, 2);

const resourceRecord = await WARCResourceWriter.createResourceRecord(
new TextEncoder().encode(text),
"pageinfo",
"application/json",
this.pageUrl,
new Date(),
);
const url = this.pageUrl;

this.warcQ.add(() => this.writer.writeSingleRecord(resourceRecord));
this.warcQ.add(() =>
this.writer.writeNewResourceRecord({
buffer: new TextEncoder().encode(text),
resourceType: "pageinfo",
contentType: "application/json",
url,
}),
);

return this.pageInfo.ts;
}
Expand Down