Skip to content

Commit

Permalink
kafka: rewrite Consumer to ConsumerGroup
Browse files Browse the repository at this point in the history
accept connection uri kafka://127.0.0.1:2181
  • Loading branch information
icebob committed Jan 24, 2018
1 parent 5de4406 commit 9b3f6b8
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"program": "${workspaceRoot}\\dev\\index.js",
"cwd": "${workspaceRoot}",
"args": [
"server"
"client"
]
},
{
Expand Down
9 changes: 4 additions & 5 deletions dev/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ let ServiceBroker = require("../src/service-broker");
let broker = new ServiceBroker({
namespace: "multi",
nodeID: process.argv[2] || "client-" + process.pid,
transporter: {
type: "kafka",
options: "192.168.51.29:2181"
},
_transporter: "NATS",
transporter: "kafka://192.168.51.29:2181",
//transporter: "amqp://192.168.0.181:5672",
//serializer: "ProtoBuf",
//requestTimeout: 1000,
Expand Down Expand Up @@ -111,4 +109,5 @@ broker.start()
});
}, 1000);

});
})
.then(() => broker.repl());
6 changes: 2 additions & 4 deletions dev/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ let { MoleculerError } = require("../src/errors");
let broker = new ServiceBroker({
namespace: "multi",
nodeID: process.argv[2] || "server-" + process.pid,
transporter: {
type: "kafka",
options: "192.168.51.29:2181"
},
_transporter: "NATS",
transporter: "kafka://192.168.51.29:2181",
//transporter: "amqp://192.168.0.181:5672",
//serializer: "ProtoBuf",

Expand Down
1 change: 1 addition & 0 deletions examples/loadtest/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ let ServiceBroker = require("../../src/service-broker");

// Create broker
let broker = new ServiceBroker({
namespace: "loadtest",
nodeID: process.argv[2] || "client",
transporter: process.env.TRANSPORTER || "NATS",
logger: console,
Expand Down
1 change: 1 addition & 0 deletions examples/loadtest/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ let hostname = os.hostname();

// Create broker
let broker = new ServiceBroker({
namespace: "loadtest",
nodeID: process.argv[2] || hostname + "-server",
transporter: process.env.TRANSPORTER || "NATS",
logger: console,
Expand Down
2 changes: 2 additions & 0 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ class ServiceBroker {
TransporterClass = Transporters.Redis;
else if (opt.startsWith("amqp://"))
TransporterClass = Transporters.AMQP;
else if (opt.startsWith("kafka://"))
TransporterClass = Transporters.Kafka;

if (TransporterClass)
return new TransporterClass(opt);
Expand Down
35 changes: 18 additions & 17 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,41 +164,42 @@ class Transit {
* @memberOf Transit
*/
makeSubscriptions() {
this.subscribing = Promise.all([
this.subscribing = this.tx.makeSubscriptions([

// Subscribe to broadcast events
this.subscribe(P.PACKET_EVENT, this.nodeID),
{ cmd: P.PACKET_EVENT, nodeID: this.nodeID },

// Subscribe to requests
this.subscribe(P.PACKET_REQUEST, this.nodeID),
{ cmd: P.PACKET_REQUEST, nodeID: this.nodeID },

// Subscribe to node responses of requests
this.subscribe(P.PACKET_RESPONSE, this.nodeID),
{ cmd: P.PACKET_RESPONSE, nodeID: this.nodeID },

// Discover handler
this.subscribe(P.PACKET_DISCOVER),
this.subscribe(P.PACKET_DISCOVER, this.nodeID),
{ cmd: P.PACKET_DISCOVER },
{ cmd: P.PACKET_DISCOVER, nodeID: this.nodeID },

// NodeInfo handler
this.subscribe(P.PACKET_INFO), // Broadcasted INFO. If a new node connected
this.subscribe(P.PACKET_INFO, this.nodeID), // Response INFO to DISCOVER packet
{ cmd: P.PACKET_INFO }, // Broadcasted INFO. If a new node connected
{ cmd: P.PACKET_INFO, nodeID: this.nodeID }, // Response INFO to DISCOVER packet

// Disconnect handler
this.subscribe(P.PACKET_DISCONNECT),
{ cmd: P.PACKET_DISCONNECT },

// Heartbeat handler
this.subscribe(P.PACKET_HEARTBEAT),
{ cmd: P.PACKET_HEARTBEAT },

// Ping handler
this.subscribe(P.PACKET_PING), // Broadcasted
this.subscribe(P.PACKET_PING, this.nodeID), // Targeted
{ cmd: P.PACKET_PING }, // Broadcasted
{ cmd: P.PACKET_PING, nodeID: this.nodeID }, // Targeted

// Pong handler
this.subscribe(P.PACKET_PONG, this.nodeID)
{ cmd: P.PACKET_PONG, nodeID: this.nodeID }

]).then(() => {
this.subscribing = null;
});

])
.then(() => {
this.subscribing = null;
});
return this.subscribing;
}

Expand Down
11 changes: 11 additions & 0 deletions src/transporters/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ class BaseTransporter {
throw new Error("Not implemented!");
}

/**
* Subscribe to all topics
*
* @param {Array<Object>} topics
*
* @memberOf BaseTransporter
*/
makeSubscriptions(topics) {
return Promise.all(topics.map(({ cmd, nodeID }) => this.subscribe(cmd, nodeID)));
}

/**
* Subscribe to a command
*
Expand Down
98 changes: 81 additions & 17 deletions src/transporters/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

const Promise = require("bluebird");
const Transporter = require("./base");
const { MoleculerError } = require("../errors");

/**
* Lightweight transporter for Kafka
Expand All @@ -27,11 +28,11 @@ class KafkaTransporter extends Transporter {
constructor(opts) {
if (typeof opts == "string")
opts = { kafka: {
connectionString: opts
host: opts.replace("kafka://", "")
} };

opts.kafka = Object.assign({
connectionString: undefined,
host: undefined,
client: {
zkOptions: undefined,
noAckBatchOptions: undefined,
Expand All @@ -40,11 +41,13 @@ class KafkaTransporter extends Transporter {
producer: {},
customPartitioner: undefined,

consumer: {
/*consumer: {
groupId: undefined, // No nodeID at here
encoding: "buffer"
encoding: "buffer",
fromOffset: false,
},
consumerPayloads: undefined,
*/

publish: {
partition: 0,
Expand All @@ -57,6 +60,8 @@ class KafkaTransporter extends Transporter {
this.client = null;
this.producer = null;
this.consumer = null;

this.topics = [];
}

/**
Expand All @@ -66,7 +71,6 @@ class KafkaTransporter extends Transporter {
*/
connect() {
const opts = this.opts.kafka;
opts.consumer.groupId = this.nodeID;

return new Promise((resolve, reject) => {
let Kafka;
Expand All @@ -77,10 +81,11 @@ class KafkaTransporter extends Transporter {
this.broker.fatal("The 'kafka-node' package is missing. Please install it with 'npm install kafka-node --save' command.", err, true);
}

this.client = new Kafka.Client(opts.connectionString, opts.client.zkOptions, opts.client.noAckBatchOptions, opts.client.sslOptions);
this.client = new Kafka.Client(opts.host, opts.client.zkOptions, opts.client.noAckBatchOptions, opts.client.sslOptions);
this.client.once("connect", () => {

/* Moved to ConsumerGroup
// Create Consumer
this.consumer = new Kafka.Consumer(this.client, opts.consumerPayloads || [], opts.consumer);
this.consumer.on("error", e => {
Expand All @@ -94,8 +99,10 @@ class KafkaTransporter extends Transporter {
this.consumer.on("message", message => {
const topic = message.topic;
const cmd = topic.split(".")[1];
console.log(cmd);
this.messageHandler(cmd, message.value);
});
});*/


// Create Producer
this.producer = new Kafka.Producer(this.client, opts.producer, opts.customPartitioner);
Expand Down Expand Up @@ -123,11 +130,66 @@ class KafkaTransporter extends Transporter {
this.client.close(() => {
this.client = null;
this.producer = null;
this.consumer = null;

if (this.consumer) {
this.consumer.close(() => {
this.consumer = null;
});
}
});
}
}

/**
* Subscribe to all topics
*
* @param {Array<Object>} topics
*
* @memberOf BaseTransporter
*/
makeSubscriptions(topics) {
topics = topics.map(({ cmd, nodeID }) => this.getTopicName(cmd, nodeID));

return new Promise((resolve, reject) => {

this.producer.createTopics(topics, true, (err, data) => {
if (err) {
this.logger.error("Unable to create topics!", topics, err);
return reject(err);
}

const consumerOptions = Object.assign({
id: "default-kafka-consumer",
host: this.opts.kafka.host,
groupId: this.nodeID,
fromOffset: "latest",
encoding: "buffer",
}, this.opts.kafka.consumer);

const Kafka = require("kafka-node");
this.consumer = new Kafka.ConsumerGroup(consumerOptions, topics);

this.consumer.on("error", e => {
this.logger.error("Kafka Consumer error", e.message);
this.logger.debug(e);

if (!this.connected)
reject(e);
});

this.consumer.on("message", message => {
const topic = message.topic;
const cmd = topic.split(".")[1];
this.messageHandler(cmd, message.value);
});

this.consumer.on("connect", () => {
resolve();
});
});
});
}

/**
* Subscribe to a command
*
Expand All @@ -136,27 +198,29 @@ class KafkaTransporter extends Transporter {
*
* @memberOf KafkaTransporter
*/
/*
subscribe(cmd, nodeID) {
const topic = this.getTopicName(cmd, nodeID);
this.topics.push(topic);
return new Promise((resolve, reject) => {
const topics = [this.getTopicName(cmd, nodeID)];
this.producer.createTopics(topics, true, (err, data) => {
this.producer.createTopics([topic], true, (err, data) => {
if (err) {
this.logger.error("Unable to create topics!", topics, err);
this.logger.error("Unable to create topics!", topic, err);
return reject(err);
}
this.consumer.addTopics(topics, (err, added) => {
this.consumer.addTopics([{ topic, offset: -1 }], (err, added) => {
if (err) {
this.logger.error("Unable to add topic!", topics, err);
this.logger.error("Unable to add topic!", topic, err);
return reject(err);
}
resolve();
});

}, false);
});
});
}
}*/

/**
* Publish a packet
Expand Down

0 comments on commit 9b3f6b8

Please sign in to comment.