/
enterprise.go
224 lines (196 loc) · 5.83 KB
/
enterprise.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package enterprise
import (
"container/ring"
"context"
"net/url"
"strings"
cmp "github.com/snetsystems/cmp/backend"
"github.com/snetsystems/cmp/backend/influx"
)
var _ cmp.TimeSeries = &Client{}
// Ctrl represents administrative controls over an Influx Enterprise cluster
type Ctrl interface {
ShowCluster(ctx context.Context) (*Cluster, error)
Users(ctx context.Context, name *string) (*Users, error)
User(ctx context.Context, name string) (*User, error)
CreateUser(ctx context.Context, name, passwd string) error
DeleteUser(ctx context.Context, name string) error
ChangePassword(ctx context.Context, name, passwd string) error
SetUserPerms(ctx context.Context, name string, perms Permissions) error
UserRoles(ctx context.Context) (map[string]Roles, error)
Roles(ctx context.Context, name *string) (*Roles, error)
Role(ctx context.Context, name string) (*Role, error)
CreateRole(ctx context.Context, name string) error
DeleteRole(ctx context.Context, name string) error
SetRolePerms(ctx context.Context, name string, perms Permissions) error
SetRoleUsers(ctx context.Context, name string, users []string) error
AddRoleUsers(ctx context.Context, name string, users []string) error
RemoveRoleUsers(ctx context.Context, name string, users []string) error
}
// Client is a device for retrieving time series data from an Influx Enterprise
// cluster. It is configured using the addresses of one or more meta node URLs.
// Data node URLs are retrieved automatically from the meta nodes and queries
// are appropriately load balanced across the cluster.
type Client struct {
Ctrl
UsersStore cmp.UsersStore
RolesStore cmp.RolesStore
Logger cmp.Logger
dataNodes *ring.Ring
opened bool
}
// NewClientWithTimeSeries initializes a Client with a known set of TimeSeries.
func NewClientWithTimeSeries(lg cmp.Logger, mu string, authorizer influx.Authorizer, tls, insecure bool, series ...cmp.TimeSeries) (*Client, error) {
metaURL, err := parseMetaURL(mu, tls)
if err != nil {
return nil, err
}
ctrl := NewMetaClient(metaURL, insecure, authorizer)
c := &Client{
Ctrl: ctrl,
UsersStore: &UserStore{
Ctrl: ctrl,
Logger: lg,
},
RolesStore: &RolesStore{
Ctrl: ctrl,
Logger: lg,
},
}
c.dataNodes = ring.New(len(series))
for _, s := range series {
c.dataNodes.Value = s
c.dataNodes = c.dataNodes.Next()
}
return c, nil
}
// NewClientWithURL initializes an Enterprise client with a URL to a Meta Node.
// Acceptable URLs include host:port combinations as well as scheme://host:port
// varieties. TLS is used when the URL contains "https" or when the TLS
// parameter is set. authorizer will add the correct `Authorization` headers
// on the out-bound request.
func NewClientWithURL(mu string, authorizer influx.Authorizer, tls bool, insecure bool, lg cmp.Logger) (*Client, error) {
metaURL, err := parseMetaURL(mu, tls)
if err != nil {
return nil, err
}
ctrl := NewMetaClient(metaURL, insecure, authorizer)
return &Client{
Ctrl: ctrl,
UsersStore: &UserStore{
Ctrl: ctrl,
Logger: lg,
},
RolesStore: &RolesStore{
Ctrl: ctrl,
Logger: lg,
},
Logger: lg,
}, nil
}
// Connect prepares a Client to process queries. It must be called prior to calling Query
func (c *Client) Connect(ctx context.Context, src *cmp.Source) error {
c.opened = true
// return early if we already have dataNodes
if c.dataNodes != nil {
return nil
}
cluster, err := c.Ctrl.ShowCluster(ctx)
if err != nil {
return err
}
c.dataNodes = ring.New(len(cluster.DataNodes))
for _, dn := range cluster.DataNodes {
cl := &influx.Client{
Logger: c.Logger,
}
dataSrc := &cmp.Source{}
*dataSrc = *src
dataSrc.URL = dn.HTTPAddr
if err := cl.Connect(ctx, dataSrc); err != nil {
continue
}
c.dataNodes.Value = cl
c.dataNodes = c.dataNodes.Next()
}
return nil
}
// Query retrieves timeseries information pertaining to a specified query. It
// can be cancelled by using a provided context.
func (c *Client) Query(ctx context.Context, q cmp.Query) (cmp.Response, error) {
if !c.opened {
return nil, cmp.ErrUninitialized
}
return c.nextDataNode().Query(ctx, q)
}
// Write records points into a time series
func (c *Client) Write(ctx context.Context, points []cmp.Point) error {
if !c.opened {
return cmp.ErrUninitialized
}
return c.nextDataNode().Write(ctx, points)
}
// Users is the interface to the users within Influx Enterprise
func (c *Client) Users(context.Context) cmp.UsersStore {
return c.UsersStore
}
// Roles provide a grouping of permissions given to a grouping of users
func (c *Client) Roles(ctx context.Context) (cmp.RolesStore, error) {
return c.RolesStore, nil
}
// Permissions returns all Influx Enterprise permission strings
func (c *Client) Permissions(context.Context) cmp.Permissions {
all := cmp.Allowances{
"NoPermissions",
"ViewAdmin",
"ViewCMP",
"CreateDatabase",
"CreateUserAndRole",
"AddRemoveNode",
"DropDatabase",
"DropData",
"ReadData",
"WriteData",
"Rebalance",
"ManageShard",
"ManageContinuousQuery",
"ManageQuery",
"ManageSubscription",
"Monitor",
"CopyShard",
"KapacitorAPI",
"KapacitorConfigAPI",
}
return cmp.Permissions{
{
Scope: cmp.AllScope,
Allowed: all,
},
{
Scope: cmp.DBScope,
Allowed: all,
},
}
}
// nextDataNode retrieves the next available data node
func (c *Client) nextDataNode() cmp.TimeSeries {
c.dataNodes = c.dataNodes.Next()
return c.dataNodes.Value.(cmp.TimeSeries)
}
// parseMetaURL constructs a url from either a host:port combination or a
// scheme://host:port combo. The optional TLS parameter takes precedence over
// any TLS preference found in the provided URL
func parseMetaURL(mu string, tls bool) (metaURL *url.URL, err error) {
if strings.Contains(mu, "http") {
metaURL, err = url.Parse(mu)
} else {
metaURL = &url.URL{
Scheme: "http",
Host: mu,
}
}
if tls {
metaURL.Scheme = "https"
}
return
}