Skip to content

Commit

Permalink
feat(entities): bunch entity updates together
Browse files Browse the repository at this point in the history
  • Loading branch information
mKeRix committed Feb 28, 2021
1 parent d861486 commit 52f0197
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 4 deletions.
74 changes: 72 additions & 2 deletions src/entities/entities.service.spec.ts
Expand Up @@ -131,11 +131,15 @@ describe('EntitiesService', () => {
});

it('should send state updates to publishers', () => {
jest.useFakeTimers('modern');

const entity = new Sensor('test_sensor', 'Test Sensor');
const spy = jest.spyOn(emitter, 'emit');

const entityProxy = service.add(entity);
entityProxy.state = 1337;
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledWith('entityUpdate', entity, [{
newValue: 1337,
oldValue: undefined,
Expand All @@ -144,11 +148,15 @@ describe('EntitiesService', () => {
});

it('should send attribute updates to publishers', () => {
jest.useFakeTimers('modern');

const entity = new Sensor('attributes_sensor', 'Sensor with attributes');
const spy = jest.spyOn(emitter, 'emit');

const entityProxy = service.add(entity);
entityProxy.attributes.test = '123';
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledWith(
'entityUpdate',
entityProxy,
Expand All @@ -162,40 +170,72 @@ describe('EntitiesService', () => {
});

it("should not send updates for non-changed values", () => {
jest.useFakeTimers('modern');

const entity = new Sensor('test_sensor', 'Test Sensor');
const spy = jest.spyOn(emitter, 'emit');

const entityProxy = service.add(entity);
spy.mockClear();

entityProxy.state = 'abc';
jest.advanceTimersByTime(250);
entityProxy.state = 'abc';
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledTimes(1);
});

it("should not include diffs for non-changed values", () => {
jest.useFakeTimers('modern');

const entity = new Sensor('test_sensor', 'Test Sensor');
const spy = jest.spyOn(emitter, 'emit');

const entityProxy = service.add(entity);
spy.mockClear();

entityProxy.state = 'abc';
entityProxy.state = 'abc';
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledWith('entityUpdate', entityProxy, [{
newValue: 'abc',
oldValue: undefined,
path: '/state'
}], true);
});

it("should send updates for type-changed values", () => {
jest.useFakeTimers('modern');

const entity = new Sensor('test_sensor', 'Test Sensor');
const spy = jest.spyOn(emitter, 'emit');

const entityProxy = service.add(entity);
spy.mockClear();

entityProxy.state = '123';
jest.advanceTimersByTime(250);
entityProxy.state = 123;
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledTimes(2);
});

it("should send include old values in entity updates", () => {
jest.useFakeTimers('modern');

const entity = new Sensor('test_sensor', 'Test Sensor');
const spy = jest.spyOn(emitter, 'emit');

const entityProxy = service.add(entity);
spy.mockClear();

entityProxy.state = 'abc';
jest.advanceTimersByTime(250);
entityProxy.state = 'def';
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledWith('entityUpdate', entity, [{
newValue: 'def',
Expand All @@ -205,20 +245,26 @@ describe('EntitiesService', () => {
});

it("should emit entity updates for array changes", () => {
jest.useFakeTimers('modern')

const entity = new Sensor('test_sensor', 'Test Sensor');
const spy = jest.spyOn(emitter, 'emit');

const entityProxy = service.add(entity);
spy.mockClear();

entityProxy.attributes.test = ['item1'];
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledWith('entityUpdate', entityProxy, [{
newValue: ['item1'],
oldValue: undefined,
path: '/attributes/test'
}], true)

entityProxy.attributes.test.push('item2');
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledWith('entityUpdate', entityProxy, [{
newValue: 'item2',
oldValue: undefined,
Expand All @@ -227,6 +273,8 @@ describe('EntitiesService', () => {
});

it("should send updates for nested objects", () => {
jest.useFakeTimers('modern');

const entity = new Sensor('test_sensor', 'Test Sensor');
const spy = jest.spyOn(emitter, 'emit');

Expand All @@ -236,7 +284,9 @@ describe('EntitiesService', () => {
entityProxy.attributes.test = {
key1: 'value1'
};
jest.advanceTimersByTime(250);
entityProxy.attributes.test.key1 = 'value2';
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledWith('entityUpdate', entity, [{
newValue: 'value2',
Expand All @@ -248,45 +298,56 @@ describe('EntitiesService', () => {

describe('Entity Authority', () => {
it('should always mark non-distributed entity updates as authority', () => {
jest.useFakeTimers('modern');

const entity = new Sensor('distributed_sensor', 'Distribution', false);
const spy = jest.spyOn(emitter, 'emit');
clusterService.isMajorityLeader.mockReturnValue(false);

const entityProxy = service.add(entity);
spy.mockClear();
entityProxy.state = true;
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), true)
});

it('should mark distributed entity updates as non-authority if not the leader', () => {
jest.useFakeTimers('modern');

const entity = new Sensor('distributed_sensor', 'Distribution', true);
const spy = jest.spyOn(emitter, 'emit');
clusterService.isMajorityLeader.mockReturnValue(false);

const entityProxy = service.add(entity);
spy.mockClear();
entityProxy.state = true;
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), false)
});

it('should mark distributed entity updates as authority if the leader', () => {
jest.useFakeTimers('modern');

const entity = new Sensor('distributed_sensor', 'Distribution', true);
const spy = jest.spyOn(emitter, 'emit');
clusterService.isMajorityLeader.mockReturnValue(true);

const entityProxy = service.add(entity);
spy.mockClear();
entityProxy.state = true;
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), true)
});

it('should mark distributed entity updates as authority if state is not locked', () => {
jest.useFakeTimers('modern')

const entity = new Sensor(
'distributed_sensor',
'Distribution',
Expand All @@ -299,6 +360,7 @@ describe('EntitiesService', () => {
const entityProxy = service.add(entity);
spy.mockClear();
entityProxy.state = 'test';
jest.advanceTimersByTime(250);

expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('entityUpdate', expect.anything(), expect.anything(), true)
Expand Down Expand Up @@ -447,6 +509,8 @@ describe('EntitiesService', () => {
spy.mockClear();

entityProxy.state = 'test1';
jest.advanceTimersByTime(250);

expect(entityProxy.state).toBe('test1');
expect(spy).toHaveBeenCalledWith(
'entityUpdate',
Expand Down Expand Up @@ -493,6 +557,8 @@ describe('EntitiesService', () => {
entityProxy.state = 10;
jest.advanceTimersByTime(1000);
expect(entityProxy.state).toBe(10);

jest.advanceTimersByTime(250);
expect(spy).toHaveBeenCalledWith(
'entityUpdate',
entityProxy,
Expand All @@ -504,12 +570,14 @@ describe('EntitiesService', () => {
true
);

jest.advanceTimersByTime(9 * 1000);
jest.advanceTimersByTime(9 * 1000 - 250);
entityProxy.state = 20;
expect(entityProxy.state).toBe(10);

jest.advanceTimersByTime(6 * 1000);
expect(entityProxy.state).toBe(13.75);

jest.advanceTimersByTime(250)
expect(spy).toHaveBeenCalledWith(
'entityUpdate',
entityProxy,
Expand All @@ -521,8 +589,10 @@ describe('EntitiesService', () => {
true
);

jest.advanceTimersByTime(55 * 1000);
jest.advanceTimersByTime(55 * 1000 - 250);
expect(entityProxy.state).toBe(20);

jest.advanceTimersByTime(250)
expect(spy).toHaveBeenCalledWith(
'entityUpdate',
entityProxy,
Expand Down
20 changes: 18 additions & 2 deletions src/entities/entity.proxy.ts
@@ -1,7 +1,12 @@
import { Entity } from "./entity.dto";
import { EntitiesEventEmitter, PropertyDiff } from "./entities.events";
import Timeout = NodeJS.Timeout;

export class EntityProxyHandler implements ProxyHandler<Entity> {
private diff: Array<PropertyDiff> = [];
private target?: Entity;
private flushTimeout?: Timeout;

constructor(private readonly emitter: EntitiesEventEmitter, private readonly isLeader: () => boolean) {
}

Expand All @@ -26,9 +31,20 @@ export class EntityProxyHandler implements ProxyHandler<Entity> {
}

private emitEntityUpdate(entity: Entity, diff: Array<PropertyDiff>): void {
const hasAuthority = !entity.distributed || !entity.stateLocked || this.isLeader();
this.target = entity;
this.diff.push(...diff)

if (!this.flushTimeout) {
this.flushTimeout = setTimeout(this.flushEntityUpdates.bind(this), 100)
}
}

private flushEntityUpdates(): void {
const hasAuthority = !this.target.distributed || !this.target.stateLocked || this.isLeader();
this.emitter.emit('entityUpdate', this.target, this.diff, hasAuthority)

this.emitter.emit('entityUpdate', entity, diff, hasAuthority)
this.diff = []
this.flushTimeout = undefined;
}
}

Expand Down

0 comments on commit 52f0197

Please sign in to comment.