A bridge transfer messages from emqtt to kafka
Switch branches/tags
Nothing to show
Clone or download
Permalink
Failed to load latest commit information.
etc Made configuration file available Jul 28, 2016
src Fix clientid to client_id in JSON Jul 28, 2016
.gitignore Initial commit Jul 27, 2016
LICENSE Initial commit Jul 27, 2016
README.md Update README.md Jul 28, 2016
rebar.config Initial commit Jul 27, 2016

README.md

emqttd_plugin_kafka_bridge

Emqttd kafka bridge Plugin, a bridge transfer messages from emqtt to kafka.

Build Plugin

Add the plugin as submodule of emqttd project.

If the submodules exist:

git submodule update --remote plugins/emqttd_plugin_kafka_bridge

Orelse:

git submodule add https://github.com/vowstar/emqttd_plugin_kafka_bridge.git plugins/emqttd_plugin_kafka_bridge

And then build emqttd project.

Configure Plugin

TODO: Move broker list to here

File: etc/plugin.config

[
  {emqttd_plugin_kafka_bridge, [
  	{kafka, [
      {bootstrap_broker, {"127.0.0.1", 9092} },
      {partition_strategy, strict_round_robin}
    ]}
  ]}
].

Broker URL and port setting:

bootstrap_broker, {"127.0.0.1", 9092}

Partition strategy setting:

Round robin

partition_strategy, strict_round_robin

Random

partition_strategy, random

Load Plugin

./bin/emqttd_ctl plugins load emqttd_plugin_kafka_bridge

Kafka Topic and messages

Topic

All message will be published on to kafka's broker_message Topic.

Messages

In the following circumstances, you will receive kafka messages

  • when a client connected broker

  • when a client disconnected broker

  • when a client subscribed a channel

  • when a client unsubscribed a channel

  • when a client published a message to a channel

  • when a client delivered a message

  • when a client acknowledged a messages

All these message will published on to kafka.

Connected

{
	"type":"connected",
	"client_id":"a client id",
	"cluster_node":"which emqtt node",
	"ts":1469690427
}

Note: key "ts" is unix timestamp.

Disconnected

{
	"type":"disconnected",
	"client_id":"a client id",
	"reason":"reason why disconnected",
	"cluster_node":"which emqtt node",
	"ts":1469690427
}

Subscribed

{
	"type":"subscribed",
	"client_id":"a client id",
	"topic":"which topic",
	"cluster_node":"which emqtt node",
	"ts":1469690427
}

Unsubscribed

{
	"type":"unsubscribed",
	"client_id":"a client id",
	"topic":"which topic",
	"cluster_node":"which emqtt node",
	"ts":1469690427
}

Published

{
	"type":"published",
	"client_id":"a client id",
	"topic":"which topic",
    "payload":"payload of message",
    "qos":1,
	"cluster_node":"which emqtt node",
	"ts":1469690427
}

Note: key "Qos" is QoS level of message.

Delivered

{
	"type":"delivered",
	"client_id":"a client id",
	"form":"from client id",
	"topic":"which topic",
    "payload":"payload of message",
    "qos":1,
	"cluster_node":"which emqtt node",
	"ts":1469690427
}

Acknowledged

{
	"type":"acked",
	"client_id":"a client id",
	"form":"from client id",
	"topic":"which topic",
    "payload":"payload of message",
    "qos":1,
	"cluster_node":"which emqtt node",
	"ts":1469690427
}

Author

Huang Rui vowstar@gmail.com

https://www.devicexx.com

Thanks

This project is based on the code of:

Erlang MQTT Broker EMQTTD

Helpshift Ekaf ekaf

And learn ideas form:

Gao dongchen's emqttd_plugin_blackhole

Abhishek Chawla's emqttd_publish_client_activity