Skip to content

Commit

Permalink
rename waitOnTags -> nextTaggedMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
wanderer committed Nov 21, 2017
1 parent 5a462da commit 67c9bed
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
9 changes: 6 additions & 3 deletions actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ module.exports = class Actor {
* @param {object} message
* @returns {Promise}
*/
async create (message) {
await this.message(message, 'onCreation')
create (message) {
// start loop before running intializtion message so the the container state
// will be "running" incase the actor recievse a message will running
// creation code
this._startMessageLoop()
return this.message(message, 'onCreation')
}

// waits for the next message
Expand All @@ -68,7 +71,7 @@ module.exports = class Actor {
if (this.containerState !== 'running') {
this.containerState = 'running'
while (1) {
const message = await this.inbox.getNextMessage()
const message = await this.inbox.nextMessage()
if (!message) break

// if the message we recived had more ticks then we currently have then
Expand Down
17 changes: 10 additions & 7 deletions inbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,12 @@ module.exports = class Inbox {
* @param {Integer} timeout
* @returns {Promise}
*/
async waitOnTag (tags, timeout) {
if (this._waitingTags) {
throw new Error('already getting next message')
}

async nextTaggedMessage (tags, timeout) {
this._waitingTags = new Set(tags)
this._queue = this._queue.filter(message => !this._queueTaggedMessage(message))

// todo: add saturation test
const message = await this.getNextMessage(timeout)
const message = await this.nextMessage(timeout)
delete this._waitingTags
this._waitingTagsQueue.forEach(message => this._queueMessage(message))
this._waitingTagsQueue = []
Expand All @@ -74,7 +70,13 @@ module.exports = class Inbox {
* @param {Integer} timeout
* @returns {Promise}
*/
async getNextMessage (timeout = 0) {
async nextMessage (timeout = 0) {
if (this._gettingNextMessage) {
throw new Error('already getting next message')
} else {
this._gettingNextMessage = true
}

let message = this._getOldestMessage()
if (message === undefined && timeout === 0) {
return
Expand Down Expand Up @@ -102,6 +104,7 @@ module.exports = class Inbox {
])
oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
}
this._gettingNextMessage = false
return this._deQueueMessage()
}

Expand Down
22 changes: 11 additions & 11 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,14 @@ tape('actor creation', async t => {
}

onMessage (m) {
t.true(m, 'should recive a response message')
t.equals(m.data, 'test', 'should recive a response message')
}
}

class testVMContainerB extends BaseContainer {
onCreation (m) {
const cap = m.caps[0]
return this.actor.send(cap, new Message())
return this.actor.send(cap, new Message({data: 'test'}))
}

static get typeId () {
Expand Down Expand Up @@ -483,7 +483,7 @@ tape('basic tagged caps', async t => {
const message = new Message()
message.responseCap = rCap
await this.actor.send(m.caps[0], message)
const rMessage = await this.actor.inbox.waitOnTag([1], 44)
const rMessage = await this.actor.inbox.nextTaggedMessage([1], 44)
t.true(rMessage, 'should recive a response message')
}
}
Expand Down Expand Up @@ -529,9 +529,9 @@ tape('trying to listen for caps more then once', async t => {
const message = new Message({data: 'first'})
message.responseCap = rCap
await this.actor.send(m.caps[0], message)
const promise = this.actor.inbox.waitOnTag([1], 44)
const promise = this.actor.inbox.nextTaggedMessage([1], 44)
try {
await this.actor.inbox.waitOnTag([1], 44)
await this.actor.inbox.nextTaggedMessage([1], 44)
} catch (e) {
t.true(e, 'should error if waiting twice')
}
Expand Down Expand Up @@ -591,7 +591,7 @@ tape('multple messages to restore on waiting for tags', async t => {
this.actor.send(m.caps[0], message1),
this.actor.send(m.caps[1], message2)
])
const rMessage = await this.actor.inbox.waitOnTag([1, 2], 44)
const rMessage = await this.actor.inbox.nextTaggedMessage([1, 2], 44)
t.true(rMessage, 'should recive a response message')
}
}
Expand Down Expand Up @@ -655,7 +655,7 @@ tape('multple messages to backup on waiting for tags', async t => {
message2.caps.push(cap2)
await this.actor.send(m.caps[0], message1)
await this.actor.send(m.caps[1], message2)
const rMessage = await this.actor.inbox.waitOnTag([1, 2], 44)
const rMessage = await this.actor.inbox.nextTaggedMessage([1, 2], 44)
t.true(rMessage, 'should recive a response message')
}
}
Expand Down Expand Up @@ -719,7 +719,7 @@ tape('multple messages, but single tag', async t => {
message2.caps.push(cap2)
await this.actor.send(m.caps[0], message1)
await this.actor.send(m.caps[1], message2)
const rMessage = await this.actor.inbox.waitOnTag([2], 44)
const rMessage = await this.actor.inbox.nextTaggedMessage([2], 44)
t.true(rMessage, 'should recive a response message')
}
}
Expand Down Expand Up @@ -770,7 +770,7 @@ tape('deadlock test', async t => {
class testVMContainerA extends BaseContainer {
async onMessage (m) {
t.true(m, 'should recive first message 1')
const rMessage = await this.actor.inbox.waitOnTag([1], 50)
const rMessage = await this.actor.inbox.nextTaggedMessage([1], 50)
t.equals(rMessage, undefined, 'should recive a response message 1')
}
}
Expand All @@ -779,7 +779,7 @@ tape('deadlock test', async t => {
async onMessage (m) {
t.true(m, 'should recive first message 2')
this.actor.incrementTicks(47)
const rMessage = await this.actor.inbox.waitOnTag([1], 1)
const rMessage = await this.actor.inbox.nextTaggedMessage([1], 1)
t.equals(rMessage, undefined, 'should recive a response message 2')
}

Expand All @@ -792,7 +792,7 @@ tape('deadlock test', async t => {
async onMessage (m) {
t.true(m, 'should recive first message 3')
this.actor.incrementTicks(45)
const rMessage = await this.actor.inbox.waitOnTag([1], 1)
const rMessage = await this.actor.inbox.nextTaggedMessage([1], 1)
t.equals(rMessage, undefined, 'should recive a response message 3')
}

Expand Down

0 comments on commit 67c9bed

Please sign in to comment.