-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc_client_gen.js
executable file
·72 lines (60 loc) · 2.04 KB
/
rpc_client_gen.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
'use strict';
const config = require('../config.dev')
const amqp = require('amqplib').connect('amqp://' + config.username + ':' + config.password + '@' + config.host)
const co = require('co')
const uuid = require('uuid');
//TODO : wrap it as a class
// const args = process.argv.slice(2)
// if (args.length == 0) {
// console.log("Usage: rpc_client.js num");
// process.exit(1)
// }
function* init(){
let conn = yield amqp
let ch = yield conn.createChannel()
let cbQueue = yield ch.assertQueue('', {exclusive: true})
return {"conn": conn, "channel": ch, "cbQueue": cbQueue}
}
function* sender(initConfig, msg, resHandler) {
try {
let ch = initConfig.channel
let conn = initConfig.conn
let cbQueue = initConfig.cbQueue
const corr = uuid.v4()
console.log(' [x] [%s] Requesting fib(%s)',corr, msg)
ch.consume(cbQueue.queue, (resultMsg) => {
resHandler(resultMsg, corr, conn)
})
ch.sendToQueue('rpc_queue', new Buffer(msg.toString()), {"correlationId": corr, "replyTo": cbQueue.queue})
}
catch (ex) {
console.warn("ex:", ex)
}
}
//客户端最好需要对消息防止重复处理,超时没有返回结果的要进行超时处理
//服务端:先存储到db, 然后当做消费他.后续补充这些异常消息的处理
function responseHandler(res, corr, conn) {
// console.log("corr: %s - %s", corr, JSON.stringify(res.content.toString() ) );
if (res.properties.correlationId == corr)
{
// console.log(' [.] Got %s', JSON.stringify(res.content.toString() ));
setTimeout( () => {
conn.close()
}, 5000);
}
return res.content
};
exports.send = function * (msg, responseHandler) {
let initConfig = yield init();
yield sender(initConfig, msg, responseHandler );
}
function onerror(err) {
console.error(err.stack);
}
co(function*() {
let num = parseInt(args[0])
let initConfig = yield init();
yield [
sender(initConfig, num.toString(), responseHandler),
]
}, onerror)