/
controllerutil.go
107 lines (94 loc) · 3.61 KB
/
controllerutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package controller
/*
Copyright 2020 - 2021 Crunchy Data Solutions, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"context"
"encoding/json"
"errors"
"github.com/percona/percona-postgresql-operator/internal/config"
"github.com/percona/percona-postgresql-operator/internal/kubeapi"
crv1 "github.com/percona/percona-postgresql-operator/pkg/apis/crunchydata.com/v1"
pgo "github.com/percona/percona-postgresql-operator/pkg/generated/clientset/versioned"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
// ErrControllerGroupExists is the error that is thrown when a controller group for a specific
// namespace already exists
var ErrControllerGroupExists = errors.New("A controller group for the namespace specified already" +
"exists")
// WorkerRunner is an interface for controllers the have worker queues that need to be run
type WorkerRunner interface {
RunWorker(stopCh <-chan struct{}, doneCh chan<- struct{})
WorkerCount() int
}
// Manager defines the interface for a controller manager
type Manager interface {
AddGroup(namespace string) error
AddAndRunGroup(namespace string) error
RemoveAll()
RemoveGroup(namespace string)
RunAll() error
RunGroup(namespace string) error
}
// InitializeReplicaCreation initializes the creation of replicas for a cluster. For a regular
// (i.e. non-standby) cluster this is called following the creation of the initial cluster backup,
// which is needed to bootstrap replicas. However, for a standby cluster this is called as
// soon as the primary PG pod reports ready and the cluster is marked as initialized.
func InitializeReplicaCreation(clientset pgo.Interface, clusterName,
namespace string) error {
ctx := context.TODO()
selector := config.LABEL_PG_CLUSTER + "=" + clusterName
pgreplicaList, err := clientset.CrunchydataV1().Pgreplicas(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
if err != nil {
log.Error(err)
return err
}
for i := range pgreplicaList.Items {
patch, err := kubeapi.NewMergePatch().
Add("metadata", "annotations")(map[string]string{
config.ANNOTATION_PGHA_BOOTSTRAP_REPLICA: "true",
}).Bytes()
if err != nil {
log.Error(err)
}
if _, err := clientset.CrunchydataV1().Pgreplicas(namespace).
Patch(ctx, pgreplicaList.Items[i].GetName(), types.MergePatchType, patch,
metav1.PatchOptions{}); err != nil {
log.Error(err)
}
}
return nil
}
// SetClusterInitializedStatus sets the status of a pgcluster CR to indicate that it has been
// initialized. This is specifically done by patching the status of the pgcluster CR with the
// proper initialization status.
func SetClusterInitializedStatus(clientset pgo.Interface, clusterName,
namespace string) error {
ctx := context.TODO()
patch, err := json.Marshal(map[string]interface{}{
"status": crv1.PgclusterStatus{
State: crv1.PgclusterStateInitialized,
Message: "Cluster has been initialized",
},
})
if err == nil {
_, err = clientset.CrunchydataV1().Pgclusters(namespace).
Patch(ctx, clusterName, types.MergePatchType, patch, metav1.PatchOptions{})
}
if err != nil {
log.Error(err)
return err
}
return nil
}