-
Notifications
You must be signed in to change notification settings - Fork 16
/
pluginServer.go
145 lines (130 loc) · 4.68 KB
/
pluginServer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package grpc
import (
"fmt"
"github.com/hashicorp/go-plugin"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v4/grpc/proto"
pluginshared "github.com/turbot/steampipe-plugin-sdk/v4/grpc/shared"
"github.com/turbot/steampipe-plugin-sdk/v4/version"
)
type PluginSchema struct {
Schema map[string]*proto.TableSchema
Mode string
}
type ExecuteFunc func(req *proto.ExecuteRequest, stream proto.WrapperPlugin_ExecuteServer) error
type GetSchemaFunc func(string) (*PluginSchema, error)
type SetConnectionConfigFunc func(string, string) error
type SetAllConnectionConfigsFunc func([]*proto.ConnectionConfig, int) error
type UpdateConnectionConfigsFunc func([]*proto.ConnectionConfig, []*proto.ConnectionConfig, []*proto.ConnectionConfig) error
// PluginServer is the server for a single plugin
type PluginServer struct {
proto.UnimplementedWrapperPluginServer
pluginName string
executeFunc ExecuteFunc
setConnectionConfigFunc SetConnectionConfigFunc
setAllConnectionConfigsFunc SetAllConnectionConfigsFunc
updateConnectionConfigsFunc UpdateConnectionConfigsFunc
getSchemaFunc GetSchemaFunc
}
func NewPluginServer(pluginName string,
setConnectionConfigFunc SetConnectionConfigFunc,
setAllConnectionConfigsFunc SetAllConnectionConfigsFunc,
updateConnectionConfigsFunc UpdateConnectionConfigsFunc,
getSchemaFunc GetSchemaFunc,
executeFunc ExecuteFunc) *PluginServer {
return &PluginServer{
pluginName: pluginName,
executeFunc: executeFunc,
setConnectionConfigFunc: setConnectionConfigFunc,
setAllConnectionConfigsFunc: setAllConnectionConfigsFunc,
updateConnectionConfigsFunc: updateConnectionConfigsFunc,
getSchemaFunc: getSchemaFunc,
}
}
func (s PluginServer) GetSchema(req *proto.GetSchemaRequest) (res *proto.GetSchemaResponse, err error) {
defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
schema, err := s.getSchemaFunc(req.Connection)
if err != nil {
return nil, err
}
return &proto.GetSchemaResponse{
Schema: &proto.Schema{
Schema: schema.Schema,
Mode: schema.Mode,
SdkVersion: version.String(),
ProtocolVersion: version.ProtocolVersion,
},
}, err
}
func (s PluginServer) Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_ExecuteServer) (err error) {
defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
// NOTE: Compatibility
// if a pre-v16 version of Steampipe is being used,
// the deprecated Connection, CacheEnabled and CacheTtl will be set, and ExecuteConnectionData will be nil
// populate ExecuteConnectionData
if req.ExecuteConnectionData == nil {
if req.Connection == "" {
return fmt.Errorf("either ExecuteConnectionData or Connection must be provided")
}
req.ExecuteConnectionData = map[string]*proto.ExecuteConnectionData{
req.Connection: {
Limit: req.QueryContext.Limit,
CacheEnabled: req.CacheEnabled,
CacheTtl: req.CacheTtl,
},
}
}
return s.executeFunc(req, stream)
}
func (s PluginServer) SetConnectionConfig(req *proto.SetConnectionConfigRequest) (res *proto.SetConnectionConfigResponse, err error) {
defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
err = s.setConnectionConfigFunc(req.ConnectionName, req.ConnectionConfig)
return &proto.SetConnectionConfigResponse{}, err
}
func (s PluginServer) SetAllConnectionConfigs(req *proto.SetAllConnectionConfigsRequest) (res *proto.SetConnectionConfigResponse, err error) {
defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
err = s.setAllConnectionConfigsFunc(req.Configs, int(req.MaxCacheSizeMb))
return &proto.SetConnectionConfigResponse{}, err
}
func (s PluginServer) UpdateConnectionConfigs(req *proto.UpdateConnectionConfigsRequest) (res *proto.UpdateConnectionConfigsResponse, err error) {
defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
err = s.updateConnectionConfigsFunc(req.Added, req.Deleted, req.Changed)
return &proto.UpdateConnectionConfigsResponse{}, err
}
func (s PluginServer) GetSupportedOperations(*proto.GetSupportedOperationsRequest) (*proto.GetSupportedOperationsResponse, error) {
return &proto.GetSupportedOperationsResponse{
QueryCache: true,
MultipleConnections: true,
}, nil
}
func (s PluginServer) Serve() {
pluginMap := map[string]plugin.Plugin{
s.pluginName: &pluginshared.WrapperPlugin{Impl: s},
}
plugin.Serve(&plugin.ServeConfig{
Plugins: pluginMap,
GRPCServer: plugin.DefaultGRPCServer,
// A non-nil value here enables gRPC serving for this plugin...
HandshakeConfig: pluginshared.Handshake,
})
}