Skip to content

Commit

Permalink
fix(inputs.modbus): Do not fail if a single slave reports errors (#11785
Browse files Browse the repository at this point in the history
)

(cherry picked from commit d637a66)
  • Loading branch information
srebhan authored and reimda committed Sep 19, 2022
1 parent d8232c2 commit 2fb79bf
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 38 deletions.
74 changes: 44 additions & 30 deletions plugins/inputs/modbus/modbus.go
Expand Up @@ -146,26 +146,24 @@ func (m *Modbus) Gather(acc telegraf.Accumulator) error {
}
}

timestamp := time.Now()
for retry := 0; retry <= m.Retries; retry++ {
timestamp = time.Now()
if err := m.gatherFields(); err != nil {
if mberr, ok := err.(*mb.Error); ok && mberr.ExceptionCode == mb.ExceptionCodeServerDeviceBusy && retry < m.Retries {
m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry)
time.Sleep(time.Duration(m.RetriesWaitTime))
continue
}
// Show the disconnect error this way to not shadow the initial error
if discerr := m.disconnect(); discerr != nil {
m.Log.Errorf("Disconnecting failed: %v", discerr)
for slaveID, requests := range m.requests {
m.Log.Debugf("Reading slave %d for %s...", slaveID, m.Controller)
if err := m.readSlaveData(slaveID, requests); err != nil {
acc.AddError(fmt.Errorf("slave %d: %w", slaveID, err))
mberr, ok := err.(*mb.Error)
if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy {
m.Log.Debugf("Reconnecting to %s...", m.Controller)
if err := m.disconnect(); err != nil {
return fmt.Errorf("disconnecting failed: %w", err)
}
if err := m.connect(); err != nil {
return fmt.Errorf("connecting failed: %w", err)
}
}
return err
continue
}
// Reading was successful, leave the retry loop
break
}
timestamp := time.Now()

for slaveID, requests := range m.requests {
tags := map[string]string{
"name": m.Name,
"type": cCoils,
Expand Down Expand Up @@ -276,24 +274,40 @@ func (m *Modbus) disconnect() error {
return err
}

func (m *Modbus) gatherFields() error {
for slaveID, requests := range m.requests {
m.handler.SetSlave(slaveID)
if err := m.gatherRequestsCoil(requests.coil); err != nil {
return err
}
if err := m.gatherRequestsDiscrete(requests.discrete); err != nil {
return err
}
if err := m.gatherRequestsHolding(requests.holding); err != nil {
return err
func (m *Modbus) readSlaveData(slaveID byte, requests requestSet) error {
m.handler.SetSlave(slaveID)

for retry := 0; retry < m.Retries; retry++ {
err := m.gatherFields(requests)
if err == nil {
// Reading was successful
return nil
}
if err := m.gatherRequestsInput(requests.input); err != nil {

// Exit in case a non-recoverable error occurred
mberr, ok := err.(*mb.Error)
if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy {
return err
}

// Wait some time and try again reading the slave.
m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry)
time.Sleep(time.Duration(m.RetriesWaitTime))
}
return m.gatherFields(requests)
}

return nil
func (m *Modbus) gatherFields(requests requestSet) error {
if err := m.gatherRequestsCoil(requests.coil); err != nil {
return err
}
if err := m.gatherRequestsDiscrete(requests.discrete); err != nil {
return err
}
if err := m.gatherRequestsHolding(requests.holding); err != nil {
return err
}
return m.gatherRequestsInput(requests.input)
}

func (m *Modbus) gatherRequestsCoil(requests []request) error {
Expand Down
118 changes: 110 additions & 8 deletions plugins/inputs/modbus/modbus_test.go
Expand Up @@ -1050,9 +1050,9 @@ func TestRetryFailExhausted(t *testing.T) {
require.NoError(t, modbus.Init())
require.NotEmpty(t, modbus.requests)

err := modbus.Gather(&acc)
require.Error(t, err)
require.Equal(t, "modbus: exception '6' (server device busy), function '129'", err.Error())
require.NoError(t, modbus.Gather(&acc))
require.Len(t, acc.Errors, 1)
require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '6' (server device busy), function '129'")
}

func TestRetryFailIllegal(t *testing.T) {
Expand All @@ -1072,7 +1072,8 @@ func TestRetryFailIllegal(t *testing.T) {
data[1] = byte(0)

return data, &mbserver.IllegalFunction
})
},
)

modbus := Modbus{
Name: "TestRetryFailExhausted",
Expand All @@ -1092,9 +1093,9 @@ func TestRetryFailIllegal(t *testing.T) {
require.NoError(t, modbus.Init())
require.NotEmpty(t, modbus.requests)

err := modbus.Gather(&acc)
require.Error(t, err)
require.Equal(t, "modbus: exception '1' (illegal function), function '129'", err.Error())
require.NoError(t, modbus.Gather(&acc))
require.Len(t, acc.Errors, 1)
require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '1' (illegal function), function '129'")
require.Equal(t, counter, 1)
}

Expand Down Expand Up @@ -1866,7 +1867,8 @@ func TestRequestsStartingWithOmits(t *testing.T) {
Log: testutil.Logger{},
}
modbus.Requests = []requestDefinition{
{SlaveID: 1,
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
Expand Down Expand Up @@ -1942,3 +1944,103 @@ func TestRequestsEmptyFields(t *testing.T) {
err := modbus.Init()
require.EqualError(t, err, `configuraton invalid: found request section without fields`)
}

func TestMultipleSlavesOneFail(t *testing.T) {
telegraf.Debug = true
modbus := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
Retries: 1,
ConfigurationType: "request",
Log: testutil.Logger{},
}
modbus.Requests = []requestDefinition{
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
},
},
},
{
SlaveID: 2,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
},
},
},
{
SlaveID: 3,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
},
},
},
}
require.NoError(t, modbus.Init())

serv := mbserver.NewServer()
require.NoError(t, serv.ListenTCP("localhost:1502"))
defer serv.Close()

serv.RegisterFunctionHandler(3,
func(s *mbserver.Server, frame mbserver.Framer) ([]byte, *mbserver.Exception) {
tcpframe, ok := frame.(*mbserver.TCPFrame)
if !ok {
return nil, &mbserver.IllegalFunction
}

if tcpframe.Device == 2 {
// Simulate device 2 being unavailable
return []byte{}, &mbserver.GatewayTargetDeviceFailedtoRespond
}
return []byte{0x02, 0x00, 0x42}, &mbserver.Success
},
)

expected := []telegraf.Metric{
testutil.MustMetric(
"modbus",
map[string]string{
"type": cHoldingRegisters,
"slave_id": "1",
"name": modbus.Name,
},
map[string]interface{}{"holding-0": int16(0x42)},
time.Unix(0, 0),
),
testutil.MustMetric(
"modbus",
map[string]string{
"type": cHoldingRegisters,
"slave_id": "3",
"name": modbus.Name,
},
map[string]interface{}{"holding-0": int16(0x42)},
time.Unix(0, 0),
),
}

var acc testutil.Accumulator
require.NoError(t, modbus.Gather(&acc))
acc.Wait(len(expected))
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime(), testutil.SortMetrics())
require.Len(t, acc.Errors, 1)
require.EqualError(t, acc.FirstError(), "slave 2: modbus: exception '11' (gateway target device failed to respond), function '131'")
}

0 comments on commit 2fb79bf

Please sign in to comment.