diff --git a/docs/.vuepress/config.js b/docs/.vuepress/config.js index fdb19cb..db8e646 100644 --- a/docs/.vuepress/config.js +++ b/docs/.vuepress/config.js @@ -81,6 +81,7 @@ module.exports = { children: [ '/integrations/', '/integrations/home-assistant', + '/integrations/mqtt', '/integrations/bluetooth-low-energy', '/integrations/bluetooth-classic', '/integrations/xiaomi-mi', diff --git a/docs/integrations/README.md b/docs/integrations/README.md index a1b3cf7..59e281e 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -13,6 +13,7 @@ Looking for a way to integrate OpenHAB? You can use the [Home Assistant Core](ho | Integration | Supported entities | | ------------------------------------------ | ------------------ | | [Home Assistant Core](./home-assistant.md) | All | +| [MQTT](./mqtt.md) | All | ## Presence Detection diff --git a/docs/integrations/mqtt.md b/docs/integrations/mqtt.md new file mode 100644 index 0000000..85b8922 --- /dev/null +++ b/docs/integrations/mqtt.md @@ -0,0 +1,132 @@ +# MQTT + +**Integration Key:** `mqtt` + +::: tip + +If you are looking to integrate with [Home Assistant Core](https://www.home-assistant.io) via MQTT take a look at the [Home Assistant integration](./home-assistant.md) instead. + +::: + +The MQTT integration will send messages with room-assistant entity update information to an [MQTT broker](https://mqtt.org). + +## Message Format + +Entity updates are sent into unique topics for each entity, grouped by instance name and entity type. The topic format is `baseTopic/instanceName/entityType/entityId`. + +Each message will have the following properties: + +- **entity** - This includes the whole current entity state, like you would see it in the [API](../guide/api.md). +- **hasAuthority** - This boolean value shows whether this update message comes from an entity that has authority over the entity. It will be `false` for distributed entity updates that are emitted from non-leader instances. You may use this to respect the room-assistant leader in your own automations, but you can of course also just pick a single instance to work off. + +Optionally the message will also include the following properties: + +- **diff** - This is an array of changes to the previous entity state. Each array item includes a `path` to the changed property based off the entity JSON root, the `oldValue` and the `newValue`. Could be used to monitor for specific changes only. This property will not be included when instances emit an entity refresh (e.g. after re-connecting to the MQTT broker). + +::: details Example Message + +This message could have been posted to `room-assistant/entity/living-room/bluetooth-low-energy-presence-sensor/ble-some-id`: + +```json +{ + "entity": { + "attributes": { + "distance": 3.6, + "lastUpdatedAt": "2021-02-28T14:17:33.141Z" + }, + "id": "ble-some-id", + "name": "Something Room Presence", + "distributed": true, + "stateLocked": true, + "distances": { + "bedroom": { + "lastUpdatedAt": "2021-02-28T14:17:32.605Z", + "distance": 9.4, + "outOfRange": false + }, + "living-room": { + "lastUpdatedAt": "2021-02-28T14:17:33.141Z", + "distance": 3.6, + "outOfRange": false + } + }, + "timeout": 180, + "measuredValues": { + "bedroom": { + "rssi": -79.81192947697268, + "measuredPower": -59 + }, + "living-room": { + "rssi": -70.40174705168248, + "measuredPower": -59 + } + }, + "state": "living-room" + }, + "diff": [ + { + "path": "/measuredValues/living-room", + "oldValue": { + "rssi": -73.37709762753302, + "measuredPower": -59 + }, + "newValue": { + "rssi": -70.40174705168248, + "measuredPower": -59 + } + }, + { + "path": "/distances/living-room", + "oldValue": { + "lastUpdatedAt": "2021-02-28T14:17:32.308Z", + "distance": 3.8, + "outOfRange": false + }, + "newValue": { + "lastUpdatedAt": "2021-02-28T14:17:33.141Z", + "distance": 3.6, + "outOfRange": false + } + } + ], + "hasAuthority": true +} +``` + +::: + +To get started with your automations based on these topics it is recommended to just explore the data provided in the topics using e.g. a GUI MQTT tool. + +## Settings + +| Name | Type | Default | Description | +| ------------- | ----------------------------- | ----------------------- | ------------------------------------------------------------ | +| `mqttUrl` | String | `mqtt://localhost:1883` | Connection string for your MQTT broker. | +| `mqttOptions` | [MQTT Options](#mqtt-options) | | Additional options for the MQTT connection. | +| `baseTopic` | String | `room-assistant/entity` | Base for the entity update topics. | +| `qos` | Number | `0` | Quality of Service level that the messages will be sent with. | +| `retain` | Boolean | `false` | Whether to mark the messages as to retain or not. | + +### MQTT Options + +| Name | Type | Default | Description | +| -------------------- | ------- | ------- | ------------------------------------------------------------ | +| `username` | String | | Username for authentication | +| `password` | String | | Password for authentication | +| `rejectUnauthorized` | Boolean | `true` | Whether MQTTS connections should fail for invalid certificates or not. Set this to `false` if you are using a self-signed certificate and connect via TLS. | + +::: details Example Config + +```yaml +global: + integrations: + - mqtt +mqtt: + mqttUrl: mqtt://localhost:1883 + mqttOptions: + username: youruser + password: yourpass + retain: false +``` + +::: \ No newline at end of file diff --git a/src/config/definitions/default.ts b/src/config/definitions/default.ts index c08fc13..e40ad60 100644 --- a/src/config/definitions/default.ts +++ b/src/config/definitions/default.ts @@ -10,6 +10,7 @@ import { ShellConfig } from '../../integrations/shell/shell.config'; import { XiaomiMiConfig } from '../../integrations/xiaomi-mi/xiaomi-mi.config'; import { EntitiesConfig } from '../../entities/entities.config'; import { LoggerConfig } from '../logger.config'; +import { MqttConfig } from "../../integrations/mqtt/mqtt.config"; export class AppConfig { global: GlobalConfig = new GlobalConfig(); @@ -24,6 +25,7 @@ export class AppConfig { shell: ShellConfig = new ShellConfig(); xiaomiMi: XiaomiMiConfig = new XiaomiMiConfig(); homeAssistant: HomeAssistantConfig = new HomeAssistantConfig(); + mqtt: MqttConfig = new MqttConfig(); } module.exports = new AppConfig(); diff --git a/src/integrations/mqtt/mqtt.config.ts b/src/integrations/mqtt/mqtt.config.ts new file mode 100644 index 0000000..13c140d --- /dev/null +++ b/src/integrations/mqtt/mqtt.config.ts @@ -0,0 +1,9 @@ +import { IClientOptions } from "async-mqtt"; + +export class MqttConfig { + mqttUrl = 'mqtt://localhost:1883'; + mqttOptions: IClientOptions = {}; + baseTopic = 'room-assistant/entity'; + qos: 0 | 1 | 2 = 0; + retain = false; +} diff --git a/src/integrations/mqtt/mqtt.health.spec.ts b/src/integrations/mqtt/mqtt.health.spec.ts new file mode 100644 index 0000000..a2ea6cb --- /dev/null +++ b/src/integrations/mqtt/mqtt.health.spec.ts @@ -0,0 +1,30 @@ +import { mocked } from "ts-jest/utils"; +import { MqttService } from "./mqtt.service"; +import { MqttHealthIndicator } from "./mqtt.health"; +import { HealthCheckError } from "@nestjs/terminus"; + +jest.mock('./mqtt.service') + +describe('MqttHealthIndicator', () => { + const serviceMock = mocked(new MqttService(undefined, undefined, undefined)); + const healthIndicator = new MqttHealthIndicator(serviceMock); + + it('should report healthy if connection is established', () => { + serviceMock.isConnected.mockReturnValue(true); + + const result = healthIndicator.connectionCheck(); + expect(result['mqtt_connected'].status).toEqual('up'); + }); + + it('should report unhealthy if connection not established yet', () => { + serviceMock.isConnected.mockReturnValue(undefined); + + expect(() => healthIndicator.connectionCheck()).toThrow(HealthCheckError); + }); + + it('should report unhealthy if connection lost', () => { + serviceMock.isConnected.mockReturnValue(false); + + expect(() => healthIndicator.connectionCheck()).toThrow(HealthCheckError); + }); +}) diff --git a/src/integrations/mqtt/mqtt.health.ts b/src/integrations/mqtt/mqtt.health.ts new file mode 100644 index 0000000..8590978 --- /dev/null +++ b/src/integrations/mqtt/mqtt.health.ts @@ -0,0 +1,25 @@ +import { HealthCheckError, HealthIndicator, HealthIndicatorResult } from "@nestjs/terminus"; +import { Injectable, Optional } from "@nestjs/common"; +import { MqttService } from "./mqtt.service"; +import { HealthIndicatorService } from "../../status/health-indicator.service"; + +@Injectable() +export class MqttHealthIndicator extends HealthIndicator { + constructor(private readonly mqttService: MqttService, @Optional() healthIndicatorService?: HealthIndicatorService) { + super(); + healthIndicatorService?.registerHealthIndicator(async () => + this.connectionCheck() + ); + } + + connectionCheck(): HealthIndicatorResult { + const isHealthy = this.mqttService.isConnected(); + const result = this.getStatus('mqtt_connected', isHealthy); + + if (isHealthy) { + return result; + } + + throw new HealthCheckError('No connection to MQTT broker', result); + } +} diff --git a/src/integrations/mqtt/mqtt.module.ts b/src/integrations/mqtt/mqtt.module.ts new file mode 100644 index 0000000..b0bce4d --- /dev/null +++ b/src/integrations/mqtt/mqtt.module.ts @@ -0,0 +1,16 @@ +import { DynamicModule, Module } from "@nestjs/common"; +import { ConfigModule } from "../../config/config.module"; +import { EntitiesModule } from "../../entities/entities.module"; +import { StatusModule } from "../../status/status.module"; +import { MqttService } from "./mqtt.service"; + +@Module({}) +export default class MqttModule { + static forRoot(): DynamicModule { + return { + module: MqttModule, + imports: [ConfigModule, EntitiesModule, StatusModule], + providers: [MqttService] + } + } +} diff --git a/src/integrations/mqtt/mqtt.service.spec.ts b/src/integrations/mqtt/mqtt.service.spec.ts new file mode 100644 index 0000000..15bad31 --- /dev/null +++ b/src/integrations/mqtt/mqtt.service.spec.ts @@ -0,0 +1,197 @@ +import { Sensor } from "../../entities/sensor"; + +const mockMqttClient = { + on: jest.fn(), + publish: jest.fn(), + subscribe: jest.fn(), + end: jest.fn(), +}; + +import { ClusterService } from "../../cluster/cluster.service"; +import { MqttService } from "./mqtt.service"; +import { Test, TestingModule } from "@nestjs/testing"; +import { NestEmitterModule } from "nest-emitter"; +import { ConfigModule } from "../../config/config.module"; +import { EntitiesModule } from "../../entities/entities.module"; +import { EventEmitter } from "events"; +import { EntitiesService } from "../../entities/entities.service"; +import { ConfigService } from "../../config/config.service"; +import c from "config"; +import { MqttConfig } from "./mqtt.config"; +import { mocked } from "ts-jest/utils"; +import * as mqtt from "async-mqtt"; +import { BinarySensor } from "../../entities/binary-sensor"; +import { DeviceTracker } from "../../entities/device-tracker"; +import { Switch } from "../../entities/switch"; +import { Camera } from "../../entities/camera"; +import { Entity } from "../../entities/entity.dto"; + +jest.mock('mdns', () => ({}), { virtual: true }); +jest.mock('async-mqtt', () => { + return { + connectAsync: jest.fn().mockReturnValue(mockMqttClient), + }; +}); +const mockMqtt = mocked(mqtt, true); + +describe('MqttService', () => { + let service: MqttService; + let mockConfig: MqttConfig; + const entitiesEmitter = new EventEmitter(); + const mqttEmitter = new EventEmitter(); + const entitiesService = { + refreshStates: jest.fn(), + }; + const configService = { + get: jest.fn().mockImplementation((key: string) => { + return key === 'mqtt' ? mockConfig : c.get(key); + }), + }; + + beforeEach(async () => { + jest.clearAllMocks(); + entitiesEmitter.removeAllListeners(); + mqttEmitter.removeAllListeners(); + mockMqttClient.on.mockImplementation((event, callback) => { + mqttEmitter.on(event, callback); + }); + mockConfig = new MqttConfig(); + + const module: TestingModule = await Test.createTestingModule({ + imports: [ + NestEmitterModule.forRoot(entitiesEmitter), + ConfigModule, + EntitiesModule, + ], + providers: [MqttService] + }) + .overrideProvider(EntitiesService) + .useValue(entitiesService) + .overrideProvider(ConfigService) + .useValue(configService) + .overrideProvider(ClusterService) + .useValue({}) + .compile(); + + service = module.get(MqttService); + }) + + it('should subscribe to entity events on init', async () => { + const emitterOnSpy = jest.spyOn(entitiesEmitter, 'on'); + + await service.onModuleInit(); + expect(emitterOnSpy).toHaveBeenCalledWith( + 'entityUpdate', + expect.any(Function) + ); + expect(emitterOnSpy).toHaveBeenCalledWith( + 'entityRefresh', + expect.any(Function) + ); + }); + + it('should connect to MQTT on init', async () => { + await service.onModuleInit(); + expect(mockMqtt.connectAsync).toHaveBeenCalledWith( + expect.any(String), + expect.any(Object), + false + ); + }); + + it('should request an entity state refresh on broker reconnect', async () => { + await service.onModuleInit(); + + mqttEmitter.emit('connect'); + + expect(entitiesService.refreshStates).toHaveBeenCalledTimes(1); + }); + + it.each([ + ['sensor', new Sensor('test', 'Test')], + ['binary-sensor', new BinarySensor('test', 'Test')], + ['device-tracker', new DeviceTracker('test', 'Test')], + ['switch', new Switch('test', 'Test')], + ['camera', new Camera('test', 'Test')] + ])("should post %s entity update messages", async (entityTopic: string, entity: Entity) => { + const diff = [{ + newValue: 'new-state', + oldValue: 'old-state', + path: '/state' + }]; + const hasAuthority = true; + + await service.onModuleInit(); + + entitiesEmitter.emit('entityUpdate', entity, diff, hasAuthority); + + expect(mockMqttClient.publish).toHaveBeenCalledWith( + `room-assistant/entity/test-instance/${entityTopic}/${entity.id}`, + JSON.stringify({ entity, diff, hasAuthority }), + expect.any(Object) + ) + }); + + it.each([ + ['sensor', new Sensor('test', 'Test')], + ['binary-sensor', new BinarySensor('test', 'Test')], + ['device-tracker', new DeviceTracker('test', 'Test')], + ['switch', new Switch('test', 'Test')], + ['camera', new Camera('test', 'Test')] + ])("should post %s entity refresh messages", async (entityTopic: string, entity: Entity) => { + const hasAuthority = true; + + await service.onModuleInit(); + + entitiesEmitter.emit('entityRefresh', entity, hasAuthority); + + expect(mockMqttClient.publish).toHaveBeenCalledWith( + `room-assistant/entity/test-instance/${entityTopic}/${entity.id}`, + JSON.stringify({ entity, hasAuthority }), + expect.any(Object) + ) + }); + + it("should use the configured base topic", async () => { + mockConfig.baseTopic = 'my-new-topic'; + + await service.onModuleInit(); + entitiesEmitter.emit('entityUpdate', new Sensor('test', 'Test'), [], true); + + expect(mockMqttClient.publish).toHaveBeenCalledWith( + 'my-new-topic/test-instance/sensor/test', + expect.any(String), + expect.any(Object) + ) + }); + + it("should pass the configured retain setting", async () => { + mockConfig.retain = true; + + await service.onModuleInit(); + entitiesEmitter.emit('entityUpdate', new Sensor('test', 'Test'), [], true); + + expect(mockMqttClient.publish).toHaveBeenCalledWith( + expect.any(String), + expect.any(String), + expect.objectContaining({ + retain: true + }) + ) + }); + + it("should pass the configured qos setting", async () => { + mockConfig.qos = 2; + + await service.onModuleInit(); + entitiesEmitter.emit('entityUpdate', new Sensor('test', 'Test'), [], true); + + expect(mockMqttClient.publish).toHaveBeenCalledWith( + expect.any(String), + expect.any(String), + expect.objectContaining({ + qos: 2 + }) + ) + }); +}) diff --git a/src/integrations/mqtt/mqtt.service.ts b/src/integrations/mqtt/mqtt.service.ts new file mode 100644 index 0000000..178d98a --- /dev/null +++ b/src/integrations/mqtt/mqtt.service.ts @@ -0,0 +1,106 @@ +import { Injectable, Logger, OnModuleInit } from "@nestjs/common"; +import { ConfigService } from "../../config/config.service"; +import { EntitiesService } from "../../entities/entities.service"; +import { InjectEventEmitter } from "nest-emitter"; +import { EntitiesEventEmitter, PropertyDiff } from "../../entities/entities.events"; +import { MqttConfig } from "./mqtt.config"; +import mqtt, { AsyncMqttClient } from "async-mqtt"; +import { Entity } from "../../entities/entity.dto"; +import { makeId } from "../../util/id"; +import _ from "lodash"; + +@Injectable() +export class MqttService implements OnModuleInit { + private config: MqttConfig; + private mqttClient: AsyncMqttClient; + private readonly logger: Logger = new Logger(MqttService.name); + + constructor(private readonly configService: ConfigService, + private readonly entitiesService: EntitiesService, + @InjectEventEmitter() private readonly emitter: EntitiesEventEmitter) { + this.config = configService.get('mqtt'); + } + + /** + * Lifecycle hook, called once the host module has been initialized. + */ + async onModuleInit(): Promise { + try { + this.mqttClient = await mqtt.connectAsync( + this.config.mqttUrl, + { ...this.config.mqttOptions }, + false + ); + + this.mqttClient.on('error', e => this.logger.error(e.message, e.stack)) + this.mqttClient.on('connect', this.handleReconnect.bind(this)); + this.logger.log( + `Successfully connected to MQTT broker at ${this.config.mqttUrl}` + ); + + this.emitter.on('entityUpdate', this.handleEntityUpdate.bind(this)); + this.emitter.on('entityRefresh', this.handleEntityRefresh.bind(this)); + } catch (e) { + this.logger.error(e.message, e.stack); + } + } + + /** + * Checks if room-assistant is connected to the MQTT broker. + */ + isConnected(): boolean { + return this.mqttClient?.connected; + } + + /** + * Passes entity updates to the MQTT broker. + * + * @param entity - Entity in its updated state + * @param diff - Difference between old and new state + * @param hasAuthority - Whether this instance has control of the entity or not + */ + private handleEntityUpdate(entity: Entity, diff: Array, hasAuthority: boolean): void { + const message: EntityUpdateMessage = { + entity, + diff, + hasAuthority + }; + this.mqttClient.publish(this.getTopicName(entity), JSON.stringify(message), { + qos: this.config.qos, + retain: this.config.retain + }) + } + + /** + * Passes entity information to MQTT during a refresh. + * + * @param entity - Entity in its current state + * @param hasAuthority - Whether this instance has control of the entity or not + */ + private handleEntityRefresh(entity: Entity, hasAuthority: boolean): void { + this.handleEntityUpdate(entity, undefined, hasAuthority) + } + + /** + * Handles broker re-connection events. + */ + private handleReconnect(): void { + this.logger.log('Re-connected to broker'); + this.entitiesService.refreshStates(); + } + + /** + * Generates a unique topic name from an entity. + * + * @param entity - Entity that the topic should contain + */ + private getTopicName(entity: Entity) { + return `${this.config.baseTopic}/${makeId(this.configService.get('global').instanceName)}/${_.kebabCase(entity.constructor.name)}/${makeId(entity.id)}`; + } +} + +interface EntityUpdateMessage { + entity: Entity, + diff?: Array, + hasAuthority: boolean +}