From 52f01978ae4bf64942181eb842bfe1ced7d9c974 Mon Sep 17 00:00:00 2001 From: Heiko Rothe Date: Sun, 28 Feb 2021 00:20:20 +0100 Subject: [PATCH] feat(entities): bunch entity updates together --- src/entities/entities.service.spec.ts | 74 ++++++++++++++++++++++++++- src/entities/entity.proxy.ts | 20 +++++++- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/src/entities/entities.service.spec.ts b/src/entities/entities.service.spec.ts index d9b3b66..e23091b 100644 --- a/src/entities/entities.service.spec.ts +++ b/src/entities/entities.service.spec.ts @@ -131,11 +131,15 @@ describe('EntitiesService', () => { }); it('should send state updates to publishers', () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('test_sensor', 'Test Sensor'); const spy = jest.spyOn(emitter, 'emit'); const entityProxy = service.add(entity); entityProxy.state = 1337; + jest.advanceTimersByTime(250); + expect(spy).toHaveBeenCalledWith('entityUpdate', entity, [{ newValue: 1337, oldValue: undefined, @@ -144,11 +148,15 @@ describe('EntitiesService', () => { }); it('should send attribute updates to publishers', () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('attributes_sensor', 'Sensor with attributes'); const spy = jest.spyOn(emitter, 'emit'); const entityProxy = service.add(entity); entityProxy.attributes.test = '123'; + jest.advanceTimersByTime(250); + expect(spy).toHaveBeenCalledWith( 'entityUpdate', entityProxy, @@ -162,6 +170,8 @@ describe('EntitiesService', () => { }); it("should not send updates for non-changed values", () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('test_sensor', 'Test Sensor'); const spy = jest.spyOn(emitter, 'emit'); @@ -169,12 +179,36 @@ describe('EntitiesService', () => { spy.mockClear(); entityProxy.state = 'abc'; + jest.advanceTimersByTime(250); entityProxy.state = 'abc'; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledTimes(1); }); + it("should not include diffs for non-changed values", () => { + jest.useFakeTimers('modern'); + + const entity = new Sensor('test_sensor', 'Test Sensor'); + const spy = jest.spyOn(emitter, 'emit'); + + const entityProxy = service.add(entity); + spy.mockClear(); + + entityProxy.state = 'abc'; + entityProxy.state = 'abc'; + jest.advanceTimersByTime(250); + + expect(spy).toHaveBeenCalledWith('entityUpdate', entityProxy, [{ + newValue: 'abc', + oldValue: undefined, + path: '/state' + }], true); + }); + it("should send updates for type-changed values", () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('test_sensor', 'Test Sensor'); const spy = jest.spyOn(emitter, 'emit'); @@ -182,12 +216,16 @@ describe('EntitiesService', () => { spy.mockClear(); entityProxy.state = '123'; + jest.advanceTimersByTime(250); entityProxy.state = 123; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledTimes(2); }); it("should send include old values in entity updates", () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('test_sensor', 'Test Sensor'); const spy = jest.spyOn(emitter, 'emit'); @@ -195,7 +233,9 @@ describe('EntitiesService', () => { spy.mockClear(); entityProxy.state = 'abc'; + jest.advanceTimersByTime(250); entityProxy.state = 'def'; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledWith('entityUpdate', entity, [{ newValue: 'def', @@ -205,6 +245,8 @@ describe('EntitiesService', () => { }); it("should emit entity updates for array changes", () => { + jest.useFakeTimers('modern') + const entity = new Sensor('test_sensor', 'Test Sensor'); const spy = jest.spyOn(emitter, 'emit'); @@ -212,6 +254,8 @@ describe('EntitiesService', () => { spy.mockClear(); entityProxy.attributes.test = ['item1']; + jest.advanceTimersByTime(250); + expect(spy).toHaveBeenCalledWith('entityUpdate', entityProxy, [{ newValue: ['item1'], oldValue: undefined, @@ -219,6 +263,8 @@ describe('EntitiesService', () => { }], true) entityProxy.attributes.test.push('item2'); + jest.advanceTimersByTime(250); + expect(spy).toHaveBeenCalledWith('entityUpdate', entityProxy, [{ newValue: 'item2', oldValue: undefined, @@ -227,6 +273,8 @@ describe('EntitiesService', () => { }); it("should send updates for nested objects", () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('test_sensor', 'Test Sensor'); const spy = jest.spyOn(emitter, 'emit'); @@ -236,7 +284,9 @@ describe('EntitiesService', () => { entityProxy.attributes.test = { key1: 'value1' }; + jest.advanceTimersByTime(250); entityProxy.attributes.test.key1 = 'value2'; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledWith('entityUpdate', entity, [{ newValue: 'value2', @@ -248,6 +298,8 @@ describe('EntitiesService', () => { describe('Entity Authority', () => { it('should always mark non-distributed entity updates as authority', () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('distributed_sensor', 'Distribution', false); const spy = jest.spyOn(emitter, 'emit'); clusterService.isMajorityLeader.mockReturnValue(false); @@ -255,12 +307,15 @@ describe('EntitiesService', () => { const entityProxy = service.add(entity); spy.mockClear(); entityProxy.state = true; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledTimes(1); expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), true) }); it('should mark distributed entity updates as non-authority if not the leader', () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('distributed_sensor', 'Distribution', true); const spy = jest.spyOn(emitter, 'emit'); clusterService.isMajorityLeader.mockReturnValue(false); @@ -268,12 +323,15 @@ describe('EntitiesService', () => { const entityProxy = service.add(entity); spy.mockClear(); entityProxy.state = true; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledTimes(1); expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), false) }); it('should mark distributed entity updates as authority if the leader', () => { + jest.useFakeTimers('modern'); + const entity = new Sensor('distributed_sensor', 'Distribution', true); const spy = jest.spyOn(emitter, 'emit'); clusterService.isMajorityLeader.mockReturnValue(true); @@ -281,12 +339,15 @@ describe('EntitiesService', () => { const entityProxy = service.add(entity); spy.mockClear(); entityProxy.state = true; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledTimes(1); expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), true) }); it('should mark distributed entity updates as authority if state is not locked', () => { + jest.useFakeTimers('modern') + const entity = new Sensor( 'distributed_sensor', 'Distribution', @@ -299,6 +360,7 @@ describe('EntitiesService', () => { const entityProxy = service.add(entity); spy.mockClear(); entityProxy.state = 'test'; + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledTimes(1); expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), true) @@ -447,6 +509,8 @@ describe('EntitiesService', () => { spy.mockClear(); entityProxy.state = 'test1'; + jest.advanceTimersByTime(250); + expect(entityProxy.state).toBe('test1'); expect(spy).toHaveBeenCalledWith( 'entityUpdate', @@ -493,6 +557,8 @@ describe('EntitiesService', () => { entityProxy.state = 10; jest.advanceTimersByTime(1000); expect(entityProxy.state).toBe(10); + + jest.advanceTimersByTime(250); expect(spy).toHaveBeenCalledWith( 'entityUpdate', entityProxy, @@ -504,12 +570,14 @@ describe('EntitiesService', () => { true ); - jest.advanceTimersByTime(9 * 1000); + jest.advanceTimersByTime(9 * 1000 - 250); entityProxy.state = 20; expect(entityProxy.state).toBe(10); jest.advanceTimersByTime(6 * 1000); expect(entityProxy.state).toBe(13.75); + + jest.advanceTimersByTime(250) expect(spy).toHaveBeenCalledWith( 'entityUpdate', entityProxy, @@ -521,8 +589,10 @@ describe('EntitiesService', () => { true ); - jest.advanceTimersByTime(55 * 1000); + jest.advanceTimersByTime(55 * 1000 - 250); expect(entityProxy.state).toBe(20); + + jest.advanceTimersByTime(250) expect(spy).toHaveBeenCalledWith( 'entityUpdate', entityProxy, diff --git a/src/entities/entity.proxy.ts b/src/entities/entity.proxy.ts index 62be82c..9e974e8 100644 --- a/src/entities/entity.proxy.ts +++ b/src/entities/entity.proxy.ts @@ -1,7 +1,12 @@ import { Entity } from "./entity.dto"; import { EntitiesEventEmitter, PropertyDiff } from "./entities.events"; +import Timeout = NodeJS.Timeout; export class EntityProxyHandler implements ProxyHandler { + private diff: Array = []; + private target?: Entity; + private flushTimeout?: Timeout; + constructor(private readonly emitter: EntitiesEventEmitter, private readonly isLeader: () => boolean) { } @@ -26,9 +31,20 @@ export class EntityProxyHandler implements ProxyHandler { } private emitEntityUpdate(entity: Entity, diff: Array): void { - const hasAuthority = !entity.distributed || !entity.stateLocked || this.isLeader(); + this.target = entity; + this.diff.push(...diff) + + if (!this.flushTimeout) { + this.flushTimeout = setTimeout(this.flushEntityUpdates.bind(this), 100) + } + } + + private flushEntityUpdates(): void { + const hasAuthority = !this.target.distributed || !this.target.stateLocked || this.isLeader(); + this.emitter.emit('entityUpdate', this.target, this.diff, hasAuthority) - this.emitter.emit('entityUpdate', entity, diff, hasAuthority) + this.diff = [] + this.flushTimeout = undefined; } }