Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/vs/base/common/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ export interface VSBufferReadableStream {
destroy(): void;
}

export function isVSBufferReadableStream(obj: any): obj is VSBufferReadableStream {
const candidate: VSBufferReadableStream = obj;

return candidate && [candidate.on, candidate.pause, candidate.resume, candidate.destroy].every(fn => typeof fn === 'function');
}

/**
* Helper to fully read a VSBuffer readable into a single buffer.
*/
Expand Down
85 changes: 59 additions & 26 deletions src/vs/workbench/services/files/common/fileService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { TernarySearchTree } from 'vs/base/common/map';
import { isNonEmptyArray, coalesce } from 'vs/base/common/arrays';
import { getBaseLabel } from 'vs/base/common/labels';
import { ILogService } from 'vs/platform/log/common/log';
import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, writeableBufferStream, VSBufferWriteableStream } from 'vs/base/common/buffer';
import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, bufferToStream, VSBufferReadableStream, writeableBufferStream, VSBufferWriteableStream, isVSBufferReadableStream } from 'vs/base/common/buffer';
import { Queue } from 'vs/base/common/async';
import { CancellationTokenSource, CancellationToken } from 'vs/base/common/cancellation';
import { Schemas } from 'vs/base/common/network';
Expand Down Expand Up @@ -281,7 +281,7 @@ export class FileService extends Disposable implements IFileService {
return fileStat;
}

async writeFile(resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable, options?: IWriteFileOptions): Promise<IFileStatWithMetadata> {
async writeFile(resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream, options?: IWriteFileOptions): Promise<IFileStatWithMetadata> {
const provider = this.throwIfFileSystemIsReadonly(await this.withReadWriteProvider(resource));

try {
Expand All @@ -296,12 +296,12 @@ export class FileService extends Disposable implements IFileService {

// write file: buffered
if (hasOpenReadWriteCloseCapability(provider)) {
await this.doWriteBuffered(provider, resource, bufferOrReadable instanceof VSBuffer ? bufferToReadable(bufferOrReadable) : bufferOrReadable);
await this.doWriteBuffered(provider, resource, bufferOrReadableOrStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStream) : bufferOrReadableOrStream);
}

// write file: unbuffered
else {
await this.doWriteUnbuffered(provider, resource, bufferOrReadable);
await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStream);
}
} catch (error) {
throw new FileOperationError(localize('err.write', "Unable to write file ({0})", this.ensureError(error).toString()), toFileOperationResult(error), options);
Expand Down Expand Up @@ -857,29 +857,60 @@ export class FileService extends Disposable implements IFileService {
return isPathCaseSensitive ? resource.toString() : resource.toString().toLowerCase();
}

private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readable: VSBufferReadable): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteBufferedQueued(provider, resource, readable));
}
private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStream: VSBufferReadable | VSBufferReadableStream): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(async () => {

private async doWriteBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readable: VSBufferReadable): Promise<void> {
// open handle
const handle = await provider.open(resource, { create: true });

// open handle
const handle = await provider.open(resource, { create: true });
// write into handle until all bytes from buffer have been written
try {
if (isVSBufferReadableStream(readableOrStream)) {
await this.doWriteStreamBufferedQueued(provider, handle, readableOrStream);
} else {
await this.doWriteReadableBufferedQueued(provider, handle, readableOrStream);
}
} catch (error) {
throw this.ensureError(error);
} finally {

// write into handle until all bytes from buffer have been written
try {
// close handle always
await provider.close(handle);
}
});
}

private doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, stream: VSBufferReadableStream): Promise<void> {
return new Promise((resolve, reject) => {
let posInFile = 0;

let chunk: VSBuffer | null;
while (chunk = readable.read()) {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
stream.on('data', async chunk => {
stream.pause();

try {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
} catch (error) {
reject(error);
}

posInFile += chunk.byteLength;
}
} catch (error) {
throw this.ensureError(error);
} finally {
await provider.close(handle);

stream.resume();
});

stream.on('error', error => reject(error));
stream.on('end', () => resolve());
});
}

private async doWriteReadableBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, readable: VSBufferReadable): Promise<void> {
let posInFile = 0;

let chunk: VSBuffer | null;
while (chunk = readable.read()) {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);

posInFile += chunk.byteLength;
}
}

Expand All @@ -891,16 +922,18 @@ export class FileService extends Disposable implements IFileService {
}
}

private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadable));
private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream): Promise<void> {
return this.ensureWriteQueue(provider, resource).queue(() => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStream));
}

private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadable: VSBuffer | VSBufferReadable): Promise<void> {
private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: VSBuffer | VSBufferReadable | VSBufferReadableStream): Promise<void> {
let buffer: VSBuffer;
if (bufferOrReadable instanceof VSBuffer) {
buffer = bufferOrReadable;
if (bufferOrReadableOrStream instanceof VSBuffer) {
buffer = bufferOrReadableOrStream;
} else if (isVSBufferReadableStream(bufferOrReadableOrStream)) {
buffer = await streamToBuffer(bufferOrReadableOrStream);
} else {
buffer = readableToBuffer(bufferOrReadable);
buffer = readableToBuffer(bufferOrReadableOrStream);
}

return provider.writeFile(resource, buffer.buffer, { create: true, overwrite: true });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { NullLogService } from 'vs/platform/log/common/log';
import { isLinux, isWindows } from 'vs/base/common/platform';
import { DisposableStore } from 'vs/base/common/lifecycle';
import { isEqual } from 'vs/base/common/resources';
import { VSBuffer, VSBufferReadable } from 'vs/base/common/buffer';
import { VSBuffer, VSBufferReadable, bufferToStream } from 'vs/base/common/buffer';

function getByName(root: IFileStat, name: string): IFileStat | null {
if (root.children === undefined) {
Expand Down Expand Up @@ -1545,6 +1545,48 @@ suite('Disk File Service', () => {
assert.equal(readFileSync(resource.fsPath), newContent);
});

test('writeFile (large file - stream) - buffered', async () => {
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileOpenReadWriteClose);

const resource = URI.file(join(testDir, 'lorem.txt'));

const content = readFileSync(resource.fsPath);
const newContent = content.toString() + content.toString();

const fileStat = await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent)));
assert.equal(fileStat.name, 'lorem.txt');

assert.equal(readFileSync(resource.fsPath), newContent);
});

test('writeFile (stream) - unbuffered', async () => {
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileReadWrite);

const resource = URI.file(join(testDir, 'small.txt'));

const content = readFileSync(resource.fsPath);
assert.equal(content, 'Small File');

const newContent = 'Updates to the small file';
await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent)));

assert.equal(readFileSync(resource.fsPath), newContent);
});

test('writeFile (large file - stream) - unbuffered', async () => {
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileReadWrite);

const resource = URI.file(join(testDir, 'lorem.txt'));

const content = readFileSync(resource.fsPath);
const newContent = content.toString() + content.toString();

const fileStat = await service.writeFile(resource, bufferToStream(VSBuffer.fromString(newContent)));
assert.equal(fileStat.name, 'lorem.txt');

assert.equal(readFileSync(resource.fsPath), newContent);
});

test('writeFile (file is created including parents)', async () => {
const resource = URI.file(join(testDir, 'other', 'newfile.txt'));

Expand Down