Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TracePipeline has wrong status #16666

Merged
merged 15 commits into from Jan 30, 2023
8 changes: 8 additions & 0 deletions components/telemetry-operator/config/rbac/role.yaml
Expand Up @@ -72,6 +72,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- apps
resources:
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- monitoring.coreos.com
resources:
Expand Down
87 changes: 75 additions & 12 deletions components/telemetry-operator/internal/kubernetes/deployment.go
Expand Up @@ -4,32 +4,95 @@ import (
"context"
"fmt"
v1 "k8s.io/api/apps/v1"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sort"
)

type DeploymentProber struct {
client.Client
}

func (dp *DeploymentProber) IsReady(ctx context.Context, name types.NamespacedName) (bool, error) {
log := logf.FromContext(ctx)

var d v1.Deployment
if err := dp.Get(ctx, name, &d); err != nil {
return false, fmt.Errorf("failed to get %s/%s Deployment: %v", name.Namespace, name.Name, err)
}

generation := d.Generation
observedGeneration := d.Status.ObservedGeneration
updated := d.Status.UpdatedReplicas
desired := d.Status.Replicas
ready := d.Status.ReadyReplicas
desiredReplicas := *d.Spec.Replicas
var allReplicaSets v1.ReplicaSetList

listOps := &client.ListOptions{
LabelSelector: k8slabels.SelectorFromSet(d.Spec.Selector.MatchLabels),
Namespace: d.Namespace,
}
if err := dp.List(ctx, &allReplicaSets, listOps); err != nil {
return false, fmt.Errorf("failed to list ReplicaSets: %v", err)
}

if err := dp.Get(ctx, name, &d); err != nil {
return false, fmt.Errorf("failed to get %s/%s ReplicaSet for deployment: %v", name.Namespace, name.Name, err)
}

replicaSet, err := getLatestReplicaSet(&d, &allReplicaSets)
if err != nil || replicaSet == nil {
return false, fmt.Errorf("failed to get latest ReplicaSet: %v", err)
}

isReady := replicaSet.Status.ReadyReplicas >= desiredReplicas
return isReady, nil
}

func getLatestReplicaSet(deployment *v1.Deployment, allReplicaSets *v1.ReplicaSetList) (*v1.ReplicaSet, error) {
var ownedReplicaSets []*v1.ReplicaSet
for i := range allReplicaSets.Items {
if metav1.IsControlledBy(&allReplicaSets.Items[i], deployment) {
ownedReplicaSets = append(ownedReplicaSets, &allReplicaSets.Items[i])
}
}

if len(ownedReplicaSets) == 0 {
return nil, nil
}

return findNewReplicaSet(deployment, ownedReplicaSets), nil
}

// findNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
func findNewReplicaSet(deployment *v1.Deployment, rsList []*v1.ReplicaSet) *v1.ReplicaSet {
sort.Sort(replicaSetsByCreationTimestamp(rsList))
for i := range rsList {
if equalIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) {
// In rare cases, such as after cluster upgrades, Deployment may end up with
// having more than one new ReplicaSets that have the same template as its template,
// see https://github.com/kubernetes/kubernetes/issues/40415
// We deterministically choose the oldest new ReplicaSet.
return rsList[i]
}
}
// new ReplicaSet does not exist.
return nil
}

func equalIgnoreHash(template1, template2 *corev1.PodTemplateSpec) bool {
t1Copy := template1.DeepCopy()
t2Copy := template2.DeepCopy()
delete(t1Copy.Labels, v1.DefaultDeploymentUniqueLabelKey)
delete(t2Copy.Labels, v1.DefaultDeploymentUniqueLabelKey)
return apiequality.Semantic.DeepEqual(t1Copy, t2Copy)
}

log.V(1).Info(fmt.Sprintf("Checking Deployment: updated: %d, desired: %d, ready: %d, generation: %d, observed generation: %d",
updated, desired, ready, generation, observedGeneration), "name", name.Name)
type replicaSetsByCreationTimestamp []*v1.ReplicaSet

return observedGeneration == generation && updated == desired && ready >= desired, nil
func (o replicaSetsByCreationTimestamp) Len() int { return len(o) }
func (o replicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o replicaSetsByCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
}
@@ -0,0 +1,80 @@
package kubernetes

import (
"context"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"testing"
)

func TestDeploymentProber_IsReady(t *testing.T) {
tests := []struct {
summary string
desiredScheduled int32
numberReady int32
expected bool
}{
{summary: "all scheduled all ready", desiredScheduled: 1, numberReady: 1, expected: true},
{summary: "all scheduled one ready", desiredScheduled: 2, numberReady: 1, expected: false},
{summary: "all scheduled zero ready", desiredScheduled: 1, numberReady: 0, expected: false},
}

for _, test := range tests {
tc := test
t.Run(tc.summary, func(t *testing.T) {

t.Parallel()

matchLabels := make(map[string]string)
matchLabels["test.deployment.name"] = "test-deployment"

deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "kyma-system"},
Spec: appsv1.DeploymentSpec{
Replicas: &tc.desiredScheduled,
Selector: &metav1.LabelSelector{MatchLabels: matchLabels},
},
Status: appsv1.DeploymentStatus{
ReadyReplicas: tc.numberReady,
},
}

rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "kyma-system",
Labels: deployment.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(deployment, deployment.GroupVersionKind())},
},
Spec: appsv1.ReplicaSetSpec{
Selector: deployment.Spec.Selector,
Replicas: &tc.desiredScheduled,
Template: deployment.Spec.Template,
},
Status: appsv1.ReplicaSetStatus{
ReadyReplicas: tc.numberReady,
Replicas: tc.numberReady,
},
}

itemList := make([]appsv1.ReplicaSet, 1)

itemList = append(itemList, *rs)
rsList := &appsv1.ReplicaSetList{
Items: itemList,
}

fakeClient := fake.NewClientBuilder().WithObjects(deployment).WithLists(rsList).Build()

sut := DeploymentProber{fakeClient}
ready, err := sut.IsReady(context.Background(), types.NamespacedName{Name: "foo", Namespace: "kyma-system"})

require.NoError(t, err)
require.Equal(t, tc.expected, ready)

})
}
}
1 change: 1 addition & 0 deletions components/telemetry-operator/main.go
Expand Up @@ -144,6 +144,7 @@ func getEnvOrDefault(envVar string, defaultValue string) string {

//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=daemonsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get;list;watch

//+kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors,verbs=get;list;watch;create;update;patch;delete

Expand Down
8 changes: 8 additions & 0 deletions resources/telemetry/charts/operator/templates/role.yaml
Expand Up @@ -72,6 +72,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- apps
resources:
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- monitoring.coreos.com
resources:
Expand Down
2 changes: 1 addition & 1 deletion resources/telemetry/values.yaml
Expand Up @@ -7,7 +7,7 @@ global:
version: "PR-16573"
telemetry_operator:
name: "telemetry-operator"
version: "PR-16682"
version: "PR-16666"
telemetry_webhook_cert_init:
name: "webhook-cert-init"
version: "PR-16573"
Expand Down