Skip to content

Commit

Permalink
Add options for tuning connection-loss detection
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Jun 12, 2024
1 parent 7b070fa commit 76e8e83
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 3 deletions.
7 changes: 7 additions & 0 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ to use them.
## Connection timeout for initial connection in seconds
# connection_timeout = "30s"

## Interval and ping timeout for keep-alive messages
## The sum of those options defines when a connection loss is detected.
## Note: The keep-alive interval needs to be in second granularity e.g. 1m
## but not 100ms.
# keep_alive = "60s"
# ping_timeout = "10s"

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
Expand Down
13 changes: 10 additions & 3 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type MQTTConsumer struct {
Password config.Secret `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
KeepAliveInterval config.Duration `toml:"keep_alive"`
PingTimeout config.Duration `toml:"ping_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
PersistentSession bool `toml:"persistent_session"`
ClientID string `toml:"client_id"`
Expand Down Expand Up @@ -324,7 +326,9 @@ func (m *MQTTConsumer) Stop() {
m.client.Disconnect(200)
m.Log.Debugf("Disconnected %v", m.Servers)
}
m.cancel()
if m.cancel != nil {
m.cancel()
}
}
func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if !m.client.IsConnected() {
Expand Down Expand Up @@ -385,7 +389,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts.AddBroker(server)
}
opts.SetAutoReconnect(false)
opts.SetKeepAlive(time.Second * 60)
opts.SetKeepAlive(time.Duration(m.KeepAliveInterval))
opts.SetPingTimeout(time.Duration(m.PingTimeout))
opts.SetCleanSession(!m.PersistentSession)
opts.SetAutoAckDisabled(m.PersistentSession)
opts.SetConnectionLostHandler(m.onConnectionLost)
Expand Down Expand Up @@ -446,8 +451,10 @@ func typeConvert(types map[string]string, topicValue string, key string) (interf
func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: defaultConnectionTimeout,
KeepAliveInterval: config.Duration(60 * time.Second),
PingTimeout: config.Duration(10 * time.Second),
clientFactory: factory,
}
}
Expand Down
66 changes: 66 additions & 0 deletions plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,69 @@ func TestStartupErrorBehaviorRetryIntegration(t *testing.T) {
plugin.Stop()
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

func TestReconnectIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

// Startup the container
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")

const servicePort = "1883"
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
Files: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()

// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: config.Duration(5 * time.Second),
KeepAliveInterval: config.Duration(1 * time.Second),
PingTimeout: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{Name: "mqtt-integration-test"},
clientFactory: factory,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
require.NoError(t, plugin.Init())

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

// Pause the container for simulating loosing connection
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway

// Wait until we really lost the connection
require.Eventually(t, func() bool {
return !plugin.client.IsConnected()
}, 5*time.Second, 100*time.Millisecond)

// There should be no metrics as the plugin is not fully started up yet
require.ErrorContains(t, plugin.Gather(&acc), "network Error")
require.False(t, plugin.client.IsConnected())

// Unpause the container, now we should be able to reconnect
require.NoError(t, container.Resume())
require.NoError(t, plugin.Gather(&acc))

require.Eventually(t, func() bool {
return plugin.Gather(&acc) == nil
}, 5*time.Second, 200*time.Millisecond)
}
7 changes: 7 additions & 0 deletions plugins/inputs/mqtt_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
## Connection timeout for initial connection in seconds
# connection_timeout = "30s"

## Interval and ping timeout for keep-alive messages
## The sum of those options defines when a connection loss is detected.
## Note: The keep-alive interval needs to be in second granularity e.g. 1m
## but not 100ms.
# keep_alive = "60s"
# ping_timeout = "10s"

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
Expand Down

0 comments on commit 76e8e83

Please sign in to comment.