Skip to content

Commit

Permalink
fix(browser): use worker timers to prevent unexpected client close (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando committed Dec 4, 2023
1 parent a50e85c commit 35448f3
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 38 deletions.
7 changes: 4 additions & 3 deletions examples/vite-example/src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import { ref } from 'vue'
import mqtt from 'mqtt'
console.log('mqtt', mqtt)
const connected = ref(false)
const client = mqtt.connect('wss://test.mosquitto.org:8081');
const client = mqtt.connect('wss://test.mosquitto.org:8081', {
log: console.log.bind(console),
keepalive: 30,
});
const messages = ref([])
Expand Down
151 changes: 131 additions & 20 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
"reinterval": "^1.1.0",
"rfdc": "^1.3.0",
"split2": "^4.2.0",
"worker-timers": "^7.0.78",
"ws": "^8.14.2"
},
"devDependencies": {
Expand Down
39 changes: 39 additions & 0 deletions src/lib/PingTimer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { clearTimeout as clearT, setTimeout as setT } from 'worker-timers'
import isBrowser from './is-browser'

export default class PingTimer {
private keepalive: number

private timer: any

private checkPing: () => void

private setTimeout = isBrowser ? setT : setTimeout

private clearTimeout = isBrowser ? clearT : clearTimeout

constructor(keepalive: number, checkPing: () => void) {
this.keepalive = keepalive * 1000
this.checkPing = checkPing
this.setup()
}

private setup() {
this.timer = this.setTimeout(() => {
this.checkPing()
this.reschedule()
}, this.keepalive)
}

clear() {
if (this.timer) {
this.clearTimeout(this.timer)
this.timer = null
}
}

reschedule() {
this.clear()
this.setup()
}
}
23 changes: 9 additions & 14 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import DefaultMessageIdProvider, {
IMessageIdProvider,
} from './default-message-id-provider'
import { DuplexOptions, Writable } from 'readable-stream'
import reInterval from 'reinterval'
import clone from 'rfdc/default'
import * as validations from './validations'
import _debug from 'debug'
Expand All @@ -34,24 +33,20 @@ import {
IStream,
StreamBuilder,
VoidCallback,
nextTick,
} from './shared'
import TopicAliasSend from './topic-alias-send'
import { TypedEventEmitter } from './TypedEmitter'

const nextTick = process
? process.nextTick
: (callback: () => void) => {
setTimeout(callback, 0)
}
import PingTimer from './PingTimer'

const setImmediate =
globalThis.setImmediate ||
((...args: any[]) => {
(((...args: any[]) => {
const callback = args.shift()
nextTick(() => {
callback(...args)
})
})
}) as typeof globalThis.setImmediate)

const defaultConnectOptions = {
keepalive: 60,
Expand Down Expand Up @@ -359,7 +354,7 @@ export type OnConnectCallback = (packet: IConnackPacket) => void
export type OnDisconnectCallback = (packet: IDisconnectPacket) => void
export type ClientSubscribeCallback = (
err: Error | null,
granted: ISubscriptionGrant[],
granted?: ISubscriptionGrant[],
) => void
export type OnMessageCallback = (
topic: string,
Expand Down Expand Up @@ -430,7 +425,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public noop: (error?: any) => void

public pingTimer: any
public pingTimer: PingTimer

/**
* The connection to the Broker. In browsers env this also have `socket` property
Expand Down Expand Up @@ -2068,9 +2063,9 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true
this.pingTimer = reInterval(() => {
this.pingTimer = new PingTimer(this.options.keepalive, () => {
this._checkPing()
}, this.options.keepalive * 1000)
})
}
}

Expand All @@ -2085,7 +2080,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.options.keepalive &&
this.options.reschedulePings
) {
this.pingTimer.reschedule(this.options.keepalive * 1000)
this.pingTimer.reschedule()
}
}

Expand Down

0 comments on commit 35448f3

Please sign in to comment.