Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add post install logic for namespaced brokers #3889

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2024 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

type kafkaDeploymentDeleter struct {
k8s kubernetes.Interface
}

func (k *kafkaDeploymentDeleter) DeleteBrokerDeployments(ctx context.Context) error {
deployments := make([]types.NamespacedName, 0)
c := ""
for {
deploymentList, err := k.k8s.AppsV1().Deployments("").List(ctx, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/component=kafka-broker-dispatcher",
Continue: c,
})
if err != nil {
return fmt.Errorf("failed to list deployments: %w", err)

Check warning on line 44 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L35-L44

Added lines #L35 - L44 were not covered by tests
}
for _, d := range deploymentList.Items {
deployments = append(deployments, types.NamespacedName{
Namespace: d.Namespace,
Name: d.Name,
})

Check warning on line 50 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L46-L50

Added lines #L46 - L50 were not covered by tests
}

if deploymentList.Continue == "" {
break

Check warning on line 54 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}
c = deploymentList.Continue

Check warning on line 56 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L56

Added line #L56 was not covered by tests
}

for _, deployment := range deployments {
if err := k.deleteDeployment(ctx, deployment); err != nil {
return fmt.Errorf("failed to delete deployment %s: %v", deployment, err)

Check warning on line 61 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L59-L61

Added lines #L59 - L61 were not covered by tests
}
}

return nil

Check warning on line 65 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L65

Added line #L65 was not covered by tests
}

func (k *kafkaDeploymentDeleter) deleteDeployment(ctx context.Context, deployment types.NamespacedName) error {
err := k.waitStatefulSetExists(ctx, deployment)
if err != nil {
return fmt.Errorf("failed while waiting for statefulset to come up: %w", err)

Check warning on line 71 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L68-L71

Added lines #L68 - L71 were not covered by tests
}

err = k.k8s.
AppsV1().
Deployments(deployment.Namespace).
Delete(ctx, deployment.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete deployment %s/%s: %w", deployment.Namespace, deployment.Name, err)

Check warning on line 79 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L74-L79

Added lines #L74 - L79 were not covered by tests
}

return nil

Check warning on line 82 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L82

Added line #L82 was not covered by tests
}

func (k *kafkaDeploymentDeleter) waitStatefulSetExists(ctx context.Context, sts types.NamespacedName) error {
return wait.PollUntilContextTimeout(ctx, 10*time.Second, 10*time.Minute, false, func(ctx context.Context) (done bool, err error) {
_, err = k.k8s.AppsV1().StatefulSets(sts.Namespace).Get(ctx, sts.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil

Check warning on line 89 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L85-L89

Added lines #L85 - L89 were not covered by tests
}
if err != nil {
return false, fmt.Errorf("failed to get statefulset %s/%s: %w", sts.Namespace, sts.Name, err)

Check warning on line 92 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}
return true, nil

Check warning on line 94 in control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_namespaced_broker_deployment_deleter.go#L94

Added line #L94 was not covered by tests
})
}
49 changes: 49 additions & 0 deletions control-plane/cmd/post-install/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,54 @@

package main

import (
"context"
"flag"
"fmt"
"log"

"k8s.io/client-go/kubernetes"

"knative.dev/pkg/environment"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
)

func main() {
ctx := signals.NewContext()

Check warning on line 33 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L33

Added line #L33 was not covered by tests

config, err := logging.NewConfigFromMap(nil)
if err != nil {
log.Fatal("Failed to create logging config: ", err)

Check warning on line 37 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L35-L37

Added lines #L35 - L37 were not covered by tests
}

logger, _ := logging.NewLoggerFromConfig(config, "kafka-broker-post-install")
defer logger.Sync()

Check warning on line 41 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L40-L41

Added lines #L40 - L41 were not covered by tests

logging.WithLogger(ctx, logger)

Check warning on line 43 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L43

Added line #L43 was not covered by tests

if err := run(ctx); err != nil {
logger.Fatal(err)

Check warning on line 46 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}
}

func run(ctx context.Context) error {
env := environment.ClientConfig{}
env.InitFlags(flag.CommandLine)
flag.Parse()

Check warning on line 53 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L50-L53

Added lines #L50 - L53 were not covered by tests

config, err := env.GetRESTConfig()
if err != nil {
return fmt.Errorf("failed to get kubeconfig: %w", err)

Check warning on line 57 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L55-L57

Added lines #L55 - L57 were not covered by tests
}

deploymentDeleter := &kafkaDeploymentDeleter{
k8s: kubernetes.NewForConfigOrDie(config),

Check warning on line 61 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}

if err := deploymentDeleter.DeleteBrokerDeployments(ctx); err != nil {
return fmt.Errorf("broker migration failed: %v", err)

Check warning on line 65 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

return nil

Check warning on line 68 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L68

Added line #L68 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,20 @@ metadata:
name: knative-kafka-controller-post-install
labels:
app.kubernetes.io/version: devel
rules: []
rules:
# we need to be able to delete old deployments
- apiGroups:
- "apps"
resources:
- "deployments"
verbs:
- "delete"
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
- "list"
# we need to get statefulsets
- apiGroups:
- "apps"
resources:
- "statefulsets"
verbs:
- "get"
- "list"
Loading