-
-
Notifications
You must be signed in to change notification settings - Fork 283
/
index.ts
80 lines (69 loc) · 2.52 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import { Base64Message } from './Model/Base64Message'
import { DataSource, MqttSource } from './DataSource'
import {
AddMqttConnection,
MqttMessage,
addMqttConnectionEvent,
backendEvents,
makeConnectionMessageEvent,
makeConnectionStateEvent,
makePublishEvent,
removeConnection,
} from '../../events'
import { SparkplugDecoder } from './Model/sparkplugb'
export class ConnectionManager {
private connections: { [s: string]: DataSource<any> } = {}
private handleConnectionRequest = (event: AddMqttConnection) => {
const connectionId = event.id
// Prevent double connections when reloading
if (this.connections[connectionId]) {
this.removeConnection(connectionId)
}
const options = event.options
const connection = new MqttSource()
this.connections[connectionId] = connection
const connectionStateEvent = makeConnectionStateEvent(connectionId)
connection.stateMachine.onUpdate.subscribe(state => {
backendEvents.emit(connectionStateEvent, state)
})
connection.connect(options)
this.handleNewMessagesForConnection(connectionId, connection)
backendEvents.subscribe(makePublishEvent(connectionId), (msg: MqttMessage) => {
this.connections[connectionId].publish(msg)
})
}
private handleNewMessagesForConnection(connectionId: string, connection: MqttSource) {
const messageEvent = makeConnectionMessageEvent(connectionId)
connection.onMessage((topic: string, payload: Buffer, packet: any) => {
let buffer = payload
if (buffer.length > 20000) {
buffer = buffer.slice(0, 20000)
}
backendEvents.emit(messageEvent, {
topic,
payload: SparkplugDecoder.decode(buffer) ?? Base64Message.fromBuffer(buffer),
qos: packet.qos,
retain: packet.retain,
messageId: packet.messageId,
})
})
}
public manageConnections() {
backendEvents.subscribe(addMqttConnectionEvent, this.handleConnectionRequest)
backendEvents.subscribe(removeConnection, (connectionId: string) => {
this.removeConnection(connectionId)
})
}
public removeConnection(connectionId: string) {
const connection = this.connections[connectionId]
if (connection) {
backendEvents.unsubscribeAll(makePublishEvent(connectionId))
connection.disconnect()
delete this.connections[connectionId]
connection.stateMachine.onUpdate.removeAllListeners()
}
}
public closeAllConnections() {
Object.keys(this.connections).forEach(connectionId => this.removeConnection(connectionId))
}
}