Skip to content

Commit

Permalink
feat: implement browsing opcua tags on initial connect
Browse files Browse the repository at this point in the history
Introduce a `browse` flag in opcua and opcua_listener configurations
which causes the plugin to browse the tag using the OPC servers' browse
feature.
  • Loading branch information
supergrover committed Mar 26, 2024
1 parent 4344972 commit b36af19
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 20 deletions.
106 changes: 102 additions & 4 deletions plugins/common/opcua/input/input_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

gopcua_id "github.com/gopcua/opcua/id"
"github.com/gopcua/opcua/ua"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -57,6 +58,7 @@ type NodeSettings struct {
TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"`
DefaultTags map[string]string `toml:"default_tags"`
MonitoringParams MonitoringParameters `toml:"monitoring_params"`
Browse bool `toml:"browse"`
}

// NodeID returns the OPC UA node id
Expand All @@ -69,6 +71,7 @@ type NodeGroupSettings struct {
MetricName string `toml:"name"` // Overrides plugin's setting
Namespace string `toml:"namespace"` // Can be overridden by node setting
IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting
Browse bool `toml:"browse"` // Can be overridden by node setting, although only to true state
Nodes []NodeSettings `toml:"nodes"`
TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"`
DefaultTags map[string]string `toml:"default_tags"`
Expand Down Expand Up @@ -352,6 +355,9 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error {
if node.MonitoringParams.SamplingInterval == 0 {
node.MonitoringParams.SamplingInterval = group.SamplingInterval
}
if node.Browse == false {
node.Browse = group.Browse
}

nmm, err := NewNodeMetricMapping(group.MetricName, node, groupTags)
if err != nil {
Expand All @@ -368,19 +374,111 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error {
return nil
}

func (o *OpcUAInputClient) InitNodeIDs() error {
func (o *OpcUAInputClient) InitNodeIDs(ctx context.Context) error {
o.NodeIDs = make([]*ua.NodeID, 0, len(o.NodeMetricMapping))
for _, node := range o.NodeMetricMapping {
nid, err := ua.ParseNodeID(node.Tag.NodeID())
if err != nil {
return err
var nid *ua.NodeID = nil;

if node.Tag.Browse && node.Tag.IdentifierType == "s" {
browsedNodeID, err := o.browseNode(ctx, node.Tag.Namespace, node.Tag.Identifier)
if err != nil {
o.Log.Errorf("Error browsing node %s: %s", node.Tag.FieldName, err)
// continue, other nodes may be browseable
// also, add the node as a string identifier. The server will not be able to return data
// but we must add a node to the o.NodeIDs array in order for it to match the o.NodeMetricMappings
parsedNodeID, err := ua.ParseNodeID(node.Tag.NodeID())
if err != nil {
return err
}
nid = parsedNodeID
} else {
nid = browsedNodeID
}
} else {
parsedNodeID, err := ua.ParseNodeID(node.Tag.NodeID())
if err != nil {
return err
}
nid = parsedNodeID
}

o.NodeIDs = append(o.NodeIDs, nid)
}

return nil
}

func parseNamespaceId(ns string) (uint16, error) {
nsId, err := strconv.ParseUint(ns, 10, 16)
if err != nil {
return 0, fmt.Errorf("invalid namespace number: %s", err)
}

return uint16(nsId), nil
}

// Parse a browse path somewhat as specified in OPC UA Part 4, Annex A.
// basically a list of segments of ns:BrowseName separated by slashes.
// for example: 4:Folder/3:Object/Member.Value
// Use the initialNamespace as the namespace if no ns is specified in the path segment
func parseBrowsePath(initialNamespace string, browsePath string) ([]*ua.QualifiedName, error) {
segmentSplitter := func(r rune) bool {
return r == '/' || r == '.'
}
segments := strings.FieldsFunc(browsePath, segmentSplitter)

ns, err := parseNamespaceId(initialNamespace)
if err != nil {
return nil, err
}

pathNames := make([]*ua.QualifiedName, 0, len(segments))

for _, segment := range segments {
browseName := strings.Split(segment, ":")

segmentNs := ns
segmentName := ""

if len(browseName) == 1 {
segmentName = browseName[0]
} else if len(browseName) == 2 {
segmentNs, err = parseNamespaceId(browseName[0])
if err != nil {
return nil, fmt.Errorf("Namespace part of segment %s cannot be parsed as an namespace id: %s", segment, err)
}
segmentName = browseName[1]
} else {
return nil, fmt.Errorf("Browse name segment %s contains more than one colon", segment)
}

pathNames = append(pathNames, &ua.QualifiedName {
NamespaceIndex: segmentNs,
Name: segmentName,
});
}

return pathNames, nil
}

func (o *OpcUAInputClient) browseNode(ctx context.Context, ns string, browsePath string) (*ua.NodeID, error) {
o.Log.Infof("Browsing node %s", browsePath)

objectsFolderId := ua.NewNumericNodeID(0, gopcua_id.ObjectsFolder)

pathNames, err := parseBrowsePath(ns, browsePath)
if err != nil {
return nil, fmt.Errorf("invalid browse path, %s", err)
}

targetNodeId, err := o.Client.Node(objectsFolderId).TranslateBrowsePathsToNodeIDs(ctx, pathNames)
if err != nil {
return nil, fmt.Errorf("unable to browse for node, %s", err)
}

return targetNodeId, nil
}

func (o *OpcUAInputClient) initLastReceivedValues() {
o.LastReceivedData = make([]NodeValue, len(o.NodeMetricMapping))
for nodeIdx, nmm := range o.NodeMetricMapping {
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/opcua/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,19 @@ to use them.
## identifier - OPC UA ID (tag as shown in opcua browser)
## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags
## default_tags - extra tags to be added to the output metric (optional)
## browse - browse the OPC tag using BrowseName attributes (optional).
## identifier_type must be "s" and identifier must be set to a valid browse path according to OPC UA Part 4, Annex A,
## e.g.: 3:Folder/4:SubFolder/4:Object.Member.Value
## Objects are browsed starting from the ObjectsFolder folder.
## The namespace in the namespace field is used as the initial namespace, but may be overridden by the browse path
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]},
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="s", identifier="3:Folder/4:SubFolder/4:Object.Member.Value", browse=true}
# ]
#
## Bracketed notation
Expand All @@ -104,6 +110,7 @@ to use them.
# namespace = ""
# identifier_type = ""
# identifier = ""
# browse = false
# default_tags = { tag1 = "value1", tag2 = "value2" }
#
# [[inputs.opcua.nodes]]
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/opcua/read_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (o *ReadClient) Connect() error {

// Make sure we setup the node-ids correctly after reconnect
// as the server might be restarted and IDs changed
if err := o.OpcUAInputClient.InitNodeIDs(); err != nil {
if err := o.OpcUAInputClient.InitNodeIDs(o.ctx); err != nil {
return fmt.Errorf("initializing node IDs failed: %w", err)
}

Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/opcua/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,19 @@
## identifier - OPC UA ID (tag as shown in opcua browser)
## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags
## default_tags - extra tags to be added to the output metric (optional)
## browse - browse the OPC tag using BrowseName attributes (optional).
## identifier_type must be "s" and identifier must be set to a valid browse path according to OPC UA Part 4, Annex A,
## e.g.: 3:Folder/4:SubFolder/4:Object.Member.Value
## Objects are browsed starting from the ObjectsFolder folder.
## The namespace in the namespace field is used as the initial namespace, but may be overridden by the browse path
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]},
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="s", identifier="3:Folder/4:SubFolder/4:Object.Member.Value", browse=true}
# ]
#
## Bracketed notation
Expand All @@ -76,6 +82,7 @@
# namespace = ""
# identifier_type = ""
# identifier = ""
# browse = false
# default_tags = { tag1 = "value1", tag2 = "value2" }
#
# [[inputs.opcua.nodes]]
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/opcua_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ to use them.
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier="",}
# {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}},
# {name="node3", namespace="3", identifier_type="s" identifier=3:Folder/4.Object.Member.Field"}
#]
#
## Bracketed notation
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/opcua_listener/opcua_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) {
})

subClient, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
subClient.Connect()
require.NoError(t, err)
require.Equal(t, &ua.MonitoringParameters{
SamplingInterval: 50,
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/opcua_listener/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier="",}
# {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}},
# {name="node3", namespace="3", identifier_type="s" identifier=3:Folder/4.Object.Member.Field"}
#]
#
## Bracketed notation
Expand Down
32 changes: 17 additions & 15 deletions plugins/inputs/opcua_listener/subscribe_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,11 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
return nil, err
}

if err := client.InitNodeIDs(); err != nil {
return nil, err
}

processingCtx, processingCancel := context.WithCancel(context.Background())
subClient := &SubscribeClient{
OpcUAInputClient: client,
Config: *sc,
monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)),
monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeMetricMapping)),
// 100 was chosen to make sure that the channels will not block when multiple changes come in at the same time.
// The channel size should be increased if reports come in on Telegraf blocking when many changes come in at
// the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval.
Expand All @@ -105,16 +101,6 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
cancel: processingCancel,
}

log.Debugf("Creating monitored items")
for i, nodeID := range client.NodeIDs {
// The node id index (i) is used as the handle for the monitored item
req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i))
if err := assignConfigValuesToRequest(req, &client.NodeMetricMapping[i].Tag.MonitoringParams); err != nil {
return nil, err
}
subClient.monitoredItemsReqs[i] = req
}

return subClient, nil
}

Expand All @@ -124,6 +110,22 @@ func (o *SubscribeClient) Connect() error {
return err
}

// Make sure we setup the node-ids correctly after reconnect
// as the server might be restarted and IDs changed
o.Log.Debugf("Creating monitored items on first connection")
if err := o.OpcUAInputClient.InitNodeIDs(o.ctx); err != nil {
return err
}

for i, nodeID := range o.OpcUAInputClient.NodeIDs {
// The node id index (i) is used as the handle for the monitored item
req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i))
if err := assignConfigValuesToRequest(req, &o.OpcUAInputClient.NodeMetricMapping[i].Tag.MonitoringParams); err != nil {
return err
}
o.monitoredItemsReqs[i] = req
}

o.Log.Debugf("Creating OPC UA subscription")
o.sub, err = o.Client.Subscribe(o.ctx, &opcua.SubscriptionParameters{
Interval: time.Duration(o.Config.SubscriptionInterval),
Expand Down

0 comments on commit b36af19

Please sign in to comment.