-
Notifications
You must be signed in to change notification settings - Fork 10
/
kernel.js
182 lines (159 loc) · 4.67 KB
/
kernel.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
const Message = require('primea-message')
const BN = require('bn.js')
const PortManager = require('./portManager.js')
const DeleteMessage = require('./deleteMessage')
module.exports = class Kernel {
/**
* the Kernel manages the varous message passing functions and provides
* an interface for the containers to use
* @param {Object} opts
* @param {Object} opts.id - the UUID of the Kernel
* @param {Object} opts.state - the state of the container
* @param {Object} opts.hypervisor
* @param {Object} opts.container - the container constuctor and argments
*/
constructor (opts) {
this.state = opts.state
this.code = opts.code
this.hypervisor = opts.hypervisor
this.id = opts.id
this.container = new opts.container.Constructor(this, opts.container.args)
this.timeout = 0
this.ticks = 0
this.containerState = 'idle'
// create the port manager
this.ports = new PortManager(Object.assign({
kernel: this
}, opts))
}
/**
* adds a message to this containers message queue
* @param {string} portName
* @param {object} message
*/
queue (portName, message) {
this.ports.queue(portName, message)
return this._startMessageLoop()
}
async initialize (message) {
await this.run(message, 'initialize')
return this._startMessageLoop()
}
// waits for the next message
async _startMessageLoop () {
// this ensure we only every have one loop running at a time
if (this.containerState !== 'running') {
this.containerState = 'running'
while (1) {
const message = await this.ports.getNextMessage(this.timeout)
if (!message) break
// dequqe message
message.fromPort.messages.shift()
// if the message we recived had more ticks then we currently have the
// update it
if (message._fromTicks > this.ticks) {
this.ticks = message._fromTicks
this.hypervisor.scheduler.update(this)
}
// run the next message
await this.run(message)
}
this.containerState = 'idle'
this.container.onIdle()
}
}
shutdown () {
this.hypervisor.scheduler.done(this.id)
}
/**
* run the kernels code with a given enviroment
* @param {object} message - the message to run
* @param {boolean} init - whether or not to run the intialization routine
* @returns {Promise}
*/
async run (message, method = 'run') {
if (message.constructor === DeleteMessage) {
this.ports._delete(message.fromName)
} else {
const responsePort = message.responsePort
delete message.responsePort
this.ports.addReceivedPorts(message)
let result
try {
result = await this.container[method](message)
} catch (e) {
result = {
exception: true,
exceptionError: e
}
}
if (responsePort) {
this.send(responsePort, new Message({
data: result
}))
}
}
this.ports.clearUnboundedPorts()
}
getResponsePort (message) {
if (message.responsePort) {
return message.responsePort.destPort
} else {
const [portRef1, portRef2] = this.ports.createChannel()
message.responsePort = portRef2
this.ports._unboundPorts.delete(portRef2)
return portRef1
}
}
/**
* updates the number of ticks that the container has run
* @param {Number} count - the number of ticks to add
*/
incrementTicks (count) {
this.ticks += count
this.hypervisor.scheduler.update(this)
}
/**
* creates a new message
* @param {*} data
*/
createMessage (opts) {
const message = new Message(opts)
this.ports.checkSendingPorts(message)
return message
}
/**
* creates a new container. Returning a port to it.
* @param {String} type
* @param {*} data - the data to populate the initail state with
* @returns {Object}
*/
createInstance (type, message) {
let nonce = this.state.nonce
const id = {
nonce: nonce,
parent: this.id
}
// incerment the nonce
nonce = new BN(nonce)
nonce.iaddn(1)
this.state.nonce = nonce.toArray()
this.ports.removeSentPorts(message)
return this.hypervisor.createInstance(type, message, id)
}
/**
* sends a message to a given port
* @param {Object} portRef - the port
* @param {Message} message - the message
*/
send (port, message) {
message._hops++
// set the port that the message came from
message._fromTicks = this.ticks
this.ports.removeSentPorts(message)
// if (this.currentMessage !== message && !message.responsePort) {
// this.currentMessage._addSubMessage(message)
// }
return this.hypervisor.send(port, message)
}
}