forked from grafana/grafana-plugin-sdk-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
serve.go
310 lines (260 loc) · 10.1 KB
/
serve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package backend
import (
"context"
"errors"
"fmt"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
"github.com/kosimas/grafana-plugin-sdk-go/backend/grpcplugin"
"github.com/kosimas/grafana-plugin-sdk-go/backend/log"
"github.com/kosimas/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/kosimas/grafana-plugin-sdk-go/internal/standalone"
"github.com/kosimas/grafana-plugin-sdk-go/internal/tracerprovider"
)
const defaultServerMaxReceiveMessageSize = 1024 * 1024 * 16
// GRPCSettings settings for gRPC.
type GRPCSettings struct {
// MaxReceiveMsgSize the max gRPC message size in bytes the plugin can receive.
// If this is <= 0, gRPC uses the default 16MB.
MaxReceiveMsgSize int
// MaxSendMsgSize the max gRPC message size in bytes the plugin can send.
// If this is <= 0, gRPC uses the default `math.MaxInt32`.
MaxSendMsgSize int
}
// ServeOpts options for serving plugins.
type ServeOpts struct {
// CheckHealthHandler handler for health checks.
CheckHealthHandler CheckHealthHandler
// CallResourceHandler handler for resource calls.
// Optional to implement.
CallResourceHandler CallResourceHandler
// QueryDataHandler handler for data queries.
// Required to implement if data source.
QueryDataHandler QueryDataHandler
// StreamHandler handler for streaming queries.
// This is EXPERIMENTAL and is a subject to change till Grafana 8.
StreamHandler StreamHandler
// GRPCSettings settings for gPRC.
GRPCSettings GRPCSettings
}
func asGRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts {
pluginOpts := grpcplugin.ServeOpts{
DiagnosticsServer: newDiagnosticsSDKAdapter(prometheus.DefaultGatherer, opts.CheckHealthHandler),
}
if opts.CallResourceHandler != nil {
pluginOpts.ResourceServer = newResourceSDKAdapter(opts.CallResourceHandler)
}
if opts.QueryDataHandler != nil {
pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler)
}
if opts.StreamHandler != nil {
pluginOpts.StreamServer = newStreamSDKAdapter(opts.StreamHandler)
}
return pluginOpts
}
func defaultGRPCMiddlewares(opts ServeOpts) []grpc.ServerOption {
if opts.GRPCSettings.MaxReceiveMsgSize <= 0 {
opts.GRPCSettings.MaxReceiveMsgSize = defaultServerMaxReceiveMessageSize
}
grpcMiddlewares := []grpc.ServerOption{
grpc.MaxRecvMsgSize(opts.GRPCSettings.MaxReceiveMsgSize),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor,
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
)),
}
if opts.GRPCSettings.MaxSendMsgSize > 0 {
grpcMiddlewares = append([]grpc.ServerOption{grpc.MaxSendMsgSize(opts.GRPCSettings.MaxSendMsgSize)}, grpcMiddlewares...)
}
return grpcMiddlewares
}
// Serve starts serving the plugin over gRPC.
func Serve(opts ServeOpts) error {
grpc_prometheus.EnableHandlingTimeHistogram()
pluginOpts := asGRPCServeOpts(opts)
pluginOpts.GRPCServer = func(grpcOptions []grpc.ServerOption) *grpc.Server {
return grpc.NewServer(append(defaultGRPCMiddlewares(opts), grpcOptions...)...)
}
return grpcplugin.Serve(pluginOpts)
}
// GracefulStandaloneServe starts a gRPC server that is not managed by hashicorp.
// The provided standalone.Args must have an Address set, or the function returns an error.
// The function handles creating/cleaning up the standalone address file, and graceful GRPC server termination.
// The function returns after the GRPC server has been terminated.
func GracefulStandaloneServe(dsopts ServeOpts, info standalone.ServerSettings) error {
// We must have an address if we want to run the plugin in standalone mode
if info.Address == "" {
return errors.New("standalone address must be specified")
}
if info.Dir == "" {
return errors.New("directory must be specified")
}
// Write the address and PID to local files
log.DefaultLogger.Info("Creating standalone address and pid files", "dir", info.Dir)
if err := standalone.CreateStandaloneAddressFile(info.Address, info.Dir); err != nil {
return fmt.Errorf("create standalone address file: %w", err)
}
if err := standalone.CreateStandalonePIDFile(os.Getpid(), info.Dir); err != nil {
return fmt.Errorf("create standalone pid file: %w", err)
}
// sadly vs-code can not listen to shutdown events
// https://github.com/golang/vscode-go/issues/120
// Cleanup function that deletes standalone.txt and pid.txt, if it exists. Fails silently.
// This is so the address file is deleted when the plugin shuts down gracefully, if possible.
defer func() {
log.DefaultLogger.Info("Cleaning up standalone address and pid files")
if err := standalone.CleanupStandaloneAddressFile(info); err != nil {
log.DefaultLogger.Error("Error while cleaning up standalone address file", "error", err)
}
if err := standalone.CleanupStandalonePIDFile(info); err != nil {
log.DefaultLogger.Error("Error while cleaning up standalone pid file", "error", err)
}
// Kill the dummy locator so Grafana reloads the plugin
standalone.FindAndKillCurrentPlugin(info.Dir)
}()
// When debugging, be sure to kill the running instances, so that we can reconnect
standalone.FindAndKillCurrentPlugin(info.Dir)
// Start GRPC server
pluginOpts := asGRPCServeOpts(dsopts)
if pluginOpts.GRPCServer == nil {
pluginOpts.GRPCServer = func(grpcOptions []grpc.ServerOption) *grpc.Server {
return grpc.NewServer(append(defaultGRPCMiddlewares(dsopts), grpcOptions...)...)
}
}
server := pluginOpts.GRPCServer(nil)
var plugKeys []string
if pluginOpts.DiagnosticsServer != nil {
pluginv2.RegisterDiagnosticsServer(server, pluginOpts.DiagnosticsServer)
plugKeys = append(plugKeys, "diagnostics")
}
if pluginOpts.ResourceServer != nil {
pluginv2.RegisterResourceServer(server, pluginOpts.ResourceServer)
plugKeys = append(plugKeys, "resources")
}
if pluginOpts.DataServer != nil {
pluginv2.RegisterDataServer(server, pluginOpts.DataServer)
plugKeys = append(plugKeys, "data")
}
if pluginOpts.StreamServer != nil {
pluginv2.RegisterStreamServer(server, pluginOpts.StreamServer)
plugKeys = append(plugKeys, "stream")
}
// Start the GRPC server and handle graceful shutdown to ensure we execute deferred functions correctly
log.DefaultLogger.Debug("Standalone plugin server", "capabilities", plugKeys)
listener, err := net.Listen("tcp", info.Address)
if err != nil {
return err
}
signalChan := make(chan os.Signal, 1)
serverErrChan := make(chan error, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
// Unregister signal handlers before returning
defer signal.Stop(signalChan)
// Start GRPC server in a separate goroutine
go func() {
serverErrChan <- server.Serve(listener)
}()
// Block until signal or GRPC server termination
select {
case <-signalChan:
// Signal received, stop the server
server.Stop()
if err := <-serverErrChan; err != nil {
// Bubble up error
return err
}
case err := <-serverErrChan:
// Server stopped prematurely, bubble up the error
return err
}
log.DefaultLogger.Debug("Plugin server exited")
return nil
}
// Manage runs the plugin in either standalone mode, dummy locator or normal (hashicorp) mode.
func Manage(pluginID string, serveOpts ServeOpts) error {
defer func() {
tp, ok := otel.GetTracerProvider().(tracerprovider.TracerProvider)
if !ok {
return
}
Logger.Debug("Closing tracing")
ctx, canc := context.WithTimeout(context.Background(), time.Second*5)
defer canc()
if err := tp.Shutdown(ctx); err != nil {
Logger.Error("error while shutting down tracer", "error", err)
}
}()
if s, enabled := standalone.ServerModeEnabled(pluginID); enabled {
// Run the standalone GRPC server
return GracefulStandaloneServe(serveOpts, s)
}
if s, enabled := standalone.ClientModeEnabled(pluginID); enabled {
// Grafana is trying to run the dummy plugin locator to connect to the standalone GRPC server (separate process)
Logger.Debug("Running dummy plugin locator", "addr", s.TargetAddress, "pid", strconv.Itoa(s.TargetPID))
standalone.RunDummyPluginLocator(s.TargetAddress)
return nil
}
// The default/normal hashicorp path.
return Serve(serveOpts)
}
// TestStandaloneServe starts a gRPC server that is not managed by hashicorp.
// The function returns the gRPC server which should be closed by the consumer.
func TestStandaloneServe(opts ServeOpts, address string) (*grpc.Server, error) {
pluginOpts := asGRPCServeOpts(opts)
if pluginOpts.GRPCServer == nil {
pluginOpts.GRPCServer = func(grpcOptions []grpc.ServerOption) *grpc.Server {
return grpc.NewServer(append(defaultGRPCMiddlewares(opts), grpcOptions...)...)
}
}
server := pluginOpts.GRPCServer(nil)
var plugKeys []string
if pluginOpts.DiagnosticsServer != nil {
pluginv2.RegisterDiagnosticsServer(server, pluginOpts.DiagnosticsServer)
plugKeys = append(plugKeys, "diagnostics")
}
if pluginOpts.ResourceServer != nil {
pluginv2.RegisterResourceServer(server, pluginOpts.ResourceServer)
plugKeys = append(plugKeys, "resources")
}
if pluginOpts.DataServer != nil {
pluginv2.RegisterDataServer(server, pluginOpts.DataServer)
plugKeys = append(plugKeys, "data")
}
if pluginOpts.StreamServer != nil {
pluginv2.RegisterStreamServer(server, pluginOpts.StreamServer)
plugKeys = append(plugKeys, "stream")
}
// Start the GRPC server and handle graceful shutdown to ensure we execute deferred functions correctly
log.DefaultLogger.Info("Standalone plugin server", "capabilities", plugKeys)
listener, err := net.Listen("tcp", address)
if err != nil {
return nil, err
}
serverErrChan := make(chan error, 1)
// Start GRPC server in a separate goroutine
go func() {
serverErrChan <- server.Serve(listener)
}()
// Wait until signal or GRPC server termination in a separate goroutine
go func() {
err := <-serverErrChan
if err != nil {
log.DefaultLogger.Error("Server experienced an error", "error", err)
}
}()
return server, nil
}