/
pools.go
134 lines (122 loc) · 4.85 KB
/
pools.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// This file is part of MinIO Operator
// Copyright (c) 2020 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package controller
import (
"context"
"errors"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
miniov2 "github.com/lgj101/operator/pkg/apis/minio.min.io/v2"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
)
func (c *Controller) getSSForPool(tenant *miniov2.Tenant, pool *miniov2.Pool) (*appsv1.StatefulSet, error) {
ss, err := c.kubeClientSet.AppsV1().StatefulSets(tenant.Namespace).Get(context.Background(), tenant.PoolStatefulsetName(pool), metav1.GetOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
return nil, err
}
// check if there are legacy statefulsets
ss, err = c.kubeClientSet.AppsV1().StatefulSets(tenant.Namespace).Get(context.Background(), tenant.LegacyStatefulsetName(pool), metav1.GetOptions{})
if err != nil {
return nil, err
}
}
return ss, nil
}
func (c *Controller) getAllSSForTenant(tenant *miniov2.Tenant) (map[int]*appsv1.StatefulSet, error) {
poolDir := make(map[int]*appsv1.StatefulSet)
// TODO: Load all statefulsets by using the tenant label in a single list call
for i := range tenant.Spec.Pools {
ss, err := c.getSSForPool(tenant, &tenant.Spec.Pools[i])
if err != nil && !k8serrors.IsNotFound(err) {
return nil, err
}
if ss != nil {
poolDir[i] = ss
}
}
return poolDir, nil
}
// poolSSMatchesSpec checks if the statefulset for the pool matches what is expected and described from the Tenant
func poolSSMatchesSpec(expectedStatefulSet, existingStatefulSet *appsv1.StatefulSet) (bool, error) {
if expectedStatefulSet == nil || existingStatefulSet == nil {
return false, errors.New("cannot process an empty MinIO StatefulSet")
}
// Try to detect changes in the labels or annotations
expectedMetadata := expectedStatefulSet.ObjectMeta
if !equality.Semantic.DeepEqual(expectedMetadata.Labels, existingStatefulSet.ObjectMeta.Labels) {
return false, nil
}
expectedAnnotations := expectedMetadata.Annotations
currentAnnotations := existingStatefulSet.ObjectMeta.Annotations
delete(expectedAnnotations, corev1.LastAppliedConfigAnnotation)
delete(currentAnnotations, corev1.LastAppliedConfigAnnotation)
if !equality.Semantic.DeepEqual(expectedAnnotations, currentAnnotations) {
return false, nil
}
if miniov2.IsContainersEnvUpdated(existingStatefulSet.Spec.Template.Spec.Containers, expectedStatefulSet.Spec.Template.Spec.Containers) {
return false, nil
}
if !equality.Semantic.DeepEqual(expectedStatefulSet.Spec, existingStatefulSet.Spec) {
// some field set by the operator has changed
return false, nil
}
return true, nil
}
// restartInitializedPool restarts a pool that is assumed to have been initialized
func (c *Controller) restartInitializedPool(ctx context.Context, tenant *miniov2.Tenant, pool miniov2.Pool, tenantConfiguration map[string][]byte) error {
// get a new admin client that points to a pod of an already initialized pool (ie: pool-0)
livePods, err := c.kubeClientSet.CoreV1().Pods(tenant.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", miniov2.PoolLabel, pool.Name),
})
if err != nil {
klog.Warning("Could not validate state of statefulset for pool", err)
}
if len(livePods.Items) == 0 {
livePods, err = c.kubeClientSet.CoreV1().Pods(tenant.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", miniov2.ZoneLabel, pool.Name),
})
if err != nil {
klog.Warning("Could not validate state of statefulset for zone", err)
return err
}
}
var livePod *corev1.Pod
for _, p := range livePods.Items {
if p.Status.Phase == corev1.PodRunning {
livePod = &p
break
}
}
if livePod == nil {
return fmt.Errorf("no running pods found for statefulsets %s", pool.Name)
}
livePodAddress := fmt.Sprintf("%s:9000", tenant.MinIOHLPodHostname(livePod.Name))
livePodAdminClnt, err := tenant.NewMinIOAdminForAddress(livePodAddress, tenantConfiguration, c.getTransport())
if err != nil {
return err
}
// Now tell MinIO to restart
if err = livePodAdminClnt.ServiceRestart(ctx); err != nil {
klog.Infof("We failed to restart MinIO to adopt the new pool: %v", err)
return err
}
return nil
}