Skip to content

Commit

Permalink
Add post install logic for namespaced brokers (#3889) (#3899)
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed May 16, 2024
1 parent b90b525 commit a95e409
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,37 @@ import (

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/system"
)

type kafkaDeploymentDeleter struct {
k8s kubernetes.Interface
}

func (k *kafkaDeploymentDeleter) DeleteBrokerDeployments(ctx context.Context) error {
deployments := []string{
"kafka-broker-dispatcher",
deployments := make([]types.NamespacedName, 0)
c := ""
for {
deploymentList, err := k.k8s.AppsV1().Deployments("").List(ctx, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/component=kafka-broker-dispatcher",
Continue: c,
})
if err != nil {
return fmt.Errorf("failed to list deployments: %w", err)
}
for _, d := range deploymentList.Items {
deployments = append(deployments, types.NamespacedName{
Namespace: d.Namespace,
Name: d.Name,
})
}

if deploymentList.Continue == "" {
break
}
c = deploymentList.Continue
}

for _, deployment := range deployments {
Expand All @@ -46,31 +65,31 @@ func (k *kafkaDeploymentDeleter) DeleteBrokerDeployments(ctx context.Context) er
return nil
}

func (k *kafkaDeploymentDeleter) deleteDeployment(ctx context.Context, deploymentName string) error {
err := k.waiteStatefulSetExists(ctx, deploymentName)
func (k *kafkaDeploymentDeleter) deleteDeployment(ctx context.Context, deployment types.NamespacedName) error {
err := k.waitStatefulSetExists(ctx, deployment)
if err != nil {
return fmt.Errorf("failed while waiting for statefulset to come up: %w", err)
}

err = k.k8s.
AppsV1().
Deployments(system.Namespace()).
Delete(ctx, deploymentName, metav1.DeleteOptions{})
Deployments(deployment.Namespace).
Delete(ctx, deployment.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete deployment %s/%s: %w", system.Namespace(), deploymentName, err)
return fmt.Errorf("failed to delete deployment %s/%s: %w", deployment.Namespace, deployment.Name, err)
}

return nil
}

func (k *kafkaDeploymentDeleter) waiteStatefulSetExists(ctx context.Context, statefulSetName string) error {
func (k *kafkaDeploymentDeleter) waitStatefulSetExists(ctx context.Context, sts types.NamespacedName) error {
return wait.PollUntilContextTimeout(ctx, 10*time.Second, 10*time.Minute, false, func(ctx context.Context) (done bool, err error) {
_, err = k.k8s.AppsV1().StatefulSets(system.Namespace()).Get(ctx, statefulSetName, metav1.GetOptions{})
_, err = k.k8s.AppsV1().StatefulSets(sts.Namespace).Get(ctx, sts.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, fmt.Errorf("failed to get statefulset %s/%s: %w", system.Namespace(), statefulSetName, err)
return false, fmt.Errorf("failed to get statefulset %s/%s: %w", sts.Namespace, sts.Name, err)
}
return true, nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rules:
- "deployments"
verbs:
- "delete"
- "list"
# we need to get statefulsets
- apiGroups:
- "apps"
Expand All @@ -36,4 +37,3 @@ rules:
verbs:
- "get"
- "list"

0 comments on commit a95e409

Please sign in to comment.