-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
152 lines (130 loc) · 4.92 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
const MongoClient = require('mongodb').MongoClient;
const bcrypt = require('bcryptjs');
let mysql = require('mysql2');
require('dotenv').config();
var log = require('npmlog');
// logger heading
Object.defineProperty(log, 'heading', { get: () => { return new Date().toISOString().replace(/T/, ' ').replace(/\..+/, '') } })
log.headingStyle = { bg: '', fg: 'yellow' }
log.level = process.env.DEBUG_LEVEL;
//MySQL
mysql = mysql.createPool({
port : process.env.MYSQL_PORT,
host : process.env.MYSQL_HOST,
user : process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DB
});
//Aedes Server
let aedes = require("aedes")({
id: process.env.SERVER_ID, // broker id
maxClientsIdLength: 35, // override MQTT 3.1.0 clients Id length limit
});
let server = require('net').createServer(aedes.handle);
// MQTT server
server.listen(process.env.MQTT_PORT, function () {
log.info('', 'Server listening on port:', process.env.MQTT_PORT)
});
class Team {
constructor(id, username) {
this.id = id;
this.username = username;
}
}
var team;
// authentication
aedes.authenticate = function(client, username, password, callback) {
const mqerr = new Error('Auth error');
mqerr.returnCode = 4;
log.info('', 'Client', client.id, 'connected. Waiting login...');
mysql.query('SELECT id, password FROM teams WHERE username = ?',[username], function (error, results, fields) {
if(error){
log.error('', "MySQL error:", error);
callback(mqerr,null);
} else {
if(results.length > 0){
// compare password
bcrypt.compare(password.toString(), results[0].password.toString(), function(err, res) {
if (err) throw err;
if(res) {
log.info('', 'Client', client.id, 'logged in with username', username, 'and teamID', results[0].id);
team = new Team(results[0].id, username);
callback(null,true);
} else {
// Wrong Password
log.info('', "User", username, "wrong password");
callback(mqerr,null);
}
});
} else{
// User doesn't exist
log.info('', "User doesn't exist");
callback(mqerr,null);
}
}
});
}
// publish
aedes.on('publish', function (packet, client) {
if (client) {
log.info('', 'TeamID', team.id, 'published:', packet.payload.toString(), 'on topic' ,packet.topic);
MongoClient.connect(process.env.MONGO_URL, { useUnifiedTopology: true }, function(err, client) {
if (err) throw err;
const db = client.db(process.env.MONGO_DB);
// Add timestamp to the packet
packet.timestamp = Date.now() / 1000 | 0;
// Add time to the packet
packet.date = new Date(packet.timestamp * 1000).toLocaleDateString("pt-PT");
// Add team id to the packet
packet.teamid = team.id;
// parse the payload as json
const data = JSON.parse(packet.payload.toString());
// add the payload keys to the packet root
Object.keys(data).forEach(function(key) {
if (key == 'ref') {
packet[key] = data[key];
} else {
packet[key] = parseFloat(data[key]);
}
})
db.collection(process.env.MONGO_PUB_COL).insertOne(packet, function(err, res) {
if (err) throw err;
client.close();
});
});
}
});
// new connection
aedes.on('client', function (client, callback) {
log.info('', 'New client', client.id, 'with teamID', team.id, 'connected');
const err = new Error('Mysql error');
mysql.query('UPDATE teams SET connected = 1 WHERE id = ?',[team.id], function (error, results, fields) {
if(error){
log.error('', "MySQL error:", error);
callback(err,null);
}
});
});
// disconnect
aedes.on("clientDisconnect",function(client){
log.info('', 'Client', client.id, 'with teamID', team.id, 'disconnected');
const err = new Error('Mysql error');
mysql.query('UPDATE teams SET connected = 0 WHERE id = ?',[team.id], function (error, results, fields) {
if(error){
log.error('', "MySQL error:", error);
callback(err,null);
}
});
});
// subscribe
aedes.on('subscribe', function (subscriptions, client) {
if (client) {
log.info('', 'Client', client.id, 'with teamID', team.id, 'subscribed:', subscriptions);
}
});
aedes.on('clientError', function (client, err) {
log.error('', 'Client', client.id, 'error:', err);
});
aedes.on('connectionError', function (client, err) {
log.error('', 'Client', client.id,'connection error:', err);
});