Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy process.on("exit") to avoid MaxListenersExceededWarning #42

Merged
merged 9 commits into from
Apr 14, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,4 @@

- Upgrade dependencies.
- **BREAKING:** Drop support for node 13.
- Prevent Node.js max listeners exceeded warnings if many `fs-capacitor` `ReadStream` instances are created at the same time, fixing [#30](https://github.com/mike-marcacci/fs-capacitor/issues/30) via [#42](https://github.com/mike-marcacci/fs-capacitor/pull/42).
30 changes: 18 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import fs from "fs";
import os from "os";
import path from "path";
import { Readable, ReadableOptions, Writable, WritableOptions } from "stream";
import { EventEmitter } from "events";

export class ReadAfterDestroyedError extends Error {}
export class ReadAfterReleasedError extends Error {}
Expand All @@ -12,6 +13,14 @@ export interface ReadStreamOptions {
encoding?: ReadableOptions["encoding"];
}

// Use a “proxy” event emitter configured to have an infinite maximum number of
// listeners to prevent Node.js max listeners exceeded warnings if many
// `fs-capacitor` `ReadStream` instances are created at the same time. See:
// https://github.com/mike-marcacci/fs-capacitor/issues/30
const processExitProxy = new EventEmitter();
processExitProxy.setMaxListeners(Infinity);
process.once("exit", () => processExitProxy.emit("exit"));

export class ReadStream extends Readable {
private _pos: number = 0;
private _writeStream: WriteStream;
Expand Down Expand Up @@ -66,13 +75,13 @@ export class ReadStream extends Readable {

// Otherwise, wait for the write stream to add more data or finish.
const retry = (): void => {
this._writeStream.removeListener("finish", retry);
this._writeStream.removeListener("write", retry);
this._writeStream.off("finish", retry);
this._writeStream.off("write", retry);
this._read(n);
};

this._writeStream.addListener("finish", retry);
this._writeStream.addListener("write", retry);
this._writeStream.on("finish", retry);
this._writeStream.on("write", retry);
}
);
}
Expand Down Expand Up @@ -117,7 +126,7 @@ export class WriteStream extends Writable {
}

// Cleanup when the process exits or is killed.
process.addListener("exit", this._cleanupSync);
processExitProxy.once("exit", this._cleanupSync);

this._fd = fd;
this.emit("ready");
Expand All @@ -126,7 +135,7 @@ export class WriteStream extends Writable {
}

_cleanupSync = (): void => {
process.removeListener("exit", this._cleanupSync);
processExitProxy.off("exit", this._cleanupSync);

if (typeof this._fd === "number")
try {
Expand Down Expand Up @@ -208,7 +217,7 @@ export class WriteStream extends Writable {

// We avoid removing this until now in case an exit occurs while
// asyncronously cleaning up.
process.removeListener("exit", this._cleanupSync);
processExitProxy.off("exit", this._cleanupSync);
callback(unlinkError || closeError || error);
});
});
Expand All @@ -232,16 +241,13 @@ export class WriteStream extends Writable {
const readStream = new ReadStream(this, options);
this._readStreams.add(readStream);

const remove = (): void => {
readStream.removeListener("close", remove);
readStream.once("close", (): void => {
this._readStreams.delete(readStream);

if (this._released && this._readStreams.size === 0) {
this.destroy();
}
};

readStream.addListener("close", remove);
});

return readStream;
}
Expand Down