Skip to content

Commit

Permalink
feat(cluster): add peer syncing
Browse files Browse the repository at this point in the history
When adding a new node all peers will be broadcasted now, which should
lead to remote nodes picking it up quickly as well.
  • Loading branch information
mKeRix committed Feb 17, 2020
1 parent f218eac commit 56cfe34
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
35 changes: 29 additions & 6 deletions src/cluster/cluster.service.ts
Expand Up @@ -2,7 +2,8 @@ import {
Injectable,
Logger,
OnApplicationBootstrap,
OnApplicationShutdown
OnApplicationShutdown,
OnModuleInit
} from '@nestjs/common';
import Democracy, { Node } from 'democracy';
import { Advertisement, Browser, Service } from 'mdns';
Expand All @@ -21,7 +22,7 @@ try {

@Injectable()
export class ClusterService extends Democracy
implements OnApplicationBootstrap, OnApplicationShutdown {
implements OnModuleInit, OnApplicationBootstrap, OnApplicationShutdown {
private readonly configService: ConfigService;
private readonly config: ClusterConfig;
private readonly logger: Logger;
Expand Down Expand Up @@ -54,6 +55,13 @@ export class ClusterService extends Democracy
this.logger = new Logger(ClusterService.name);
}

onModuleInit(): void {
this.on('added', this.handleNodeAdded);
this.on('removed', this.handleNodeRemoved);
this.on('elected', this.handleNodeElected);
this.on('peers', this.addMissingPeers);
}

onApplicationBootstrap(): void {
if (this.config.autoDiscovery) {
if (mdns !== undefined) {
Expand All @@ -65,9 +73,7 @@ export class ClusterService extends Democracy
}
}

this.on('added', this.handleNodeAdded);
this.on('removed', this.handleNodeRemoved);
this.on('elected', this.handleNodeElected);
this.broadcastPeers();
}

onApplicationShutdown(): void {
Expand All @@ -86,6 +92,22 @@ export class ClusterService extends Democracy
return !this.config.quorum || activeNodes.length >= this.config.quorum;
}

broadcastPeers(): void {
this.send('peers', this.options.peers);
}

protected addMissingPeers(peers: string[][]): void {
peers.forEach(peer => {
const index = this.options.peers.findIndex(
p => p[0] === peer[0] && p[1] === peer[1]
);

if (index < 0) {
this.options.peers.push(peer);
}
});
}

protected startBonjourDiscovery(): void {
this.advertisement = mdns.createAdvertisement(
mdns.udp('room-assistant'),
Expand Down Expand Up @@ -128,11 +150,12 @@ export class ClusterService extends Democracy
}

this.options.peers.push([service.addresses[0], service.port.toString()]);
this.hello();
this.send('hello');
}

private handleNodeAdded(node: Node): void {
this.logger.log(`Added ${node.source} to the cluster with id ${node.id}`);
this.broadcastPeers();
}

private handleNodeRemoved(node: Node): void {
Expand Down
7 changes: 6 additions & 1 deletion typings/democracy/index.d.ts
@@ -1,6 +1,11 @@
declare module 'democracy' {
import {Socket} from 'dgram';
export default class Democracy {
protected options: InternalOptions;
protected socket: Socket;
protected _id: string;
protected _weight: number;
protected _state: 'leader' | 'citizen' | 'removed';

constructor(opts?: Options);
protected addNodeToList(node: Node): void;
Expand All @@ -9,7 +14,7 @@ declare module 'democracy' {
leader(): Node;
resign(): this;
isLeader(): boolean;
send(customEvent: string, extraData: any, id?: string): this;
send(customEvent: string, extraData?: any, id?: string): this;
subscribe(channel: string): this;
publish(channel: string, msg: any): this;
on(event: string, func: (data: any) => void);
Expand Down

0 comments on commit 56cfe34

Please sign in to comment.