Skip to content

Commit

Permalink
test: added channel KEDA scaling test (#3925)
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Jun 5, 2024
1 parent 5c4b6a4 commit 614c5d6
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
16 changes: 16 additions & 0 deletions test/e2e_new_channel/kafka_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"knative.dev/reconciler-test/pkg/knative"
"knative.dev/reconciler-test/pkg/state"

"knative.dev/eventing-kafka-broker/test/rekt/features"
"knative.dev/eventing-kafka-broker/test/rekt/features/kafkachannel"
kafkachannelresource "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkachannel"
)
Expand Down Expand Up @@ -109,3 +110,18 @@ func TestKafkaChannelOIDC(t *testing.T) {

env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkachannelresource.GVR(), "KafkaChannel", name, env.Namespace()))
}

func TestKafkaChannelKedaScaling(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(2*time.Second, 12*time.Minute),
environment.Managed(t),
)

env.Test(ctx, t, features.ChannelScalesToZeroWithKeda())
}
60 changes: 60 additions & 0 deletions test/rekt/features/keda_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ import (
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/rekt/features/kafkafeatureflags"
kafkachannelresources "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkachannel"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/test/rekt/resources/broker"
subscriptionresources "knative.dev/eventing/test/rekt/resources/subscription"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
Expand Down Expand Up @@ -127,6 +130,47 @@ func TriggerScalesToZeroWithKeda() *feature.Feature {
return f
}

func ChannelScalesToZeroWithKeda() *feature.Feature {
f := feature.NewFeature()

f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())

event := cetest.FullEvent()

channelName := feature.MakeRandomK8sName("channel")
subscriptionName := feature.MakeRandomK8sName("subscription")
sourceName := feature.MakeRandomK8sName("source")
sinkName := feature.MakeRandomK8sName("sink")

// check that the trigger initially has replicas = 0
f.Setup("Subscription should start with replicas = 0", verifyConsumerGroupReplicas(getSubscriptionCg(subscriptionName), 0, true))

f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver))
f.Setup("install channel", kafkachannelresources.Install(channelName,
kafkachannelresources.WithNumPartitions("3"),
kafkachannelresources.WithReplicationFactor("1"),
kafkachannelresources.WithRetentionDuration("P1D"),
))

f.Setup("install subscription", subscriptionresources.Install(subscriptionName,
subscriptionresources.WithChannel(&duckv1.KReference{
Kind: "KafkaChannel",
APIVersion: "messaging.knative.dev/v1beta1",
Name: channelName,
}),
subscriptionresources.WithSubscriber(service.AsKReference(sinkName), "", ""),
))

f.Requirement("install source", eventshub.Install(sourceName, eventshub.StartSenderToResource(kafkachannelresources.GVR(), channelName), eventshub.InputEvent(event)))

f.Requirement("sink receives event", assert.OnStore(sinkName).MatchEvent(test.HasId(event.ID())).Exact(1))

//after the event is sent, the subscription should scale down to zero replicas
f.Alpha("Subscription").Must("Scale down to zero", verifyConsumerGroupReplicas(getSubscriptionCg(subscriptionName), 0, false))

return f
}

type getCgName func(ctx context.Context) (string, error)

func getKafkaSourceCg(source string) getCgName {
Expand All @@ -145,6 +189,22 @@ func getKafkaSourceCg(source string) getCgName {
}
}

func getSubscriptionCg(subscriptionName string) getCgName {
return func(ctx context.Context) (string, error) {
ns := environment.FromContext(ctx).Namespace()

sub, err := eventingclient.Get(ctx).
MessagingV1().
Subscriptions(ns).
Get(ctx, subscriptionName, metav1.GetOptions{})
if err != nil {
return "", err
}

return string(sub.UID), nil
}
}

func getTriggerCg(triggerName string) getCgName {
return func(ctx context.Context) (string, error) {
ns := environment.FromContext(ctx).Namespace()
Expand Down

0 comments on commit 614c5d6

Please sign in to comment.