Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
stagas committed May 6, 2012
0 parents commit 8415784
Show file tree
Hide file tree
Showing 11 changed files with 571 additions and 0 deletions.
9 changes: 9 additions & 0 deletions README.md
@@ -0,0 +1,9 @@
# drama

Actor model implementation for JavaScript and Node.js (work in progress)

Inspirations: Scala Actors, Akka, Pykka

## licence

MIT/X11
63 changes: 63 additions & 0 deletions examples/akka-tutorial-first.js
@@ -0,0 +1,63 @@
/**
* Port of the Akka first tutorial
* Distributed PI calculation
*/

var ActorSystem = require('../').ActorSystem

var sys = new ActorSystem

sys.define("PiWorker", {
calculatePiFor: function (m, reply) {
var acc = 0.0
for (var i = m.start; i < m.start + m.nrOfElements; i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
}
reply(acc)
}
, work: function (m) {
var sender = this.sender
this.sender.tell({ _: 'result'
, $: this.self.ask({ _: 'calculatePiFor', $: m })
})
}
})

var master = sys.create('pi calculator')

master.become({
init: function (opts) {
this.nrOfWorkers = opts.nrOfWorkers
this.workers = []
for (var w = 0; w < this.nrOfWorkers; w++) {
this.workers.push(this.self.create("pi-w", "PiWorker"))
}
this.startTime = Date.now()
}
, calculate: function (opts) {
var i = 0
this.pi = 0
this.nrOfResults = 0
this.nrOfElements = opts.nrOfElements
this.nrOfMessages = opts.nrOfMessages
while (i < this.nrOfMessages) {
for (var w = 0; w < this.nrOfWorkers; w++) {
this.self.tell(this.workers[w], { _: 'work', $: {
start: (i++) * this.nrOfElements
, nrOfElements: this.nrOfElements
}})
}
}
}
, result: function (val) {
this.pi += val
this.nrOfResults++
if (this.nrOfResults === this.nrOfMessages) {
console.log('Pi approximation:', this.pi)
console.log('Calculation time:', Date.now() - this.startTime)
}
}
})

master.init({ nrOfWorkers: 4 })
master.tell({ _: 'calculate', $: { nrOfElements: 10000, nrOfMessages: 10000 } })
41 changes: 41 additions & 0 deletions examples/pingpong.js
@@ -0,0 +1,41 @@
var ActorSystem = require('../').ActorSystem

var sys = new ActorSystem

var pong = sys.create('pong')
pong.become({
init: function () {
this.pongCount = 0
}
, ping: function (value, reply) {
if (this.pongCount % 1000 == 0) {
console.log("Pong: ping "+this.pongCount)
}
this.sender.tell({ _: 'pong' })
this.pongCount = this.pongCount + 1
}
})
pong.init()

var ping = sys.create('ping')
ping.become({
init: function () {
this.pingsLeft = 10000
this.self.tell(pong, { _: 'ping' })
}
, pong: function () {
if (this.pingsLeft % 1000 == 0) {
console.log("Ping: pong")
}
if (this.pingsLeft > 0) {
this.self.tell(pong, { _: 'ping' })
this.pingsLeft -= 1
} else {
console.log("Ping: stop")
pong.stop()
this.self.stop()
}
}
})

ping.init()
52 changes: 52 additions & 0 deletions examples/slaves.js
@@ -0,0 +1,52 @@
var ActorSystem = require('../').ActorSystem

var system = new ActorSystem()
system.define('Slave', {
greetings: function (name, reply, next) {
var self = this
this.self.ask(counter, { _: 'inc' }).when(function () {
self.self.ask(counter, { _: 'get' }).when(function (val) {
self.name = self.name + '-' + val
reply('Salut ' + name + '! We are ' + val + '! I am ' + self.name)
next()
})
})
}
})

var slaves = [];

for (var i = 10; i--;) {
slaves.push(system.create('slave', 'Slave'))
}

var counter = system.create('counter')
var one = system.create('one')

counter.become({
init: function () {
this.count = 0
}
, get: function (m, reply) {
reply(this.count)
}
, inc: function (m, reply, next) {
this.self.ask({ _: 'get' }).when(function (count) {
reply()
next(function () {
this.count = 1 + count
})
})
}
})
counter.init()

var master = system.create('master')
master.init()

slaves.forEach(function (slave) {
master.ask(slave, { _: 'greetings', $: master.name }).when(function (reply) {
console.log(reply)
console.log(slave.name)
})
})
7 changes: 7 additions & 0 deletions index.js
@@ -0,0 +1,7 @@
/**
* drama
*/

var ActorSystem = require('./lib/system').ActorSystem
exports = module.exports = ActorSystem
exports.ActorSystem = ActorSystem
164 changes: 164 additions & 0 deletions lib/actor.js
@@ -0,0 +1,164 @@
var util = require('util')

function uid () {
return Math.floor(Math.random() * Date.now() * 10000).toString(36)
}

function Actor () {
this.address = uid()
this.mailbox = []

this.running = true
this._locked = {}
this._flushing = false
this._reading = false

this.beh = {
init: function () {}
}
}

Actor.prototype.__defineGetter__('name', function () {
return this.context.name
})

Actor.prototype.toString = function () {
return this.context.name + ':' + this.address
}

Actor.prototype.isLocked = function (actorRef) {
if (!actorRef || !actorRef.address) return false
return !!this._locked[actorRef.address]
}

Actor.prototype.lock = function (actorRef) {
if (!actorRef || !actorRef.address) return false
if (this._locked[actorRef.address]) throw new Error('Attempt to lock twice')
this._locked[actorRef.address] = true
//console.log('%s locked %s', this, actorRef)
}

Actor.prototype.unlock = function (actorRef) {
if (!actorRef || !actorRef.address) return false
if ('undefined' === typeof this._locked[actorRef.address]) throw new Error('No lock existing like that')
if (!this._locked[actorRef.address]) throw new Error('Attempt to unlock twice')
this._locked[actorRef.address] = false
//console.log('%s unlocked %s', this, actorRef)

// possible memory leak, it should delete
// but it would be less costly to
// keep the same memory position
// perhaps a garbage collector of last access time
// would work here
}

Actor.prototype.become = function (beh) {
this.beh = beh
}

Actor.prototype.processNext = function () {
var self = this
!function read () {
var env = self.mailbox[0]
if (!env) return
//console.log('locked' + env.value.sender)
if (self.isLocked(env.value.sender)) return //process.nextTick(read)
self.mailbox.shift()
self.lock(env.value.sender)
self.receive(env.value)
}()
}

Actor.prototype.receive = function (message) {
var replyfn
if ('object' === typeof message && '_' in message) {
if (message.sender) {
this.context.sender = message.sender
}
if (message.replyTo) {
replyfn = function (m) {
message.replyTo.tell({ _: 'fulfill', $: m })
}
}

// standard behavior
if ('become' === message._) {
this.become(message.$)
this.unlock(message.sender)
return
}
else if ('stop' === message._) {
this.stop()
this.unlock(message.sender)
return
}

var method = this.beh[message._]

if (!method && message._ === 'init') method = function () {}
if (!method) {
console.log(this)
throw new Error('No such method ' + message._ + ' ' + util.inspect(message.$))
}
var handle = function handle (val) {
if (len <= 2) {
var fn = method.call(this.context, val, replyfn)
fn && fn.call(this.context)
this.unlock(message.sender)
return
}
else {
method.call(this.context, val, replyfn, function (fn) {
fn && fn.call(this.context)
this.unlock(message.sender)
}.bind(this))
}
}.bind(this)

var len = method.length
if ('object' === typeof message.$ && message.$.isFuture) {
message.$.when(function (val) {
handle(val)
})
}
else handle(message.$)

} else {
//
}
}

Actor.prototype.flush = function (callback) {
var self = this
if (this._flushing) return callback && callback.call(self.context)
this._flushing = true
!function next () {
if (self.reading) return

self.reading = true
self.processNext()
self.reading = false

if (self.mailbox.length) next()
else {
self._flushing = false
callback && callback.call(self.context)
}
}()
}

Actor.prototype.send = function (env) {
if (!this.running) return
this.mailbox[this.isLocked(env.sender) ? 'unshift' : 'push'](env)
this.flush()
}

Actor.prototype.stop = function () {
var self = this
this.running = false
this.flush(function () {
self.context.self.destroy()
})
}

exports.Actor = Actor
15 changes: 15 additions & 0 deletions lib/context.js
@@ -0,0 +1,15 @@
var Props = require('./props')

function Context (ctx) {
this.children = new Registry()
this.parent = ctx.parent
this.props = ctx.props
this.self = ctx.self
this.sender = ctx.sender
}

Context

Context.prototype.stop = function (actorRef, callback) {
actorRef.stop(callback)
}
38 changes: 38 additions & 0 deletions lib/future.js
@@ -0,0 +1,38 @@
var util = require('util')
var Actor = require('./actor').Actor

function Future () {
Actor.call(this)

var self = this

this.isFuture = true
this.fulfilled = false
this.value = null
this.listeners = []

this.beh = {
fulfill: function (value) {
self.value = value
self.fulfilled = true
if (!self.listeners.length) return
self.listeners.forEach(function (fn) {
fn(value)
})
this.self.stop()
}
}
}

util.inherits(Future, Actor)

Future.prototype.when = function (fn) {
if (!this.fulfilled) {
this.listeners.push(fn)
} else {
fn(this.value)
this.context.self.stop()
}
}

exports.Future = Future

0 comments on commit 8415784

Please sign in to comment.