Skip to content

Commit

Permalink
add thrift serializer & test
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Jun 9, 2018
1 parent 071daf1 commit 5e30a28
Show file tree
Hide file tree
Showing 11 changed files with 2,206 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -6,3 +6,4 @@ npm-debug.log
stats.json
yarn-error.log
benchmark/results
*.exe
3 changes: 2 additions & 1 deletion .vscode/launch.json
Expand Up @@ -11,7 +11,8 @@
"program": "${workspaceRoot}/dev/index.js",
"cwd": "${workspaceRoot}",
"args": [
"duplex-streaming"
"client",
"client"
],
},
{
Expand Down
2 changes: 1 addition & 1 deletion dev/client.js
Expand Up @@ -22,7 +22,7 @@ let broker = new ServiceBroker({
},
//transporter: "kafka://192.168.51.29:2181",
//transporter: "amqp://192.168.0.181:5672",
//serializer: "MsgPack",
serializer: "Thrift",
//requestTimeout: 1000,

//disableBalancer: true,
Expand Down
5 changes: 3 additions & 2 deletions dev/server.js
Expand Up @@ -10,8 +10,9 @@ let { MoleculerError, MoleculerRetryableError } = require("../src/errors");
let broker = new ServiceBroker({
namespace: "",
nodeID: process.argv[2] || "server-" + process.pid,
transporter: "nats://demo.nats.io:4222",
//serializer: "MsgPack",
//transporter: "nats://demo.nats.io:4222",
transporter: "NATS",
serializer: "Thrift",

//disableBalancer: true,

Expand Down
17 changes: 17 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Expand Up @@ -15,6 +15,7 @@
"perf": "nodemon --allow-natives-syntax benchmark/perf-runner.js",
"pperf": "node --inspect --expose-gc benchmark/perf-runner.js",
"proto": "pbjs -t static-module -w commonjs -o src/serializers/proto/packets.proto.js src/serializers/proto/packets.proto",
"thrift": "thrift -gen js:node -o src\\serializers\\thrift src\\serializers\\thrift\\packets.thrift",
"test": "jest --coverage --no-cache",
"test:travis": "npm test && npm run test:trans && npm run test:amqp && npm run test:ts",
"test:unit": "jest --testMatch \"**/unit/**spec.js\" --coverage --no-cache",
Expand Down Expand Up @@ -69,6 +70,7 @@
"npm-check": "5.7.1",
"pino": "4.17.3",
"protobufjs": "6.8.6",
"thrift": "0.11.0",
"ts-node": "6.1.0",
"typescript": "2.9.1",
"v8-natives": "1.1.0",
Expand Down
3 changes: 2 additions & 1 deletion src/serializers/index.js
Expand Up @@ -14,7 +14,8 @@ const Serializers = {
JSON: require("./json"),
Avro: require("./avro"),
MsgPack: require("./msgpack"),
ProtoBuf: require("./protobuf")
ProtoBuf: require("./protobuf"),
Thrift: require("./thrift")
};

function getByName(name) {
Expand Down
178 changes: 178 additions & 0 deletions src/serializers/thrift.js
@@ -0,0 +1,178 @@
/*
* moleculer
* Copyright (c) 2018 MoleculerJS (https://github.com/moleculerjs/moleculer)
* MIT Licensed
*/

"use strict";

const BaseSerializer = require("./base");
const P = require("../packets");
const { MoleculerServerError } = require("../errors");

/**
* Apache Thrift Serializer for Moleculer
*
* http://thrift.apache.org/
*
* @class ThriftSerializer
*/
class ThriftSerializer extends BaseSerializer {

/**
* Initialize Serializer
*
* @param {any} broker
*
* @memberof Serializer
*/
init(broker) {
super.init(broker);

try {
const Thrift = require("thrift");
this.TBufferedTransport = Thrift.TBufferedTransport;
this.TBinaryProtocol = Thrift.TBinaryProtocol;

const transport = new Thrift.TBufferedTransport(null, (res) => this.serialized = res);
this.protocol = new Thrift.TBinaryProtocol(transport);

} catch(err) {
/* istanbul ignore next */
this.broker.fatal("The 'thrift' package is missing! Please install it with 'npm install thrift --save' command!", err, true);
}

this.packets = require("./thrift/gen-nodejs/packets_types.js");
}

getPacketFromType(type) {
switch(type) {
case P.PACKET_EVENT: return this.packets.PacketEvent;
case P.PACKET_REQUEST: return this.packets.PacketRequest;
case P.PACKET_RESPONSE: return this.packets.PacketResponse;
case P.PACKET_DISCOVER: return this.packets.PacketDiscover;
case P.PACKET_INFO: return this.packets.PacketInfo;
case P.PACKET_DISCONNECT: return this.packets.PacketDisconnect;
case P.PACKET_HEARTBEAT: return this.packets.PacketHeartbeat;
case P.PACKET_PING: return this.packets.PacketPing;
case P.PACKET_PONG: return this.packets.PacketPong;
case P.PACKET_GOSSIP_HELLO: return this.packets.PacketGossipHello;
case P.PACKET_GOSSIP_REQ: return this.packets.PacketGossipRequest;
case P.PACKET_GOSSIP_RES: return this.packets.PacketGossipResponse;
}
}


/**
* Serialize custom fields (stringify)
*
* @param {String} type
* @param {Packet} obj
* @returns {Packet}
* @memberof Serializer
*/
serializeCustomFields(type, obj) {
obj = super.serializeCustomFields(type, obj);

switch(type) {
case P.PACKET_REQUEST: {
if (!obj.stream)
obj.params = Buffer.from(obj.params);
break;
}
case P.PACKET_RESPONSE: {
if (obj.data && !obj.stream)
obj.data = Buffer.from(obj.data);
break;
}
}

return obj;
}

/**
* Deserialize custom fields
*
* @param {String} type
* @param {Packet} obj
* @returns {Packet}
* @memberof Serializer
*/
deserializeCustomFields(type, obj) {
switch(type) {
case P.PACKET_REQUEST: {
if (!obj.stream)
obj.params = obj.params.toString("utf8");
break;
}
case P.PACKET_RESPONSE: {
if (obj.data && !obj.stream) {
if (obj.data.length)
obj.data = obj.data.toString("utf8");
else
obj.data = undefined;
}
break;
}
}

return super.deserializeCustomFields(type, obj);
}

/**
* Serializer a JS object to Buffer
*
* @param {Object} obj
* @param {String} type of packet
* @returns {Buffer}
*
* @memberof Serializer
*/
serialize(obj, type) {
const P = this.getPacketFromType(type);
if (!P) {
/* istanbul ignore next */
throw new MoleculerServerError("Invalid packet type!", 500, "INVALID_PACKET_TYPE");
}

this.serializeCustomFields(type, obj);

const t = new P(obj);
t.write(this.protocol);
this.protocol.flush();
return this.serialized;
}

/**
* Deserialize Buffer to JS object
*
* @param {Buffer} buf
* @param {String} type of packet
* @returns {Object}
*
* @memberof Serializer
*/
deserialize(buf, type) {
const P = this.getPacketFromType(type);
if (!P) {
/* istanbul ignore next */
throw new MoleculerServerError("Invalid packet type!", 500, "INVALID_PACKET_TYPE");
}

let obj;
const transport = this.TBufferedTransport.receiver(reader => {
const protocol = new this.TBinaryProtocol(reader);
const t = new P();
t.read(protocol);
obj = t;
});

transport(buf);

this.deserializeCustomFields(type, obj);

return obj;
}
}

module.exports = ThriftSerializer;

0 comments on commit 5e30a28

Please sign in to comment.