From 8243cbaeffbf41047427ccf4e12aea01adef17f7 Mon Sep 17 00:00:00 2001 From: George Stagas Date: Tue, 15 May 2012 11:50:13 +0300 Subject: [PATCH] Version 0.1.0 --- .gitignore | 1 + examples/akka-tutorial-first.js | 63 --------- examples/ask.js | 12 ++ examples/async-db.js | 32 +++++ examples/chat-client.js | 21 +++ examples/chat.js | 91 ++++++++++++ examples/distributed-db.js | 75 ++++++++++ examples/distributed-pi.js | 87 ++++++++++++ examples/error.js | 13 ++ examples/ping-pong.js | 29 ++++ examples/pingpong.js | 41 ------ examples/recursive.js | 28 ++++ examples/remote-hello.js | 15 ++ examples/remote-pass-future.js | 17 +++ examples/remote-ping-pong.js | 38 +++++ examples/slaves.js | 52 ------- index.js | 14 ++ lib/actor.js | 244 +++++++++++++++----------------- lib/context.js | 15 -- lib/future.js | 38 ----- lib/message.js | 6 + lib/ref.js | 182 ++++++++++++++---------- lib/registry.js | 22 ++- lib/system.js | 148 ++++++++++++++----- lib/worker.js | 27 ++++ package.json | 16 +++ 26 files changed, 876 insertions(+), 451 deletions(-) create mode 100644 .gitignore delete mode 100644 examples/akka-tutorial-first.js create mode 100644 examples/ask.js create mode 100644 examples/async-db.js create mode 100644 examples/chat-client.js create mode 100644 examples/chat.js create mode 100644 examples/distributed-db.js create mode 100644 examples/distributed-pi.js create mode 100644 examples/error.js create mode 100644 examples/ping-pong.js delete mode 100644 examples/pingpong.js create mode 100644 examples/recursive.js create mode 100644 examples/remote-hello.js create mode 100644 examples/remote-pass-future.js create mode 100644 examples/remote-ping-pong.js delete mode 100644 examples/slaves.js delete mode 100644 lib/context.js delete mode 100644 lib/future.js create mode 100644 lib/message.js create mode 100644 lib/worker.js create mode 100644 package.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/examples/akka-tutorial-first.js b/examples/akka-tutorial-first.js deleted file mode 100644 index c74e692..0000000 --- a/examples/akka-tutorial-first.js +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Port of the Akka first tutorial - * Distributed PI calculation - */ - -var ActorSystem = require('../').ActorSystem - -var sys = new ActorSystem - -sys.define("PiWorker", { - calculatePiFor: function (m, reply) { - var acc = 0.0 - for (var i = m.start; i < m.start + m.nrOfElements; i++) { - acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) - } - reply(acc) - } -, work: function (m) { - var sender = this.sender - this.sender.tell({ _: 'result' - , $: this.self.ask({ _: 'calculatePiFor', $: m }) - }) - } -}) - -var master = sys.create('pi calculator') - -master.become({ - init: function (opts) { - this.nrOfWorkers = opts.nrOfWorkers - this.workers = [] - for (var w = 0; w < this.nrOfWorkers; w++) { - this.workers.push(this.self.create("pi-w", "PiWorker")) - } - this.startTime = Date.now() - } -, calculate: function (opts) { - var i = 0 - this.pi = 0 - this.nrOfResults = 0 - this.nrOfElements = opts.nrOfElements - this.nrOfMessages = opts.nrOfMessages - while (i < this.nrOfMessages) { - for (var w = 0; w < this.nrOfWorkers; w++) { - this.self.tell(this.workers[w], { _: 'work', $: { - start: (i++) * this.nrOfElements - , nrOfElements: this.nrOfElements - }}) - } - } - } -, result: function (val) { - this.pi += val - this.nrOfResults++ - if (this.nrOfResults === this.nrOfMessages) { - console.log('Pi approximation:', this.pi) - console.log('Calculation time:', Date.now() - this.startTime) - } - } -}) - -master.init({ nrOfWorkers: 4 }) -master.tell({ _: 'calculate', $: { nrOfElements: 10000, nrOfMessages: 10000 } }) diff --git a/examples/ask.js b/examples/ask.js new file mode 100644 index 0000000..7f0144d --- /dev/null +++ b/examples/ask.js @@ -0,0 +1,12 @@ +var drama = require('../') +var sys = drama('sys') + +var actor = sys.actor({ + hello: function () { + this.reply('hello back') + } +}).pick('?hello') + +actor.hello(function (reply) { + console.log(reply) +}) diff --git a/examples/async-db.js b/examples/async-db.js new file mode 100644 index 0000000..ef58892 --- /dev/null +++ b/examples/async-db.js @@ -0,0 +1,32 @@ +var drama = require('../') +var sys = drama('sys') +var atomic = require('atomic') + +var db = sys.actor(function () { + var data = {} + var lock = atomic() + return { + get: function (key) { + var reply = this.reply.bind(this) + lock(key, function (done) { + setTimeout(function () { + reply(data[key], key) + done() + }, 500) + }) + } + , set: function (key, val) { + lock(key, function (done) { + setTimeout(function () { + data[key] = val + done() + }, 1000) + }) + } + } +}).init().pick('?get', 'set') + +db.set('foo', 'bar') +db.get('foo', function (val, key) { + console.log('%s: %s', key, val) +}) diff --git a/examples/chat-client.js b/examples/chat-client.js new file mode 100644 index 0000000..c813516 --- /dev/null +++ b/examples/chat-client.js @@ -0,0 +1,21 @@ +var readline = require('readline') +var net = require('net') + +var rl = readline.createInterface(process.stdin, process.stdout) +rl.on('close', function () { + console.log('Have a great day!') + process.exit() +}) + +var client = net.connect(6886, 'localhost') + +client.setEncoding('utf8') +client.on('data', function (data) { + process.stdout.write(data) +}) + +rl.on('line', function (line) { + client.write(line) + rl.setPrompt('>', 2) + rl.prompt() +}) diff --git a/examples/chat.js b/examples/chat.js new file mode 100644 index 0000000..ab1d71a --- /dev/null +++ b/examples/chat.js @@ -0,0 +1,91 @@ +var drama = require('../') +var sys = drama('sys') +var $ = sys.actor.bind(sys) + +var channels = [] + +var Client = function (socket) { + var nick + + var Enter = function () { + socket.write('\nEnter your nickname: ') + return function (newNick) { + if (newNick && newNick.length > 2) { + nick = newNick + socket.write('Welcome, ' + nick + '!\n') + socket.write('To enter the main room type: /join main\n') + return Lobby + } + else { + socket.write('Not a valid nickname (<2 chars)\n') + return Enter() + } + } + } + + var Lobby = { + join: function (name) { + if (!~channels.indexOf(name)) { + $(name, Channel).init(name) + channels.push(name) + } + var channel = 'sys@' + name + this.send('join', nick).to(channel) + return { + msg: function (msg) { + this.send('msg', msg, nick).to(channel) + } + , echo: function (msg) { + socket.write(msg + '\n') + } + , part: function (msg) { + this.send('part', msg || 'Bye.', nick).to(channel) + this.echo('Leaving ' + name) + return Lobby + } + , _: function () { + this.echo("I don't understand: " + this.originalMessage) + } + } + } + , _: function () { + socket.write('You have to join a channel before you can take action.\n') + } + } + return Enter() +} + +var Channel = function (name) { + var clients = {} + return { + join: function (nick) { + clients[nick] = this.sender + this.broadcast(nick + ' has joined the room.') + } + , part: function (msg, nick) { + delete clients[nick] + this.broadcast(nick + ' has left the room: ' + msg) + } + , msg: function (msg, nick) { + this.broadcast(nick + ': ' + msg, nick) + } + , broadcast: function (msg, nick) { + for (var k in clients) { + if (k !== nick) clients[k].tell('echo', msg) + } + } + } +} + +var net = require('net') +net.createServer(function (socket) { + socket.uid = (Math.random() * 100000).toString(36) + var client = $(socket.uid, Client).init(socket) + socket.setEncoding('utf8') + socket.on('data', function (data) { + if (data[0] === '/') { + client.tell(data.split(' ')[0].substr(1), data.split(' ').slice(1).join(' ')) + } + else client.tell('msg', data) + }) +}).listen(6886) diff --git a/examples/distributed-db.js b/examples/distributed-db.js new file mode 100644 index 0000000..eca1e29 --- /dev/null +++ b/examples/distributed-db.js @@ -0,0 +1,75 @@ +var drama = require('../') +var sys = drama('sys') +var nextTick = require('nexttick') + +var Db = function (isChild) { + var atomic = require('atomic') + var crypto = require('crypto') + + function hash (key) { + return crypto.createHash('sha1').update(key).digest('hex') + } + + var data = {} + var lock = atomic() + var workers = {} + var w = 0 + + var Child = { + get: function (key) { + var reply = this.reply.bind(this) + lock(key, function (done) { + setTimeout(function () { // simulate an async db get function + reply(data[key]) + done() + }, 50) + }) + } + , set: function (key, val) { + lock(key, function (done) { + setTimeout(function () { // simulate an async db set function + data[key] = val + done() + }, 300) + }) + } + } + + var Router = { + get: function (key) { + var reply = this.reply.bind(this) + workers[hash(key).substr(0, 1)].get(key, function (val) { + reply(val, key) + }) + } + , set: function (key, val) { + workers[hash(key).substr(0, 1)].set(key, val) + } + } + + var Master = function () { + ;'abcdefghijklmnopqrstuvwxyz0123456789'.split('').forEach(function (l) { + var worker = sys.fork('remote-' + l) + .actor('worker-' + l, Db) + .init(true) + .pick('?get', 'set') + workers[l] = worker + }) + return Router + } + + if (isChild) return Child + else return Master +} + +var db = sys.actor('db', Db).init().init().pick('?get', 'set') + +var x = 0, y = 0 +nextTick.while(function () { return ++y < 100 }, function () { + for (var x = 0; x < 100; x++) { + db.set('foo-' + y, 'bar-' + x) + db.get('foo-' + y, function (val, key) { + console.log('%s: %s', key, val) + }) + } +}) diff --git a/examples/distributed-pi.js b/examples/distributed-pi.js new file mode 100644 index 0000000..3effb9b --- /dev/null +++ b/examples/distributed-pi.js @@ -0,0 +1,87 @@ +/** + * Port of the Akka first tutorial + * Distributed PI calculator + */ + +var drama = require('../') +var sys = drama('sys') +var $ = sys.actor.bind(sys) + +var PiWorker = function () { + var it = 0 + return { + calculatePiFor: function (m) { + var acc = 0.0 + for (var i = m.start; i < m.start + m.elements; i++) { + acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) + } + it += m.elements + this.reply(acc) + } + , getPiPiece: function (needed) { + var ctx = this + var self = this.self + var piPiece = 0 + var count = needed.pieces.length + if (!count) ctx.reply(0) + else { + needed.pieces.forEach(function (piece) { + self.actor().ask(self, 'calculatePiFor', { start: piece, elements: needed.elements }, function (result) { + piPiece += result + --count || ctx.reply(piPiece) + }) + }) + } + } + } +} + +var piCalculator = sys.actor('pi calculator', function (opts) { + var numOfWorkers = opts.workers + var workers = [] + var threads = new Array(opts.threads) + for (var t = 0; t < opts.threads; t++) { + threads[t] = sys.fork('thread-' + t) + for (var w = 0, actor; w < numOfWorkers; w++) { + actor = threads[t].actor('pi-t' + t + '-w' + w, PiWorker) //, function (actor) { + actor.init() + workers.push(actor) + } + } + var totalWorkers = workers.length + var startTime = Date.now() + return { + calculate: function (elements) { + var pi = 0 + var nrOfResults = 0 + var count = elements + var ctx = this + + var c = 0 + var w = 0 + var needed = {} + for (var i = 0; i < count; i++) { + if (++w >= totalWorkers) w = 0 + needed[w] = needed[w] || [] + needed[w].push(i * elements) + } + + count = totalWorkers + for (var w in needed) { + $().ask(workers[w], 'getPiPiece', { + elements: elements + , pieces: needed[w] }, function (result) { + pi += result + --count || ctx.reply(pi, Date.now() - startTime) + }) + } + } + } +}) + +piCalculator.init({ threads: 3, workers: 2 }) +$().ask(piCalculator, 'calculate', 10000, function (pi, time) { + console.log('Pi approximation:', pi) + console.log('Time to finish:', time) +}) + diff --git a/examples/error.js b/examples/error.js new file mode 100644 index 0000000..7314f6d --- /dev/null +++ b/examples/error.js @@ -0,0 +1,13 @@ +var drama = require('../') +var sys = drama('sys') + +var actor = sys.actor({ + hello: function () { + throw new Error('Some error!') + } +}) + +actor.tell('onUncaughtException', function (e) { + console.error('Got error:', e.message) + console.error(e.stack) +}).tell('hello') diff --git a/examples/ping-pong.js b/examples/ping-pong.js new file mode 100644 index 0000000..1188356 --- /dev/null +++ b/examples/ping-pong.js @@ -0,0 +1,29 @@ +var drama = require('../') +var sys = drama('sys') + +var PingPong = function () { + var i = 0 + var me = this.self.pick('ping', 'pong') + var Ping = { + ping: function () { + console.log('ping') + if (++i > 100) { + console.log('finished') + return + } + me.pong() + return Pong + } + } + var Pong = { + pong: function () { + console.log('pong') + me.ping() + return Ping + } + } + return Ping +} + +var pingPong = sys.actor('ping-pong', PingPong).init().pick('ping') +pingPong.ping() diff --git a/examples/pingpong.js b/examples/pingpong.js deleted file mode 100644 index 38b6f84..0000000 --- a/examples/pingpong.js +++ /dev/null @@ -1,41 +0,0 @@ -var ActorSystem = require('../').ActorSystem - -var sys = new ActorSystem - -var pong = sys.create('pong') -pong.become({ - init: function () { - this.pongCount = 0 - } -, ping: function (value, reply) { - if (this.pongCount % 1000 == 0) { - console.log("Pong: ping "+this.pongCount) - } - this.sender.tell({ _: 'pong' }) - this.pongCount = this.pongCount + 1 - } -}) -pong.init() - -var ping = sys.create('ping') -ping.become({ - init: function () { - this.pingsLeft = 10000 - this.self.tell(pong, { _: 'ping' }) - } -, pong: function () { - if (this.pingsLeft % 1000 == 0) { - console.log("Ping: pong") - } - if (this.pingsLeft > 0) { - this.self.tell(pong, { _: 'ping' }) - this.pingsLeft -= 1 - } else { - console.log("Ping: stop") - pong.stop() - this.self.stop() - } - } -}) - -ping.init() diff --git a/examples/recursive.js b/examples/recursive.js new file mode 100644 index 0000000..a131cd3 --- /dev/null +++ b/examples/recursive.js @@ -0,0 +1,28 @@ +var drama = require('../') +var sys = drama('sys') + +var other = sys.actor('other') + +function buildChain (size, next) { + var a = sys.actor({ + die: function () { + console.log('dying', size) + var from = this.sender + if (next) { + this.send('die').to(next) + return { + ack: function () { + console.log('acked', size) + this.send('ack').to(from) + } + } + } else this.send('ack').to(from) + } + }) + if (size > 0) return buildChain(size - 1, a) + else return a +} + +sys.actor({ ack: function () { + console.log('done') +}}).send('die').to(buildChain(1000)) diff --git a/examples/remote-hello.js b/examples/remote-hello.js new file mode 100644 index 0000000..57595dc --- /dev/null +++ b/examples/remote-hello.js @@ -0,0 +1,15 @@ +var drama = require('../') + +var sys = drama('sys') +var remote = sys.fork('remote') + +var actor = remote.actor('bob', { + hello: function () { + console.log('you:', this.method) + this.reply('hiya') + } +}).pick('?hello') + +actor.hello(function (reply) { + console.log('%s: %s', actor.address, reply) +}) diff --git a/examples/remote-pass-future.js b/examples/remote-pass-future.js new file mode 100644 index 0000000..aa2dcac --- /dev/null +++ b/examples/remote-pass-future.js @@ -0,0 +1,17 @@ +var drama = require('../') + +var sys = drama('sys') +var remote = sys.fork('remote') + +var actor = remote.actor('bob', { + _multiply: function (x, y) { + this.reply(x * y) + } +, multiply: function (x, y) { + this.reply(this.ask(this, '_multiply', x, y)) + } +}).pick('??multiply') + +actor.multiply(4, 6, function (result) { + console.log('result is:', result) +}) diff --git a/examples/remote-ping-pong.js b/examples/remote-ping-pong.js new file mode 100644 index 0000000..02aa64c --- /dev/null +++ b/examples/remote-ping-pong.js @@ -0,0 +1,38 @@ +var drama = require('../') +var sys = drama('sys') + +var Pong = function () { + var pings = 0 + return { + ping: function () { + if (!(++pings % 1000)) console.log('ping %d', pings) + this.send('pong').to(this.sender) + } + } +} + +var Ping = function (opts) { + var pings = opts.pings || 10000 + var target = opts.target + this.send('ping').to(target) + return { + pong: function() { + if (pings--) { + if (!((opts.pings - pings) % 1000)) console.log('pong %d', opts.pings - pings) + this.send('ping').to(target) + } + else { + console.log('done') + } + } + } +} + +var ponger = sys.fork('ponger') +var pinger = sys.fork('pinger') + +var pong = ponger.actor('pong', Pong) +var ping = pinger.actor('ping', Ping) + +pong.init() +ping.init({ target: pong, pings: 10000 }) diff --git a/examples/slaves.js b/examples/slaves.js deleted file mode 100644 index 6d2760e..0000000 --- a/examples/slaves.js +++ /dev/null @@ -1,52 +0,0 @@ -var ActorSystem = require('../').ActorSystem - -var system = new ActorSystem() -system.define('Slave', { - greetings: function (name, reply, next) { - var self = this - this.self.ask(counter, { _: 'inc' }).when(function () { - self.self.ask(counter, { _: 'get' }).when(function (val) { - self.name = self.name + '-' + val - reply('Salut ' + name + '! We are ' + val + '! I am ' + self.name) - next() - }) - }) - } -}) - -var slaves = []; - -for (var i = 10; i--;) { - slaves.push(system.create('slave', 'Slave')) -} - -var counter = system.create('counter') -var one = system.create('one') - -counter.become({ - init: function () { - this.count = 0 - } -, get: function (m, reply) { - reply(this.count) - } -, inc: function (m, reply, next) { - this.self.ask({ _: 'get' }).when(function (count) { - reply() - next(function () { - this.count = 1 + count - }) - }) - } -}) -counter.init() - -var master = system.create('master') -master.init() - -slaves.forEach(function (slave) { - master.ask(slave, { _: 'greetings', $: master.name }).when(function (reply) { - console.log(reply) - console.log(slave.name) - }) -}) diff --git a/index.js b/index.js index d0b1c09..374d952 100644 --- a/index.js +++ b/index.js @@ -3,5 +3,19 @@ */ var ActorSystem = require('./lib/system').ActorSystem +var Actor = require('./lib/actor').Actor +var Ref = require('./lib/ref').Ref +var Message = require('./lib/message').Message +var Registry = require('./lib/registry').Registry + exports = module.exports = ActorSystem + exports.ActorSystem = ActorSystem +exports.Actor = Actor +exports.Ref = Ref +exports.Message = Message +exports.Registry = Registry + +exports.actor = function (address, beh) { + return new Actor(address, beh) +} diff --git a/lib/actor.js b/lib/actor.js index 8216e88..afb1f88 100644 --- a/lib/actor.js +++ b/lib/actor.js @@ -1,164 +1,152 @@ var util = require('util') +var EventEmitter = require('events').EventEmitter +var Message = require('./message').Message +var Ref = require('./ref').Ref +var slice = [].slice -function uid () { - return Math.floor(Math.random() * Date.now() * 10000).toString(36) -} +function Actor (address, beh) { + EventEmitter.call(this) -function Actor () { - this.address = uid() - this.mailbox = [] + this.address = address + if ('string' !== typeof this.address) + throw new Error('Not a valid address: ' + + util.inspect(this.address)) - this.running = true - this._locked = {} - this._flushing = false - this._reading = false + this.react(beh) - this.beh = { - init: function () {} - } + this.inbox = [] + this.self = this.ref = new Ref(this) + this.started = false + this.running = false } -Actor.prototype.__defineGetter__('name', function () { - return this.context.name -}) +util.inherits(Actor, EventEmitter) -Actor.prototype.toString = function () { - return this.context.name + ':' + this.address +Actor.prototype.belongsTo = function (system) { + system.register(this) + return this } -Actor.prototype.isLocked = function (actorRef) { - if (!actorRef || !actorRef.address) return false - return !!this._locked[actorRef.address] +Actor.prototype.setSystem = function (system) { + if (this.system) this.address = this.address.split('@')[1] + this.system = system + this.ref.system = system + this.address = system.address + '@' + this.address + this.ref.address = this.address + return this } -Actor.prototype.lock = function (actorRef) { - if (!actorRef || !actorRef.address) return false - if (this._locked[actorRef.address]) throw new Error('Attempt to lock twice') - this._locked[actorRef.address] = true - //console.log('%s locked %s', this, actorRef) +Actor.prototype.start = function (system) { + if (system) this.belongsTo(system) + this.started = true + this.running = true + this.on('message', function (message) { + this.enqueue(message) + }) + return this } -Actor.prototype.unlock = function (actorRef) { - if (!actorRef || !actorRef.address) return false - if ('undefined' === typeof this._locked[actorRef.address]) throw new Error('No lock existing like that') - if (!this._locked[actorRef.address]) throw new Error('Attempt to unlock twice') - this._locked[actorRef.address] = false - //console.log('%s unlocked %s', this, actorRef) - - // possible memory leak, it should delete - // but it would be less costly to - // keep the same memory position - // perhaps a garbage collector of last access time - // would work here +Actor.prototype.stop = function () { + var system = this.system + this.running = false + this.system.unregister(this) + return this } +Actor.prototype.react = Actor.prototype.become = function (beh) { + if ('function' !== typeof beh && 'object' !== typeof beh) { + throw new Error(this.address + ': Cannot set behavior "' + + beh + '"') + } this.beh = beh + return this } -Actor.prototype.processNext = function () { - var self = this - !function read () { - var env = self.mailbox[0] - if (!env) return - //console.log('locked' + env.value.sender) - if (self.isLocked(env.value.sender)) return //process.nextTick(read) - self.mailbox.shift() - self.lock(env.value.sender) - self.receive(env.value) - }() -} +Actor.prototype.processNext = function (message) { + var method = message.args[0] + var messageArgs = message.args.slice(1) + var fn -Actor.prototype.receive = function (message) { - var replyfn - if ('object' === typeof message && '_' in message) { - if (message.sender) { - this.context.sender = message.sender - } - if (message.replyTo) { - replyfn = function (m) { - message.replyTo.tell({ _: 'fulfill', $: m }) - } + fn = this[method] + if ('function' === typeof fn) { + return fn.apply(this, messageArgs) + } + else { + if ('function' === typeof this.beh) fn = this.beh + else { + fn = this.beh[method] + if (!fn) fn = this.beh._ // catch-all } + } - // standard behavior - if ('become' === message._) { - this.become(message.$) - this.unlock(message.sender) - return - } - else if ('stop' === message._) { - this.stop() - this.unlock(message.sender) - return - } + if ('function' !== typeof fn) + throw new Error(this.address + ': No action or method exists: "' + + method + '". Message: ' + util.inspect(message)) + + var context = {} + context.address = this.address + for (var k in this.beh) context[k] = this.beh[k] + for (var k in message) context[k] = message[k] + context.method = method + context.originalMessage = message.args.join(' ') + context.self = this.self + context.send = this.self.send.bind(this.self) + context.tell = this.self.tell.bind(this.self) + context.ask = this.self.ask.bind(this.self) + context.stop = this.self.stop.bind(this.self) + context.reply = function () { + var args = [].slice.call(arguments) + args.unshift('__reply__') + this.self.send.apply(this.self, args).to(this.sender) + }.bind(context) + context.sender = this.system.ref(context.sender) + + var beh = fn.apply(context, messageArgs) + if (beh) this.react(beh) + + return this +} - var method = this.beh[message._] +Actor.prototype.onUncaughtException = function (fn) { + this._uncaughtExceptionHandler = fn +} - if (!method && message._ === 'init') method = function () {} - if (!method) { - console.log(this) - throw new Error('No such method ' + message._ + ' ' + util.inspect(message.$)) - } - var handle = function handle (val) { - if (len <= 2) { - var fn = method.call(this.context, val, replyfn) - fn && fn.call(this.context) - this.unlock(message.sender) - return - } - else { - method.call(this.context, val, replyfn, function (fn) { - fn && fn.call(this.context) - this.unlock(message.sender) - }.bind(this)) - } - }.bind(this) - - var len = method.length - if ('object' === typeof message.$ && message.$.isFuture) { - message.$.when(function (val) { - handle(val) - }) - } - else handle(message.$) +Actor.prototype._uncaughtExceptionHandler = function (error) { + throw error +} - } else { - // +Actor.prototype.enqueue = function (message) { + this.inbox.push(message) + if (!this.flushing) { + this.flush() } + return this +} + +Actor.prototype.send = function (message) { + this.emit('message', message) + return this } Actor.prototype.flush = function (callback) { - var self = this - if (this._flushing) return callback && callback.call(self.context) - this._flushing = true + var message + this.flushing = true !function next () { - if (self.reading) return - - self.reading = true - self.processNext() - self.reading = false - - if (self.mailbox.length) next() + var message = this.inbox.shift() + if (!message) throw new Error('Empty message') + try { + this.processNext(message) + } catch (e) { this._uncaughtExceptionHandler(e) } + if (this.inbox.length) { + next.call(this) + } else { - self._flushing = false - callback && callback.call(self.context) + this.flushing = false + if (callback) callback() } - }() -} - -Actor.prototype.send = function (env) { - if (!this.running) return - this.mailbox[this.isLocked(env.sender) ? 'unshift' : 'push'](env) - this.flush() -} - -Actor.prototype.stop = function () { - var self = this - this.running = false - this.flush(function () { - self.context.self.destroy() - }) + }.call(this) + return this } exports.Actor = Actor diff --git a/lib/context.js b/lib/context.js deleted file mode 100644 index 9e8daa4..0000000 --- a/lib/context.js +++ /dev/null @@ -1,15 +0,0 @@ -var Props = require('./props') - -function Context (ctx) { - this.children = new Registry() - this.parent = ctx.parent - this.props = ctx.props - this.self = ctx.self - this.sender = ctx.sender -} - -Context - -Context.prototype.stop = function (actorRef, callback) { - actorRef.stop(callback) -} \ No newline at end of file diff --git a/lib/future.js b/lib/future.js deleted file mode 100644 index 9902435..0000000 --- a/lib/future.js +++ /dev/null @@ -1,38 +0,0 @@ -var util = require('util') -var Actor = require('./actor').Actor - -function Future () { - Actor.call(this) - - var self = this - - this.isFuture = true - this.fulfilled = false - this.value = null - this.listeners = [] - - this.beh = { - fulfill: function (value) { - self.value = value - self.fulfilled = true - if (!self.listeners.length) return - self.listeners.forEach(function (fn) { - fn(value) - }) - this.self.stop() - } - } -} - -util.inherits(Future, Actor) - -Future.prototype.when = function (fn) { - if (!this.fulfilled) { - this.listeners.push(fn) - } else { - fn(this.value) - this.context.self.stop() - } -} - -exports.Future = Future diff --git a/lib/message.js b/lib/message.js new file mode 100644 index 0000000..28d77f7 --- /dev/null +++ b/lib/message.js @@ -0,0 +1,6 @@ +function Message (sender, args) { + this.sender = 'object' === typeof sender ? sender.address : sender + this.args = args +} + +exports.Message = Message diff --git a/lib/ref.js b/lib/ref.js index 9b8f83e..3c2b012 100644 --- a/lib/ref.js +++ b/lib/ref.js @@ -1,97 +1,131 @@ -var Future = require('./future').Future +var Message = require('./message').Message +var Actor = require('./actor').Actor +var slice = [].slice -function ActorRef (system, address, name) { +function Ref (item, system) { + this.address = 'object' == typeof item ? item.address : item this.system = system - this.address = address - this.context = new Context(this) - this.context.name = name } -ActorRef.prototype.__defineGetter__('name', function () { - return this.context.name -}) - -ActorRef.prototype.__defineSetter__('name', function (s) { - return (this.context.name = s) +;[ 'init', 'start', 'react', 'stop' ].forEach(function (method) { + Ref.prototype[method] = function () { + var args = slice.call(arguments) + args.unshift(method) + this.tell.apply(this, args) + return this + } }) -ActorRef.prototype.toString = function () { - return this.context.name + '::' + this.address +Ref.prototype.actor = +Ref.prototype.create = +function (address, beh) { + return this.system.create(address, beh) } -ActorRef.prototype.create = function (name, behName) { - return this.system.create(name, behName) +Ref.prototype.tell = function () { + var message = new Message(this, slice.call(arguments)) + message.to = this.address + this.system.deliver(message) + return this } -ActorRef.prototype.tell = function (receiver, message, isProxy) { - if (!(receiver instanceof ActorRef)) { - message = receiver - receiver = this - } - if (isProxy) { - message.replyTo = this - message.sender = message.sender || this - } - else { - message.sender = this - } - -/* - console.log( - pad(message.sender.toString().grey + ' > '.green, 53) - + pad(receiver.toString().white - , 35).l - + pad('.'.white + message._.cyan + '(' + (util.inspect(message.$ || '') || ' ').yellow + ')' - , 53).r - + ( message.replyTo ? ' replyTo'.red + ': '.yellow - + (message.replyTo && message.replyTo.toString().cyan || '') - + ' 0&&l||0).join(c!=u?c:' ');return {l:c+s,r:s+c,toString:function(){return c+s}}} +exports.Ref = Ref diff --git a/lib/registry.js b/lib/registry.js index 57c2604..aec4ce8 100644 --- a/lib/registry.js +++ b/lib/registry.js @@ -3,15 +3,23 @@ function Registry () { } Registry.prototype.add = function (item) { - this._list[item.address] = item + this._list[this.resolve(item)] = item } -Registry.prototype.get = function (address) { - return this._list[address] +Registry.prototype.resolve = function (item) { + return 'object' === typeof item ? item.address : item } -Registry.prototype.remove = function (address) { - delete this._list[address] +Registry.prototype.get = function (item) { + return this._list[this.resolve(item)] +} + +Registry.prototype.remove = function (item) { + delete this._list[this.resolve(item)] +} + +Registry.prototype.has = function (address) { + return address in this._list } Registry.prototype.forEach = function (fn) { @@ -19,4 +27,8 @@ Registry.prototype.forEach = function (fn) { for (var k in this._list) fn(this._list[k], k, i++, this._list) } +Registry.prototype.keys = function () { + return Object.keys(this._list) +} + exports.Registry = Registry diff --git a/lib/system.js b/lib/system.js index 1305157..041373b 100644 --- a/lib/system.js +++ b/lib/system.js @@ -1,63 +1,141 @@ var util = require('util') -var clone = require('clone') +var EventEmitter = require('events').EventEmitter +var cp = require('child_process') + var Registry = require('./registry').Registry var Actor = require('./actor').Actor -var EventEmitter = require('events').EventEmitter -var ActorRef = require('./ref').ActorRef -require('colors') +var Ref = require('./ref').Ref -function ActorSystem (opts) { - opts = opts || {} +var toSource = require('tosource') +function ActorSystem (address, parent) { + if (!(this instanceof ActorSystem)) return new ActorSystem(address, parent) EventEmitter.call(this) - this.name = opts.name || uid() - this.behs = {} - this.registry = new Registry() + this.address = address + this.registry = new Registry(this) + this.router = new Registry(this) + this.parent = parent + if (this.parent) this.router.add(this.parent) + this.utils = utils } util.inherits(ActorSystem, EventEmitter) -ActorSystem.prototype.define = function (behName, beh) { - this.behs[behName] = beh -} - -ActorSystem.prototype.create = function (name, behName) { - var actor = new Actor() - +ActorSystem.prototype.register = function (actor) { + actor.setSystem(this) this.registry.add(actor) +} - var actorRef = new ActorRef(this, actor.address, name) +ActorSystem.prototype.unregister = function (actor) { + this.registry.remove(actor) +} - actor.context = actorRef.context +ActorSystem.prototype.deliver = function (message) { + if (process.argv[2] === 'debug') { + console.log('%s %s: %s', message.sender, message.to, message.args + .map(function (el) { return util.inspect(el).substr(0,30) }).join(' ')) + } + var target = message.to.split('@') + if (target.length < 2) { + throw new Error('Target system missing: ' + + message.to) + } - var beh = this.behs[behName] - if (beh) actorRef.become(beh) + if (this.address === target[0]) { + this.registry.get(message.to).send(message) + } + else if (this.router.has(target[0])) { + this.router.get(target[0]).send(message) + } + else if (this.parent) { + this.parent.send(message) + } + else { + throw new Error(this.address + ': Don\'t know what to do with: ' + + '"' + target[0] + '". Message: ' + util.inspect(message)) + } +} - return actorRef +ActorSystem.prototype.ref = function (item) { + return new Ref(item, this) } -ActorSystem.prototype.send = function (actorRef, message) { - var actor = this.registry.get(actorRef.address) - if (actor) { - var env = new Envelope(actor, message) - actor.send(env) +ActorSystem.prototype.fork = function (address) { + var sys = this + var child = cp.fork(__dirname + '/worker.js') + + var remote = {} + + remote.address = address + remote.child = child + remote.send = function (message) { + //console.log('safe:', safe(message)) + this.child.send(utils.safe(message)) } + + remote.ctrl = remote.controller = sys.ref(remote.address + '@controller').pick('create', '?getRegistry') + + remote.actor = + remote.create = function (address, beh, callback) { + remote.controller.create(address, toSource(beh)) + return sys.ref(remote.address + '@' + address) + } + + this.router.add(remote) + + remote.child.on('message', function (message) { + //console.log('got message from child', message) + sys.deliver(message) + }) + + remote.send({ address: address, parent: this.address }) + + return remote } -ActorSystem.prototype.destroy = function (actorRef) { - this.registry.remove(actorRef.address) +ActorSystem.prototype.actor = +ActorSystem.prototype.create = +function () { + var opts = parseArgs([].slice.call(arguments)) + return new Actor(opts.address, opts.beh).start(this).ref } -// utils +function parseArgs (args) { + var address = args[0] + var beh = args[1] + + if ('object' === typeof address || 'function' === typeof address) { + beh = address + address = utils.uid() + } -function uid () { - return Math.floor(Math.random () * Date.now() * 10000).toString(36) + return { address: address || utils.uid(), beh: beh || {} } } -function Envelope (actor, message) { - this.value = message - this.context = actor.context +exports.ActorSystem = ActorSystem + +/* utils */ + +var utils = exports.utils = {} + +utils.safe = function safe (o, isArray) { + var s = null == isArray ? {} : new Array(isArray) + for (var k in o) { + if (o[k] instanceof Actor || o[k] instanceof Ref || o[k] instanceof ActorSystem) { + s[k] = o[k].address + } + else if (Array.isArray(o[k])) { + s[k] = safe(o[k], o[k].length) + } + else if ('object' === typeof o[k]) { + s[k] = safe(o[k]) + } + else s[k] = o[k] + } + return s } -exports.ActorSystem = ActorSystem +utils.uid = function uid () { + return Math.floor(Math.random () * Date.now() * 10000).toString(36) +} diff --git a/lib/worker.js b/lib/worker.js new file mode 100644 index 0000000..7822ac6 --- /dev/null +++ b/lib/worker.js @@ -0,0 +1,27 @@ +var drama = require('../') + +process.once('message', function (setup) { + var parent = {} + parent.address = setup.parent + parent.send = function (message) { + //console.log('should send', safe(message)) + process.send(sys.utils.safe(message)) + } + var sys = drama(setup.address, parent) + + sys.create('controller', { + create: function (address, beh) { + beh = eval('(' + beh + ')') + //console.log('should create', address, beh) + var actor = this.self.create(address, beh) + } + , getRegistry: function () { + this.reply(sys.registry.keys()) + } + }) + + process.on('message', function (message) { + //console.log('got message from parent:', message) + sys.deliver(message) + }) +}) diff --git a/package.json b/package.json new file mode 100644 index 0000000..eae1714 --- /dev/null +++ b/package.json @@ -0,0 +1,16 @@ +{ + "author": "George Stagas ", + "name": "drama", + "version": "0.1.0", + "dependencies": { + "tosource": "~0.1.1" + }, + "devDependencies": { + "atomic": "~0.0.2", + "nexttick": "~0.1.0" + }, + "optionalDependencies": {}, + "engines": { + "node": "~0.6.17" + } +}