-
Notifications
You must be signed in to change notification settings - Fork 183
/
python.go
180 lines (151 loc) · 5.55 KB
/
python.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Package pyrunner contains a monitor that runs Collectd Python plugins
// directly in a subprocess. It uses the logic in internal/monitors/pyrunner
// to do most of the work of managing a Python subprocess and doing the
// configuration/shutdown calls.
package python
import (
"bytes"
"fmt"
"html/template"
"io"
"strconv"
"github.com/davecgh/go-spew/spew"
"github.com/mailru/easyjson"
"github.com/signalfx/golib/datapoint"
"github.com/signalfx/golib/event"
mpCollectd "github.com/signalfx/metricproxy/protocol/collectd"
"github.com/signalfx/metricproxy/protocol/collectd/format"
"github.com/signalfx/signalfx-agent/internal/core/config"
"github.com/signalfx/signalfx-agent/internal/monitors"
"github.com/signalfx/signalfx-agent/internal/monitors/collectd"
"github.com/signalfx/signalfx-agent/internal/monitors/pyrunner"
"github.com/signalfx/signalfx-agent/internal/monitors/types"
"github.com/signalfx/signalfx-agent/internal/utils/collectdutil"
log "github.com/sirupsen/logrus"
)
const messageTypeValueList pyrunner.MessageType = 100
const monitorType = "collectd/python"
func init() {
monitors.Register(monitorType, func() interface{} {
return &PyMonitor{
MonitorCore: pyrunner.New("sfxcollectd"),
}
}, &Config{})
}
// PyConfig is an interface for passing in Config structs derrived from the Python Config struct
type PyConfig interface {
config.MonitorCustomConfig
PythonConfig() *Config
}
// MONITOR(collectd/python): This monitor runs arbitrary collectd Python
// plugins directly, apart from collectd. It implements a mock collectd Python
// interface that supports most, but not all, of the real collectd.
// Config specifies configurations that are specific to the individual python based monitor
type Config struct {
config.MonitorConfig `yaml:",inline" acceptsEndpoints:"true"`
// Host will be filled in by auto-discovery if this monitor has a discovery
// rule. It can then be used under pluginConfig by the template
// `{{.Host}}`
Host string `yaml:"host"`
// Port will be filled in by auto-discovery if this monitor has a discovery
// rule. It can then be used under pluginConfig by the template
// `{{.Port}}`
Port uint16 `yaml:"port"`
// Corresponds to the ModuleName option in collectd-python
ModuleName string `yaml:"moduleName" json:"moduleName"`
// Corresponds to a set of ModulePath options in collectd-python
ModulePaths []string `yaml:"modulePaths" json:"modulePaths"`
// This is a yaml form of the collectd config.
PluginConfig map[string]interface{} `yaml:"pluginConfig" json:"pluginConfig" neverLog:"true"`
// A set of paths to [types.db files](https://collectd.org/documentation/manpages/types.db.5.shtml)
// that are needed by your plugin. If not specified, the runner will use
// the global collectd types.db file.
TypesDBPaths []string `yaml:"typesDBPaths" json:"typesDBPaths"`
}
// PythonConfig returns the embedded python.CoreConfig struct from the interface
func (c *Config) PythonConfig() *Config {
return c
}
// PyMonitor that runs collectd python plugins directly
type PyMonitor struct {
*pyrunner.MonitorCore
Output types.Output
}
// Configure starts the subprocess and configures the plugin
func (m *PyMonitor) Configure(conf PyConfig) error {
// get the python config from the supplied config
pyconf := conf.PythonConfig()
if len(pyconf.TypesDBPaths) == 0 {
pyconf.TypesDBPaths = append(pyconf.TypesDBPaths, collectd.MakePath("types.db"))
}
for k := range pyconf.PluginConfig {
if v, ok := pyconf.PluginConfig[k].(string); ok {
if v == "" {
continue
}
template, err := template.New("nested").Parse(v)
if err != nil {
m.Logger().WithError(err).Error("Could not parse value '%s' as template", v)
continue
}
out := bytes.Buffer{}
// fill in any templates with the whole config struct passed into this method
err = template.Option("missingkey=error").Execute(&out, conf)
if err != nil {
m.Logger().WithFields(log.Fields{
"template": v,
"error": err,
"context": spew.Sdump(conf),
}).Error("Could not render nested config template")
continue
}
var result interface{} = out.String()
if i, err := strconv.Atoi(result.(string)); err == nil {
result = i
}
pyconf.PluginConfig[k] = result
}
}
return m.MonitorCore.ConfigureInPython(pyconf, func(dataReader pyrunner.MessageReceiver) {
for {
m.Logger().Debug("Waiting for messages")
msgType, payloadReader, err := dataReader.RecvMessage()
m.Logger().Debugf("Got message of type %d", msgType)
// This is usually due to the pipe being closed
if err != nil {
m.Logger().WithError(err).Error("Could not receive messages")
return
}
if m.ShutdownCalled() {
return
}
if err := m.handleMessage(msgType, payloadReader); err != nil {
m.Logger().WithError(err).Error("Could not handle message from Python")
continue
}
}
})
}
func (m *PyMonitor) handleMessage(msgType pyrunner.MessageType, payloadReader io.Reader) error {
switch msgType {
case messageTypeValueList:
var valueList collectdformat.JSONWriteFormat
if err := easyjson.UnmarshalFromReader(payloadReader, &valueList); err != nil {
return err
}
dps := make([]*datapoint.Datapoint, 0)
events := make([]*event.Event, 0)
collectdutil.ConvertWriteFormat((*mpCollectd.JSONWriteFormat)(&valueList), &dps, &events)
for i := range dps {
m.Output.SendDatapoint(dps[i])
}
for i := range events {
m.Output.SendEvent(events[i])
}
case pyrunner.MessageTypeLog:
return m.HandleLogMessage(payloadReader)
default:
return fmt.Errorf("Unknown message type received %d", msgType)
}
return nil
}