Skip to content

Commit

Permalink
src: update Blob implementation to use DataQueue / File-backed Blobs
Browse files Browse the repository at this point in the history
Co-authored-by: flakey5 <73616808+flakey5@users.noreply.github.com>
PR-URL: #45258
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
2 people authored and targos committed Mar 13, 2023
1 parent 9b6270a commit 36f36b9
Show file tree
Hide file tree
Showing 11 changed files with 989 additions and 323 deletions.
9 changes: 9 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ When operating on file handles, the mode cannot be changed from what it was set
to with [`fsPromises.open()`][]. Therefore, this is equivalent to
[`filehandle.writeFile()`][].

#### `filehandle.blob()`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
Returns a {Blob} whose data is backed by this file.

#### `filehandle.chmod(mode)`

<!-- YAML
Expand Down
134 changes: 78 additions & 56 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ const {
MathMin,
ObjectDefineProperties,
ObjectDefineProperty,
PromiseResolve,
PromiseReject,
SafePromisePrototypeFinally,
PromiseResolve,
ReflectConstruct,
RegExpPrototypeExec,
RegExpPrototypeSymbolReplace,
Expand All @@ -22,7 +21,8 @@ const {

const {
createBlob: _createBlob,
FixedSizeBlobCopyJob,
createBlobFromFileHandle: _createBlobFromFileHandle,
concat,
getDataObject,
} = internalBinding('blob');

Expand Down Expand Up @@ -52,13 +52,13 @@ const {
const { inspect } = require('internal/util/inspect');

const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_THIS,
ERR_BUFFER_TOO_LARGE,
}
},
errnoException,
} = require('internal/errors');

const {
Expand All @@ -67,13 +67,8 @@ const {
} = require('internal/validators');

const kHandle = Symbol('kHandle');
const kState = Symbol('kState');
const kIndex = Symbol('kIndex');
const kType = Symbol('kType');
const kLength = Symbol('kLength');
const kArrayBufferPromise = Symbol('kArrayBufferPromise');

const kMaxChunkSize = 65536;

const disallowedTypeCharacters = /[^\u{0020}-\u{007E}]/u;

Expand Down Expand Up @@ -266,40 +261,28 @@ class Blob {
if (!isBlob(this))
return PromiseReject(new ERR_INVALID_THIS('Blob'));

// If there's already a promise in flight for the content,
// reuse it, but only while it's in flight. After the cached
// promise resolves it will be cleared, allowing it to be
// garbage collected as soon as possible.
if (this[kArrayBufferPromise])
return this[kArrayBufferPromise];

const job = new FixedSizeBlobCopyJob(this[kHandle]);

const ret = job.run();

// If the job returns a value immediately, the ArrayBuffer
// was generated synchronously and should just be returned
// directly.
if (ret !== undefined)
return PromiseResolve(ret);
if (this.size === 0) {
return PromiseResolve(new ArrayBuffer(0));
}

const {
promise,
resolve,
reject,
} = createDeferredPromise();

job.ondone = (err, ab) => {
if (err !== undefined)
return reject(new AbortError(undefined, { cause: err }));
resolve(ab);
const { promise, resolve } = createDeferredPromise();
const reader = this[kHandle].getReader();
const buffers = [];
const readNext = () => {
reader.pull((status, buffer) => {
if (status === -1) {
// EOS, concat & resolve
// buffer should be undefined here
resolve(concat(buffers));
return;
}
if (buffer !== undefined)
buffers.push(buffer);
readNext();
});
};
this[kArrayBufferPromise] =
SafePromisePrototypeFinally(
promise,
() => this[kArrayBufferPromise] = undefined);

return this[kArrayBufferPromise];
readNext();
return promise;
}

/**
Expand All @@ -321,24 +304,57 @@ class Blob {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');

const self = this;
if (this.size === 0) {
return new lazyReadableStream({
start(c) { c.close(); }
});
}

const reader = this[kHandle].getReader();
return new lazyReadableStream({
async start() {
this[kState] = await self.arrayBuffer();
this[kIndex] = 0;
start(c) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
},

pull(controller) {
if (this[kState].byteLength - this[kIndex] <= kMaxChunkSize) {
controller.enqueue(new Uint8Array(this[kState], this[kIndex]));
controller.close();
this[kState] = undefined;
} else {
controller.enqueue(new Uint8Array(this[kState], this[kIndex], kMaxChunkSize));
this[kIndex] += kMaxChunkSize;
pull(c) {
const { promise, resolve, reject } = createDeferredPromise();
this.pendingPulls.push({resolve, reject});
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// we can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
const pending = this.pendingPulls.shift();
if (status === -1 || (status === 0 && buffer === undefined)) {
// EOS
c.close();
pending.resolve();
return;
} else if (status < 0) {
const error = errnoException(status, 'read');
c.error(error);
pending.reject(error);
return;
}
c.enqueue(new Uint8Array(buffer));
pending.resolve();
});
return promise;
},
cancel(reason) {
// Reject any currently pending pulls here.
for (const pending of this.pendingPulls) {
pending.reject(reason);
}
this.pendingPulls = [];
}
});
// We set the highWaterMark to 0 because we do not want the stream to
// start reading immediately on creation. We want it to wait until read
// is called.
}, new CountQueuingStrategy({ highWaterMark: 0 }));
}
}

Expand Down Expand Up @@ -406,10 +422,16 @@ function resolveObjectURL(url) {
}
}

function createBlobFromFileHandle(handle) {
const [blob, length] = _createBlobFromFileHandle(handle);
return createBlob(blob, length);
}

module.exports = {
Blob,
ClonedBlob,
createBlob,
createBlobFromFileHandle,
isBlob,
kHandle,
resolveObjectURL,
Expand Down
10 changes: 10 additions & 0 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const {
S_IFREG
} = constants;

const { createBlobFromFileHandle } = require('internal/blob');

const binding = internalBinding('fs');
const { Buffer } = require('buffer');

Expand Down Expand Up @@ -310,6 +312,14 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return new WriteStream(undefined, { ...options, fd: this });
}

/**
* @typedef {import('../blob').Blob} Blob
* @returns {Blob}
*/
blob() {
return createBlobFromFileHandle(this[kHandle]);
}

[kTransfer]() {
if (this[kClosePromise] || this[kRefs] > 1) {
throw lazyDOMException('Cannot transfer FileHandle while in use',
Expand Down
2 changes: 1 addition & 1 deletion src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace node {
V(ELDHISTOGRAM) \
V(FILEHANDLE) \
V(FILEHANDLECLOSEREQ) \
V(FIXEDSIZEBLOBCOPY) \
V(BLOBREADER) \
V(FSEVENTWRAP) \
V(FSREQCALLBACK) \
V(FSREQPROMISE) \
Expand Down
6 changes: 6 additions & 0 deletions src/env_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@
V(base_object_ctor_template, v8::FunctionTemplate) \
V(binding_data_ctor_template, v8::FunctionTemplate) \
V(blob_constructor_template, v8::FunctionTemplate) \
V(blob_reader_constructor_template, v8::FunctionTemplate) \
V(blocklist_constructor_template, v8::FunctionTemplate) \
V(contextify_global_template, v8::ObjectTemplate) \
V(contextify_wrapper_template, v8::ObjectTemplate) \
Expand All @@ -340,6 +341,7 @@
V(dir_instance_template, v8::ObjectTemplate) \
V(fd_constructor_template, v8::ObjectTemplate) \
V(fdclose_constructor_template, v8::ObjectTemplate) \
V(fdentry_constructor_template, v8::FunctionTemplate) \
V(filehandlereadwrap_template, v8::ObjectTemplate) \
V(fsreqpromise_constructor_template, v8::ObjectTemplate) \
V(handle_wrap_ctor_template, v8::FunctionTemplate) \
Expand All @@ -359,14 +361,18 @@
V(secure_context_constructor_template, v8::FunctionTemplate) \
V(shutdown_wrap_template, v8::ObjectTemplate) \
V(socketaddress_constructor_template, v8::FunctionTemplate) \
V(streambaseentry_ctor_template, v8::FunctionTemplate) \
V(streambaseoutputstream_constructor_template, v8::ObjectTemplate) \
V(streamentry_ctor_template, v8::FunctionTemplate) \
V(streamentry_opaque_ctor_template, v8::FunctionTemplate) \
V(qlogoutputstream_constructor_template, v8::ObjectTemplate) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)


#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \
V(async_hooks_after_function, v8::Function) \
V(async_hooks_before_function, v8::Function) \
Expand Down
Loading

0 comments on commit 36f36b9

Please sign in to comment.