Skip to content

Commit

Permalink
fix(cluster): ensure there is only one leader
Browse files Browse the repository at this point in the history
Closes #195
  • Loading branch information
mKeRix committed Jun 7, 2020
1 parent 8481883 commit 286adc2
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
66 changes: 56 additions & 10 deletions src/cluster/cluster.service.spec.ts
Expand Up @@ -19,9 +19,14 @@ const mockMdns = {
},
dns_sd: [],
};
const mockSocket = {
bind: jest.fn(),
on: jest.fn(),
send: jest.fn(),
};

import { networkInterfaces } from 'os';
import Democracy, { Node } from 'democracy';
import { Node } from 'democracy';
import { Test, TestingModule } from '@nestjs/testing';
import { ClusterService } from './cluster.service';
import { ConfigModule } from '../config/config.module';
Expand All @@ -30,12 +35,17 @@ import { ConfigService } from '../config/config.service';
import c from 'config';

jest.mock('os');
jest.mock('democracy');
jest.mock('dgram', () => {
return {
createSocket: jest.fn().mockReturnValue(mockSocket),
};
});
jest.mock('mdns', () => mockMdns, { virtual: true });
jest.useFakeTimers();

describe('ClusterService', () => {
let service: ClusterService;
const mockConfig = new ClusterConfig();
const mockConfig = { ...new ClusterConfig(), weight: 50 };
const configService = {
get: jest.fn().mockImplementation((key: string) => {
return key === 'cluster' ? mockConfig : c.get(key);
Expand Down Expand Up @@ -81,13 +91,11 @@ describe('ClusterService', () => {
});

it('should determine the local IP', () => {
expect(Democracy).toHaveBeenCalledWith({
id: 'test-instance',
source: '192.168.1.108:6425',
peers: [],
timeout: 60000,
weight: undefined,
});
expect(mockSocket.bind).toHaveBeenCalledWith(
'6425',
'192.168.1.108',
expect.any(Function)
);
});

it('should start advertising room-assistant via Bonjour', () => {
Expand Down Expand Up @@ -177,4 +185,42 @@ describe('ClusterService', () => {

expect(service.isMajorityLeader()).toBeFalsy();
});

it('should handle cluster leader conflicts', () => {
mockSocket.bind.mock.calls[0][2]();
const socketCallback = mockSocket.on.mock.calls[0][1];

const msg1 = Buffer.from(
JSON.stringify({
event: 'hello',
id: 'node1',
weight: 60,
state: 'leader',
channels: [],
source: '127.0.0.1:6425',
}),
'utf8'
);
const msg2 = Buffer.from(
JSON.stringify({
event: 'hello',
id: 'node2',
weight: 55,
state: 'leader',
channels: [],
source: '127.0.0.1:6426',
}),
'utf8'
);

socketCallback(msg1);
socketCallback(msg2);

const leaders = Object.entries(service.nodes()).filter(
(node) => node[1].state === 'leader'
);

expect(service.leader().id).toBe('node1');
expect(leaders).toHaveLength(1);
});
});
26 changes: 26 additions & 0 deletions src/cluster/cluster.service.ts
Expand Up @@ -150,6 +150,32 @@ export class ClusterService extends Democracy
});
}

/**
* Process events that are received over the network.
*
* @param msg - Received message
* @returns this
*/
protected processEvent(msg: Buffer): this {
super.processEvent(msg);
const data = this.decodeMsg(msg);

if (!data.chunk && data.state === 'leader') {
const leaders = Object.entries(this._nodes).filter(
(node) => node[1].state === 'leader'
);

if (leaders.length > 1) {
leaders.forEach((leader) => {
this._nodes[leader[0]].state = 'citizen';
});
this.holdElections();
}
}

return this;
}

/**
* Check if a node should be removed from the cluster.
*
Expand Down
3 changes: 3 additions & 0 deletions typings/democracy/index.d.ts
Expand Up @@ -13,11 +13,14 @@ declare module 'democracy' {
constructor(opts?: Options);
protected addNodeToList(node: Node): void;
protected checkBallots(candidate: string): this;
protected processEvent(msg: Buffer): this;
protected decodeMsg(msg: Buffer): Record<string, unknown>;
hello(): this;
nodes(): { [key: string]: Node };
leader(): Node | undefined;
resign(): this;
isLeader(): boolean;
holdElections(): this;
send(customEvent: string, extraData?: any, id?: string): this;
subscribe(channel: string): this;
publish(channel: string, msg: any): this;
Expand Down

0 comments on commit 286adc2

Please sign in to comment.