forked from Staples-Inc/snap-plugin-publisher-blueflood
-
Notifications
You must be signed in to change notification settings - Fork 0
/
blueflood.go
executable file
·183 lines (148 loc) · 5.53 KB
/
blueflood.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
/*
Copyright 2016 Staples, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package blueflood
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
log "github.com/Sirupsen/logrus"
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core/ctypes"
)
type ingestMetric struct {
CollectionTime int64 `json:"collectionTime"`
TTLInSeconds int `json:"ttlInSeconds"`
MetricValue interface{} `json:"metricValue"`
MetricName string `json:"metricName"`
}
const (
pluginName = "blueflood"
pluginVersion = 1
pluginType = plugin.PublisherPluginType
)
// Meta information about this plugin
func Meta() *plugin.PluginMeta {
return plugin.NewPluginMeta(pluginName, pluginVersion, pluginType, []string{plugin.SnapGOBContentType}, []string{plugin.SnapGOBContentType})
}
// BluefloodPublisher allows for publishing metrics to blueflood
type BluefloodPublisher struct{}
// NewBluefloodPublisher returns a blueflood publisher
func NewBluefloodPublisher() *BluefloodPublisher {
return &BluefloodPublisher{}
}
func publishMetrics(data []ingestMetric, server string, timeout int, logger *log.Logger) {
jsonData, err := json.Marshal(data)
if err != nil {
logger.Printf("Error marshalling json data: %s\n", err.Error())
return
}
buff := bytes.NewReader(jsonData)
httptimeout := time.Duration(timeout) * time.Second
client := &http.Client{Timeout: httptimeout} //Add configurable connect timeout
req, err := http.NewRequest("POST", server, buff)
if err != nil {
logger.Printf("Error creating Ingest POST request, error: %s\n", err.Error())
return
}
req.Header.Set("Content-Type", "application/json")
response, err := client.Do(req)
if err != nil {
logger.Printf("Error performing request, err: %v", response)
return
}
if response.StatusCode != 200 {
logger.Printf("Error: Metrics request not ingested, status: %v", response.StatusCode)
return
}
return
}
// Publish metrics to the configured blueflood server at the specified address
func (b *BluefloodPublisher) Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error {
logger := log.New()
var metrics []plugin.MetricType
switch contentType {
case plugin.SnapGOBContentType:
dec := gob.NewDecoder(bytes.NewBuffer(content))
if err := dec.Decode(&metrics); err != nil {
logger.Printf("Error decoding GOB: error=%v content=%v", err, content)
return err
}
default:
logger.Printf("Error unknown content type '%v'", contentType)
return fmt.Errorf("Unknown content type '%s'", contentType)
}
server := config["server"].(ctypes.ConfigValueStr).Value
rollUpNum := config["rollupNum"].(ctypes.ConfigValueInt).Value
ttlInSeconds := config["ttlInSeconds"].(ctypes.ConfigValueInt).Value
timeout := config["timeout"].(ctypes.ConfigValueInt).Value
data := []ingestMetric{}
for _, m := range metrics {
if m.Namespace().String() == "" { //Ensure empty namespaces are not sent to blueflood
continue
}
switch v := m.Data().(type) {
case float32, float64, int, int32, int64, uint32, uint64:
data = append(data, ingestMetric{MetricName: m.Namespace().Key(), MetricValue: m.Data(), TTLInSeconds: ttlInSeconds, CollectionTime: time.Now().Unix() * 1000})
case string:
d, ok := strconv.ParseFloat(m.Data().(string), 64)
if ok == nil {
data = append(data, ingestMetric{MetricName: m.Namespace().Key(), MetricValue: d, TTLInSeconds: ttlInSeconds, CollectionTime: time.Now().Unix() * 1000})
} else {
logger.Infof("String '%v' was not able to be parsed into a numeric", m.Data())
}
default:
logger.Warningf("Unknown data received for metric '%v': Type %T", m.Namespace(), v)
}
if len(data) == rollUpNum {
go publishMetrics(data, server, timeout, logger)
data = []ingestMetric{}
}
}
if len(data) > 0 {
go publishMetrics(data, server, timeout, logger)
}
return nil
}
func handleConfigErr(e error) {
if e != nil {
log.Panicf("Error: Config Policy not set correctly: %v", e)
}
}
// GetConfigPolicy gathers configurations for the blueflood publisher
func (b *BluefloodPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
cp := cpolicy.New()
config := cpolicy.NewPolicyNode()
serverName, err := cpolicy.NewStringRule("server", true)
handleConfigErr(err)
serverName.Description = "Blueflood host address"
rollUpNum, err := cpolicy.NewIntegerRule("rollupNum", false, 20)
handleConfigErr(err)
rollUpNum.Description = "Configurable value to break up blueflood ingest requests into chunks of metrics"
ttlInSeconds, err := cpolicy.NewIntegerRule("ttlInSeconds", false, 172800)
handleConfigErr(err)
ttlInSeconds.Description = "Blueflood ingest setting for number of seconds before data expires in blueflood ingest"
timeoutVal, err := cpolicy.NewIntegerRule("timeout", false, 0)
handleConfigErr(err)
timeoutVal.Description = "Number of seconds to timeout out requests to the blueflood server"
config.Add(serverName)
config.Add(rollUpNum)
config.Add(ttlInSeconds)
config.Add(timeoutVal)
cp.Add([]string{""}, config)
return cp, nil
}