Skip to content

Commit

Permalink
Merge d1c05d3 into 072fc74
Browse files Browse the repository at this point in the history
  • Loading branch information
joyja committed Mar 23, 2020
2 parents 072fc74 + d1c05d3 commit 2ae3aee
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 21 deletions.
4 changes: 2 additions & 2 deletions src/__tests__/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ test('create mqtt with the proper headers and fields returns valid results', asy
throw error
})
expect(setInterval).toBeCalledTimes(1)
expect(clearInterval).toBeCalledTimes(0)
expect(clearInterval).toBeCalledTimes(1)
expect(mockSparkplug.on).toBeCalledTimes(7)
expect(mockSparkplug.publishNodeBirth).toBeCalledTimes(1)
expect(mockSparkplug.publishDeviceBirth).toBeCalledTimes(1)
Expand Down Expand Up @@ -515,7 +515,7 @@ test('update mqtt with the proper headers and fields returns valid results', asy
throw error
})
expect(setInterval).toBeCalledTimes(1)
expect(clearInterval).toBeCalledTimes(1)
expect(clearInterval).toBeCalledTimes(2)
expect(mockSparkplug.on).toBeCalledTimes(7)
expect(mockSparkplug.publishNodeBirth).toBeCalledTimes(1)
expect(mockSparkplug.publishDeviceBirth).toBeCalledTimes(1)
Expand Down
10 changes: 10 additions & 0 deletions src/database/model.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
const logger = require('../logger')

const executeQuery = function(db, sql, params = [], firstRowOnly = false) {
if (process.env.TENTACLE_DEBUG) {
console.log(new Date().toISOString())
console.log(sql)
console.log(params)
}
return new Promise((resolve, reject) => {
const callback = (error, rows) => {
if (error) {
Expand All @@ -18,6 +23,11 @@ const executeQuery = function(db, sql, params = [], firstRowOnly = false) {
}

const executeUpdate = function(db, sql, params = []) {
if (process.env.TENTACLE_DEBUG) {
console.log(new Date().toISOString())
console.log(sql)
console.log(params)
}
return new Promise((resolve, reject) => {
db.run(sql, params, function(error) {
if (error) {
Expand Down
22 changes: 10 additions & 12 deletions src/relations.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ Mqtt.prototype.publishHistory = async function() {
let historyToPublish = []
for (const host of hosts) {
const history = await host.getHistory(this.recordLimit)
console.log(history)
const newRecords = history.filter((record) => {
return !historyToPublish.some((row) => {
return row.id === record.id
Expand Down Expand Up @@ -346,18 +347,15 @@ Mqtt.prototype.publishHistory = async function() {
return record.id
})
await this.constructor.executeUpdate(sql, params)
sql = `SELECT a.id AS id
FROM mqttHistory AS a
LEFT JOIN mqttPrimaryHostHistory AS b ON a.id = b.mqttHistory
WHERE b.id IS NULL LIMIT 750`
const historyToDelete = await this.constructor.executeQuery(sql, [], false)
sql = `DELETE FROM mqttHistory WHERE id in (${'?,'
.repeat(historyToDelete.length)
.slice(0, -1)})`
params = historyToDelete.map((record) => {
return record.id
})
await this.constructor.executeUpdate(sql, params)
sql = `DELETE FROM mqttHistory
WHERE id IN (
SELECT a.id AS id
FROM mqttHistory AS a
LEFT JOIN mqttPrimaryHostHistory AS b
on a.id = b.mqttHistory
WHERE b.id IS NULL LIMIT 750
)`
await this.constructor.executeUpdate(sql)
}

Object.defineProperties(Mqtt.prototype, {
Expand Down
12 changes: 5 additions & 7 deletions src/service/__tests__/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,10 @@ afterAll(async () => {

beforeEach(() => {
jest.useFakeTimers()
mockSparkplug.on.mockClear()
mockSparkplug.publishNodeBirth.mockClear()
mockSparkplug.publishDeviceBirth.mockClear()
mockSparkplug.publishDeviceData.mockClear()
mockSparkplug.publishDeviceDeath.mockClear()
mockSparkplug.stop.mockClear()
})

afterEach(() => {
jest.clearAllMocks()
})

test(`Initializing Service, also initializes Mqtt, MqttSource and MqttHistory.`, async () => {
Expand Down Expand Up @@ -243,7 +241,7 @@ describe(`MQTT: `, () => {
})
test(`onReconnect stops and then starts publishing.`, () => {
mqtt.onReconnect()
expect(clearInterval).toBeCalledTimes(1)
expect(clearInterval).toBeCalledTimes(2)
expect(setInterval).toBeCalledTimes(1)
expect(setInterval).toBeCalledWith(expect.any(Function), mqtt.rate)
})
Expand Down
1 change: 1 addition & 0 deletions src/service/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class Mqtt extends Model {
this.stopPublishing()
}
startPublishing() {
this.interval = clearInterval(this.interval)
this.interval = setInterval(() => {
this.publish()
this.publishHistory()
Expand Down

0 comments on commit 2ae3aee

Please sign in to comment.