Skip to content

Commit

Permalink
Support StatefulSets for Kafka components
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Aug 2, 2022
1 parent e188b0e commit 62afe02
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 225 deletions.
Expand Up @@ -5,9 +5,16 @@ import (
"knative.dev/pkg/apis"
)

const (
// StatefulSetsAvailable is a Condition indicating whether or not the StatefulSets of
// the respective component have come up successfully.
StatefulSetsAvailable apis.ConditionType = "StatefulSetsAvailable"
)

var (
kafkaCondSet = apis.NewLivingConditionSet(
operatorv1alpha1.DeploymentsAvailable,
StatefulSetsAvailable,
operatorv1alpha1.InstallSucceeded,
)
)
Expand Down Expand Up @@ -49,3 +56,17 @@ func (is *KnativeKafkaStatus) MarkDeploymentsNotReady() {
"NotReady",
"Waiting on deployments")
}

// MarkStatefulSetsAvailable marks the StatefulSetAvailable status as true.
func (is *KnativeKafkaStatus) MarkStatefulSetsAvailable() {
kafkaCondSet.Manage(is).MarkTrue(StatefulSetsAvailable)
}

// MarkStatefulSetNotReady marks the StatefulSetsAvailable status as false and calls out
// it's waiting for StatefulSet.
func (is *KnativeKafkaStatus) MarkStatefulSetNotReady() {
kafkaCondSet.Manage(is).MarkFalse(
StatefulSetsAvailable,
"NotReady",
"Waiting on StatefulSets")
}
Expand Up @@ -18,11 +18,16 @@ func TestKnativeKafkaHappyPath(t *testing.T) {
ks.MarkInstallSucceeded()
// Dependencies are assumed successful too.
apistest.CheckConditionOngoing(ks, operatorv1alpha1.DeploymentsAvailable, t)
apistest.CheckConditionOngoing(ks, StatefulSetsAvailable, t)
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.InstallSucceeded, t)
if ready := ks.IsReady(); ready {
t.Errorf("ks.IsReady() = %v, want false", ready)
}

// Deployments are not available at first.
ks.MarkDeploymentsNotReady()
apistest.CheckConditionFailed(ks, operatorv1alpha1.DeploymentsAvailable, t)
apistest.CheckConditionOngoing(ks, StatefulSetsAvailable, t)
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.InstallSucceeded, t)
if ready := ks.IsReady(); ready {
t.Errorf("ks.IsReady() = %v, want false", ready)
Expand All @@ -31,10 +36,21 @@ func TestKnativeKafkaHappyPath(t *testing.T) {
// Deployments become ready and we're good.
ks.MarkDeploymentsAvailable()
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.DeploymentsAvailable, t)
apistest.CheckConditionOngoing(ks, StatefulSetsAvailable, t)
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.InstallSucceeded, t)
if ready := ks.IsReady(); ready {
t.Errorf("ks.IsReady() = %v, want false", ready)
}

ks.MarkStatefulSetsAvailable()
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.InstallSucceeded, t)
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.DeploymentsAvailable, t)
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.InstallSucceeded, t)

if ready := ks.IsReady(); !ready {
t.Errorf("ks.IsReady() = %v, want true", ready)
}

}

func TestKnativeKafkaErrorPath(t *testing.T) {
Expand All @@ -57,9 +73,17 @@ func TestKnativeKafkaErrorPath(t *testing.T) {
t.Errorf("ks.IsReady() = %v, want false", ready)
}

ks.MarkStatefulSetNotReady()
apistest.CheckConditionFailed(ks, StatefulSetsAvailable, t)
if ready := ks.IsReady(); ready {
t.Errorf("ks.IsReady() = %v, want false", ready)
}

// Deployments become ready
ks.MarkDeploymentsAvailable()
ks.MarkStatefulSetsAvailable()
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.DeploymentsAvailable, t)
apistest.CheckConditionSucceeded(ks, StatefulSetsAvailable, t)
apistest.CheckConditionSucceeded(ks, operatorv1alpha1.InstallSucceeded, t)
if ready := ks.IsReady(); !ready {
t.Errorf("ks.IsReady() = %v, want true", ready)
Expand Down

This file was deleted.

This file was deleted.

8 changes: 8 additions & 0 deletions knative-operator/pkg/controller/knativekafka/images.go
Expand Up @@ -37,6 +37,14 @@ func ImageTransform(overrideMap map[string]string) mf.Transformer {

obj = ds
podSpec = &ds.Spec.Template.Spec
case "StatefulSet":
ss := &appsv1.StatefulSet{}
if err := scheme.Scheme.Convert(u, ss, nil); err != nil {
return fmt.Errorf("failed to convert Unstructured to StatefulSet: %w", err)
}

obj = ss
podSpec = &ss.Spec.Template.Spec
case "Job":
job := &batchv1.Job{}
if err := scheme.Scheme.Convert(u, job, nil); err != nil {
Expand Down
Expand Up @@ -34,10 +34,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kafkaconfig "knative.dev/eventing-kafka/pkg/common/config"

serverlessoperatorv1alpha1 "github.com/openshift-knative/serverless-operator/knative-operator/pkg/apis/operator/v1alpha1"
"github.com/openshift-knative/serverless-operator/knative-operator/pkg/common"
"github.com/openshift-knative/serverless-operator/knative-operator/pkg/monitoring"
kafkaconfig "knative.dev/eventing-kafka/pkg/common/config"
)

const (
Expand Down Expand Up @@ -217,6 +218,7 @@ func (r *ReconcileKnativeKafka) executeInstallStages(instance *serverlessoperato
r.transform,
r.apply,
r.checkDeployments,
r.checkStatefulSets,
}

return executeStages(instance, manifest, stages)
Expand Down Expand Up @@ -279,8 +281,6 @@ func (r *ReconcileKnativeKafka) transform(manifest *mf.Manifest, instance *serve
operatorcommon.ConfigMapTransform(instance.Spec.Config, logging.FromContext(context.TODO())),
configureEventingKafka(instance.Spec),
ImageTransform(common.BuildImageOverrideMapFromEnviron(os.Environ(), "KAFKA_IMAGE_")),
replicasTransform(manifest.Client),
configMapHashTransform(manifest.Client),
socommon.VersionedJobNameTransform(),
rbacProxyTranform,
)
Expand Down Expand Up @@ -339,6 +339,31 @@ func (r *ReconcileKnativeKafka) checkDeployments(manifest *mf.Manifest, instance
return nil
}

func (r *ReconcileKnativeKafka) checkStatefulSets(manifest *mf.Manifest, instance *serverlessoperatorv1alpha1.KnativeKafka) error {
log.Info("Checking statefulsets")
for _, u := range manifest.Filter(mf.ByKind("StatefulSet")).Resources() {
u := u // To avoid memory aliasing
resource, err := manifest.Client.Get(&u)
if err != nil {
instance.Status.MarkStatefulSetNotReady()
if errors.IsNotFound(err) {
return nil
}
return err
}
ss := &appsv1.StatefulSet{}
if err := scheme.Scheme.Convert(resource, ss, nil); err != nil {
return err
}
if !isStatefulSetAvailable(ss) {
instance.Status.MarkStatefulSetNotReady()
return nil
}
}
instance.Status.MarkStatefulSetsAvailable()
return nil
}

// Delete Knative Kafka resources
func (r *ReconcileKnativeKafka) deleteResources(manifest *mf.Manifest, instance *serverlessoperatorv1alpha1.KnativeKafka) error {
if len(manifest.Resources()) <= 0 {
Expand All @@ -364,6 +389,13 @@ func isDeploymentAvailable(d *appsv1.Deployment) bool {
return false
}

func isStatefulSetAvailable(ss *appsv1.StatefulSet) bool {
if ss.Spec.Replicas == nil {
return ss.Status.ReadyReplicas == 1
}
return *ss.Spec.Replicas == ss.Status.ReadyReplicas
}

// general clean-up. required for the resources that cannot be garbage collected with the owner reference mechanism
func (r *ReconcileKnativeKafka) delete(instance *serverlessoperatorv1alpha1.KnativeKafka) error {
defer monitoring.KnativeUp.DeleteLabelValues("kafka_status")
Expand Down
46 changes: 0 additions & 46 deletions knative-operator/pkg/controller/knativekafka/replicastransform.go

This file was deleted.

0 comments on commit 62afe02

Please sign in to comment.