From 99f9510f2e42862ec2b1d0137b5f6b0434163ccd Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Thu, 11 Jun 2020 12:43:24 +0200 Subject: [PATCH] web - use highWaterMark for upload (#83565) --- src/vs/platform/files/common/fileService.ts | 34 +++++++--------- src/vs/platform/files/common/files.ts | 7 ---- .../electron-browser/diskFileService.test.ts | 24 +++--------- .../files/browser/views/explorerViewer.ts | 39 +++++++++++++++---- 4 files changed, 51 insertions(+), 53 deletions(-) diff --git a/src/vs/platform/files/common/fileService.ts b/src/vs/platform/files/common/fileService.ts index 4ee9b3dea1ab7..65652d2a81ef3 100644 --- a/src/vs/platform/files/common/fileService.ts +++ b/src/vs/platform/files/common/fileService.ts @@ -338,12 +338,12 @@ export class FileService extends Disposable implements IFileService { // write file: unbuffered (only if data to write is a buffer, or the provider has no buffered write capability) if (!hasOpenReadWriteCloseCapability(provider) || (hasReadWriteCapability(provider) && bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer)) { - await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream, options); + await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream); } // write file: buffered else { - await this.doWriteBuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStreamOrBufferedStream) : bufferOrReadableOrStreamOrBufferedStream, options); + await this.doWriteBuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStreamOrBufferedStream) : bufferOrReadableOrStreamOrBufferedStream); } } catch (error) { throw new FileOperationError(localize('err.write', "Unable to write file '{0}' ({1})", this.resourceForError(resource), ensureFileSystemProviderError(error).toString()), toFileOperationResult(error), options); @@ -954,7 +954,7 @@ export class FileService extends Disposable implements IFileService { return writeQueue; } - private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStreamOrBufferedStream: VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise { + private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStreamOrBufferedStream: VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise { return this.ensureWriteQueue(provider, resource).queue(async () => { // open handle @@ -963,9 +963,9 @@ export class FileService extends Disposable implements IFileService { // write into handle until all bytes from buffer have been written try { if (isReadableStream(readableOrStreamOrBufferedStream) || isReadableBufferedStream(readableOrStreamOrBufferedStream)) { - await this.doWriteStreamBufferedQueued(provider, handle, readableOrStreamOrBufferedStream, options); + await this.doWriteStreamBufferedQueued(provider, handle, readableOrStreamOrBufferedStream); } else { - await this.doWriteReadableBufferedQueued(provider, handle, readableOrStreamOrBufferedStream, options); + await this.doWriteReadableBufferedQueued(provider, handle, readableOrStreamOrBufferedStream); } } catch (error) { throw ensureFileSystemProviderError(error); @@ -977,7 +977,7 @@ export class FileService extends Disposable implements IFileService { }); } - private async doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, streamOrBufferedStream: VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise { + private async doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, streamOrBufferedStream: VSBufferReadableStream | VSBufferReadableBufferedStream): Promise { let posInFile = 0; let stream: VSBufferReadableStream; @@ -986,7 +986,7 @@ export class FileService extends Disposable implements IFileService { if (isReadableBufferedStream(streamOrBufferedStream)) { if (streamOrBufferedStream.buffer.length > 0) { const chunk = VSBuffer.concat(streamOrBufferedStream.buffer); - await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0, options); + await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); posInFile += chunk.byteLength; } @@ -1012,7 +1012,7 @@ export class FileService extends Disposable implements IFileService { stream.pause(); try { - await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0, options); + await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); } catch (error) { return reject(error); } @@ -1031,35 +1031,32 @@ export class FileService extends Disposable implements IFileService { }); } - private async doWriteReadableBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, readable: VSBufferReadable, options?: IWriteFileOptions): Promise { + private async doWriteReadableBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, readable: VSBufferReadable): Promise { let posInFile = 0; let chunk: VSBuffer | null; while ((chunk = readable.read()) !== null) { - await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0, options); + await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); posInFile += chunk.byteLength; } } - private async doWriteBuffer(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, buffer: VSBuffer, length: number, posInFile: number, posInBuffer: number, options?: IWriteFileOptions): Promise { + private async doWriteBuffer(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, buffer: VSBuffer, length: number, posInFile: number, posInBuffer: number): Promise { let totalBytesWritten = 0; while (totalBytesWritten < length) { // Write through the provider const bytesWritten = await provider.write(handle, posInFile + totalBytesWritten, buffer.buffer, posInBuffer + totalBytesWritten, length - totalBytesWritten); totalBytesWritten += bytesWritten; - - // report progress as needed - options?.progress?.(bytesWritten); } } - private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise { - return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStreamOrBufferedStream, options)); + private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise { + return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStreamOrBufferedStream)); } - private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream, options?: IWriteFileOptions): Promise { + private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise { let buffer: VSBuffer; if (bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer) { buffer = bufferOrReadableOrStreamOrBufferedStream; @@ -1073,9 +1070,6 @@ export class FileService extends Disposable implements IFileService { // Write through the provider await provider.writeFile(resource, buffer.buffer, { create: true, overwrite: true }); - - // Report progress as needed - options?.progress?.(buffer.byteLength); } private async doPipeBuffered(sourceProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, source: URI, targetProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, target: URI): Promise { diff --git a/src/vs/platform/files/common/files.ts b/src/vs/platform/files/common/files.ts index fa02a4b2520e6..1a27a27071542 100644 --- a/src/vs/platform/files/common/files.ts +++ b/src/vs/platform/files/common/files.ts @@ -731,13 +731,6 @@ export interface IWriteFileOptions { * The etag of the file. This can be used to prevent dirty writes. */ readonly etag?: string; - - /** - * The progress callback can be used to get accurate information how many - * bytes have been written. Each call carries the length of bytes written - * since the last call was made. - */ - readonly progress?: (byteLength: number) => void; } export interface IResolveFileOptions { diff --git a/src/vs/platform/files/test/electron-browser/diskFileService.test.ts b/src/vs/platform/files/test/electron-browser/diskFileService.test.ts index 1ec2b39e92ab5..b170a7628b47e 100644 --- a/src/vs/platform/files/test/electron-browser/diskFileService.test.ts +++ b/src/vs/platform/files/test/electron-browser/diskFileService.test.ts @@ -1681,11 +1681,9 @@ suite('Disk File Service', function () { assert.equal(content, 'Small File'); const newContent = 'Updates to the small file'; - let totalBytes = 0; - await service.writeFile(resource, VSBuffer.fromString(newContent), { progress: byteLength => totalBytes += byteLength }); + await service.writeFile(resource, VSBuffer.fromString(newContent)); assert.equal(readFileSync(resource.fsPath), newContent); - assert.equal(totalBytes, newContent.length); } test('writeFile (large file) - default', async () => { @@ -1710,12 +1708,10 @@ suite('Disk File Service', function () { const content = readFileSync(resource.fsPath); const newContent = content.toString() + content.toString(); - let totalBytes = 0; - const fileStat = await service.writeFile(resource, VSBuffer.fromString(newContent), { progress: byteLength => totalBytes += byteLength }); + const fileStat = await service.writeFile(resource, VSBuffer.fromString(newContent)); assert.equal(fileStat.name, 'lorem.txt'); assert.equal(readFileSync(resource.fsPath), newContent); - assert.equal(totalBytes, newContent.length); } test('writeFile - buffered - readonly throws', async () => { @@ -1786,11 +1782,9 @@ suite('Disk File Service', function () { assert.equal(content, 'Small File'); const newContent = 'Updates to the small file'; - let totalBytes = 0; - await service.writeFile(resource, toLineByLineReadable(newContent), { progress: byteLength => totalBytes += byteLength }); + await service.writeFile(resource, toLineByLineReadable(newContent)); assert.equal(readFileSync(resource.fsPath), newContent); - assert.equal(totalBytes, newContent.length); } test('writeFile (large file - readable) - default', async () => { @@ -1815,12 +1809,10 @@ suite('Disk File Service', function () { const content = readFileSync(resource.fsPath); const newContent = content.toString() + content.toString(); - let totalBytes = 0; - const fileStat = await service.writeFile(resource, toLineByLineReadable(newContent), { progress: byteLength => totalBytes += byteLength }); + const fileStat = await service.writeFile(resource, toLineByLineReadable(newContent)); assert.equal(fileStat.name, 'lorem.txt'); assert.equal(readFileSync(resource.fsPath), newContent); - assert.equal(totalBytes, newContent.length); } test('writeFile (stream) - default', async () => { @@ -1843,13 +1835,11 @@ suite('Disk File Service', function () { const source = URI.file(join(testDir, 'small.txt')); const target = URI.file(join(testDir, 'small-copy.txt')); - let totalBytes = 0; - const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath)), { progress: byteLength => totalBytes += byteLength }); + const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath))); assert.equal(fileStat.name, 'small-copy.txt'); const targetContents = readFileSync(target.fsPath).toString(); assert.equal(readFileSync(source.fsPath).toString(), targetContents); - assert.equal(totalBytes, targetContents.length); } test('writeFile (large file - stream) - default', async () => { @@ -1872,13 +1862,11 @@ suite('Disk File Service', function () { const source = URI.file(join(testDir, 'lorem.txt')); const target = URI.file(join(testDir, 'lorem-copy.txt')); - let totalBytes = 0; - const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath)), { progress: byteLength => totalBytes += byteLength }); + const fileStat = await service.writeFile(target, streamToBufferReadableStream(createReadStream(source.fsPath))); assert.equal(fileStat.name, 'lorem-copy.txt'); const targetContents = readFileSync(target.fsPath).toString(); assert.equal(readFileSync(source.fsPath).toString(), targetContents); - assert.equal(totalBytes, targetContents.length); } test('writeFile (file is created including parents)', async () => { diff --git a/src/vs/workbench/contrib/files/browser/views/explorerViewer.ts b/src/vs/workbench/contrib/files/browser/views/explorerViewer.ts index ee5c3e6c9fed3..dc82b457c131e 100644 --- a/src/vs/workbench/contrib/files/browser/views/explorerViewer.ts +++ b/src/vs/workbench/contrib/files/browser/views/explorerViewer.ts @@ -1052,12 +1052,12 @@ export class FileDragAndDrop implements ITreeDragAndDrop { // Chrome/Edge/Firefox support stream method if (typeof file.stream === 'function') { - await this.doUploadWebFileEntryBuffered(resource, file, reportProgress); + await this.doUploadWebFileEntryBuffered(resource, file, reportProgress, token); } // Fallback to unbuffered upload for other browsers else { - await this.doUploadWebFileEntryUnbuffered(resource, file, reportProgress); + await this.doUploadWebFileEntryUnbuffered(resource, file); } return { isFile: true, resource }; @@ -1099,9 +1099,14 @@ export class FileDragAndDrop implements ITreeDragAndDrop { } } - private async doUploadWebFileEntryBuffered(resource: URI, file: File, progressReporter: (fileSize: number, bytesUploaded: number) => void): Promise { - const writeableStream = newWriteableBufferStream(); - const writeFilePromise = this.fileService.writeFile(resource, writeableStream, { progress: byteLength => progressReporter(file.size, byteLength) }); + private async doUploadWebFileEntryBuffered(resource: URI, file: File, progressReporter: (fileSize: number, bytesUploaded: number) => void, token: CancellationToken): Promise { + const writeableStream = newWriteableBufferStream({ + // Set a highWaterMark to prevent the stream + // for file upload to produce large buffers + // in-memory + highWaterMark: 10 + }); + const writeFilePromise = this.fileService.writeFile(resource, writeableStream); // Read the file in chunks using File.stream() web APIs try { @@ -1109,7 +1114,21 @@ export class FileDragAndDrop implements ITreeDragAndDrop { let res = await reader.read(); while (!res.done) { - writeableStream.write(VSBuffer.wrap(res.value)); + if (token.isCancellationRequested) { + return undefined; + } + + // Write buffer into stream but make sure to wait + // in case the highWaterMark is reached + const buffer = VSBuffer.wrap(res.value); + await writeableStream.write(buffer); + + if (token.isCancellationRequested) { + return undefined; + } + + // Report progress + progressReporter(file.size, buffer.byteLength); res = await reader.read(); } @@ -1118,17 +1137,21 @@ export class FileDragAndDrop implements ITreeDragAndDrop { writeableStream.end(error); } + if (token.isCancellationRequested) { + return undefined; + } + // Wait for file being written to target await writeFilePromise; } - private doUploadWebFileEntryUnbuffered(resource: URI, file: File, progressReporter: (fileSize: number, bytesUploaded: number) => void): Promise { + private doUploadWebFileEntryUnbuffered(resource: URI, file: File): Promise { return new Promise((resolve, reject) => { const reader = new FileReader(); reader.onload = async event => { try { if (event.target?.result instanceof ArrayBuffer) { - await this.fileService.writeFile(resource, VSBuffer.wrap(new Uint8Array(event.target.result)), { progress: byteLength => progressReporter(file.size, byteLength) }); + await this.fileService.writeFile(resource, VSBuffer.wrap(new Uint8Array(event.target.result))); } else { throw new Error('Could not read from dropped file.'); }