Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.modbus): Do not fail if a single slave reports errors #11785

Merged
merged 4 commits into from Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 44 additions & 30 deletions plugins/inputs/modbus/modbus.go
Expand Up @@ -150,26 +150,24 @@ func (m *Modbus) Gather(acc telegraf.Accumulator) error {
}
}

timestamp := time.Now()
for retry := 0; retry <= m.Retries; retry++ {
timestamp = time.Now()
if err := m.gatherFields(); err != nil {
if mberr, ok := err.(*mb.Error); ok && mberr.ExceptionCode == mb.ExceptionCodeServerDeviceBusy && retry < m.Retries {
m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry)
time.Sleep(time.Duration(m.RetriesWaitTime))
continue
}
// Show the disconnect error this way to not shadow the initial error
if discerr := m.disconnect(); discerr != nil {
m.Log.Errorf("Disconnecting failed: %v", discerr)
for slaveID, requests := range m.requests {
m.Log.Debugf("Reading slave %d for %s...", slaveID, m.Controller)
if err := m.readSlaveData(slaveID, requests); err != nil {
acc.AddError(fmt.Errorf("slave %d: %w", slaveID, err))
mberr, ok := err.(*mb.Error)
if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy {
m.Log.Debugf("Reconnecting to %s...", m.Controller)
if err := m.disconnect(); err != nil {
return fmt.Errorf("disconnecting failed: %w", err)
}
if err := m.connect(); err != nil {
return fmt.Errorf("connecting failed: %w", err)
}
}
return err
continue
}
// Reading was successful, leave the retry loop
break
}
timestamp := time.Now()

for slaveID, requests := range m.requests {
tags := map[string]string{
"name": m.Name,
"type": cCoils,
Expand Down Expand Up @@ -280,24 +278,40 @@ func (m *Modbus) disconnect() error {
return err
}

func (m *Modbus) gatherFields() error {
for slaveID, requests := range m.requests {
m.handler.SetSlave(slaveID)
if err := m.gatherRequestsCoil(requests.coil); err != nil {
return err
}
if err := m.gatherRequestsDiscrete(requests.discrete); err != nil {
return err
}
if err := m.gatherRequestsHolding(requests.holding); err != nil {
return err
func (m *Modbus) readSlaveData(slaveID byte, requests requestSet) error {
m.handler.SetSlave(slaveID)

for retry := 0; retry < m.Retries; retry++ {
err := m.gatherFields(requests)
if err == nil {
// Reading was successful
return nil
}
if err := m.gatherRequestsInput(requests.input); err != nil {

// Exit in case a non-recoverable error occurred
mberr, ok := err.(*mb.Error)
if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy {
return err
}

// Wait some time and try again reading the slave.
m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry)
time.Sleep(time.Duration(m.RetriesWaitTime))
}
return m.gatherFields(requests)
}

return nil
func (m *Modbus) gatherFields(requests requestSet) error {
if err := m.gatherRequestsCoil(requests.coil); err != nil {
return err
}
if err := m.gatherRequestsDiscrete(requests.discrete); err != nil {
return err
}
if err := m.gatherRequestsHolding(requests.holding); err != nil {
return err
}
return m.gatherRequestsInput(requests.input)
}

func (m *Modbus) gatherRequestsCoil(requests []request) error {
Expand Down
118 changes: 110 additions & 8 deletions plugins/inputs/modbus/modbus_test.go
Expand Up @@ -1050,9 +1050,9 @@ func TestRetryFailExhausted(t *testing.T) {
require.NoError(t, modbus.Init())
require.NotEmpty(t, modbus.requests)

err := modbus.Gather(&acc)
require.Error(t, err)
require.Equal(t, "modbus: exception '6' (server device busy), function '129'", err.Error())
require.NoError(t, modbus.Gather(&acc))
require.Len(t, acc.Errors, 1)
require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '6' (server device busy), function '129'")
}

func TestRetryFailIllegal(t *testing.T) {
Expand All @@ -1072,7 +1072,8 @@ func TestRetryFailIllegal(t *testing.T) {
data[1] = byte(0)

return data, &mbserver.IllegalFunction
})
},
)

modbus := Modbus{
Name: "TestRetryFailExhausted",
Expand All @@ -1092,9 +1093,9 @@ func TestRetryFailIllegal(t *testing.T) {
require.NoError(t, modbus.Init())
require.NotEmpty(t, modbus.requests)

err := modbus.Gather(&acc)
require.Error(t, err)
require.Equal(t, "modbus: exception '1' (illegal function), function '129'", err.Error())
require.NoError(t, modbus.Gather(&acc))
require.Len(t, acc.Errors, 1)
require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '1' (illegal function), function '129'")
require.Equal(t, counter, 1)
}

Expand Down Expand Up @@ -1866,7 +1867,8 @@ func TestRequestsStartingWithOmits(t *testing.T) {
Log: testutil.Logger{},
}
modbus.Requests = []requestDefinition{
{SlaveID: 1,
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
Expand Down Expand Up @@ -1942,3 +1944,103 @@ func TestRequestsEmptyFields(t *testing.T) {
err := modbus.Init()
require.EqualError(t, err, `configuraton invalid: found request section without fields`)
}

func TestMultipleSlavesOneFail(t *testing.T) {
telegraf.Debug = true
modbus := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
Retries: 1,
ConfigurationType: "request",
Log: testutil.Logger{},
}
modbus.Requests = []requestDefinition{
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
},
},
},
{
SlaveID: 2,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
},
},
},
{
SlaveID: 3,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
},
},
},
}
require.NoError(t, modbus.Init())

serv := mbserver.NewServer()
require.NoError(t, serv.ListenTCP("localhost:1502"))
defer serv.Close()

serv.RegisterFunctionHandler(3,
func(s *mbserver.Server, frame mbserver.Framer) ([]byte, *mbserver.Exception) {
tcpframe, ok := frame.(*mbserver.TCPFrame)
if !ok {
return nil, &mbserver.IllegalFunction
}

if tcpframe.Device == 2 {
// Simulate device 2 being unavailable
return []byte{}, &mbserver.GatewayTargetDeviceFailedtoRespond
}
return []byte{0x02, 0x00, 0x42}, &mbserver.Success
},
)

expected := []telegraf.Metric{
testutil.MustMetric(
"modbus",
map[string]string{
"type": cHoldingRegisters,
"slave_id": "1",
"name": modbus.Name,
},
map[string]interface{}{"holding-0": int16(0x42)},
time.Unix(0, 0),
),
testutil.MustMetric(
"modbus",
map[string]string{
"type": cHoldingRegisters,
"slave_id": "3",
"name": modbus.Name,
},
map[string]interface{}{"holding-0": int16(0x42)},
time.Unix(0, 0),
),
}

var acc testutil.Accumulator
require.NoError(t, modbus.Gather(&acc))
acc.Wait(len(expected))
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime(), testutil.SortMetrics())
require.Len(t, acc.Errors, 1)
require.EqualError(t, acc.FirstError(), "slave 2: modbus: exception '11' (gateway target device failed to respond), function '131'")
}