Skip to content

Commit

Permalink
fix(entities): re-publish all entity states when elected
Browse files Browse the repository at this point in the history
Previously the state that was published out to e.g. Home Assistant could
get desynced with the room-assistant state on a switch of leaders.
Re-publishing all data ensures that the state stays synced after a
re-election happened.
  • Loading branch information
mKeRix committed Feb 14, 2020
1 parent 3e7cf6e commit be380cd
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 4 deletions.
60 changes: 59 additions & 1 deletion src/entities/entities.service.spec.ts
Expand Up @@ -13,7 +13,8 @@ describe('EntitiesService', () => {
let service: EntitiesService;
const emitter: EventEmitter = new EventEmitter();
const clusterService = {
isMajorityLeader: jest.fn()
isMajorityLeader: jest.fn(),
on: jest.fn()
};

beforeEach(async () => {
Expand All @@ -29,6 +30,23 @@ describe('EntitiesService', () => {
jest.resetAllMocks();
});

it('should register a callback to leader election on bootstrap', () => {
service.onApplicationBootstrap();
expect(clusterService.on).toHaveBeenCalled();
});

it('should refresh entity states if elected', () => {
const refreshSpy = jest
.spyOn(service, 'refreshStates')
.mockImplementation(() => undefined);

service.onApplicationBootstrap();
const electedCallback = clusterService.on.mock.calls[0][1].bind(service);
electedCallback();

expect(refreshSpy).toHaveBeenCalled();
});

it('should return information about whether entity ids are registered or not', () => {
const entity = new Sensor('test_sensor', 'Test Sensor');
service.add(entity);
Expand Down Expand Up @@ -156,4 +174,44 @@ describe('EntitiesService', () => {
expect.anything()
);
});

it('should send out events for all non-distributed entities when refreshing as non-leader', () => {
clusterService.isMajorityLeader.mockReturnValue(false);
const spy = jest.spyOn(emitter, 'emit');

const sensor1 = new Sensor('sensor1', 'Sensor 1');
sensor1.state = 1;
sensor1.attributes = {
test: 'abc'
};
service.add(sensor1);
const sensor2 = new Sensor('sensor2', 'Sensor 2', true);
sensor2.state = 2;
service.add(sensor2);
spy.mockClear();

service.refreshStates();

expect(spy).toHaveBeenCalledTimes(2);
});

it('should send out events for all entities when refreshing as majority leader', () => {
clusterService.isMajorityLeader.mockReturnValue(true);
const spy = jest.spyOn(emitter, 'emit');

const sensor1 = new Sensor('sensor1', 'Sensor 1');
sensor1.state = 1;
sensor1.attributes = {
test: 'abc'
};
service.add(sensor1);
const sensor2 = new Sensor('sensor2', 'Sensor 2', true);
sensor2.state = 2;
service.add(sensor2);
spy.mockClear();

service.refreshStates();

expect(spy).toHaveBeenCalledTimes(4);
});
});
35 changes: 32 additions & 3 deletions src/entities/entities.service.ts
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
import { Entity } from './entity';
import { EntityProxyHandler } from './entity.proxy';
import { InjectEventEmitter } from 'nest-emitter';
Expand All @@ -7,13 +7,22 @@ import { EntityCustomization } from './entity-customization.interface';
import { ClusterService } from '../cluster/cluster.service';

@Injectable()
export class EntitiesService {
export class EntitiesService implements OnApplicationBootstrap {
private readonly entities: Map<string, Entity> = new Map<string, Entity>();
private readonly logger: Logger;

constructor(
private readonly clusterService: ClusterService,
@InjectEventEmitter() private readonly emitter: EntitiesEventEmitter
) {}
) {
this.logger = new Logger(EntitiesService.name);
}

onApplicationBootstrap(): void {
this.clusterService.on('elected', () => {
this.refreshStates();
});
}

has(id: string): boolean {
return this.entities.has(id);
Expand Down Expand Up @@ -42,4 +51,24 @@ export class EntitiesService {
this.emitter.emit('newEntity', proxy, customizations);
return proxy;
}

refreshStates(): void {
this.logger.log('Refreshing entity states');
this.entities.forEach(entity => {
if (!entity.distributed || this.clusterService.isMajorityLeader()) {
this.emitter.emit(
'stateUpdate',
entity.id,
entity.state,
entity.distributed
);
this.emitter.emit(
'attributesUpdate',
entity.id,
entity.attributes,
entity.distributed
);
}
});
}
}

0 comments on commit be380cd

Please sign in to comment.