From c95b07925db8a1a56efc2886b1b57d68bdb4e8b2 Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Wed, 10 Jul 2019 19:58:17 +0200 Subject: [PATCH] files - first cut allow stream for writeFile --- src/vs/base/common/buffer.ts | 6 ++ .../services/files/common/fileService.ts | 85 +++++++++++++------ .../files/test/node/diskFileService.test.ts | 44 +++++++++- 3 files changed, 108 insertions(+), 27 deletions(-) diff --git a/src/vs/base/common/buffer.ts b/src/vs/base/common/buffer.ts index 7b4e9cc8d6686..74f515650895f 100644 --- a/src/vs/base/common/buffer.ts +++ b/src/vs/base/common/buffer.ts @@ -182,6 +182,12 @@ export interface VSBufferReadableStream { destroy(): void; } +export function isVSBufferReadableStream(obj: any): obj is VSBufferReadableStream { + const candidate: VSBufferReadableStream = obj; + + return candidate && [candidate.on, candidate.pause, candidate.resume, candidate.destroy].every(fn => typeof fn === 'function'); +} + /** * Helper to fully read a VSBuffer readable into a single buffer. */ diff --git a/src/vs/workbench/services/files/common/fileService.ts b/src/vs/workbench/services/files/common/fileService.ts index 84cd11315cb76..38063562380b3 100644 --- a/src/vs/workbench/services/files/common/fileService.ts +++ b/src/vs/workbench/services/files/common/fileService.ts @@ -14,7 +14,7 @@ import { TernarySearchTree } from 'vs/base/common/map'; import { isNonEmptyArray, coalesce } from 'vs/base/common/arrays'; import { getBaseLabel } from 'vs/base/common/labels'; import { ILogService } from 'vs/platform/log/common/log'; -import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, writeableBufferStream, VSBufferWriteableStream } from 'vs/base/common/buffer'; +import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, writeableBufferStream, VSBufferWriteableStream, isVSBufferReadableStream } from 'vs/base/common/buffer'; import { Queue } from 'vs/base/common/async'; import { CancellationTokenSource, CancellationToken } from 'vs/base/common/cancellation'; import { Schemas } from 'vs/base/common/network'; @@ -281,7 +281,7 @@ export class FileService extends Disposable implements IFileService { return fileStat; } - async writeFile(resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable, options?: IWriteFileOptions): Promise { + async writeFile(resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream, options?: IWriteFileOptions): Promise { const provider = this.throwIfFileSystemIsReadonly(await this.withReadWriteProvider(resource)); try { @@ -296,12 +296,12 @@ export class FileService extends Disposable implements IFileService { // write file: buffered if (hasOpenReadWriteCloseCapability(provider)) { - await this.doWriteBuffered(provider, resource, bufferOrReadable instanceof VSBuffer ? bufferToReadable(bufferOrReadable) : bufferOrReadable); + await this.doWriteBuffered(provider, resource, bufferOrReadableOrStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStream) : bufferOrReadableOrStream); } // write file: unbuffered else { - await this.doWriteUnbuffered(provider, resource, bufferOrReadable); + await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStream); } } catch (error) { throw new FileOperationError(localize('err.write', "Unable to write file ({0})", this.ensureError(error).toString()), toFileOperationResult(error), options); @@ -857,29 +857,60 @@ export class FileService extends Disposable implements IFileService { return isPathCaseSensitive ? resource.toString() : resource.toString().toLowerCase(); } - private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readable: VSBufferReadable): Promise { - return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteBufferedQueued(provider, resource, readable)); - } + private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStream: VSBufferReadable | VSBufferReadableStream): Promise { + return this.ensureWriteQueue(provider, resource).queue(async () => { - private async doWriteBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readable: VSBufferReadable): Promise { + // open handle + const handle = await provider.open(resource, { create: true }); - // open handle - const handle = await provider.open(resource, { create: true }); + // write into handle until all bytes from buffer have been written + try { + if (isVSBufferReadableStream(readableOrStream)) { + await this.doWriteStreamBufferedQueued(provider, handle, readableOrStream); + } else { + await this.doWriteReadableBufferedQueued(provider, handle, readableOrStream); + } + } catch (error) { + throw this.ensureError(error); + } finally { - // write into handle until all bytes from buffer have been written - try { + // close handle always + await provider.close(handle); + } + }); + } + + private doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, stream: VSBufferReadableStream): Promise { + return new Promise((resolve, reject) => { let posInFile = 0; - let chunk: VSBuffer | null; - while (chunk = readable.read()) { - await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); + stream.on('data', async chunk => { + stream.pause(); + + try { + await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); + } catch (error) { + reject(error); + } posInFile += chunk.byteLength; - } - } catch (error) { - throw this.ensureError(error); - } finally { - await provider.close(handle); + + stream.resume(); + }); + + stream.on('error', error => reject(error)); + stream.on('end', () => resolve()); + }); + } + + private async doWriteReadableBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, readable: VSBufferReadable): Promise { + let posInFile = 0; + + let chunk: VSBuffer | null; + while (chunk = readable.read()) { + await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); + + posInFile += chunk.byteLength; } } @@ -891,16 +922,18 @@ export class FileService extends Disposable implements IFileService { } } - private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable): Promise { - return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadable)); + private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream): Promise { + return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStream)); } - private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable): Promise { + private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream): Promise { let buffer: VSBuffer; - if (bufferOrReadable instanceof VSBuffer) { - buffer = bufferOrReadable; + if (bufferOrReadableOrStream instanceof VSBuffer) { + buffer = bufferOrReadableOrStream; + } else if (isVSBufferReadableStream(bufferOrReadableOrStream)) { + buffer = await streamToBuffer(bufferOrReadableOrStream); } else { - buffer = readableToBuffer(bufferOrReadable); + buffer = readableToBuffer(bufferOrReadableOrStream); } return provider.writeFile(resource, buffer.buffer, { create: true, overwrite: true }); diff --git a/src/vs/workbench/services/files/test/node/diskFileService.test.ts b/src/vs/workbench/services/files/test/node/diskFileService.test.ts index 7c563ae1c9d51..d74db006d1b1f 100644 --- a/src/vs/workbench/services/files/test/node/diskFileService.test.ts +++ b/src/vs/workbench/services/files/test/node/diskFileService.test.ts @@ -20,7 +20,7 @@ import { NullLogService } from 'vs/platform/log/common/log'; import { isLinux, isWindows } from 'vs/base/common/platform'; import { DisposableStore } from 'vs/base/common/lifecycle'; import { isEqual } from 'vs/base/common/resources'; -import { VSBuffer, VSBufferReadable } from 'vs/base/common/buffer'; +import { VSBuffer, VSBufferReadable, bufferToStream } from 'vs/base/common/buffer'; function getByName(root: IFileStat, name: string): IFileStat | null { if (root.children === undefined) { @@ -1545,6 +1545,48 @@ suite('Disk File Service', () => { assert.equal(readFileSync(resource.fsPath), newContent); }); + test('writeFile (large file - stream) - buffered', async () => { + setCapabilities(fileProvider, FileSystemProviderCapabilities.FileOpenReadWriteClose); + + const resource = URI.file(join(testDir, 'lorem.txt')); + + const content = readFileSync(resource.fsPath); + const newContent = content.toString() + content.toString(); + + const fileStat = await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent))); + assert.equal(fileStat.name, 'lorem.txt'); + + assert.equal(readFileSync(resource.fsPath), newContent); + }); + + test('writeFile (stream) - unbuffered', async () => { + setCapabilities(fileProvider, FileSystemProviderCapabilities.FileReadWrite); + + const resource = URI.file(join(testDir, 'small.txt')); + + const content = readFileSync(resource.fsPath); + assert.equal(content, 'Small File'); + + const newContent = 'Updates to the small file'; + await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent))); + + assert.equal(readFileSync(resource.fsPath), newContent); + }); + + test('writeFile (large file - stream) - unbuffered', async () => { + setCapabilities(fileProvider, FileSystemProviderCapabilities.FileReadWrite); + + const resource = URI.file(join(testDir, 'lorem.txt')); + + const content = readFileSync(resource.fsPath); + const newContent = content.toString() + content.toString(); + + const fileStat = await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent))); + assert.equal(fileStat.name, 'lorem.txt'); + + assert.equal(readFileSync(resource.fsPath), newContent); + }); + test('writeFile (file is created including parents)', async () => { const resource = URI.file(join(testDir, 'other', 'newfile.txt'));