Skip to content

Commit

Permalink
feat: 🎸 make basic WriteStream work
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Jun 20, 2023
1 parent a568afd commit 69281ff
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 81 deletions.
105 changes: 41 additions & 64 deletions src/fsa-to-node/FsaNodeWriteStream.ts
@@ -1,6 +1,6 @@
import { Writable } from 'stream';
import { Defer } from 'thingies/es6/Defer';
import { codeMutex } from 'thingies/es6/codeMutex';
import { concurrency } from 'thingies/es6/concurrency';
import type { IFileSystemFileHandle, IFileSystemWritableFileStream } from '../fsa/types';
import type { IWriteStream } from '../node/types/misc';
import type { IWriteStreamOptions } from '../node/types/options';
Expand All @@ -23,9 +23,9 @@ import type { IWriteStreamOptions } from '../node/types/options';
*/
export class FsaNodeWriteStream extends Writable implements IWriteStream {
protected __pending__: boolean = true;
protected __stream__: Promise<IFileSystemWritableFileStream>;
protected __closed__: boolean = false;
protected readonly __mutex__ = codeMutex();
protected readonly __stream__: Promise<IFileSystemWritableFileStream>;
protected readonly __mutex__ = concurrency(1);

public constructor(
handle: Promise<IFileSystemFileHandle>,
Expand All @@ -45,6 +45,35 @@ export class FsaNodeWriteStream extends Writable implements IWriteStream {
});
}

private async ___write___(buffers: Buffer[]): Promise<void> {
await this.__mutex__(async () => {
if (this.__closed__) return;
// if (this.__closed__) throw new Error('WriteStream is closed');
const writable = await this.__stream__;
for (const buffer of buffers) {
await writable.write(buffer);
}
});
}

private async __close__(): Promise<void> {
await this.__mutex__(async () => {
if (this.__closed__) {
process.nextTick(() => this.emit('close'));
return;
}
try {
const writable = await this.__stream__;
this.__closed__ = true;
await writable.close();
this.emit('close');
} catch (error) {
this.emit('error', error);
this.emit('close', error);
}
});
}

// ------------------------------------------------------------- IWriteStream

public get bytesWritten(): number {
Expand All @@ -57,77 +86,25 @@ export class FsaNodeWriteStream extends Writable implements IWriteStream {

public close(cb): void {
if (cb) this.once('close', cb);
if (this.__closed__) {
process.nextTick(() => this.emit('close'));
return;
}
this.__closed__ = true;
(async () => {
try {
const writable = await this.__stream__;
this.__mutex__(async () => {
console.log('closing')
await writable.close();
});
this.emit('close');
} catch (error) {
this.emit('error', error);
this.emit('close', error);
}
})().catch(() => {});
this.__close__().catch(() => {});
}

// ----------------------------------------------------------------- Writable

private async ___write___(buffers: Buffer[]): Promise<void> {
const writable = await this.__stream__;
this.__mutex__(async () => {
for (const buffer of buffers) {
try {
console.log(1);
await writable.write(buffer);
console.log(2);
} catch (error) {
console.log('err', error);
}
}
});
}

_write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void {
(async () => {
try {
await this.___write___([chunk]);
callback(null);
} catch (error) {
callback(error);
}
})().catch(() => {});
this.___write___([chunk])
.then(() => callback(null))
.catch(error => callback(error));
}

_writev(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void): void {
(async () => {
try {
const buffers = chunks.map(({chunk}) => chunk);
await this.___write___(buffers);
callback(null);
} catch (error) {
callback(error);
}
})().catch(() => {});
const buffers = chunks.map(({chunk}) => chunk);
this.___write___(buffers)
.then(() => callback(null))
.catch(error => callback(error));
}

_final(callback: (error?: Error | null) => void): void {
(async () => {
try {
const writable = await this.__stream__;
this.__mutex__(async () => {
await writable.close();
});
callback(null);
} catch (error) {
callback(error);
}
})().catch(() => {});
this.__close__().then(() => callback(null)).catch(error => callback(error));
}
}
22 changes: 13 additions & 9 deletions src/fsa-to-node/__tests__/FsaNodeFs.test.ts
Expand Up @@ -633,15 +633,19 @@ describe('.read()', () => {
});

describe('.createWriteStream()', () => {
test.only('can use stream to write to a file', async () => {
test('can use stream to write to a new file', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const stream = fs.createWriteStream('/folder/file');
stream.write('a');
const onClose = new Promise(resolve => stream.on('close', (err) => {
resolve(err);
}));
stream.close();
await onClose;
console.log(mfs.__vol.toJSON());
const stream = fs.createWriteStream('/folder/file2');
stream.write(Buffer.from('A'));
stream.write(Buffer.from('BC'));
stream.write(Buffer.from('DEF'));
stream.end();
await new Promise(resolve => stream.once('close', resolve));
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'test',
'/mountpoint/folder/file2': 'ABCDEF',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});
});
9 changes: 1 addition & 8 deletions src/node-to-fsa/NodeFileSystemWritableFileStream.ts
Expand Up @@ -120,15 +120,10 @@ export class NodeFileSystemWritableFileStream extends WritableStream implements
}

protected async writeBase(chunk: Data): Promise<void> {
console.log('base')
const writer = this.getWriter();
console.log('writer')
try {
console.log('write', chunk)
await writer.write(new Uint8Array([123]));
console.log('done')
await writer.write(chunk);
} finally {
console.log('finally');
writer.releaseLock();
}
}
Expand All @@ -153,8 +148,6 @@ export class NodeFileSystemWritableFileStream extends WritableStream implements
return this.writeBase(params);
default: {
if (ArrayBuffer.isView(params)) {

console.log('ERIT', params)
return this.writeBase(params);
}
else {
Expand Down

0 comments on commit 69281ff

Please sign in to comment.