Skip to content

Commit

Permalink
feat(connection): added timeout property and timeout event
Browse files Browse the repository at this point in the history
  • Loading branch information
fenying committed Oct 25, 2023
1 parent c009f92 commit 46119e1
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- feat(protocol): added unix socket supports.
- feat(protocol): accepts socket factory function to produce sockets.
- feat(connection): added ended/finished properties and finished event.
- feat(connection): added timeout property and timeout event.
- feat(encoder): accepts string as data to send.
- feat(connector): added `IConnection.writable` property.

Expand Down
1 change: 1 addition & 0 deletions src/examples/tcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import * as LwDFX from '../../lib';
const server = LwDFX.createServer({
alpWhitelist: ['b1', 'b2'],
maxFrameSize: 1024,
timeout: 500,
});

const tcpGateway = LwDFX.Tcp.createGateway(server, {
Expand Down
1 change: 1 addition & 0 deletions src/examples/tls/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import * as FS from 'node:fs';
const server = LwDFX.createServer({
alpWhitelist: ['b1', 'b2'],
maxFrameSize: 1024,
timeout: 500,
});

const gateway = LwDFX.Tls.createGateway(server, {
Expand Down
1 change: 1 addition & 0 deletions src/examples/unixsocket/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import * as LwDFX from '../../lib';
const server = LwDFX.createServer({
alpWhitelist: ['b1', 'b2'],
maxFrameSize: 1024,
timeout: 500,
});

const socketGateway = LwDFX.UnixSocket.createGateway(server, {
Expand Down
43 changes: 29 additions & 14 deletions src/lib/AbstractConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export abstract class AbstractConnection extends $Events.EventEmitter implements

public constructor(
socket: $Net.Socket,
public timeout: number
private _timeout: number
) {

super();
Expand All @@ -61,6 +61,22 @@ export abstract class AbstractConnection extends $Events.EventEmitter implements
socket.setNoDelay(true);
}

public get timeout(): number {

return this._timeout;
}

public set timeout(v: number) {

if (!Number.isSafeInteger(v) || v < 0) {

throw new LwDFXError('invalid_timeout', 'Invalid timeout value.');
}

this._timeout = v;
this._socket?.setTimeout(v);
}

public get finished(): boolean {

return this._socket?.writableFinished ?? true;
Expand Down Expand Up @@ -172,8 +188,9 @@ export abstract class AbstractConnection extends $Events.EventEmitter implements

this._socket!
.removeAllListeners()
.setTimeout(this.timeout, () => {
.on('timeout', () => {

this.emit('timeout');
this._socket?.destroy(new LwDFXError('timeout', 'Connection timeout'));
})
.on('close', () => {
Expand Down Expand Up @@ -201,6 +218,11 @@ export abstract class AbstractConnection extends $Events.EventEmitter implements
this.destroy();
}
});

if (this._timeout) {

this._socket!.setTimeout(this._timeout);
}
}

protected abstract _handshake(
Expand Down Expand Up @@ -237,23 +259,18 @@ export abstract class AbstractConnection extends $Events.EventEmitter implements

callback = once(callback);

const timer = setTimeout(() => {

callback(new LwDFXError('timeout', 'Handshake timeout'));
if (handshakeTimeout > 0) {

}, handshakeTimeout);
this._socket!.setTimeout(handshakeTimeout, () => {

this._socket!.setTimeout(handshakeTimeout, () => {

this._socket!.destroy(new LwDFXError('timeout', 'Handshake timeout'));
});
this._socket!.destroy(new LwDFXError('timeout', 'Handshake timeout'));
});
}

this._socket!.on('close', () => {

callback(new LwDFXError('conn_lost', 'Connection closed'));

clearTimeout(timer);

this._socket?.removeAllListeners();
this._socket = null;
});
Expand All @@ -272,8 +289,6 @@ export abstract class AbstractConnection extends $Events.EventEmitter implements
return;
}

clearTimeout(timer);

this._setupSocket();

callback(null, this);
Expand Down
15 changes: 12 additions & 3 deletions src/lib/Decl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ export interface IConnection {
*/
readonly ended: boolean;

/**
* The timeout in milliseconds for the connection.
*/
timeout: number;

/**
* Register a callback for the `frame` event.
*
Expand All @@ -99,7 +104,7 @@ export interface IConnection {
* @param event The event name.
* @param callback The callback function.
*/
on(event: 'end' | 'finish', callback: () => void): this;
on(event: 'end' | 'finish' | 'timeout', callback: () => void): this;

/**
* Register a callback for the `error` event.
Expand Down Expand Up @@ -159,7 +164,9 @@ export interface IConnectOptions {
alpWhitelist?: string[];

/**
* The timeout in milliseconds for the connections.
* The timeout in milliseconds for the connections after connection is established.
*
* > Timeout means the connection is idle for a long time, and the connection will be closed.
*
* > Set to `0` to disable the timeout.
*
Expand All @@ -169,7 +176,9 @@ export interface IConnectOptions {
timeout?: number;

/**
* The timeout in milliseconds for the handshake process.
* The timeout in milliseconds during the handshake process.
*
* > Timeout means the connection is idle for a long time, and the connection will be closed.
*
* > Set to `0` to disable the timeout.
*
Expand Down
19 changes: 17 additions & 2 deletions src/lib/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,29 @@ class LwDFXServer extends $Events.EventEmitter implements D.IServer {
public constructor(
public alpWhitelist: string[],
public maxConnections: number,
public timeout: number,
private _timeout: number,
public handshakeTimeout: number,
public maxFrameSize: number,
) {

super();
}

public get timeout(): number {

return this._timeout;
}

public set timeout(v: number) {

if (!Number.isSafeInteger(v) || v < 0) {

throw new LwDFXError('invalid_timeout', 'Invalid timeout value.');
}

this._timeout = v;
}

public get connections(): number {

return Object.keys(this._conns).length;
Expand Down Expand Up @@ -85,7 +100,7 @@ class LwDFXServer extends $Events.EventEmitter implements D.IServer {
return;
}

const conn = new ServerConnection(this._generateNextId(), socket, this.timeout, this.maxFrameSize);
const conn = new ServerConnection(this._generateNextId(), socket, this._timeout, this.maxFrameSize);

conn.setup(this.alpWhitelist, this.handshakeTimeout, (err) => {

Expand Down
18 changes: 17 additions & 1 deletion src/lib/Tcp/TcpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ export interface ITcpClientOptions extends D.IConnectOptions {
* @default null
*/
socket?: $Net.Socket | D.ISocketFactory | null;

/**
* The timeout for connecting to the server, in milliseconds.
*
* @default 30000
*/
connectTimeout?: number;
}

function netConnect(opts: ITcpClientOptions): Promise<$Net.Socket> {
Expand All @@ -72,7 +79,6 @@ function netConnect(opts: ITcpClientOptions): Promise<$Net.Socket> {
const socket = $Net.connect({
'host': opts.hostname ?? C.DEFAULT_HOSTNAME,
'port': opts.port ?? C.DEFAULT_PORT,
'timeout': opts.handshakeTimeout ?? Constants.DEFAULT_HANDSHAKE_TIMEOUT,
}, () => {

socket.removeAllListeners('error');
Expand All @@ -82,6 +88,16 @@ function netConnect(opts: ITcpClientOptions): Promise<$Net.Socket> {
resolve(socket);
});

const connectTimeout = opts.connectTimeout ?? C.DEFAULT_CONNECT_TIMEOUT;

if (connectTimeout) {

socket.setTimeout(connectTimeout, () => {

socket.destroy(new LwDFXError('connect_timeout', 'Timeout for connecting to remote server'));
});
}

socket.on('error', (e) => {

socket.removeAllListeners('error');
Expand Down
7 changes: 7 additions & 0 deletions src/lib/Tcp/TcpCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,10 @@ export const DEFAULT_HOSTNAME: string = 'localhost';
* @type uint32
*/
export const DEFAULT_BACKLOG: number = 1023;

/**
* The default timeout for connecting to the server, in milliseconds.
*
* @type uint32
*/
export const DEFAULT_CONNECT_TIMEOUT: number = 30_000;
18 changes: 17 additions & 1 deletion src/lib/Tls/TlsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ export interface ITlsClientOptions extends D.IConnectOptions {
*/
socket?: $Tls.TLSSocket | D.ISocketFactory | null;

/**
* The timeout for connecting to the server, in milliseconds.
*
* @default 30000
*/
connectTimeout?: number;

/**
* The TLS options for new connections.
*/
Expand All @@ -76,7 +83,6 @@ function netConnect(opts: ITlsClientOptions): Promise<$Net.Socket> {
const socket = $Tls.connect(opts.port ?? C.DEFAULT_PORT, opts.hostname ?? C.DEFAULT_HOSTNAME, {
// eslint-disable-next-line @typescript-eslint/naming-convention
'ALPNProtocols': [C.DEFAULT_ALPN_PROTOCOL],
'timeout': opts.handshakeTimeout ?? Constants.DEFAULT_HANDSHAKE_TIMEOUT,
...(opts.tlsOptions ?? {})
}, () => {

Expand All @@ -87,6 +93,16 @@ function netConnect(opts: ITlsClientOptions): Promise<$Net.Socket> {
resolve(socket);
});

const connectTimeout = opts.connectTimeout ?? C.DEFAULT_CONNECT_TIMEOUT;

if (connectTimeout) {

socket.setTimeout(connectTimeout, () => {

socket.destroy(new LwDFXError('connect_timeout', 'Timeout for connecting to remote server'));
});
}

socket.on('error', (e) => {

socket.removeAllListeners('error');
Expand Down
7 changes: 7 additions & 0 deletions src/lib/Tls/TlsCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ export const DEFAULT_BACKLOG: number = 1023;
* The default TLS ALPN protocol of LwDFXv1.
*/
export const DEFAULT_ALPN_PROTOCOL: string = 'lwdfx1';

/**
* The default timeout for connecting to the server, in milliseconds.
*
* @type uint32
*/
export const DEFAULT_CONNECT_TIMEOUT: number = 30_000;

0 comments on commit 46119e1

Please sign in to comment.