From fc659078cafeea7ab81cfb3530170ee956577b7c Mon Sep 17 00:00:00 2001 From: supergrover Date: Sun, 24 Mar 2024 11:46:46 +0100 Subject: [PATCH] feat: implement browsing opcua tags on initial connect Introduce a `browse` flag in opcua and opcua_listener configurations which causes the plugin to browse the tag using the OPC servers' browse feature. --- plugins/common/opcua/input/input_client.go | 106 +++++++++++++++++- plugins/inputs/opcua/README.md | 7 ++ plugins/inputs/opcua/read_client.go | 2 +- plugins/inputs/opcua/sample.conf | 7 ++ plugins/inputs/opcua_listener/README.md | 1 + .../opcua_listener/opcua_listener_test.go | 1 + plugins/inputs/opcua_listener/sample.conf | 1 + .../inputs/opcua_listener/subscribe_client.go | 32 +++--- 8 files changed, 137 insertions(+), 20 deletions(-) diff --git a/plugins/common/opcua/input/input_client.go b/plugins/common/opcua/input/input_client.go index 1412f011a4bcb..2ea3048f1067d 100644 --- a/plugins/common/opcua/input/input_client.go +++ b/plugins/common/opcua/input/input_client.go @@ -9,6 +9,7 @@ import ( "strings" "time" + gopcua_id "github.com/gopcua/opcua/id" "github.com/gopcua/opcua/ua" "github.com/influxdata/telegraf" @@ -57,6 +58,7 @@ type NodeSettings struct { TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"` DefaultTags map[string]string `toml:"default_tags"` MonitoringParams MonitoringParameters `toml:"monitoring_params"` + Browse bool `toml:"browse"` } // NodeID returns the OPC UA node id @@ -69,6 +71,7 @@ type NodeGroupSettings struct { MetricName string `toml:"name"` // Overrides plugin's setting Namespace string `toml:"namespace"` // Can be overridden by node setting IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting + Browse bool `toml:"browse"` // Can be overridden by node setting, although only to true state Nodes []NodeSettings `toml:"nodes"` TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"` DefaultTags map[string]string `toml:"default_tags"` @@ -352,6 +355,9 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error { if node.MonitoringParams.SamplingInterval == 0 { node.MonitoringParams.SamplingInterval = group.SamplingInterval } + if node.Browse == false { + node.Browse = group.Browse + } nmm, err := NewNodeMetricMapping(group.MetricName, node, groupTags) if err != nil { @@ -368,19 +374,111 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error { return nil } -func (o *OpcUAInputClient) InitNodeIDs() error { +func (o *OpcUAInputClient) InitNodeIDs(ctx context.Context) error { o.NodeIDs = make([]*ua.NodeID, 0, len(o.NodeMetricMapping)) for _, node := range o.NodeMetricMapping { - nid, err := ua.ParseNodeID(node.Tag.NodeID()) - if err != nil { - return err + var nid *ua.NodeID = nil + + if node.Tag.Browse && node.Tag.IdentifierType == "s" { + browsedNodeID, err := o.browseNode(ctx, node.Tag.Namespace, node.Tag.Identifier) + if err != nil { + o.Log.Errorf("Error browsing node %s: %s", node.Tag.FieldName, err) + // continue, other nodes may be browseable + // also, add the node as a string identifier. The server will not be able to return data + // but we must add a node to the o.NodeIDs array in order for it to match the o.NodeMetricMappings + parsedNodeID, err := ua.ParseNodeID(node.Tag.NodeID()) + if err != nil { + return err + } + nid = parsedNodeID + } else { + nid = browsedNodeID + } + } else { + parsedNodeID, err := ua.ParseNodeID(node.Tag.NodeID()) + if err != nil { + return err + } + nid = parsedNodeID } + o.NodeIDs = append(o.NodeIDs, nid) } return nil } +func parseNamespaceId(ns string) (uint16, error) { + nsId, err := strconv.ParseUint(ns, 10, 16) + if err != nil { + return 0, fmt.Errorf("invalid namespace number: %s", err) + } + + return uint16(nsId), nil +} + +// Parse a browse path somewhat as specified in OPC UA Part 4, Annex A. +// basically a list of segments of ns:BrowseName separated by slashes. +// for example: 4:Folder/3:Object/Member.Value +// Use the initialNamespace as the namespace if no ns is specified in the path segment +func parseBrowsePath(initialNamespace string, browsePath string) ([]*ua.QualifiedName, error) { + segmentSplitter := func(r rune) bool { + return r == '/' || r == '.' + } + segments := strings.FieldsFunc(browsePath, segmentSplitter) + + ns, err := parseNamespaceId(initialNamespace) + if err != nil { + return nil, err + } + + pathNames := make([]*ua.QualifiedName, 0, len(segments)) + + for _, segment := range segments { + browseName := strings.Split(segment, ":") + + segmentNs := ns + segmentName := "" + + if len(browseName) == 1 { + segmentName = browseName[0] + } else if len(browseName) == 2 { + segmentNs, err = parseNamespaceId(browseName[0]) + if err != nil { + return nil, fmt.Errorf("Namespace part of segment %s cannot be parsed as an namespace id: %s", segment, err) + } + segmentName = browseName[1] + } else { + return nil, fmt.Errorf("Browse name segment %s contains more than one colon", segment) + } + + pathNames = append(pathNames, &ua.QualifiedName{ + NamespaceIndex: segmentNs, + Name: segmentName, + }) + } + + return pathNames, nil +} + +func (o *OpcUAInputClient) browseNode(ctx context.Context, ns string, browsePath string) (*ua.NodeID, error) { + o.Log.Infof("Browsing node %s", browsePath) + + objectsFolderId := ua.NewNumericNodeID(0, gopcua_id.ObjectsFolder) + + pathNames, err := parseBrowsePath(ns, browsePath) + if err != nil { + return nil, fmt.Errorf("invalid browse path, %s", err) + } + + targetNodeId, err := o.Client.Node(objectsFolderId).TranslateBrowsePathsToNodeIDs(ctx, pathNames) + if err != nil { + return nil, fmt.Errorf("unable to browse for node, %s", err) + } + + return targetNodeId, nil +} + func (o *OpcUAInputClient) initLastReceivedValues() { o.LastReceivedData = make([]NodeValue, len(o.NodeMetricMapping)) for nodeIdx, nmm := range o.NodeMetricMapping { diff --git a/plugins/inputs/opcua/README.md b/plugins/inputs/opcua/README.md index eff23f7ac4219..d3fde56250f4c 100644 --- a/plugins/inputs/opcua/README.md +++ b/plugins/inputs/opcua/README.md @@ -89,6 +89,11 @@ to use them. ## identifier - OPC UA ID (tag as shown in opcua browser) ## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags ## default_tags - extra tags to be added to the output metric (optional) + ## browse - browse the OPC tag using BrowseName attributes (optional). + ## identifier_type must be "s" and identifier must be set to a valid browse path according to OPC UA Part 4, Annex A, + ## e.g.: 3:Folder/4:SubFolder/4:Object.Member.Value + ## Objects are browsed starting from the ObjectsFolder folder. + ## The namespace in the namespace field is used as the initial namespace, but may be overridden by the browse path ## ## Use either the inline notation or the bracketed notation, not both. # @@ -96,6 +101,7 @@ to use them. # nodes = [ # {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]}, # {name="", namespace="", identifier_type="", identifier=""}, + # {name="", namespace="", identifier_type="s", identifier="3:Folder/4:SubFolder/4:Object.Member.Value", browse=true} # ] # ## Bracketed notation @@ -104,6 +110,7 @@ to use them. # namespace = "" # identifier_type = "" # identifier = "" + # browse = false # default_tags = { tag1 = "value1", tag2 = "value2" } # # [[inputs.opcua.nodes]] diff --git a/plugins/inputs/opcua/read_client.go b/plugins/inputs/opcua/read_client.go index e52e03bba6b1f..cc398862d92b9 100644 --- a/plugins/inputs/opcua/read_client.go +++ b/plugins/inputs/opcua/read_client.go @@ -61,7 +61,7 @@ func (o *ReadClient) Connect() error { // Make sure we setup the node-ids correctly after reconnect // as the server might be restarted and IDs changed - if err := o.OpcUAInputClient.InitNodeIDs(); err != nil { + if err := o.OpcUAInputClient.InitNodeIDs(o.ctx); err != nil { return fmt.Errorf("initializing node IDs failed: %w", err) } diff --git a/plugins/inputs/opcua/sample.conf b/plugins/inputs/opcua/sample.conf index 46002f8df806b..3d08157ecf369 100644 --- a/plugins/inputs/opcua/sample.conf +++ b/plugins/inputs/opcua/sample.conf @@ -61,6 +61,11 @@ ## identifier - OPC UA ID (tag as shown in opcua browser) ## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags ## default_tags - extra tags to be added to the output metric (optional) + ## browse - browse the OPC tag using BrowseName attributes (optional). + ## identifier_type must be "s" and identifier must be set to a valid browse path according to OPC UA Part 4, Annex A, + ## e.g.: 3:Folder/4:SubFolder/4:Object.Member.Value + ## Objects are browsed starting from the ObjectsFolder folder. + ## The namespace in the namespace field is used as the initial namespace, but may be overridden by the browse path ## ## Use either the inline notation or the bracketed notation, not both. # @@ -68,6 +73,7 @@ # nodes = [ # {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]}, # {name="", namespace="", identifier_type="", identifier=""}, + # {name="", namespace="", identifier_type="s", identifier="3:Folder/4:SubFolder/4:Object.Member.Value", browse=true} # ] # ## Bracketed notation @@ -76,6 +82,7 @@ # namespace = "" # identifier_type = "" # identifier = "" + # browse = false # default_tags = { tag1 = "value1", tag2 = "value2" } # # [[inputs.opcua.nodes]] diff --git a/plugins/inputs/opcua_listener/README.md b/plugins/inputs/opcua_listener/README.md index a1738cdf930fe..892f12ba7efe3 100644 --- a/plugins/inputs/opcua_listener/README.md +++ b/plugins/inputs/opcua_listener/README.md @@ -218,6 +218,7 @@ to use them. # nodes = [ # {name="node1", namespace="", identifier_type="", identifier="",} # {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}}, + # {name="node3", namespace="3", identifier_type="s" identifier=3:Folder/4.Object.Member.Field"} #] # ## Bracketed notation diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go index 9aaa44ddcc98c..66c5937a3590d 100644 --- a/plugins/inputs/opcua_listener/opcua_listener_test.go +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -800,6 +800,7 @@ func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) { }) subClient, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + subClient.Connect() require.NoError(t, err) require.Equal(t, &ua.MonitoringParameters{ SamplingInterval: 50, diff --git a/plugins/inputs/opcua_listener/sample.conf b/plugins/inputs/opcua_listener/sample.conf index 65343a7ba3a92..e2728c4b1fd25 100644 --- a/plugins/inputs/opcua_listener/sample.conf +++ b/plugins/inputs/opcua_listener/sample.conf @@ -179,6 +179,7 @@ # nodes = [ # {name="node1", namespace="", identifier_type="", identifier="",} # {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}}, + # {name="node3", namespace="3", identifier_type="s" identifier=3:Folder/4.Object.Member.Field"} #] # ## Bracketed notation diff --git a/plugins/inputs/opcua_listener/subscribe_client.go b/plugins/inputs/opcua_listener/subscribe_client.go index ba64994212fbe..b9123ee22f849 100644 --- a/plugins/inputs/opcua_listener/subscribe_client.go +++ b/plugins/inputs/opcua_listener/subscribe_client.go @@ -87,15 +87,11 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su return nil, err } - if err := client.InitNodeIDs(); err != nil { - return nil, err - } - processingCtx, processingCancel := context.WithCancel(context.Background()) subClient := &SubscribeClient{ OpcUAInputClient: client, Config: *sc, - monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)), + monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeMetricMapping)), // 100 was chosen to make sure that the channels will not block when multiple changes come in at the same time. // The channel size should be increased if reports come in on Telegraf blocking when many changes come in at // the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval. @@ -105,16 +101,6 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su cancel: processingCancel, } - log.Debugf("Creating monitored items") - for i, nodeID := range client.NodeIDs { - // The node id index (i) is used as the handle for the monitored item - req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i)) - if err := assignConfigValuesToRequest(req, &client.NodeMetricMapping[i].Tag.MonitoringParams); err != nil { - return nil, err - } - subClient.monitoredItemsReqs[i] = req - } - return subClient, nil } @@ -124,6 +110,22 @@ func (o *SubscribeClient) Connect() error { return err } + // Make sure we setup the node-ids correctly after reconnect + // as the server might be restarted and IDs changed + o.Log.Debugf("Creating monitored items on first connection") + if err := o.OpcUAInputClient.InitNodeIDs(o.ctx); err != nil { + return err + } + + for i, nodeID := range o.OpcUAInputClient.NodeIDs { + // The node id index (i) is used as the handle for the monitored item + req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i)) + if err := assignConfigValuesToRequest(req, &o.OpcUAInputClient.NodeMetricMapping[i].Tag.MonitoringParams); err != nil { + return err + } + o.monitoredItemsReqs[i] = req + } + o.Log.Debugf("Creating OPC UA subscription") o.sub, err = o.Client.Subscribe(o.ctx, &opcua.SubscriptionParameters{ Interval: time.Duration(o.Config.SubscriptionInterval),