Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel Input Conformance Tests #6823

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 149 additions & 4 deletions test/rekt/features/channel/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@ import (
"fmt"
"strconv"

"github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/ptr"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/resources/service"
"knative.dev/reconciler-test/pkg/state"

v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/test/rekt/features/knconf"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/subscription"
)

func DataPlaneConformance(channelName string) *feature.FeatureSet {
Expand All @@ -56,12 +61,12 @@ func DataPlaneChannel(channelName string) *feature.Feature {
f.Requirement("Channel is Ready", channel_impl.IsReady(channelName))

f.Stable("Input").
Must("Every Channel MUST expose either an HTTP or HTTPS endpoint.", todo).
Must("Every Channel MUST expose either an HTTP or HTTPS endpoint.", shouldExposeHTTPorHTTPS).
Must("The endpoint(s) MUST conform to 0.3 or 1.0 CloudEvents specification.",
channelAcceptsCEVersions).
MustNot("The Channel MUST NOT perform an upgrade of the passed in version. It MUST emit the event with the same version.", todo).
Must("It MUST support both Binary Content Mode and Structured Content Mode of the HTTP Protocol Binding for CloudEvents.", todo).
May("When dispatching the event, the channel MAY use a different HTTP Message mode of the one used by the event.", todo).
MustNot("The Channel MUST NOT perform an upgrade of the passed in version. It MUST emit the event with the same version.", channelableEventVersionNotUpgraded).
Must("It MUST support both Binary Content Mode and Structured Content Mode of the HTTP Protocol Binding for CloudEvents.", shouldSupportBinaryAndStructuredContentMode).
May("When dispatching the event, the channel MAY use a different HTTP Message mode of the one used by the event.", dispachModeDifferentFromRecieved).
// For example, It MAY receive an event in Structured Content Mode and dispatch in Binary Content Mode.
May("The HTTP(S) endpoint MAY be on any port, not just the standard 80 and 443.", todo).
May("Channels MAY expose other, non-HTTP endpoints in addition to HTTP at their discretion.", todo)
Expand Down Expand Up @@ -105,6 +110,17 @@ func DataPlaneChannel(channelName string) *feature.Feature {
return f
}

func shouldExposeHTTPorHTTPS(ctx context.Context, t feature.T) {
c := getChannelable(ctx, t)
addr := c.Status.AddressStatus.Address.URL
if addr == nil {
addr = new(apis.URL)
}
if addr.Scheme != "http" && addr.Scheme != "https" {
t.Fatalf("expected channel scheme to be HTTP or HTTPS, found: %s", addr.Scheme)
}
}

func channelRejectsMalformedCE(ctx context.Context, t feature.T) {
channelName := state.GetStringOrFail(ctx, t, ChannelableNameKey)

Expand Down Expand Up @@ -154,6 +170,135 @@ func channelAcceptsCEVersions(ctx context.Context, t feature.T) {
knconf.AcceptsCEVersions(ctx, t, channel_impl.GVR(), name)
}

func shouldSupportBinaryAndStructuredContentMode(ctx context.Context, t feature.T) {
channelName := state.GetStringOrFail(ctx, t, ChannelableNameKey)

binarycontenttypes := []string{
"application/vnd.apache.thrift.binary",
"application/xml",
"application/json",
}

for _, contenttype := range binarycontenttypes {
source1 := feature.MakeRandomK8sName("source")
eventshub.Install(source1,
eventshub.StartSenderToResource(channel_impl.GVR(), channelName),
eventshub.InputHeader("ce-specversion", "1.0"),
eventshub.InputHeader("ce-type", "sometype"),
eventshub.InputHeader("ce-source", "200.request.sender.test.knative.dev"),
eventshub.InputHeader("ce-id", uuid.New().String()),
eventshub.InputHeader("content-type", contenttype),
eventshub.InputBody("{}"),
eventshub.InputMethod("POST"),
)(ctx, t)

store := eventshub.StoreFromContext(ctx, source1)
events := knconf.Correlate(store.AssertAtLeast(t, 2, knconf.SentEventMatcher("")))
for _, e := range events {
if e.Response.StatusCode < 200 || e.Response.StatusCode > 299 {
t.Errorf("Expected statuscode 2XX for sequence %d got %d", e.Response.Sequence, e.Response.StatusCode)
}
}
}

structuredcontenttype := "application/cloudevents+json"
bodycontent := `{
"specversion" : "1.0",
"type" : "sometype",
"source" : "json.request.sender.test.knative.dev",
"id" : "2222-4444-6666",
"time" : "2020-07-06T09:23:12Z",
"datacontenttype" : "application/json",
"data" : {
"message" : "helloworld"
}
}`

source2 := feature.MakeRandomK8sName("source2")
eventshub.Install(source2,
eventshub.StartSenderToResource(channel_impl.GVR(), channelName),
eventshub.InputHeader("content-type", structuredcontenttype),
eventshub.InputBody(bodycontent),
eventshub.InputMethod("POST"),
)(ctx, t)

store := eventshub.StoreFromContext(ctx, source2)
events := knconf.Correlate(store.AssertAtLeast(t, 2, knconf.SentEventMatcher("")))
for _, e := range events {
if e.Response.StatusCode < 200 || e.Response.StatusCode > 299 {
t.Errorf("Expected statuscode 2XX for sequence %d got %d", e.Response.Sequence, e.Response.StatusCode)
}
}
}

func dispachModeDifferentFromRecieved(ctx context.Context, t feature.T) {
channelName := state.GetStringOrFail(ctx, t, ChannelableNameKey)

structuredcontenttype := "application/cloudevents+json"
bodycontent := `{
"specversion" : "1.0",
"type" : "sometype",
"source" : "json.request.sender.test.knative.dev",
"id" : "2222-4444-6666",
"time" : "2020-07-06T09:23:12Z",
"datacontenttype" : "application/json",
"data" : {
"message" : "helloworld"
}
}`

source := feature.MakeRandomK8sName("source")
sink := feature.MakeRandomK8sName("sink")
sub := feature.MakeRandomK8sName("subscription")

eventshub.Install(sink, eventshub.StartReceiver)(ctx, t)

subscription.Install(sub,
subscription.WithChannel(channel_impl.AsRef(channelName)),
subscription.WithSubscriber(service.AsKReference(sink), ""),
)(ctx, t)

eventshub.Install(source,
eventshub.StartSenderToResource(channel_impl.GVR(), channelName),
eventshub.InputHeader("content-type", structuredcontenttype),
eventshub.InputBody(bodycontent),
eventshub.InputMethod("POST"),
)(ctx, t)

store := eventshub.StoreFromContext(ctx, source)
events := knconf.Correlate(store.AssertAtLeast(t, 2, knconf.SentEventMatcher("")))
for _, e := range events {
if e.Response.StatusCode < 200 || e.Response.StatusCode > 299 {
t.Errorf("Expected statuscode 2XX for sequence %d got %d", e.Response.Sequence, e.Response.StatusCode)
}
}
}

func channelableEventVersionNotUpgraded(ctx context.Context, t feature.T) {
channelName := state.GetStringOrFail(ctx, t, ChannelableNameKey)

source := feature.MakeRandomK8sName("source")
sink := feature.MakeRandomK8sName("sink")
sub := feature.MakeRandomK8sName("subscription")

event := test.FullEvent()
event.SetSpecVersion("0.3")

// Creating sink
eventshub.Install(sink, eventshub.StartReceiver)(ctx, t)

subscription.Install(sub,
subscription.WithChannel(channel_impl.AsRef(channelName)),
subscription.WithSubscriber(service.AsKReference(sink), ""),
)(ctx, t)

eventshub.Install(source, eventshub.StartSenderToResource(channel_impl.GVR(), channelName), eventshub.InputEvent(event))(ctx, t)

assert.OnStore(sink).
MatchEvent(test.HasId(event.ID()), test.HasSpecVersion("0.3")).
AtLeast(1)(ctx, t)
}

func addControlPlaneDelivery(fs *feature.FeatureSet) {
//Should("Channels SHOULD retry resending CloudEvents when they fail to either connect or send CloudEvents to subscribers.", todo).
//Should("Channels SHOULD support various retry configuration parameters: [the maximum number of retries]", todo).
Expand Down