Skip to content

Commit

Permalink
handle ERR_STREAM_WRITE_AFTER_END
Browse files Browse the repository at this point in the history
  • Loading branch information
panzi committed Apr 10, 2023
1 parent d889252 commit 9be2ffd
Showing 1 changed file with 43 additions and 19 deletions.
62 changes: 43 additions & 19 deletions src/index.ts
Expand Up @@ -381,7 +381,7 @@ export class CarbonClient {
private _sendBufferOffset: number = 0;
private _sendIntervalTimer: NodeJS.Timeout|null = null;
private _connectionRetryTimer: NodeJS.Timeout|null = null;
private _connectionCallbacks: ((error?: Error) => void)[]|null = null;
private _connectionCallbacks: ((error: Error|null, retryCount: number) => void)[]|null = null;

/**
*
Expand Down Expand Up @@ -538,7 +538,7 @@ export class CarbonClient {
}

connect(): Promise<void>;
connect(callback: (error?: Error) => void): void;
connect(callback: (error: Error|null, retryCount: number) => void): void;

/**
* Connect client to server.
Expand All @@ -561,7 +561,7 @@ export class CarbonClient {
* the operation. (*should never happen*)
* @throws `Error` based on the errors that can be thrown by the underlying used NodeJS APIs.
*/
connect(callback?: (error?: Error) => void): Promise<void>|void {
connect(callback?: (error: Error|null, retryCount: number) => void): Promise<void>|void {
return this._connectWithRetry(0, callback);
}

Expand Down Expand Up @@ -786,7 +786,7 @@ export class CarbonClient {
}

if (this._connectionCallbacks !== null) {
this._connectDone(new Disconnected());
this._connectDone(new Disconnected(), 0);
}

if (!this._socket) {
Expand Down Expand Up @@ -1060,7 +1060,7 @@ export class CarbonClient {
}
}

private _connectDone(error?: Error): void {
private _connectDone(error: Error|null, retryCount: number): void {
if (this._connectionRetryTimer !== null) {
clearTimeout(this._connectionRetryTimer);
this._connectionRetryTimer = null;
Expand All @@ -1070,7 +1070,7 @@ export class CarbonClient {
if (callbacks) {
for (const callback of callbacks) {
try {
callback(error);
callback(error, retryCount);
} catch (error2) {
setImmediate(() =>
this._emit(
Expand All @@ -1082,10 +1082,10 @@ export class CarbonClient {
}

private _connectWithRetry(retryOnError: number): Promise<void>;
private _connectWithRetry(retryOnError: number, callback: (error?: Error) => void): void;
private _connectWithRetry(retryOnError: number, callback?: (error?: Error) => void): Promise<void>|void;
private _connectWithRetry(retryOnError: number, callback: (error: Error|null, retryCount: number) => void): void;
private _connectWithRetry(retryOnError: number, callback?: (error: Error|null, retryCount: number) => void): Promise<void>|void;

private _connectWithRetry(retryOnError: number, callback?: (error?: Error) => void): Promise<void>|void {
private _connectWithRetry(retryOnError: number, callback?: (error: Error|null, retryCount: number) => void): Promise<void>|void {
let retryCount = 0;
const connectCallback = (error?: Error) => {
if (error) {
Expand All @@ -1094,16 +1094,16 @@ export class CarbonClient {
this._connectionRetryTimer = setTimeout(() => {
this._connectionRetryTimer = null;
if (this._socket) {
this._connectDone();
this._connectDone(null, retryCount);
} else if (this._connectionCallbacks !== null) {
this._connect(connectCallback);
}
});
return;
}
return this._connectDone(error);
return this._connectDone(error, retryCount);
}
this._connectDone();
this._connectDone(null, retryCount);
};

if (callback) {
Expand Down Expand Up @@ -1136,13 +1136,36 @@ export class CarbonClient {
if (error) {
if (this._socket && sendRetryCount < this.retryOnError) {
++ sendRetryCount;
setTimeout(() => {
if (this._socket) {
doSend();
} else {
reject(error);
if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') {
try {
if (this._socket instanceof DgramSocket) {
this._socket.close(resolve);
} else if (this._socket instanceof NetSocket) {
this._socket.destroy();
}
} catch (error) {
// return reject(error instanceof Error ? error : new Error(String(error)));
}
}, this.retryTimeout);
this._socket = null;
if (this.autoConnect) {
return this._connectWithRetry(this.retryOnError - sendRetryCount, (error, connectRetryCount) => {
if (error) {
return reject(error);
}
sendRetryCount -= connectRetryCount;
doSend();
});
}
return reject(new NotConnected());
} else {
setTimeout(() => {
if (this._socket) {
doSend();
} else {
reject(error);
}
}, this.retryTimeout);
}
return;
}
return reject(error);
Expand All @@ -1164,10 +1187,11 @@ export class CarbonClient {

if (!this._socket) {
if (this.autoConnect) {
return this._connectWithRetry(this.retryOnError, error => {
return this._connectWithRetry(this.retryOnError, (error, connectRetryCount) => {
if (error) {
return reject(error);
}
sendRetryCount -= connectRetryCount;
doSend();
});
}
Expand Down

0 comments on commit 9be2ffd

Please sign in to comment.