Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: keepalive manager #1865

Merged
merged 16 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ npm test

This will run both `browser` and `node` tests.


### Running specific tests

For example, you can run `node -r esbuild-register --test test/pingTimer.ts`
For example, you can run `node -r esbuild-register --test test/keepaliveManager.ts`

### Browser

Expand Down
6 changes: 3 additions & 3 deletions example.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import mqtt from './src/index'

const client = mqtt.connect('mqtts://test.mosquitto.org', {
keepalive: 10,
port: 8883,
const client = mqtt.connect('mqtt://broker.hivemq.com', {
keepalive: 3,
port: 1883,
reconnectPeriod: 15000,
rejectUnauthorized: false,
})
Expand Down
107 changes: 107 additions & 0 deletions src/lib/KeepaliveManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import type MqttClient from './client'
import getTimer, { type Timer } from './get-timer'
import type { TimerVariant } from './shared'

export default class KeepaliveManager {
private _keepalive: number

private timerId: number

private timer: Timer

private destroyed = false

private counter: number

private client: MqttClient

private _keepaliveTimeoutTimestamp: number

private _intervalEvery: number

/** Timestamp of next keepalive timeout */
get keepaliveTimeoutTimestamp() {
return this._keepaliveTimeoutTimestamp
}

/** Milliseconds of the actual interval */
get intervalEvery() {
return this._intervalEvery
}

get keepalive() {
return this._keepalive
}

constructor(client: MqttClient, variant: TimerVariant) {
this.client = client
this.timer = getTimer(variant)
this.setKeepalive(client.options.keepalive)
}

private clear() {
if (this.timerId) {
this.timer.clear(this.timerId)
this.timerId = null
}
}

/** Change the keepalive */
setKeepalive(value: number) {
// keepalive is in seconds
value *= 1000

if (
// eslint-disable-next-line no-restricted-globals
isNaN(value) ||
value <= 0 ||
value > 2147483647
) {
throw new Error(
`Keepalive value must be an integer between 0 and 2147483647. Provided value is ${value}`,
)
}

this._keepalive = value

this.reschedule()

this.client['log'](`KeepaliveManager: set keepalive to ${value}ms`)
}

destroy() {
this.clear()
this.destroyed = true
}

reschedule() {
if (this.destroyed) {
return
}

this.clear()
this.counter = 0

// https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.5_Keep
const keepAliveTimeout = Math.ceil(this._keepalive * 1.5)

this._keepaliveTimeoutTimestamp = Date.now() + keepAliveTimeout
this._intervalEvery = Math.ceil(this._keepalive / 2)

this.timerId = this.timer.set(() => {
// this should never happen, but just in case
if (this.destroyed) {
return
}

this.counter += 1

// after keepalive seconds, send a pingreq
if (this.counter === 2) {
this.client.sendPing()
} else if (this.counter > 2) {
this.client.onKeepaliveTimeout()
}
}, this._intervalEvery)
}
}
56 changes: 0 additions & 56 deletions src/lib/PingTimer.ts

This file was deleted.

84 changes: 29 additions & 55 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
} from './shared'
import TopicAliasSend from './topic-alias-send'
import { TypedEventEmitter } from './TypedEmitter'
import PingTimer from './PingTimer'
import KeepaliveManager from './KeepaliveManager'
import isBrowser, { isWebWorker } from './is-browser'

const setImmediate =
Expand Down Expand Up @@ -433,10 +433,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public noop: (error?: any) => void

/** Timestamp of last received control packet */
public pingResp: number

public pingTimer: PingTimer
public keepaliveManager: KeepaliveManager

/**
* The connection to the Broker. In browsers env this also have `socket` property
Expand Down Expand Up @@ -572,8 +569,8 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
// map of a subscribe messageId and a topic
this.messageIdToTopic = {}

// Ping timer, setup in _setupPingTimer
this.pingTimer = null
// Keepalive manager, setup in _setupKeepaliveManager
this.keepaliveManager = null
// Is the client connected?
this.connected = false
// Are we disconnecting?
Expand Down Expand Up @@ -660,7 +657,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.log('close :: clearing connackTimer')
clearTimeout(this.connackTimer)

this._destroyPingTimer()
this._destroyKeepaliveManager()

if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
Expand Down Expand Up @@ -1780,7 +1777,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this._setupReconnect()
}

this._destroyPingTimer()
this._destroyKeepaliveManager()

if (done && !this.connected) {
this.log(
Expand Down Expand Up @@ -2064,45 +2061,36 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
}

/**
* _setupPingTimer - setup the ping timer
*
* @api private
* _setupKeepaliveManager - setup the keepalive manager
*/
private _setupPingTimer() {
private _setupKeepaliveManager() {
this.log(
'_setupPingTimer :: keepalive %d (seconds)',
'_setupKeepaliveManager :: keepalive %d (seconds)',
this.options.keepalive,
)

if (!this.pingTimer && this.options.keepalive) {
this.pingTimer = new PingTimer(
this.options.keepalive,
() => {
this._checkPing()
},
if (!this.keepaliveManager && this.options.keepalive) {
this.keepaliveManager = new KeepaliveManager(
this,
this.options.timerVariant,
)
this.pingResp = Date.now()
}
}

private _destroyPingTimer() {
if (this.pingTimer) {
this.log('_destroyPingTimer :: destroying ping timer')
this.pingTimer.destroy()
this.pingTimer = null
private _destroyKeepaliveManager() {
if (this.keepaliveManager) {
this.log('_destroyKeepaliveManager :: destroying keepalive manager')
this.keepaliveManager.destroy()
this.keepaliveManager = null
}
}

/**

* _shiftPingInterval - reschedule the ping interval
*
* @api private
* Reschedule the ping interval
*/
private _shiftPingInterval() {
public reschedulePing() {
if (
this.pingTimer &&
this.keepaliveManager &&
this.options.keepalive &&
this.options.reschedulePings
) {
Expand All @@ -2115,34 +2103,20 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
*/
private _reschedulePing() {
this.log('_reschedulePing :: rescheduling ping')
this.pingTimer.reschedule()
this.keepaliveManager.reschedule()
}

/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
private _checkPing() {
this.log('_checkPing :: checking ping...')
// give 100ms offset to avoid ping timeout when receiving fast responses
const timeSincePing = Date.now() - this.pingResp - 100
if (timeSincePing <= this.options.keepalive * 1000) {
this.log('_checkPing :: ping response received in time')
this._sendPing()
} else {
// do a forced cleanup since socket will be in bad shape
this.emit('error', new Error('Keepalive timeout'))
this.log('_checkPing :: calling _cleanUp with force true')
this._cleanUp(true)
}
}

private _sendPing() {
public sendPing() {
this.log('_sendPing :: sending pingreq')
this._sendPacket({ cmd: 'pingreq' })
}

public onKeepaliveTimeout() {
this.emit('error', new Error('Keepalive timeout'))
this.log('onKeepaliveTimeout :: calling _cleanUp with force true')
this._cleanUp(true)
}

/**
* _resubscribe
* @api private
Expand Down Expand Up @@ -2205,7 +2179,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

this.connackPacket = packet
this.messageIdProvider.clear()
this._setupPingTimer()
this._setupKeepaliveManager()

this.connected = true

Expand Down
14 changes: 7 additions & 7 deletions src/lib/get-timer.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import isBrowser, { isWebWorker, isReactNativeBrowser } from './is-browser'
import { clearTimeout as clearT, setTimeout as setT } from 'worker-timers'
import { clearInterval as clearI, setInterval as setI } from 'worker-timers'
import type { TimerVariant } from './shared'

// dont directly assign globals to class props otherwise this throws in web workers: Uncaught TypeError: Illegal invocation
// See: https://stackoverflow.com/questions/9677985/uncaught-typeerror-illegal-invocation-in-chrome

export interface Timer {
set: typeof setT
clear: typeof clearT
set: typeof setI
clear: typeof clearI
}

const workerTimer: Timer = {
set: setT,
clear: clearT,
set: setI,
clear: clearI,
}

const nativeTimer: Timer = {
set: (func, time) => setTimeout(func, time),
clear: (timerId) => clearTimeout(timerId),
set: (func, time) => setInterval(func, time),
clear: (timerId) => clearInterval(timerId),
}

const getTimer = (variant: TimerVariant): Timer => {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/handlers/connack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const handleConnack: PacketHandler = (client, packet: IConnackPacket) => {
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
client['_shiftPingInterval']()
}

if (packet.properties.maximumPacketSize) {
if (!options.properties) {
options.properties = {}
Expand Down
Loading
Loading