forked from CrunchyData/postgres-operator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
common.go
410 lines (344 loc) · 14 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package operator
/*
Copyright 2017 - 2021 Crunchy Data Solutions, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"bytes"
"encoding/json"
"os"
"strings"
"github.com/crunchydata/postgres-operator/internal/config"
"github.com/crunchydata/postgres-operator/internal/ns"
crv1 "github.com/crunchydata/postgres-operator/pkg/apis/crunchydata.com/v1"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
const (
// defaultRegistry is the default registry to pull the container images from
defaultRegistry = "registry.developers.crunchydata.com/crunchydata"
)
var CRUNCHY_DEBUG bool
var NAMESPACE string
var InstallationName string
var PgoNamespace string
var EventTCPAddress = "localhost:4150"
var Pgo config.PgoConfig
// ContainerImageOverrides contains a list of container images that are
// overridden by the RELATED_IMAGE_* environmental variables that can be set by
// people deploying the Operator
var ContainerImageOverrides = map[string]string{}
// NamespaceOperatingMode defines the namespace operating mode for the cluster,
// e.g. "dynamic", "readonly" or "disabled". See type NamespaceOperatingMode
// for detailed explanations of each mode available.
var namespaceOperatingMode ns.NamespaceOperatingMode
type containerResourcesTemplateFields struct {
// LimitsMemory and LimitsCPU detemrine the memory/CPU limits
LimitsMemory, LimitsCPU string
// RequestsMemory and RequestsCPU determine how much memory/CPU resources to
// request
RequestsMemory, RequestsCPU string
}
func Initialize(clientset kubernetes.Interface) {
tmp := os.Getenv("CRUNCHY_DEBUG")
if tmp == "true" {
CRUNCHY_DEBUG = true
log.Debug("CRUNCHY_DEBUG flag set to true")
} else {
CRUNCHY_DEBUG = false
log.Info("CRUNCHY_DEBUG flag set to false")
}
NAMESPACE = os.Getenv("NAMESPACE")
log.Infof("NAMESPACE %s", NAMESPACE)
InstallationName = os.Getenv("PGO_INSTALLATION_NAME")
log.Infof("InstallationName %s", InstallationName)
if InstallationName == "" {
log.Error("PGO_INSTALLATION_NAME env var is required")
os.Exit(2)
}
PgoNamespace = os.Getenv("PGO_OPERATOR_NAMESPACE")
if PgoNamespace == "" {
log.Error("PGO_OPERATOR_NAMESPACE environment variable is not set and is required, this is the namespace that the Operator is to run within.")
os.Exit(2)
}
var err error
err = Pgo.GetConfig(clientset, PgoNamespace)
if err != nil {
log.Error(err)
log.Error("pgo-config files and templates did not load")
os.Exit(2)
}
log.Printf("PrimaryStorage=%v\n", Pgo.Storage["storage1"])
if Pgo.Cluster.CCPImagePrefix == "" {
log.Debugf("pgo.yaml CCPImagePrefix not set, using default %q", defaultRegistry)
Pgo.Cluster.CCPImagePrefix = defaultRegistry
} else {
log.Debugf("pgo.yaml CCPImagePrefix set, using %s", Pgo.Cluster.CCPImagePrefix)
}
if Pgo.Pgo.PGOImagePrefix == "" {
log.Debugf("pgo.yaml PGOImagePrefix not set, using default %q", defaultRegistry)
Pgo.Pgo.PGOImagePrefix = defaultRegistry
} else {
log.Debugf("PGOImagePrefix set, using %s", Pgo.Pgo.PGOImagePrefix)
}
if Pgo.Cluster.PgmonitorPassword == "" {
log.Debug("pgo.yaml PgmonitorPassword not set, using default")
Pgo.Cluster.PgmonitorPassword = "password"
}
// In a RELATED_IMAGE_* world, this does not _need_ to be set, but our
// installer does set it up so we could be ok...
if Pgo.Pgo.PGOImageTag == "" {
log.Error("pgo.yaml PGOImageTag not set, required ")
os.Exit(2)
}
// initialize any container image overrides that are set by the "RELATED_*"
// variables
initializeContainerImageOverrides()
tmp = os.Getenv("EVENT_TCP_ADDRESS")
if tmp != "" {
EventTCPAddress = tmp
}
log.Info("EventTCPAddress set to " + EventTCPAddress)
// set controller refresh intervals and worker counts
initializeControllerRefreshIntervals()
initializeControllerWorkerCounts()
}
// GetPodSecurityContext will generate the security context required for a
// Deployment by incorporating the standard fsGroup for the user that runs the
// container (typically the "postgres" user), and adds any supplemental groups
// that may need to be added, e.g. for NFS storage.
//
// Following the legacy method, this returns a JSON string, which will be
// modified in the future. Mainly this is transitioning from the legacy function
// by adding the expected types
func GetPodSecurityContext(supplementalGroups []int64) string {
// set up the security context struct
securityContext := v1.PodSecurityContext{
// add any supplemental groups that the user passed in
SupplementalGroups: supplementalGroups,
}
// determine if we should use the PostgreSQL FSGroup.
if !Pgo.Cluster.DisableFSGroup {
// we store the PostgreSQL FSGroup in this constant as an int64, so it's
// just carried over
securityContext.FSGroup = &crv1.PGFSGroup
}
// ...convert to JSON. Errors are ignored
doc, err := json.Marshal(securityContext)
// if there happens to be an error, warn about it
if err != nil {
log.Warn(err)
}
// for debug purposes, we can look at the document
log.Debug(doc)
// return a string of the security context
return string(doc)
}
// GetResourcesJSON is a pseudo-legacy method that creates JSON that applies the
// CPU and Memory settings. The settings are only included if:
// a) they exist
// b) they are nonzero
func GetResourcesJSON(resources, limits v1.ResourceList) string {
fields := containerResourcesTemplateFields{}
// first, if the contents of the resources list happen to be nil, exit out
if resources == nil && limits == nil {
return ""
}
if resources != nil {
if resources.Cpu() != nil && !resources.Cpu().IsZero() {
fields.RequestsCPU = resources.Cpu().String()
}
if resources.Memory() != nil && !resources.Memory().IsZero() {
fields.RequestsMemory = resources.Memory().String()
}
}
if limits != nil {
if limits.Cpu() != nil && !limits.Cpu().IsZero() {
fields.LimitsCPU = limits.Cpu().String()
}
if limits.Memory() != nil && !limits.Memory().IsZero() {
fields.LimitsMemory = limits.Memory().String()
}
}
doc := bytes.Buffer{}
if err := config.ContainerResourcesTemplate.Execute(&doc, fields); err != nil {
log.Error(err)
return ""
}
if log.GetLevel() == log.DebugLevel {
config.ContainerResourcesTemplate.Execute(os.Stdout, fields)
}
return doc.String()
}
// GetRepoType returns the proper repo type to set in container based on the
// backrest storage type provided
func GetRepoType(backrestStorageType string) string {
if backrestStorageType != "" && backrestStorageType == "s3" {
return "s3"
} else {
return "posix"
}
}
// IsLocalAndS3Storage a boolean indicating whether or not local and s3 storage should
// be enabled for pgBackRest based on the backrestStorageType string provided
func IsLocalAndS3Storage(backrestStorageType string) bool {
if backrestStorageType != "" && strings.Contains(backrestStorageType, "s3") &&
strings.Contains(backrestStorageType, "local") {
return true
}
return false
}
// SetContainerImageOverride determines if there is an override available for
// a container image, and sets said value on the Kubernetes Container image
// definition
func SetContainerImageOverride(containerImageName string, container *v1.Container) {
// if a container image name override is available, set it!
overrideImageName := ContainerImageOverrides[containerImageName]
if overrideImageName != "" {
log.Debugf("overriding image %s with %s", containerImageName, overrideImageName)
container.Image = overrideImageName
}
}
// initializeContainerImageOverrides initializes the container image overrides
// that could be set if there are any `RELATED_IMAGE_*` environmental variables
func initializeContainerImageOverrides() {
// the easiest way to handle this is to iterate over the RelatedImageMap,
// check if said image exist in the environmental variable, and if it does
// load it in as an override. Otherwise, ignore.
for relatedImageEnvVar, imageName := range config.RelatedImageMap {
// see if the envirionmental variable overrides the image name or not
overrideImageName := os.Getenv(relatedImageEnvVar)
// if it is overridden, set the image name the map
if overrideImageName != "" {
ContainerImageOverrides[imageName] = overrideImageName
log.Infof("image %s overridden by: %s", imageName, overrideImageName)
}
}
}
// initControllerRefreshIntervals initializes the refresh intervals for any informers
// created by the Operator requiring a refresh interval. This includes first attempting
// to utilize the refresh interval(s) defined in the pgo.yaml config file, and if not
// present then falling back to a default value.
func initializeControllerRefreshIntervals() {
// set the namespace controller refresh interval if not provided in the pgo.yaml
if Pgo.Pgo.NamespaceRefreshInterval == nil {
log.Debugf("NamespaceRefreshInterval not set, defaulting to %d seconds",
config.DefaultNamespaceRefreshInterval)
defaultVal := int(config.DefaultNamespaceRefreshInterval)
Pgo.Pgo.NamespaceRefreshInterval = &defaultVal
} else {
log.Debugf("NamespaceRefreshInterval is set, using %d seconds",
*Pgo.Pgo.NamespaceRefreshInterval)
}
// set the default controller group refresh interval if not provided in the pgo.yaml
if Pgo.Pgo.ControllerGroupRefreshInterval == nil {
log.Debugf("ControllerGroupRefreshInterval not set, defaulting to %d seconds",
config.DefaultControllerGroupRefreshInterval)
defaultVal := int(config.DefaultControllerGroupRefreshInterval)
Pgo.Pgo.ControllerGroupRefreshInterval = &defaultVal
} else {
log.Debugf("ControllerGroupRefreshInterval is set, using %d seconds",
*Pgo.Pgo.ControllerGroupRefreshInterval)
}
}
// initControllerWorkerCounts sets the number of workers that will be created for any worker
// queues created within the various controllers created by the Operator. This includes first
// attempting to utilize the worker counts defined in the pgo.yaml config file, and if not
// present then falling back to a default value.
func initializeControllerWorkerCounts() {
if Pgo.Pgo.ConfigMapWorkerCount == nil {
log.Debugf("ConfigMapWorkerCount not set, defaulting to %d worker(s)",
config.DefaultConfigMapWorkerCount)
defaultVal := int(config.DefaultConfigMapWorkerCount)
Pgo.Pgo.ConfigMapWorkerCount = &defaultVal
} else {
log.Debugf("ConfigMapWorkerCount is set, using %d worker(s)",
*Pgo.Pgo.ConfigMapWorkerCount)
}
if Pgo.Pgo.NamespaceWorkerCount == nil {
log.Debugf("NamespaceWorkerCount not set, defaulting to %d worker(s)",
config.DefaultNamespaceWorkerCount)
defaultVal := int(config.DefaultNamespaceWorkerCount)
Pgo.Pgo.NamespaceWorkerCount = &defaultVal
} else {
log.Debugf("NamespaceWorkerCount is set, using %d worker(s)",
*Pgo.Pgo.NamespaceWorkerCount)
}
if Pgo.Pgo.PGClusterWorkerCount == nil {
log.Debugf("PGClusterWorkerCount not set, defaulting to %d worker(s)",
config.DefaultPGClusterWorkerCount)
defaultVal := int(config.DefaultPGClusterWorkerCount)
Pgo.Pgo.PGClusterWorkerCount = &defaultVal
} else {
log.Debugf("PGClusterWorkerCount is set, using %d worker(s)",
*Pgo.Pgo.PGClusterWorkerCount)
}
if Pgo.Pgo.PGReplicaWorkerCount == nil {
log.Debugf("PGReplicaWorkerCount not set, defaulting to %d worker(s)",
config.DefaultPGReplicaWorkerCount)
defaultVal := int(config.DefaultPGReplicaWorkerCount)
Pgo.Pgo.PGReplicaWorkerCount = &defaultVal
} else {
log.Debugf("PGReplicaWorkerCount is set, using %d worker(s)",
*Pgo.Pgo.PGReplicaWorkerCount)
}
if Pgo.Pgo.PGTaskWorkerCount == nil {
log.Debugf("PGTaskWorkerCount not set, defaulting to %d worker(s)",
config.DefaultPGTaskWorkerCount)
defaultVal := int(config.DefaultPGTaskWorkerCount)
Pgo.Pgo.PGTaskWorkerCount = &defaultVal
} else {
log.Debugf("PGTaskWorkerCount is set, using %d worker(s)",
*Pgo.Pgo.PGTaskWorkerCount)
}
}
// SetupNamespaces is responsible for the initial namespace configuration for the Operator
// install. This includes setting the proper namespace operating mode, creating and/or updating
// namespaces as needed (or as permitted by the current operator mode), and returning a valid list
// of namespaces for the current Operator install.
func SetupNamespaces(clientset kubernetes.Interface) ([]string, error) {
// First set the proper namespace operating mode for the Operator install. The mode identified
// determines whether or not certain namespace capabilities are enabled.
if err := setNamespaceOperatingMode(clientset); err != nil {
log.Errorf("Error detecting namespace operating mode: %v", err)
return nil, err
}
log.Debugf("Namespace operating mode is '%s'", NamespaceOperatingMode())
namespaceList, err := ns.GetInitialNamespaceList(clientset, NamespaceOperatingMode(),
InstallationName, PgoNamespace)
if err != nil {
return nil, err
}
// proceed with creating and/or updating any namespaces provided for the installation
if err := ns.ConfigureInstallNamespaces(clientset, InstallationName,
PgoNamespace, namespaceList, NamespaceOperatingMode()); err != nil {
log.Errorf("Unable to setup namespaces: %v", err)
return nil, err
}
return namespaceList, nil
}
// setNamespaceOperatingMode set the namespace operating mode for the Operator by calling the
// proper utility function to determine which mode is applicable based on the current
// permissions assigned to the Operator Service Account.
func setNamespaceOperatingMode(clientset kubernetes.Interface) error {
nsOpMode, err := ns.GetNamespaceOperatingMode(clientset)
if err != nil {
return err
}
namespaceOperatingMode = nsOpMode
return nil
}
// NamespaceOperatingMode returns the namespace operating mode for the current Operator
// installation, which is stored in the "namespaceOperatingMode" variable
func NamespaceOperatingMode() ns.NamespaceOperatingMode {
return namespaceOperatingMode
}