Skip to content

Commit

Permalink
add setTimeout to throttle actor shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: wanderer <mjbecze@gmail.com>
  • Loading branch information
wanderer committed Jan 4, 2018
1 parent 818c7e2 commit c77c761
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 77 deletions.
16 changes: 7 additions & 9 deletions actor.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Pipe = require('buffer-pipe')
const Cap = require('primea-capability')
const Message = require('primea-message')
const leb128 = require('leb128').unsigned
const Inbox = require('./inbox.js')

Expand Down Expand Up @@ -65,19 +64,18 @@ module.exports = class Actor {
// waits for the next message
async _startMessageLoop () {
// this ensure we only every have one loop running at a time
while (1) {
const message = await this.inbox.nextMessage(0, true)
if (!message) break

// run the next message
while (!this.inbox.isEmpty) {
const message = await this.inbox.nextMessage(0)
await this.runMessage(message)
}
this.running = false
// wait for state ops to finish
await this.state.done()
if (!this.running) {
this.container.onIdle()
}
setTimeout(() => {
if (!this.running) {
this.container.onIdle()
}
}, 0)
}

serializeMetaData () {
Expand Down
20 changes: 8 additions & 12 deletions inbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ module.exports = class Inbox {
})
}

get isEmpty () {
return !this._queue.length
}

/**
* queues a message
* @param {Message} message
Expand Down Expand Up @@ -70,23 +74,14 @@ module.exports = class Inbox {
* @param {Integer} timeout
* @returns {Promise}
*/
nextMessage (timeout, getCurrent = false) {
async nextMessage (timeout) {
if (!this._gettingNextMessage) {
this._gettingNextMessage = this._nextMessage(timeout)
this._gettingNextMessage.then(() => {
this._gettingNextMessage = false
})
} else if (!getCurrent) {
this._gettingNextMessage = true
} else {
throw new Error('already waiting for next message')
}
return this._gettingNextMessage
}

async _nextMessage (timeout) {
let message = this._getOldestMessage()
if (message === undefined && timeout === 0) {
return
}

timeout += this.actor.ticks
let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
Expand Down Expand Up @@ -123,6 +118,7 @@ module.exports = class Inbox {
])
oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
}
this._gettingNextMessage = false
return this._deQueueMessage()
}

Expand Down
7 changes: 0 additions & 7 deletions scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ module.exports = class Scheduler {
constructor () {
this._waits = []
this._running = new Set()
this._checkingWaits = false
this.instances = new SortedMap(comparator)
}

Expand Down Expand Up @@ -110,8 +109,6 @@ module.exports = class Scheduler {
if (!this.instances.size) {
this._waits.forEach(wait => wait.resolve())
this._waits = []
this._checkingWaits = false

return
}

Expand Down Expand Up @@ -140,11 +137,7 @@ module.exports = class Scheduler {
instance.ticks = oldest
this._update(instance)
}
this._checkingWaits = false

return this._checkWaits()
}

this._checkingWaits = false
}
}
49 changes: 0 additions & 49 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,54 +437,6 @@ tape('basic tagged caps', async t => {
t.deepEquals(stateRoot, expectedState, 'expected root!')
})

tape('return while waiting for tag', async t => {
t.plan(4)
const expectedState = {
'/': Buffer.from('b8eb399087a990e30373e954b627a9512c9af40b', 'hex')
}

const tree = new RadixTree({
db: db
})

class testVMContainerA extends BaseContainer {
async onMessage (m) {
if (m.tag === 1) {
t.true(m, 'should recive second message')
} else {
t.true(m, 'should recive first message')
const rCap = this.actor.mintCap(1)
const message = new Message({caps: [rCap]})
this.actor.send(m.caps[0], message)
this.actor.inbox.nextTaggedMessage([1], 44)
}
}
}

class testVMContainerB extends BaseContainer {
onMessage (m) {
t.true(m, 'should recive a message')
this.actor.send(m.caps[0], new Message())
}

static get typeId () {
return 8
}
}

const hypervisor = new Hypervisor(tree)
hypervisor.registerContainer(testVMContainerA)
hypervisor.registerContainer(testVMContainerB)

let capA = await hypervisor.createActor(testVMContainerA.typeId, new Message())
let capB = await hypervisor.createActor(testVMContainerB.typeId, new Message())

hypervisor.send(capA, new Message({caps: [capB]}))

const stateRoot = await hypervisor.createStateRoot()
t.deepEquals(stateRoot, expectedState, 'expected root!')
})

tape('trying to listen for caps more then once', async t => {
t.plan(4)
const expectedState = {
Expand All @@ -498,7 +450,6 @@ tape('trying to listen for caps more then once', async t => {
class testVMContainerA extends BaseContainer {
async onMessage (m) {
t.true(m, 'should recive first message')
const rCap = this.actor.mintCap(1)
const message = new Message({data: 'first'})
this.actor.send(m.caps[0], message)
const promise = this.actor.inbox.nextTaggedMessage([1], 44)
Expand Down

0 comments on commit c77c761

Please sign in to comment.