-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
azure_storage_queue.go
126 lines (106 loc) · 3.66 KB
/
azure_storage_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//go:generate ../../../tools/readme_config_includer/generator
package azure_storage_queue
import (
"context"
_ "embed"
"errors"
"net/url"
"strings"
"time"
"github.com/Azure/azure-storage-queue-go/azqueue"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
//go:embed sample.conf
var sampleConfig string
type AzureStorageQueue struct {
StorageAccountName string `toml:"account_name"`
StorageAccountKey string `toml:"account_key"`
PeekOldestMessageAge bool `toml:"peek_oldest_message_age"`
Log telegraf.Logger
serviceURL *azqueue.ServiceURL
}
func (*AzureStorageQueue) SampleConfig() string {
return sampleConfig
}
func (a *AzureStorageQueue) Init() error {
if a.StorageAccountName == "" {
return errors.New("account_name must be configured")
}
if a.StorageAccountKey == "" {
return errors.New("account_key must be configured")
}
return nil
}
func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) {
if a.serviceURL == nil {
_url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net")
if err != nil {
return azqueue.ServiceURL{}, err
}
credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
if err != nil {
return azqueue.ServiceURL{}, err
}
pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{})
serviceURL := azqueue.NewServiceURL(*_url, pipeline)
a.serviceURL = &serviceURL
}
return *a.serviceURL, nil
}
func (a *AzureStorageQueue) GatherQueueMetrics(acc telegraf.Accumulator, queueItem azqueue.QueueItem, properties *azqueue.QueueGetPropertiesResponse, peekedMessage *azqueue.PeekedMessage) {
fields := make(map[string]interface{})
tags := make(map[string]string)
tags["queue"] = strings.TrimSpace(queueItem.Name)
tags["account"] = a.StorageAccountName
fields["size"] = properties.ApproximateMessagesCount()
if peekedMessage != nil {
fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano()
}
acc.AddFields("azure_storage_queues", fields, tags)
}
func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error {
serviceURL, err := a.GetServiceURL()
if err != nil {
return err
}
ctx := context.TODO()
for marker := (azqueue.Marker{}); marker.NotDone(); {
a.Log.Debugf("Listing queues of storage account '%s'", a.StorageAccountName)
queuesSegment, err := serviceURL.ListQueuesSegment(ctx, marker,
azqueue.ListQueuesSegmentOptions{
Detail: azqueue.ListQueuesSegmentDetails{Metadata: false},
})
if err != nil {
return err
}
marker = queuesSegment.NextMarker
for _, queueItem := range queuesSegment.QueueItems {
a.Log.Debugf("Processing queue '%s' of storage account '%s'", queueItem.Name, a.StorageAccountName)
queueURL := serviceURL.NewQueueURL(queueItem.Name)
properties, err := queueURL.GetProperties(ctx)
if err != nil {
a.Log.Errorf("Error getting properties for queue %s: %s", queueItem.Name, err.Error())
continue
}
var peekedMessage *azqueue.PeekedMessage
if a.PeekOldestMessageAge {
messagesURL := queueURL.NewMessagesURL()
messagesResponse, err := messagesURL.Peek(ctx, 1)
if err != nil {
a.Log.Errorf("Error peeking queue %s: %s", queueItem.Name, err.Error())
} else if messagesResponse.NumMessages() > 0 {
peekedMessage = messagesResponse.Message(0)
}
}
a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage)
}
}
return nil
}
func init() {
inputs.Add("azure_storage_queue", func() telegraf.Input {
return &AzureStorageQueue{PeekOldestMessageAge: true}
})
}