/
cluster.js
87 lines (72 loc) · 2.85 KB
/
cluster.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const node_cluster = require('cluster');
const path = require('path');
const os = require('os');
const serialize = require('../serialize');
const debug = require('debug');
const {PID} = require('../types');
const log = debug('open-telecom:adapters:cluster');
function create_cluster_manager(cluster = node_cluster) {
class ClusterManager {
constructor() {
this.workers = [];
this.node_ids = 0;
this.cpus = os.cpus();
this.resolvers = new Map();
this.node_workers = new Map();
}
async start() {
cluster.setupMaster({
exec: path.resolve(__dirname, 'slave-start.js'),
});
cluster.on('message', (worker, message) => {
log('message : %o', message);
const parsed = serialize.parse(message);
log('parsed : %o', parsed);
this.handle_message(worker, parsed);
});
for (let cpu of this.cpus) {
log('fork');
const worker = cluster.fork();
this.workers.push(worker);
await new Promise(resolve => setTimeout(resolve, 50));
}
}
async load_module(module) {
log('load_module : %o', module);
for (let node_id of this.node_workers.keys()) {
log('load_module : deliver : %o', node_id);
await this.deliver_message(
{to: PID.of(node_id, 0), msg: {cmd: 'load_module', args: [module]}});
}
}
async handle_message(worker, message) {
let result = null;
let error = null;
switch (message.cmd) {
case 'deliver':
await this.deliver_message(message.payload);
result = true;
break;
case 'register_node':
const node_id = ++this.node_ids;
this.node_workers.set(node_id, worker);
result = node_id;
break;
default:
error = new Error(`Unknown Command: ${message.cmd}`);
console.error(`UNKNOWN MESSAGE: %o`, message);
}
log('handle_message : command : %o', message.cmd);
log('handle_message : result : %o', result);
worker.send(serialize.stringify({cmd: 'rpc-reply', ref: message.ref, error, result}));
}
deliver_message(message) {
log('deliver_message : %o', message);
const node_id = message.to.node;
const worker = this.node_workers.get(node_id);
worker.send(serialize.stringify({cmd: 'deliver', payload: message}))
}
}
return ClusterManager;
}
module.exports = {create_cluster_manager};