From e31d853e2b26d334d9ea78338b61da3d963cf0dd Mon Sep 17 00:00:00 2001 From: Zachary Hueras Date: Sat, 20 Apr 2019 14:56:24 -0400 Subject: [PATCH] Sketching some more stuff out --- lib/adapters/cluster-slave.js | 6 +++++ lib/adapters/cluster.js | 42 +++++++++++++++++++++-------------- lib/otp/supervisor.js | 29 +++++++++++++++++++++--- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/lib/adapters/cluster-slave.js b/lib/adapters/cluster-slave.js index 0bf9481..adf2c18 100644 --- a/lib/adapters/cluster-slave.js +++ b/lib/adapters/cluster-slave.js @@ -63,6 +63,8 @@ function create_adapter(emitter) { switch (message.cmd) { case 'rpc-reply': return handle_rpc_reply(message); + case 'rpc': + return handle_rpc(message); case 'deliver': return deliver(message.payload, false); } @@ -80,6 +82,10 @@ function create_adapter(emitter) { return resolve(reply.result); } + async function handle_rpc(message) { + log('rpc : %o', message); + } + async function deliver(message, tryMaster = true) { log('deliver : %O', message); diff --git a/lib/adapters/cluster.js b/lib/adapters/cluster.js index 16c6902..bc3be2e 100644 --- a/lib/adapters/cluster.js +++ b/lib/adapters/cluster.js @@ -3,6 +3,7 @@ const path = require('path'); const os = require('os'); const serialize = require('../serialize'); const debug = require('debug'); +const {PID} = require('../types'); const log = debug('open-telecom:adapters:cluster'); @@ -12,29 +13,36 @@ function create_cluster_manager(cluster = node_cluster) { this.workers = []; this.node_ids = 0; this.cpus = os.cpus(); + this.resolvers = new Map(); + this.node_workers = new Map(); } - start() { + async start() { cluster.setupMaster({ - exec: path.resolve(__dirname, 'cluster-slave.js'), + exec: path.resolve(__dirname, 'slave-start.js'), }); - for (let cpu of this.cpus) { - const worker = cluster.fork(); - this.workers.push(worker); - } - cluster.on('message', (worker, message) => { log('message : %o', message); const parsed = serialize.parse(message); log('parsed : %o', parsed); this.handle_message(worker, parsed); }); + + for (let cpu of this.cpus) { + log('fork'); + const worker = cluster.fork(); + this.workers.push(worker); + await new Promise(resolve => setTimeout(resolve, 50)); + } } - async load_module(load_module) { - for (let worker in this.workers) { - await this.rpc(worker, 'load_module', [module]); + async load_module(module) { + log('load_module : %o', module); + for (let node_id of this.node_workers.keys()) { + log('load_module : deliver : %o', node_id); + await this.deliver_message( + {to: PID.of(node_id, 0), msg: {cmd: 'load_module', args: [module]}}); } } @@ -49,6 +57,7 @@ function create_cluster_manager(cluster = node_cluster) { break; case 'register_node': const node_id = ++this.node_ids; + this.node_workers.set(node_id, worker); result = node_id; break; default: @@ -62,14 +71,13 @@ function create_cluster_manager(cluster = node_cluster) { worker.send(serialize.stringify({cmd: 'rpc-reply', ref: message.ref, error, result})); } - deliver_message(message) {} + deliver_message(message) { + log('deliver_message : %o', message); - rpc(worker, cmd, args = []) { - return new Promise((resolve, reject) => { - const ref = this.refs++; - worker.send(serialize.stringify({cmd, args, ref})); - this._resolvers.set(ref, {resolve, reject}); - }); + const node_id = message.to.node; + const worker = this.node_workers.get(node_id); + + worker.send(serialize.stringify({cmd: 'deliver', payload: message})) } } diff --git a/lib/otp/supervisor.js b/lib/otp/supervisor.js index a96e74a..f448f57 100644 --- a/lib/otp/supervisor.js +++ b/lib/otp/supervisor.js @@ -1,5 +1,8 @@ const proc_lib = require('./proc_lib'); +const ONE_FOR_ONE = Symbol.for('$otp:supervisor:one_for_one'); +const ONE_FOR_ALL = Symbol.for('$otp:supervisor:one_for_all'); + async function start(ctx, callbacks) { return proc_lib.start((ctx) => init(ctx, callbacks)); } @@ -8,11 +11,31 @@ async function start_link() { return proc_lib.start_link((ctx) => init(ctx, callbacks)); } -async function init(ctx) {} +async function init(ctx, callbacks) { + const {ok, strategy, children} = callbacks.init(ctx); + + if (!ok) + throw new Error('invalid initial state'); -async function loop(state) {} + const spawned = children.map(descriptor => { + const pid = ctx.spawn_link(descriptor); + return pid; + }); + + return loop(ctx, {children, spawned, strategy}); +} + +async function loop(ctx, state) { + let running = true; + while (running) { + const message = ctx.receive(); + } +} module.exports = { start, - start_link + start_link, + + ONE_FOR_ONE, + ONE_FOR_ALL, };