Skip to content

Commit

Permalink
Merge pull request #2 from nicocoul/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
nicocoul committed Nov 25, 2021
2 parents f9413ea + c9f452f commit b033eeb
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 85 deletions.
2 changes: 2 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ tmp/
obsolete/
cli/
coverage/
img/
mrd/
*.tpl
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
Ya Rfc (Remote Function Execution) is a [rpc](https://en.wikipedia.org/wiki/Remote_procedure_call) library for Node js.

Works well with [ya-pubsub](https://www.npmjs.com/package/ya-pubsub).
Integrates well with [ya-pubsub](https://www.npmjs.com/package/ya-pubsub).

### Key Features
* asynchronous
* embeddable
* designed for micro-services

### Execution Flow
![basic execution flow](https://github.com/nicocoul/ya-rfc/blob/dev/img/basicExecFlow.png)

### Basic Example
Given a module accessible by the RFC server
Expand Down Expand Up @@ -38,7 +40,7 @@ const broker = ya.broker()
broker.plug(ya.plugins.net(net.Server().listen(8002)))

// Server spawns worker processes at startup.
// Round-robin scheduling is used to balance load over child processes
// Round-robin scheduling is used to balance load over worker processes.
const modulePath = path.join(__dirname, 'procedures.js')
ya.server.net({ host: 'localhost', port: 8002 }, modulePath)
ya.server.net({ host: 'localhost', port: 8002 }, modulePath)
Expand Down Expand Up @@ -71,8 +73,8 @@ progress 6000/10000
progress 7000/10000
progress 8000/10000
progress 9000/10000
result is 10000
status end
result is 10000
*/

```
Expand Down
4 changes: 3 additions & 1 deletion README.md.tpl
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
Ya Rfc (Remote Function Execution) is a [rpc](https://en.wikipedia.org/wiki/Remote_procedure_call) library for Node js.

Works well with [ya-pubsub](https://www.npmjs.com/package/ya-pubsub).
Integrates well with [ya-pubsub](https://www.npmjs.com/package/ya-pubsub).

### Key Features
* asynchronous
* embeddable
* designed for micro-services

### Execution Flow
![basic execution flow](https://github.com/nicocoul/ya-rfc/blob/dev/img/basicExecFlow.png)

### Basic Example
Given a module accessible by the RFC server
Expand Down
27 changes: 26 additions & 1 deletion cli/render-readme.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,37 @@ const fs = require('fs')
const path = require('path')
const mustache = require('mustache')

const template = fs.readFileSync(path.join(__dirname, '..', 'README.md.tpl'), { encoding: 'utf8' })
const https = require('https')

function convert (mermaidPath, pngPath) {
const utf8 = fs.readFileSync(mermaidPath, { encoding: 'utf8' })
//const encoded = encodeURIComponent(utf8)
const base64 = Buffer.from(utf8).toString('base64')
const ws = fs.createWriteStream(pngPath)
const req = https.request({ hostname: 'mermaid.ink', path: `/img/${base64}`, method: 'GET', port: 443 }, res => {
res.on('data', data => {
ws.write(data)
})
})
req.on('error', console.error)
req.end()
}

console.log(encodeURIComponent('test?'))

// examples
const examples = {
procedures: fs.readFileSync(path.join(__dirname, '..', 'examples', 'procedures.js'), { encoding: 'utf8' }),
tcp: fs.readFileSync(path.join(__dirname, '..', 'examples', 'rpc-tcp.js'), { encoding: 'utf8' }),
ws: fs.readFileSync(path.join(__dirname, '..', 'examples', 'rpc-ws.js'), { encoding: 'utf8' })
}

const mrdPath = path.join(__dirname, '..', 'mrd')
const imgPath = path.join(__dirname, '..', 'img')
fs.readdirSync(mrdPath).forEach(entry => {
convert(path.join(mrdPath, entry), path.join(imgPath, `${path.parse(entry).name}.png`))
})

const template = fs.readFileSync(path.join(__dirname, '..', 'README.md.tpl'), { encoding: 'utf8' })
const result = mustache.render(template, { examples })
fs.writeFileSync(path.join(__dirname, '..', 'README.md'), result, { encoding: 'utf8' })
4 changes: 2 additions & 2 deletions examples/rpc-tcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const broker = ya.broker()
broker.plug(ya.plugins.net(net.Server().listen(8002)))

// Server spawns worker processes at startup.
// Round-robin scheduling is used to balance load over child processes
// Round-robin scheduling is used to balance load over worker processes.
const modulePath = path.join(__dirname, 'procedures.js')
ya.server.net({ host: 'localhost', port: 8002 }, modulePath)
ya.server.net({ host: 'localhost', port: 8002 }, modulePath)
Expand Down Expand Up @@ -41,6 +41,6 @@ progress 6000/10000
progress 7000/10000
progress 8000/10000
progress 9000/10000
result is 10000
status end
result is 10000
*/
Binary file added img/basicExecFlow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 9 additions & 9 deletions lib/brokers/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ function create () {
if (!request) return
const reqChannel = channels.getById(request.channelId)
if (!reqChannel) return
reqChannel.write({ c: COMMANDS.RPC_EXECUTE, id: d.id, result: d.result, error: d.error, progress: d.progress })
if (d.result !== undefined || d.error !== undefined) {
requests.remove(d.id)
executeProcedures()
}
if (request.withStatus) {
if (d.error) {
reqChannel.write({ c: COMMANDS.RPC_EXECUTE, id: d.id, status: 'error' })
} else if (d.result) {
reqChannel.write({ c: COMMANDS.RPC_EXECUTE, id: d.id, status: 'end' })
}
}
reqChannel.write({ c: COMMANDS.RPC_EXECUTE, id: d.id, result: d.result, error: d.error, progress: d.progress })
if (d.result !== undefined || d.error !== undefined) {
requests.remove(d.id)
executeProcedures()
}
} else {
schedule(c, d.id, d.procedure, d.args, d.affinity, d.withStatus, d.withProgress)
executeProcedures()
Expand Down Expand Up @@ -183,14 +183,14 @@ function create () {
execContext.removeChannel(channel.id)
})
}
function destroy () {
_plugin.destroy()
channels.forEach(channel => channel.destroy())
function kill () {
_plugin.kill()
channels.forEach(channel => channel.kill())
}
return {
plug,
schedule,
destroy
kill
}
}

Expand Down
8 changes: 4 additions & 4 deletions lib/clients/rpc-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ function addRpcClientRole (channel) {
const request = requests.get(d.id)
if (resp.result) {
request.onResult(undefined, d.result === 'null' ? null : d.result)
delete resp.id
requests.remove(d.id)
} else if (resp.progress && request.onProgress) {
request.onProgress(d.progress)
} else if (resp.error) {
request.onResult(new Error(d.error), undefined)
delete resp.id
requests.remove(d.id)
} else if (resp.status && request.onStatus) {
request.onStatus(resp.status)
}
Expand All @@ -66,8 +66,8 @@ function create (channel) {
const rpcClientRole = addRpcClientRole(channel)
return {
execute: rpcClientRole.execute,
destroy: () => {
channel.destroy()
kill: () => {
channel.kill()
}
}
}
Expand Down
12 changes: 3 additions & 9 deletions lib/clients/rpc-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ function newProcedures (module = {}) {
exists: (name) => {
return state[name] !== undefined
},
add: (name, fct) => {
state[name] = fct
},
get: (name) => {
return state[name]
},
names: () => {
return Object.keys(state)
}
Expand Down Expand Up @@ -192,7 +186,7 @@ function create (channel, modulePath, options = {}) {
channel.write({ c: COMMANDS.RPC_EXECUTOR, procedures: procedures.names(), affinity })
})

function destroy () {
function kill () {
destroyed = true
loadMon.destroy()
loadMonNot.destroy()
Expand All @@ -204,7 +198,7 @@ function create (channel, modulePath, options = {}) {
logger.debug(`failed to kill worker ${error.message}`)
}
})
channel.destroy()
channel.kill()
}

for (let i = 0; i < workersCount; i++) {
Expand All @@ -214,7 +208,7 @@ function create (channel, modulePath, options = {}) {
processRequests()

return {
destroy
kill
}
}

Expand Down
29 changes: 29 additions & 0 deletions mrd/basicExecFlow.mrd
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
sequenceDiagram
participant C as client
participant B as broker
participant S1 as server1
participant S2 as server2
participant S1W1 as server1/worker1
participant S1W2 as server1/worker2
S1-->>B: {cpu:0.3, mem:0.60}
S2-->>B: {cpu:0.33, mem:0.60}
C->>B: execute
activate B
note over B: select sever with minimum load
B->>S1: execute
activate S1
B-->>C: {status: scheduled}
deactivate B
note over S1: wait for enough free CPU & memory
note over S1: select worker (round robin)
S1->>S1W1: execute
activate S1W1
S1-->>C: {status: started}
deactivate S1
S1W1-->>C: {progress: '50%'}
S1W1->>S1: {resullt: some result}
deactivate S1W1
activate S1
S1-->>C: {status: 'end'}
S1->>C: {result: 'some result'}
deactivate S1
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ya-rfc",
"version": "0.1.2",
"version": "0.1.8",
"description": "",
"main": "index.js",
"author": "Nicolas Coulon <coulon_nicolas@hotmail.com> (https://github.com/nicocoul)",
Expand All @@ -25,17 +25,17 @@
},
"license": "MIT",
"scripts": {
"test": "jest",
"test:detect": "jest --detectOpenHandles",
"test:watch": "jest --watch",
"test:coverage": "jest --collectCoverage",
"test": "jest --runInBand",
"test:detect": "jest --runInBand --detectOpenHandles",
"test:watch": "jest --runInBand --watch",
"test:coverage": "jest --runInBand --collectCoverage",
"lint": "eslint",
"lint:fix": "eslint --ext .js --fix",
"render-readme": "node cli/render-readme.js"
},
"dependencies": {
"uuid": "^8.3.2",
"ya-common": "^0.1.2"
"ya-common": "^0.1.6"
},
"devDependencies": {
"eslint": "^8.3.0",
Expand Down
9 changes: 5 additions & 4 deletions tests/brokers/rpc.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const path = require('path')
const { pause, newDummyChannel } = require('../common')
const { duplexify } = require('../../lib/common')
const { COMMANDS } = require('../../lib/constants')
const yac = require('ya-common')
const { COMMANDS } = yac.constants
const { duplexify } = yac.common
const rpcBroker = require('../../lib/brokers/rpc')
const rpcServer = require('../../lib/clients/rpc-server')

Expand All @@ -21,8 +22,8 @@ describe('Rpc broker', () => {
received.push(data)
})
channelBroker.write({ c: COMMANDS.RPC_EXECUTE, id: 1, procedure: 'funcWithResult', args: [10] })
await pause(200)
server.destroy()
await pause(500)
server.kill()
expect(received.find(r => r.result !== undefined)).toStrictEqual({ c: COMMANDS.RPC_EXECUTE, id: 1, result: 10 })
expect(received.find(r => r.error !== undefined)).toBeUndefined()
})
Expand Down
17 changes: 7 additions & 10 deletions tests/clients/rpc-server.test.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const path = require('path')
const { pause, newDummyChannel } = require('../common')
const { COMMANDS } = require('../../lib/constants')
const yac = require('ya-common')
const { COMMANDS } = yac.constants
const rpc = require('../../lib/clients/rpc-server')
const logger = require('../../lib/logger')(__filename)
const logger = yac.logger(__filename)

const modulePath = path.join(__dirname, 'fixtures', 'rpc-module')

Expand All @@ -23,8 +24,7 @@ describe('Rcp server', () => {
channel.remote.writable.write({ c: COMMANDS.RPC_EXECUTE, id: 1, procedure: 'funcWithResult', args: [10] })

await pause(500)
server.destroy()
// channel.destroy()
server.kill()
expect(result).toStrictEqual({ c: COMMANDS.RPC_EXECUTE, id: 1, result: 10 })
expect(error).toBeUndefined()
})
Expand All @@ -45,8 +45,7 @@ describe('Rcp server', () => {
channel.remote.writable.write({ c: COMMANDS.RPC_EXECUTE, id: 1, procedure: 'no func', args: [10] })

await pause(500)
server.destroy()
// channel.destroy()
server.kill()
expect(result).toBeUndefined()
expect(error).toBeDefined()
})
Expand All @@ -67,8 +66,7 @@ describe('Rcp server', () => {
channel.remote.writable.write({ c: COMMANDS.RPC_EXECUTE, id: 1, procedure: 'functThatThrows', args: [10] })

await pause(500)
server.destroy()
// channel.destroy()
server.kill()
expect(result).toBeUndefined()
expect(error).toBeDefined()
})
Expand All @@ -94,8 +92,7 @@ describe('Rcp server', () => {
channel.remote.writable.write({ c: COMMANDS.RPC_EXECUTE, id: 1, procedure: 'funcWithProgress' })

await pause(500)
server.destroy()
// channel.destroy()
server.kill()
expect(result).toBeDefined()
expect(error).toBeUndefined()
expect(progress).toBeDefined()
Expand Down
Loading

0 comments on commit b033eeb

Please sign in to comment.