/
opensearch.go
246 lines (223 loc) · 8.65 KB
/
opensearch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// Copyright (c) 2022, 2023, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package opensearch
import (
"context"
"fmt"
"time"
vmov1 "github.com/verrazzano/verrazzano-monitoring-operator/pkg/apis/vmcontroller/v1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/verrazzano/verrazzano/pkg/k8s/ready"
"github.com/verrazzano/verrazzano/pkg/log/vzlog"
"github.com/verrazzano/verrazzano/pkg/vzcr"
vzapi "github.com/verrazzano/verrazzano/platform-operator/apis/verrazzano/v1alpha1"
"github.com/verrazzano/verrazzano/platform-operator/constants"
"github.com/verrazzano/verrazzano/platform-operator/controllers/verrazzano/component/common"
"github.com/verrazzano/verrazzano/platform-operator/controllers/verrazzano/component/spi"
"k8s.io/apimachinery/pkg/types"
)
const (
esDataDeployment = "vmi-system-es-data"
esIngestDeployment = "vmi-system-os-ingest"
esMasterStatefulset = "vmi-system-es-master"
nodeNamePrefix = "vmi-system-%s"
)
// doesOSExist is the IsInstalled check
func doesOSExist(ctx spi.ComponentContext) bool {
sts := types.NamespacedName{
Name: esMasterStatefulset,
Namespace: ComponentNamespace,
}
exists, err := ready.DoesStatefulsetExist(ctx.Client(), sts)
if err != nil {
ctx.Log().Errorf("Component %s failed getting statefulset %v: %v", ctx.GetComponent(), sts, err)
}
return exists
}
// IsSingleDataNodeCluster returns true if there is exactly 1 or 0 data nodes
func IsSingleDataNodeCluster(ctx spi.ComponentContext) bool {
return findESReplicas(ctx, "data") <= 1
}
// isOSReady checks if the OpenSearch resources are ready
func isOSReady(ctx spi.ComponentContext) bool {
prefix := fmt.Sprintf("Component %s", ctx.GetComponent())
if vzcr.IsOpenSearchEnabled(ctx.EffectiveCR()) && ctx.EffectiveCR().Spec.Components.Elasticsearch != nil {
for _, node := range ctx.EffectiveCR().Spec.Components.Elasticsearch.Nodes {
if !isOSNodeReady(ctx, node, prefix) {
return false
}
}
}
return common.IsVMISecretReady(ctx)
}
func nodesToObjectKeys(ctx spi.ComponentContext) *ready.AvailabilityObjects {
objects := &ready.AvailabilityObjects{}
vz := ctx.EffectiveCR()
if vzcr.IsOpenSearchEnabled(vz) && vz.Spec.Components.Elasticsearch != nil {
isLegacyOS, err := common.IsLegacyOS(ctx)
if err != nil {
ctx.Log().ErrorfThrottled("Failed to get VMI, considering legacy OS to be disabled: %v", err)
}
ns := ComponentNamespace
if !isLegacyOS {
ns = constants.VerrazzanoLoggingNamespace
}
for _, node := range vz.Spec.Components.Elasticsearch.Nodes {
if node.Replicas == nil || *node.Replicas < 1 {
continue
}
nodeControllerName := getNodeControllerName(node, isLegacyOS)
if !isLegacyOS || hasRole(node.Roles, vmov1.MasterRole) {
objects.StatefulsetNames = append(objects.StatefulsetNames, types.NamespacedName{
Name: nodeControllerName,
Namespace: ns,
})
continue
}
if hasRole(node.Roles, vmov1.DataRole) {
objects.DeploymentNames = append(objects.DeploymentNames, dataDeploymentObjectKeys(node, nodeControllerName)...)
continue
}
objects.DeploymentNames = append(objects.DeploymentNames, types.NamespacedName{
Name: nodeControllerName,
Namespace: ComponentNamespace,
})
}
}
return objects
}
func isOSNodeReady(ctx spi.ComponentContext, node vzapi.OpenSearchNode, prefix string) bool {
if node.Replicas == nil || *node.Replicas < 1 {
return true
}
isLegacyOS, err := common.IsLegacyOS(ctx)
if err != nil {
ctx.Log().ErrorfThrottled("Failed to get VMI, considering legacy OS to be disabled: %v", err)
}
ns := ComponentNamespace
if !isLegacyOS {
ns = constants.VerrazzanoLoggingNamespace
}
nodeControllerName := getNodeControllerName(node, isLegacyOS)
// If a node has the master role, it is a statefulset
// If the opster operator is managing OpenSearch, then all nodes are statefulset
if !isLegacyOS || hasRole(node.Roles, vmov1.MasterRole) {
return AreOpensearchStsReady(ctx.Log(), ctx.Client(), []types.NamespacedName{{
Name: nodeControllerName,
Namespace: ns,
}}, *node.Replicas, prefix)
}
// Data nodes have N = node.Replicas number of deployment objects.
if hasRole(node.Roles, vmov1.DataRole) {
return ready.DeploymentsAreReady(ctx.Log(), ctx.Client(), dataDeploymentObjectKeys(node, nodeControllerName), 1, prefix)
}
// Ingest nodes can be handled like normal deployments
return ready.DeploymentsAreReady(ctx.Log(), ctx.Client(), []types.NamespacedName{{
Name: nodeControllerName,
Namespace: ComponentNamespace,
}}, *node.Replicas, prefix)
}
func getNodeControllerName(node vzapi.OpenSearchNode, isLegacyOS bool) string {
if isLegacyOS {
return fmt.Sprintf(nodeNamePrefix, node.Name)
}
return fmt.Sprintf("opensearch-%s", node.Name)
}
func dataDeploymentObjectKeys(node vzapi.OpenSearchNode, nodeControllerName string) []types.NamespacedName {
var dataDeployments []types.NamespacedName
if node.Replicas == nil {
return dataDeployments
}
var i int32
for i = 0; i < *node.Replicas; i++ {
dataDeploymentName := fmt.Sprintf("%s-%d", nodeControllerName, i)
dataDeployments = append(dataDeployments, types.NamespacedName{
Name: dataDeploymentName,
Namespace: ComponentNamespace,
})
}
return dataDeployments
}
func hasRole(roles []vmov1.NodeRole, roleToHave vmov1.NodeRole) bool {
for _, role := range roles {
if role == roleToHave {
return true
}
}
return false
}
// findESReplicas searches the ES install args to find the correct resources to search for in isReady
func findESReplicas(ctx spi.ComponentContext, nodeType vmov1.NodeRole) int32 {
var replicas int32
if vzcr.IsOpenSearchEnabled(ctx.EffectiveCR()) && ctx.EffectiveCR().Spec.Components.Elasticsearch != nil {
for _, node := range ctx.EffectiveCR().Spec.Components.Elasticsearch.Nodes {
for _, role := range node.Roles {
if role == nodeType && node.Replicas != nil {
replicas += *node.Replicas
}
}
}
}
return replicas
}
// AreOpensearchStsReady Check that the OS statefulsets have the minimum number of specified replicas ready and available. It ignores the updated replicas check if updated replicas are zero or cluster is not healthy.
func AreOpensearchStsReady(log vzlog.VerrazzanoLogger, client client.Client, namespacedNames []types.NamespacedName, expectedReplicas int32, prefix string) bool {
for _, namespacedName := range namespacedNames {
statefulset := appsv1.StatefulSet{}
if err := client.Get(context.TODO(), namespacedName, &statefulset); err != nil {
if errors.IsNotFound(err) {
log.Progressf("%s is waiting for statefulset %v to exist", prefix, namespacedName)
// StatefulSet not found
return false
}
log.Errorf("Failed getting statefulset %v: %v", namespacedName, err)
return false
}
if statefulset.Status.ReadyReplicas < expectedReplicas {
log.Progressf("%s is waiting for statefulset %s replicas to be %v. Current ready replicas is %v", prefix, namespacedName,
expectedReplicas, statefulset.Status.ReadyReplicas)
return false
}
if !areOSReplicasUpdated(log, statefulset, expectedReplicas, client, prefix, namespacedName) {
return false
}
log.Oncef("%s has enough ready replicas for statefulsets %v", prefix, namespacedName)
}
return true
}
// areOSReplicasUpdated check whether all replicas of opensearch are updated or not. In case of yellow cluster status, we skip this check and consider replicas are updated.
func areOSReplicasUpdated(log vzlog.VerrazzanoLogger, statefulset appsv1.StatefulSet, expectedReplicas int32, client client.Client, prefix string, namespacedName types.NamespacedName) bool {
if statefulset.Status.UpdatedReplicas > 0 && statefulset.Status.UpdatedReplicas < expectedReplicas {
pas, err := GetVerrazzanoPassword(client)
if err != nil {
log.Errorf("Failed getting OS secret to check OS cluster health: %v", err)
return false
}
var isHealthy bool
wait.ExponentialBackoff(wait.Backoff{
Duration: time.Second * 2,
Factor: 1,
Jitter: 0.2,
Steps: 3,
}, func() (bool, error) {
osClient := NewOSClient(pas)
isHealthy, err = osClient.IsClusterHealthy(client)
return isHealthy, nil
})
if err != nil {
log.Errorf("Failed getting OpenSearch cluster health: %v", err)
return false
}
if !isHealthy {
log.Progressf("Skipping updated replicas check for OpenSearch because cluster health is not green")
return true
}
log.Progressf("%s is waiting for statefulset %s replicas to be %v. Current updated replicas is %v", prefix, namespacedName,
expectedReplicas, statefulset.Status.UpdatedReplicas)
return false
}
return true
}