Skip to content

Commit

Permalink
Merge f7d0913 into 68c8c01
Browse files Browse the repository at this point in the history
  • Loading branch information
joyja committed Mar 25, 2020
2 parents 68c8c01 + f7d0913 commit 3f64772
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 17 deletions.
4 changes: 4 additions & 0 deletions src/__tests__/tag.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ afterAll(async () => {
})
})

afterEach(() => {
jest.clearAllMocks()
})

test(`Tag: initialize initializes ScanClass to.`, async () => {
await Tag.initialize(db, pubsub)
expect(ScanClass.initialized).toBe(true)
Expand Down
20 changes: 12 additions & 8 deletions src/relations.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ Mqtt.prototype.publishHistory = async function() {
let historyToPublish = []
for (const host of hosts) {
const history = await host.getHistory(this.recordLimit)
console.log(history)
const newRecords = history.filter((record) => {
return !historyToPublish.some((row) => {
return row.id === record.id
Expand All @@ -329,7 +328,7 @@ Mqtt.prototype.publishHistory = async function() {
const tag = Tag.findById(record.tag)
return {
name: tag.name,
value: record.value,
value: tag.datatype === 'BOOLEAN' ? !!+record.value : record.value,
timestamp: record.timestamp,
type: tag.datatype,
isHistorical: true
Expand Down Expand Up @@ -387,15 +386,14 @@ MqttSource.prototype.log = async function(scanClassId) {
return false
}
})
for (tag of tags) {
if (tags.length > 0) {
await new Promise((resolve) => {
this.db.serialize(async () => {
const primaryHosts = this.mqtt.primaryHosts
let sql = `INSERT INTO mqttHistory (mqttSource, tag, timestamp, value)`
sql = `${sql} VALUES (?,?,?,?);`
let params = [this.id, tag.id, getTime(new Date()), tag.value]
let sql = `INSERT INTO mqttHistory (mqttSource, timestamp)`
sql = `${sql} VALUES (?,?);`
let params = [this.id, getTime(new Date())]
const result = await this.constructor.executeUpdate(sql, params)
for (host of primaryHosts) {
for (host of this.mqtt.primaryHosts) {
sql = `INSERT INTO mqttPrimaryHostHistory (mqttPrimaryHost, mqttHistory)`
sql = `${sql} VALUES (?,?);`
params = [host.id, result.lastID]
Expand All @@ -404,6 +402,12 @@ MqttSource.prototype.log = async function(scanClassId) {
serviceUpdate: this.mqtt.service
})
}
for (const tag of tags) {
let sql = `INSERT INTO mqttHistoryTag (mqttHistory, tag, value)`
sql = `${sql} VALUES (?,?,?);`
let params = [result.lastID, tag.id, tag.value]
await this.constructor.executeUpdate(sql, params)
}
resolve()
})
})
Expand Down
12 changes: 12 additions & 0 deletions src/resolvers/__tests__/resolvers.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ afterAll(async () => {
})
})

afterEach(async () => {
jest.clearAllMocks()
})

// ==============================
// Queries
// ==============================
Expand Down Expand Up @@ -1347,4 +1351,12 @@ describe('Subscription: ', () => {
resolvers.Subscription.tagUpdate.subscribe({}, {}, context)
expect(pubsub.asyncIterator).toBeCalledTimes(1)
})
test('deviceUpdate subscribe returns an asyncIterator', () => {
resolvers.Subscription.deviceUpdate.subscribe({}, {}, context)
expect(pubsub.asyncIterator).toBeCalledTimes(1)
})
test('serviceUpdate subscribe returns an asyncIterator', () => {
resolvers.Subscription.serviceUpdate.subscribe({}, {}, context)
expect(pubsub.asyncIterator).toBeCalledTimes(1)
})
})
2 changes: 1 addition & 1 deletion src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const { executeQuery } = require('./database/model')
const fs = require('fs')
const logger = require('./logger')

const desiredUserVersion = 4
const desiredUserVersion = 5

let db = undefined
let httpServer = undefined
Expand Down
38 changes: 30 additions & 8 deletions src/service/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,44 @@ const createTable = function(db, tableName, fields) {
}

class Mqtt extends Model {
static async initialize(db, pubsub) {
await MqttSource.initialize(db, pubsub)
await MqttPrimaryHost.initialize(db, pubsub)
const result = await super.initialize(db, pubsub)
static async upgradeToV3() {
if (this.tableExisted && this.version < 3) {
const newColumns = [{ colName: 'recordLimit', colType: 'TEXT' }]
for (const column of newColumns) {
let sql = `ALTER TABLE "${this.table}" ADD "${column.colName}" ${column.colType}`
await this.executeUpdate(sql)
}
}
}
static async upgradeToV5() {
let history = undefined
let sql = undefined
if (this.version < 5) {
sql = `DROP TABLE mqttHistory`
await this.executeUpdate(sql)
sql = `DROP TABLE mqttPrimaryHostHistory`
await this.executeUpdate(sql)
}
}
static async initialize(db, pubsub) {
await MqttSource.initialize(db, pubsub)
await MqttPrimaryHost.initialize(db, pubsub)
const result = await super.initialize(db, pubsub)
await this.upgradeToV3()
await this.upgradeToV5()
let history = undefined
let sql = undefined
const mqttHistoryFields = [
{ colName: 'mqttSource', colRef: 'mqttSource', onDelete: 'CASCADE' },
{ colName: 'timestamp', colType: 'INTEGER' }
]
await createTable(this.db, 'mqttHistory', mqttHistoryFields)
const mqttHistoryTagFields = [
{ colName: 'mqttHistory', colRef: 'mqttHistory', onDelete: 'CASCADE' },
{ colName: 'tag', colRef: 'tag', onDelete: 'CASCADE' },
{ colName: 'timestamp', colType: 'INTEGER' },
{ colName: 'value', colType: 'TEXT' }
]
await createTable(this.db, 'mqttHistory', mqttHistoryFields)
await createTable(this.db, 'mqttHistoryTag', mqttHistoryTagFields)
const mqttPrimaryHostHistoryFields = [
{
colName: 'mqttPrimaryHost',
Expand Down Expand Up @@ -441,12 +461,14 @@ class MqttPrimaryHost extends Model {
a.mqttPrimaryHost as hostId,
b.id as historyId,
b.mqttSource as source,
b.tag as tag,
c.tag as tag,
b.timestamp as timestamp,
b.value as value
c.value as value
FROM mqttPrimaryHostHistory AS a
JOIN mqttHistory AS b
ON a.mqttHistory=b.id
JOIN mqttHistoryTag AS c
ON b.id=c.mqttHistory
WHERE a.mqttPrimaryHost=?`
if (limit) {
sql = `${sql} LIMIT ${limit}`
Expand Down
1 change: 1 addition & 0 deletions src/tag.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class ScanClass extends Model {
this.scanCount = 0
}
startScan() {
this.interval = clearInterval(this.interval)
this.interval = setInterval(async () => {
await this.scan()
this.scanCount += 1
Expand Down

0 comments on commit 3f64772

Please sign in to comment.