Skip to content

Commit

Permalink
Write large blobs in chunks to avoid write file limits in Node
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeLane committed Nov 2, 2023
1 parent a1391ed commit be2ebd2
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 25 deletions.
35 changes: 30 additions & 5 deletions packages/core/cache/src/FSCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {serialize, deserialize, registerSerializableClass} from '@parcel/core';
// flowlint-next-line untyped-import:off
import packageJson from '../package.json';

import {WRITE_LIMIT_CHUNK} from './constants';

const pipeline: (Readable, Writable) => Promise<void> = promisify(
stream.pipeline,
);
Expand Down Expand Up @@ -81,16 +83,39 @@ export class FSCache implements Cache {
}
}

#getFilePath(key: string, index: number): string {
return path.join(this.dir, `${key}-${index}`);
}
hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(this._getCachePath(`${key}-large`));
return this.fs.exists(this.#getFilePath(key, 0));
}
getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(this._getCachePath(`${key}-large`));
async getLargeBlob(key: string): Promise<Buffer> {
const buffers: Promise<Buffer>[] = [];
for (let i = 0; await this.fs.exists(this.#getFilePath(key, i)); i += 1) {
const file: Promise<Buffer> = this.fs.readFile(this.#getFilePath(key, i));
buffers.push(file);
}
return Buffer.concat(await Promise.all(buffers));
}
async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(this._getCachePath(`${key}-large`), contents);
async setLargeBlob(key: string, contents: Buffer): Promise<void> {
const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK);
const writePromises: Promise<void>[] = [];
for (let i = 0; i < chunks; i += 1) {
writePromises.push(
this.fs.writeFile(
this.#getFilePath(key, i),
contents.subarray(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK),
),
);
}
await Promise.all(writePromises);
}
async get<T>(key: string): Promise<?T> {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/cache/src/IDBCache.browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export class IDBCache implements Cache {
return this.getBlob(key);
}

setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
setLargeBlob(key: string, contents: Buffer): Promise<void> {
return this.setBlob(key, contents);
}

Expand Down
34 changes: 29 additions & 5 deletions packages/core/cache/src/LMDBCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {NodeFS} from '@parcel/fs';
import packageJson from '../package.json';
// $FlowFixMe
import lmdb from 'lmdb';
import {WRITE_LIMIT_CHUNK} from './constants';

const pipeline: (Readable, Writable) => Promise<void> = promisify(
stream.pipeline,
Expand Down Expand Up @@ -91,16 +92,39 @@ export class LMDBCache implements Cache {
return Promise.resolve(this.store.get(key));
}

#getFilePath(key: string, index: number): string {
return path.join(this.dir, `${key}-${index}`);
}

hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(path.join(this.dir, key));
return this.fs.exists(this.#getFilePath(key, 0));
}

getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(path.join(this.dir, key));
async getLargeBlob(key: string): Promise<Buffer> {
const buffers: Promise<Buffer>[] = [];
for (let i = 0; await this.fs.exists(this.#getFilePath(key, i)); i += 1) {
const file: Promise<Buffer> = this.fs.readFile(this.#getFilePath(key, i));

buffers.push(file);
}

return Buffer.concat(await Promise.all(buffers));
}

async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(path.join(this.dir, key), contents);
async setLargeBlob(key: string, contents: Buffer): Promise<void> {
const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK);

const writePromises: Promise<void>[] = [];
for (let i = 0; i < chunks; i += 1) {
writePromises.push(
this.fs.writeFile(
this.#getFilePath(key, i),
contents.subarray(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK),
),
);
}

await Promise.all(writePromises);
}

refresh(): void {
Expand Down
4 changes: 4 additions & 0 deletions packages/core/cache/src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// @flow strict-local

// Node has a file size limit of 2 GB
export const WRITE_LIMIT_CHUNK = 2 * 1024 ** 3;
2 changes: 1 addition & 1 deletion packages/core/cache/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface Cache {
setBlob(key: string, contents: Buffer | string): Promise<void>;
hasLargeBlob(key: string): Promise<boolean>;
getLargeBlob(key: string): Promise<Buffer>;
setLargeBlob(key: string, contents: Buffer | string): Promise<void>;
setLargeBlob(key: string, contents: Buffer): Promise<void>;
getBuffer(key: string): Promise<?Buffer>;
/**
* In a multi-threaded environment, where there are potentially multiple Cache
Expand Down
1 change: 1 addition & 0 deletions packages/dev/query/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"dependencies": {
"@parcel/core": "2.10.2",
"@parcel/graph": "3.0.2",
"@parcel/cache": "2.10.2",
"nullthrows": "^1.1.1",
"table": "^6.8.1",
"v8-compile-cache": "^2.0.0"
Expand Down
7 changes: 4 additions & 3 deletions packages/dev/query/src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {Priority} from '@parcel/core/src/types';

import {loadGraphs} from './index.js';

export function run(input: string[]) {
export async function run(input: string[]) {
let args = input;
let cacheDir = path.join(process.cwd(), '.parcel-cache');
if (args[0] === '--cache') {
Expand All @@ -37,8 +37,9 @@ export function run(input: string[]) {
}

console.log('Loading graphs...');
let {assetGraph, bundleGraph, bundleInfo, requestTracker} =
loadGraphs(cacheDir);
let {assetGraph, bundleGraph, bundleInfo, requestTracker} = await loadGraphs(
cacheDir,
);

if (requestTracker == null) {
console.error('Request Graph could not be found');
Expand Down
24 changes: 14 additions & 10 deletions packages/dev/query/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import path from 'path';
import v8 from 'v8';
import nullthrows from 'nullthrows';
import invariant from 'assert';
import {LMDBCache} from '@parcel/cache/src/LMDBCache';

const {
AssetGraph,
Expand All @@ -19,12 +20,12 @@ const {
},
} = require('./deep-imports.js');

export function loadGraphs(cacheDir: string): {|
export async function loadGraphs(cacheDir: string): Promise<{|
assetGraph: ?AssetGraph,
bundleGraph: ?BundleGraph,
requestTracker: ?RequestTracker,
bundleInfo: ?Map<ContentKey, PackagedBundleInfo>,
|} {
|}> {
function filesBySizeAndModifiedTime() {
let files = fs.readdirSync(cacheDir).map(f => {
let stat = fs.statSync(path.join(cacheDir, f));
Expand All @@ -38,11 +39,14 @@ export function loadGraphs(cacheDir: string): {|
}

let requestTracker;
const cache = new LMDBCache(cacheDir);
for (let f of filesBySizeAndModifiedTime()) {
// if (bundleGraph && assetGraph && requestTracker) break;
if (path.extname(f) !== '') continue;
// Empty filename or not the first chunk
if (path.extname(f) !== '' && !f.endsWith('-0')) continue;
try {
let obj = v8.deserialize(fs.readFileSync(f));
let obj = v8.deserialize(
await cache.getLargeBlob(path.basename(f).slice(0, -'-0'.length)),
);
/* if (obj.assetGraph != null && obj.assetGraph.value.hash != null) {
assetGraph = AssetGraph.deserialize(obj.assetGraph.value);
} else if (obj.bundleGraph != null) {
Expand Down Expand Up @@ -90,7 +94,7 @@ export function loadGraphs(cacheDir: string): {|
);
if (bundleGraphRequestNode != null) {
bundleGraph = BundleGraph.deserialize(
loadLargeBlobRequestRequestSync(cacheDir, bundleGraphRequestNode)
(await loadLargeBlobRequestRequestSync(cache, bundleGraphRequestNode))
.bundleGraph.value,
);

Expand All @@ -99,8 +103,8 @@ export function loadGraphs(cacheDir: string): {|
).find(n => n.type === 'request' && n.value.type === 'asset_graph_request');
if (assetGraphRequest != null) {
assetGraph = AssetGraph.deserialize(
loadLargeBlobRequestRequestSync(cacheDir, assetGraphRequest).assetGraph
.value,
(await loadLargeBlobRequestRequestSync(cache, assetGraphRequest))
.assetGraph.value,
);
}
}
Expand All @@ -120,9 +124,9 @@ export function loadGraphs(cacheDir: string): {|
return {assetGraph, bundleGraph, requestTracker, bundleInfo};
}

function loadLargeBlobRequestRequestSync(cacheDir, node) {
async function loadLargeBlobRequestRequestSync(cache, node) {
invariant(node.type === 'request');
return v8.deserialize(
fs.readFileSync(path.join(cacheDir, nullthrows(node.value.resultCacheKey))),
await cache.getLargeBlob(nullthrows(node.value.resultCacheKey)),
);
}

0 comments on commit be2ebd2

Please sign in to comment.