Skip to content

Commit

Permalink
web - use highWaterMark for upload (#83565)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpasero committed Jun 11, 2020
1 parent 6f02589 commit 99f9510
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 53 deletions.
34 changes: 14 additions & 20 deletions src/vs/platform/files/common/fileService.ts
Expand Up @@ -338,12 +338,12 @@ export class FileService extends Disposable implements IFileService {

// write file: unbuffered (only if data to write is a buffer, or the provider has no buffered write capability)
if (!hasOpenReadWriteCloseCapability(provider) || (hasReadWriteCapability(provider) && bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer)) {
await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream, options);
await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream);
}

// write file: buffered
else {
await this.doWriteBuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStreamOrBufferedStream) : bufferOrReadableOrStreamOrBufferedStream, options);
await this.doWriteBuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStreamOrBufferedStream) : bufferOrReadableOrStreamOrBufferedStream);
}
} catch (error) {
throw new FileOperationError(localize('err.write', "Unable to write file '{0}' ({1})", this.resourceForError(resource), ensureFileSystemProviderError(error).toString()), toFileOperationResult(error), options);
Expand Down Expand Up @@ -954,7 +954,7 @@ export class FileService extends Disposable implements IFileService {
return writeQueue;
}

private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStreamOrBufferedStream: VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise<void> {
private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStreamOrBufferedStream: VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(async () => {

// open handle
Expand All @@ -963,9 +963,9 @@ export class FileService extends Disposable implements IFileService {
// write into handle until all bytes from buffer have been written
try {
if (isReadableStream(readableOrStreamOrBufferedStream) || isReadableBufferedStream(readableOrStreamOrBufferedStream)) {
await this.doWriteStreamBufferedQueued(provider, handle, readableOrStreamOrBufferedStream, options);
await this.doWriteStreamBufferedQueued(provider, handle, readableOrStreamOrBufferedStream);
} else {
await this.doWriteReadableBufferedQueued(provider, handle, readableOrStreamOrBufferedStream, options);
await this.doWriteReadableBufferedQueued(provider, handle, readableOrStreamOrBufferedStream);
}
} catch (error) {
throw ensureFileSystemProviderError(error);
Expand All @@ -977,7 +977,7 @@ export class FileService extends Disposable implements IFileService {
});
}

private async doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, streamOrBufferedStream: VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise<void> {
private async doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, streamOrBufferedStream: VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
let posInFile = 0;
let stream: VSBufferReadableStream;

Expand All @@ -986,7 +986,7 @@ export class FileService extends Disposable implements IFileService {
if (isReadableBufferedStream(streamOrBufferedStream)) {
if (streamOrBufferedStream.buffer.length > 0) {
const chunk = VSBuffer.concat(streamOrBufferedStream.buffer);
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0, options);
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);

posInFile += chunk.byteLength;
}
Expand All @@ -1012,7 +1012,7 @@ export class FileService extends Disposable implements IFileService {
stream.pause();

try {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0, options);
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
} catch (error) {
return reject(error);
}
Expand All @@ -1031,35 +1031,32 @@ export class FileService extends Disposable implements IFileService {
});
}

private async doWriteReadableBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, readable: VSBufferReadable, options?: IWriteFileOptions): Promise<void> {
private async doWriteReadableBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, readable: VSBufferReadable): Promise<void> {
let posInFile = 0;

let chunk: VSBuffer | null;
while ((chunk = readable.read()) !== null) {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0, options);
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);

posInFile += chunk.byteLength;
}
}

private async doWriteBuffer(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, buffer: VSBuffer, length: number, posInFile: number, posInBuffer: number, options?: IWriteFileOptions): Promise<void> {
private async doWriteBuffer(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, buffer: VSBuffer, length: number, posInFile: number, posInBuffer: number): Promise<void> {
let totalBytesWritten = 0;
while (totalBytesWritten < length) {

// Write through the provider
const bytesWritten = await provider.write(handle, posInFile + totalBytesWritten, buffer.buffer, posInBuffer + totalBytesWritten, length - totalBytesWritten);
totalBytesWritten += bytesWritten;

// report progress as needed
options?.progress?.(bytesWritten);
}
}

private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStreamOrBufferedStream, options));
private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStreamOrBufferedStream));
}

private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise<void> {
private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
let buffer: VSBuffer;
if (bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer) {
buffer = bufferOrReadableOrStreamOrBufferedStream;
Expand All @@ -1073,9 +1070,6 @@ export class FileService extends Disposable implements IFileService {

// Write through the provider
await provider.writeFile(resource, buffer.buffer, { create: true, overwrite: true });

// Report progress as needed
options?.progress?.(buffer.byteLength);
}

private async doPipeBuffered(sourceProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, source: URI, targetProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, target: URI): Promise<void> {
Expand Down
7 changes: 0 additions & 7 deletions src/vs/platform/files/common/files.ts
Expand Up @@ -731,13 +731,6 @@ export interface IWriteFileOptions {
* The etag of the file. This can be used to prevent dirty writes.
*/
readonly etag?: string;

/**
* The progress callback can be used to get accurate information how many
* bytes have been written. Each call carries the length of bytes written
* since the last call was made.
*/
readonly progress?: (byteLength: number) => void;
}

export interface IResolveFileOptions {
Expand Down
Expand Up @@ -1681,11 +1681,9 @@ suite('Disk File Service', function () {
assert.equal(content, 'Small File');

const newContent = 'Updates to the small file';
let totalBytes = 0;
await service.writeFile(resource, VSBuffer.fromString(newContent), { progress: byteLength => totalBytes += byteLength });
await service.writeFile(resource, VSBuffer.fromString(newContent));

assert.equal(readFileSync(resource.fsPath), newContent);
assert.equal(totalBytes, newContent.length);
}

test('writeFile (large file) - default', async () => {
Expand All @@ -1710,12 +1708,10 @@ suite('Disk File Service', function () {
const content = readFileSync(resource.fsPath);
const newContent = content.toString() + content.toString();

let totalBytes = 0;
const fileStat = await service.writeFile(resource, VSBuffer.fromString(newContent), { progress: byteLength => totalBytes += byteLength });
const fileStat = await service.writeFile(resource, VSBuffer.fromString(newContent));
assert.equal(fileStat.name, 'lorem.txt');

assert.equal(readFileSync(resource.fsPath), newContent);
assert.equal(totalBytes, newContent.length);
}

test('writeFile - buffered - readonly throws', async () => {
Expand Down Expand Up @@ -1786,11 +1782,9 @@ suite('Disk File Service', function () {
assert.equal(content, 'Small File');

const newContent = 'Updates to the small file';
let totalBytes = 0;
await service.writeFile(resource, toLineByLineReadable(newContent), { progress: byteLength => totalBytes += byteLength });
await service.writeFile(resource, toLineByLineReadable(newContent));

assert.equal(readFileSync(resource.fsPath), newContent);
assert.equal(totalBytes, newContent.length);
}

test('writeFile (large file - readable) - default', async () => {
Expand All @@ -1815,12 +1809,10 @@ suite('Disk File Service', function () {
const content = readFileSync(resource.fsPath);
const newContent = content.toString() + content.toString();

let totalBytes = 0;
const fileStat = await service.writeFile(resource, toLineByLineReadable(newContent), { progress: byteLength => totalBytes += byteLength });
const fileStat = await service.writeFile(resource, toLineByLineReadable(newContent));
assert.equal(fileStat.name, 'lorem.txt');

assert.equal(readFileSync(resource.fsPath), newContent);
assert.equal(totalBytes, newContent.length);
}

test('writeFile (stream) - default', async () => {
Expand All @@ -1843,13 +1835,11 @@ suite('Disk File Service', function () {
const source = URI.file(join(testDir, 'small.txt'));
const target = URI.file(join(testDir, 'small-copy.txt'));

let totalBytes = 0;
const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath)), { progress: byteLength => totalBytes += byteLength });
const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath)));
assert.equal(fileStat.name, 'small-copy.txt');

const targetContents = readFileSync(target.fsPath).toString();
assert.equal(readFileSync(source.fsPath).toString(), targetContents);
assert.equal(totalBytes, targetContents.length);
}

test('writeFile (large file - stream) - default', async () => {
Expand All @@ -1872,13 +1862,11 @@ suite('Disk File Service', function () {
const source = URI.file(join(testDir, 'lorem.txt'));
const target = URI.file(join(testDir, 'lorem-copy.txt'));

let totalBytes = 0;
const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath)), { progress: byteLength => totalBytes += byteLength });
const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath)));
assert.equal(fileStat.name, 'lorem-copy.txt');

const targetContents = readFileSync(target.fsPath).toString();
assert.equal(readFileSync(source.fsPath).toString(), targetContents);
assert.equal(totalBytes, targetContents.length);
}

test('writeFile (file is created including parents)', async () => {
Expand Down
39 changes: 31 additions & 8 deletions src/vs/workbench/contrib/files/browser/views/explorerViewer.ts
Expand Up @@ -1052,12 +1052,12 @@ export class FileDragAndDrop implements ITreeDragAndDrop<ExplorerItem> {

// Chrome/Edge/Firefox support stream method
if (typeof file.stream === 'function') {
await this.doUploadWebFileEntryBuffered(resource, file, reportProgress);
await this.doUploadWebFileEntryBuffered(resource, file, reportProgress, token);
}

// Fallback to unbuffered upload for other browsers
else {
await this.doUploadWebFileEntryUnbuffered(resource, file, reportProgress);
await this.doUploadWebFileEntryUnbuffered(resource, file);
}

return { isFile: true, resource };
Expand Down Expand Up @@ -1099,17 +1099,36 @@ export class FileDragAndDrop implements ITreeDragAndDrop<ExplorerItem> {
}
}

private async doUploadWebFileEntryBuffered(resource: URI, file: File, progressReporter: (fileSize: number, bytesUploaded: number) => void): Promise<void> {
const writeableStream = newWriteableBufferStream();
const writeFilePromise = this.fileService.writeFile(resource, writeableStream, { progress: byteLength => progressReporter(file.size, byteLength) });
private async doUploadWebFileEntryBuffered(resource: URI, file: File, progressReporter: (fileSize: number, bytesUploaded: number) => void, token: CancellationToken): Promise<void> {
const writeableStream = newWriteableBufferStream({
// Set a highWaterMark to prevent the stream
// for file upload to produce large buffers
// in-memory
highWaterMark: 10
});
const writeFilePromise = this.fileService.writeFile(resource, writeableStream);

// Read the file in chunks using File.stream() web APIs
try {
const reader: ReadableStreamDefaultReader<Uint8Array> = file.stream().getReader();

let res = await reader.read();
while (!res.done) {
writeableStream.write(VSBuffer.wrap(res.value));
if (token.isCancellationRequested) {
return undefined;
}

// Write buffer into stream but make sure to wait
// in case the highWaterMark is reached
const buffer = VSBuffer.wrap(res.value);
await writeableStream.write(buffer);

if (token.isCancellationRequested) {
return undefined;
}

// Report progress
progressReporter(file.size, buffer.byteLength);

res = await reader.read();
}
Expand All @@ -1118,17 +1137,21 @@ export class FileDragAndDrop implements ITreeDragAndDrop<ExplorerItem> {
writeableStream.end(error);
}

if (token.isCancellationRequested) {
return undefined;
}

// Wait for file being written to target
await writeFilePromise;
}

private doUploadWebFileEntryUnbuffered(resource: URI, file: File, progressReporter: (fileSize: number, bytesUploaded: number) => void): Promise<void> {
private doUploadWebFileEntryUnbuffered(resource: URI, file: File): Promise<void> {
return new Promise<void>((resolve, reject) => {
const reader = new FileReader();
reader.onload = async event => {
try {
if (event.target?.result instanceof ArrayBuffer) {
await this.fileService.writeFile(resource, VSBuffer.wrap(new Uint8Array(event.target.result)), { progress: byteLength => progressReporter(file.size, byteLength) });
await this.fileService.writeFile(resource, VSBuffer.wrap(new Uint8Array(event.target.result)));
} else {
throw new Error('Could not read from dropped file.');
}
Expand Down

0 comments on commit 99f9510

Please sign in to comment.