-
Notifications
You must be signed in to change notification settings - Fork 375
/
mqttServer.ts
122 lines (98 loc) · 2.85 KB
/
mqttServer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import { getLogger } from 'pinus-logger'; let logger = getLogger('pinus-admin', 'MqttServer');
import { EventEmitter } from 'events';
import * as mqtt_connection from 'mqtt-connection';
import * as Util from 'util';
import * as net from 'net';
export interface MqttSocket extends mqtt_connection
{
send(topic:string , msg:any):void;
}
export interface MqttServer
{
on(event: 'connection', listener: (socket:MqttSocket)=>void): this;
on(event: 'error', listener: (err:Error)=>void): this;
on(event: 'closed', listener: ()=>void): this;
on(event: string, listener: (msg:any)=>void): this;
}
let curId = 1;
export class MqttServer extends EventEmitter
{
inited = false;
closed = true;
server : net.Server;
socket : mqtt_connection;
constructor(private opts?: any, private cb?: Function)
{
super();
};
listen(port : number)
{
//check status
if (this.inited)
{
this.cb(new Error('already inited.'));
return;
}
this.inited = true;
let self = this;
this.server = new net.Server();
this.server.listen(port);
logger.info('[MqttServer] listen on %d', port);
this.server.on('listening', this.emit.bind(this, 'listening'));
this.server.on('error', function (err)
{
// logger.error('mqtt server is error: %j', err.stack);
self.emit('error', err);
});
this.server.on('connection', function (stream)
{
let socket = mqtt_connection(stream) as MqttSocket;
socket.id = curId++;
self.socket = socket;
socket.on('connect', (pkg : any)=>
{
socket.connack({
returnCode: 0
});
});
socket.on('publish', function (pkg : any)
{
let topic = pkg.topic;
let msg = pkg.payload.toString();
msg = JSON.parse(msg);
// logger.debug('[MqttServer] publish %s %j', topic, msg);
socket.emit(topic, msg);
});
socket.on('pingreq', function ()
{
socket.pingresp();
});
socket.send = function (topic : string, msg : any)
{
socket.publish({
topic: topic,
payload: JSON.stringify(msg)
});
};
self.emit('connection', socket);
});
};
send(topic : string, msg : any)
{
this.socket.publish({
topic: topic,
payload: msg
});
}
close()
{
if (this.closed)
{
return;
}
this.socket = undefined;
this.closed = true;
this.server.close();
this.emit('closed');
};
}