Skip to content

Commit

Permalink
Merge pull request #19 from moleculerjs/kafka_adapter
Browse files Browse the repository at this point in the history
Add Kafka adapter
  • Loading branch information
icebob committed Oct 9, 2021
2 parents f3d1cd2 + 2ec9e53 commit b320397
Show file tree
Hide file tree
Showing 18 changed files with 1,472 additions and 772 deletions.
6 changes: 3 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
"type": "node",
"request": "launch",
"name": "Launch selected demo",
"program": "${file}",
"program": "examples/index.js",
"cwd": "${workspaceRoot}",
"args": [
"dead-letter"
"serializer"
],
"env": {
"ADAPTER": "nats://localhost:4222"
//"ADAPTER": "kafka://localhost:9093"
}
},
{
Expand Down
122 changes: 98 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Reliable messages for Moleculer services via external queue/channel/topic. Unlik
## Features

- reliable messages with acknowledgement.
- multiple adapters (Redis, RabbitMQ, NATS JetStream).
- multiple adapters (Redis, RabbitMQ, NATS JetStream, Kafka).
- plugable adapters.
- configurable max-in-flight.
- retry messages.
Expand Down Expand Up @@ -126,7 +126,7 @@ module.exports = {
// Default options
ChannelsMiddleware({
adapter: {
type: "Redis",
type: "Kafka",
options: {}
}
}),
Expand Down Expand Up @@ -192,7 +192,7 @@ module.exports = {
| `sendMethodName` | `String` | `"sendToChannel"` | Name of the method in ServiceBroker to send message to the channels. |
| `adapterPropertyName` | `String` | `"channelAdapter"` | Name of the property in ServiceBroker to access the `Adapter` instance directly. |

**Example**s
**Examples**
```js
// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;
Expand Down Expand Up @@ -229,6 +229,9 @@ module.exports = {
| `amqp.queueOptions` | `Object` | AMQP | AMQP lib queue configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue).
| `amqp.exchangeOptions` | `Object` | AMQP | AMQP lib exchange configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange).
| `amqp.consumeOptions` | `Object` | AMQP | AMQP lib consume configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume).
| `kafka.consumerOptions` | `Object` | Kafka | Kafka consumer configuration. More info [here](https://kafka.js.org/docs/consuming#options).
| `kafka.fromBeginning` | `Boolean` | Kafka | Kafka consumer `fromBeginning` option. More info [here](https://kafka.js.org/docs/consuming#frombeginning).
| `kafka.partitionsConsumedConcurrently` | `Number` | Kafka | Kafka consumer `partitionsConsumedConcurrently` option. More info [here](https://kafka.js.org/docs/consuming#partition-aware-concurrency).

## Failed message
If the service is not able to process a message, it should throw an `Error` inside the handler function. In case of error and if `maxRetries` option is a positive number, the adapter will redeliver the message to one of all consumers.
Expand All @@ -249,13 +252,18 @@ Use the `broker.sendToChannel(channelName, payload, opts)` method to send a mess

| Name | Type | Supported adapters | Description |
| ---- | ---- | ------------------ | ----------- |
| `raw` | `Boolean` | Redis, AMQP | If truthy, the payload won't be serialized. |
| `raw` | `Boolean` | * | If truthy, the payload won't be serialized. |
| `persistent` | `Boolean` | AMQP | If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts. |
| `ttl` | `Number` | AMQP | If supplied, the message will be discarded from a queue once it’s been there longer than the given number of milliseconds. |
| `priority` | `Number` | AMQP | Priority of the message. |
| `correlationId` | `String` | AMQP | Request identifier. |
| `headers` | `Object` | AMQP | Application specific headers to be carried along with the message content. |
| `headers` | `Object` | AMQP, JetStream, Kafka | Application specific headers to be carried along with the message content. |
| `routingKey` | `Object` | AMQP | The AMQP `publish` method's second argument. If you want to send the message into an external queue instead of exchange, set the `channelName` to `""` and set the queue name to `routingKey` |
| `key` | `String` | Kafka | Key of Kafka message. |
| `partition` | `String` | Kafka | Partition of Kafka message. |
| `acks` | `Number` | Kafka | Control the number of required acks. |
| `timeout` | `Number` | Kafka | The time to await a response in ms. Default: `30000` |
| `compression` | `any` | Kafka | Compression codec. Default: `CompressionTypes.None` |

## Middleware hooks
It is possible to wrap the handlers and the send method in Moleculer middleware. The module defines two hooks to cover it. The `localChannel` hook is similar to [`localAction`](https://moleculer.services/docs/0.14/middlewares.html#localAction-next-action) but it wraps the channel handlers in service schema. The `sendToChannel` hook is similar to [`emit`](https://moleculer.services/docs/0.14/middlewares.html#emit-next) or [`broadcast`](https://moleculer.services/docs/0.14/middlewares.html#broadcast-next) but it wraps the `broker.sendToChannel` publisher method.
Expand All @@ -271,9 +279,9 @@ const MyMiddleware = {

// Wrap the channel handlers
localChannel(next, chan) {
return async msg => {
return async (msg, raw) => {
this.logger.info(kleur.magenta(` Before localChannel for '${chan.name}'`), msg);
await next(msg);
await next(msg, raw);
this.logger.info(kleur.magenta(` After localChannel for '${chan.name}'`), msg);
};
},
Expand Down Expand Up @@ -318,7 +326,7 @@ module.exports = {
| `readTimeoutInternal` | `Number`| `0` | Redis | Maximum time (in milliseconds) while waiting for new messages. By default equals to 0, i.e., never timeout. More info [here](https://redis.io/commands/XREADGROUP#differences-between-xread-and-xreadgroup)
| `minIdleTime` | `Number` | `60 * 60 * 1000` | Redis | Time (in milliseconds) after which pending messages are considered NACKed and should be claimed. Defaults to 1 hour.
| `claimInterval` | `Number` | `100` | Redis | Interval (in milliseconds) between message claims.
| `maxInFlight` | `Number` | `1` | Redis, AMQP | Max number of messages under processing at the same time.
| `maxInFlight` | `Number` | `1` | Redis, AMQP, Kafka | Max number of messages under processing at the same time.
| `startID` | `String` | `$` | Redis | Starting point when consumers fetch data from the consumer group. By default equals to `$`, i.e., consumers will only see new elements arriving in the stream. More info [here](https://redis.io/commands/XGROUP).
| `processingAttemptsInterval` | `Number` | `0` | Redis | Interval (in milliseconds) between message transfer into `FAILED_MESSAGES` channel.
| `amqp.url` | `String` | `null` | AMQP | Connection URI.
Expand All @@ -328,13 +336,19 @@ module.exports = {
| `amqp.messageOptions` | `Object` | `null` | AMQP | AMQP lib message configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish).
| `amqp.consumeOptions` | `Object` | `null` | AMQP | AMQP lib consume configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume).
| `nats.streamConfig` | `Object` | `null` | NATS | NATS JetStream storage configuration. More info [here](https://docs.nats.io/jetstream/concepts/streams).
| `nats.consumerOpts` | `Object` | `null` | NATS | NATS JetStream consumer configuration. More info [here](https://docs.nats.io/jetstream/concepts/consumers).
| `nats.consumerOptions` | `Object` | `null` | NATS | NATS JetStream consumer configuration. More info [here](https://docs.nats.io/jetstream/concepts/consumers).
| `kafka.brokers` | `String[]` | `null` | Kafka | Kafka bootstrap brokers.
| `kafka.logCreator` | `Function` | `null` | Kafka | Kafka logCreator. More info [here](https://kafka.js.org/docs/custom-logger).
| `kafka.producerOptions` | `Object` | `null` | Kafka | Kafka producer constructor configuration. More info [here](https://kafka.js.org/docs/producing#options).
| `kafka.consumerOptions` | `Object` | `null` | Kafka | Kafka consumer constructor configuration. More info [here](https://kafka.js.org/docs/consuming#options).


### Redis Streams

[Redis Streams](https://redis.io/topics/streams-intro) was introduced in Redis 5.0. Hoverer, since this module relies on the [XAUTOCLAIM](https://redis.io/commands/xautoclaim) command, Redis >= 6.2.0 is required.

>To use this adapter, install the `ioredis` module with npm install `ioredis` command.
**Redis Adapter Overview**

![Dead-Letter](assets/redis_queue.png)
Expand Down Expand Up @@ -455,9 +469,11 @@ module.exports = {
};
```

### RabbitMQ
### AMQP (RabbitMQ)

The AMQP adapter uses the exchange-queue logic of RabbitMQ for creating consumer groups. It means the `sendToChannel` method sends the message to the exchange and not for a queue.

The RabbitMQ adapter uses the exchange-queue logic of RabbitMQ for creating consumer groups. It means the `sendToChannel` method sends the message to the exchange and not for a queue.
>To use this adapter, install the `amqplib` module with npm install `amqplib` command.
**Example**

Expand Down Expand Up @@ -515,11 +531,63 @@ module.exports = {

### Kafka

Coming soon.
The Kafka adapter uses Apache Kafka topics.

> In Kafka adapter, the `maxInFlight` function works differently than other adapters. Reading messages from a partition is processed sequentially in order. So if you want to process multiple messages, you should read messages from multiple partition. To enable it, use the `kafka.partitionsConsumedConcurrently` option in channel options. [More info](https://kafka.js.org/docs/consuming#a-name-concurrent-processing-a-partition-aware-concurrency).
>To use this adapter, install the `kafkajs` module with npm install `kafkajs` command.
**Example**

```js
// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
middlewares: [
ChannelsMiddleware({
adapter: "kafka://localhost:9092"
})
]
};
```

**Example with options**

```js
// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
middlewares: [
ChannelsMiddleware({
adapter: {
type: "Kafka",
options: {
kafka: {
brokers: ["kafka-1:9092", "kafka-1:9092"]
// Options for `producer()`
producerOptions: {},
// Options for `consumer()`
consumerOptions: {},
},
maxRetries: 3,
deadLettering: {
enabled: false,
queueName: "DEAD_LETTER",
}
}
}
})
]
};
```


### NATS JetStream

>To use this adapter, install the `nats` module with npm install `nats` command.
```js
// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;
Expand Down Expand Up @@ -590,33 +658,39 @@ In this test, we send one message at a time. After processing the current messag

| Adapter | Time | msg/sec |
| -------------- | ----:| -------:|
| Redis | 3ms | 283 |
| RedisCluster | 2ms | 360 |
| AMQP | 46ms | 22 |
| Redis | 2ms | 452 |
| RedisCluster | 2ms | 433 |
| AMQP | 51ms | 20 |
| NATS JetStream | 1ms | 584 |
| Kafka | 1ms | 637 |

![chart](https://image-charts.com/chart?chd=a%3A3%2C2%2C22&chf=b0%2Clg%2C90%2C03a9f4%2C0%2C3f51b5%2C1&chg=0%2C50&chma=0%2C0%2C10%2C10&chs=999x500&cht=bvs&chtt=Latency%20test%20%28milliseconds%29%7Clower%20is%20better&chxl=0%3A%7CRedis%7CRedisCluster%7CAMQP&chxs=0%2C333%2C10%7C1%2C333%2C10&chxt=x%2Cy)
![chart](https://image-charts.com/chart?chd=a%3A3%2C2%2C51%2C1%2C1&chf=b0%2Clg%2C90%2C03a9f4%2C0%2C3f51b5%2C1&chg=0%2C50&chma=0%2C0%2C10%2C10&chs=999x500&cht=bvs&chtt=Latency%20test%20%28milliseconds%29%7Clower%20is%20better&chxl=0%3A%7CRedis%7CRedisCluster%7CAMQP%7CNATS%7CKafka&chxs=0%2C333%2C10%7C1%2C333%2C10&chxt=x%2Cy)

### Throughput test (maxInFligth: 10)
In this test, we send 10k messages and wait for all be processed. This test measures the throughput. The `maxInFlight` is `10`.

| Adapter | msg/sec |
| -------------- | -------:|
| Redis | 1164 |
| RedisCluster | 1275 |
| AMQP | 10431 |
| Redis | 1294 |
| RedisCluster | 4118 |
| AMQP | 11143 |
| NATS JetStream | 589 |
| Kafka | 1831 |

![chart](https://image-charts.com/chart?chd=a%3A1164%2C1275%2C10431&chf=b0%2Clg%2C90%2C03a9f4%2C0%2C3f51b5%2C1&chg=0%2C50&chma=0%2C0%2C10%2C10&chs=999x500&cht=bvs&chtt=Throughtput%20test%20%28msg%2Fsec%29%7C%28maxInFligth%3A%2010%29%7Chigher%20is%20better&chxl=0%3A%7CRedis%7CRedisCluster%7CAMQP&chxs=0%2C333%2C10%7C1%2C333%2C10&chxt=x%2Cy)
![chart](https://image-charts.com/chart?chd=a%3A1294%2C4118%2C11143%2C589%2C1831&chf=b0%2Clg%2C90%2C03a9f4%2C0%2C3f51b5%2C1&chg=0%2C50&chma=0%2C0%2C10%2C10&chs=999x500&cht=bvs&chtt=Throughtput%20test%20%28msg%2Fsec%29%7C%28maxInFligth%3A%2010%29%7Chigher%20is%20better&chxl=0%3A%7CRedis%7CRedisCluster%7CAMQP%7CNATS%7CKafka&chxs=0%2C333%2C10%7C1%2C333%2C10&chxt=x%2Cy)

### Throughput test (maxInFligth: 100)
In this test, we send 10k messages and wait for all be processed. This test measures the throughput. The `maxInFlight` is `100`.

| Adapter | msg/sec |
| -------------- | -------:|
| Redis | 6260 |
| RedisCluster | 4071 |
| AMQP | 20988 |
| Redis | 4081 |
| RedisCluster | 4198 |
| AMQP | 21438 |
| NATS JetStream | 646 |
| Kafka | 1916 |

![chart](https://image-charts.com/chart?chd=a%3A6260%2C4071%2C20988&chf=b0%2Clg%2C90%2C03a9f4%2C0%2C3f51b5%2C1&chg=0%2C50&chma=0%2C0%2C10%2C10&chs=999x500&cht=bvs&chtt=Throughtput%20test%20%28msg%2Fsec%29%7C%28maxInFligth%3A%20100%29%7Chigher%20is%20better&chxl=0%3A%7CRedis%7CRedisCluster%7CAMQP&chxs=0%2C333%2C10%7C1%2C333%2C10&chxt=x%2Cy)
![chart](https://image-charts.com/chart?chd=a%3A4081%2C4198%2C21438%2C646%2C1916&chf=b0%2Clg%2C90%2C03a9f4%2C0%2C3f51b5%2C1&chg=0%2C50&chma=0%2C0%2C10%2C10&chs=999x500&cht=bvs&chtt=Throughtput%20test%20%28msg%2Fsec%29%7C%28maxInFligth%3A%20100%29%7Chigher%20is%20better&chxl=0%3A%7CRedis%7CRedisCluster%7CAMQP%7CNATS%7CKafka&chxs=0%2C333%2C10%7C1%2C333%2C10&chxt=x%2Cy)


## License
Expand Down
3 changes: 2 additions & 1 deletion benchmark/suites/latency.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const Adapters = [
}
},
{ type: "AMQP", options: {} },
{ type: "NATS", options: {} }
{ type: "NATS", options: {} },
{ type: "Kafka", options: { kafka: { brokers: ["localhost:9093"] } } }
];

Promise.mapSeries(Adapters, async adapterDef => {
Expand Down
3 changes: 2 additions & 1 deletion benchmark/suites/throughput.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const Adapters = [
}
},
{ type: "AMQP", options: {} },
{ type: "NATS", options: {} }
{ type: "NATS", options: {} },
{ type: "Kafka", options: { kafka: { brokers: ["localhost:9093"] } } }
];

Promise.mapSeries(Adapters, async adapterDef => {
Expand Down
34 changes: 29 additions & 5 deletions examples/dead-letter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ const broker = new ServiceBroker({
command: "publish",
alias: ["p"],
async action(broker, args) {
const { options } = args;
//console.log(options);
const payload = {
id: 2,
name: "Jane Doe",
Expand All @@ -33,7 +31,7 @@ const broker = new ServiceBroker({
pid: process.pid
};

await broker.sendToChannel("my.fail.topic", payload);
await broker.sendToChannel("my.fail.topic", payload, { key: "" + c });
}
}
]
Expand All @@ -43,7 +41,7 @@ broker.createService({
name: "sub1",
channels: {
"my.fail.topic": {
group: "mygroup",
group: "failgroup",
minIdleTime: 1000,
claimInterval: 500,
maxRetries: 3,
Expand All @@ -59,8 +57,34 @@ broker.createService({
}
}
});
broker.createService({
name: "sub2",
channels: {
"my.fail.topic": {
group: "goodgroup",
handler(msg) {
this.logger.info(">>> I processed", msg);
}
}
}
});

broker.createService(deadServiceSchema);
broker.createService({
name: "sub3",
channels: {
DEAD_LETTER: {
group: "failgroup",
handler(msg, raw) {
this.logger.info("--> FAILED HANDLER <--");
this.logger.info(msg);
// Send a notification about the failure

this.logger.info("--> RAW (ENTIRE) MESSAGE <--");
this.logger.info(raw);
}
}
}
});

broker
.start()
Expand Down
Loading

0 comments on commit b320397

Please sign in to comment.