diff --git a/pkg/channel/fanout/fanout_message_handler.go b/pkg/channel/fanout/fanout_message_handler.go index 9ad4cc4b42f..3b79fe6a8e8 100644 --- a/pkg/channel/fanout/fanout_message_handler.go +++ b/pkg/channel/fanout/fanout_message_handler.go @@ -23,9 +23,7 @@ package fanout import ( "context" - "encoding/json" "errors" - "fmt" nethttp "net/http" "net/url" "time" @@ -36,7 +34,6 @@ import ( "go.uber.org/zap" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/kncloudevents" ) @@ -46,16 +43,10 @@ const ( ) type Subscription struct { - eventingduck.SubscriberSpec - RetryConfig kncloudevents.RetryConfig -} - -func (s *Subscription) MarshalJSON() ([]byte, error) { - return json.Marshal(s.SubscriberSpec) -} - -func (s *Subscription) UnmarshalJSON(bytes []byte) error { - return json.Unmarshal(bytes, &s.SubscriberSpec) + Subscriber *url.URL + Reply *url.URL + DeadLetter *url.URL + RetryConfig *kncloudevents.RetryConfig } // Config for a fanout.MessageHandler. @@ -96,23 +87,35 @@ func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDisp } handler.receiver = receiver - for i := range config.Subscriptions { - retriesConfig, err := retriesOf(config.Subscriptions[i].SubscriberSpec) - if err != nil { - return nil, fmt.Errorf("failed to create retries config from SubscriberSpec: %w", err) - } - config.Subscriptions[i].RetryConfig = retriesConfig - } - return handler, nil } -func retriesOf(spec eventingduck.SubscriberSpec) (kncloudevents.RetryConfig, error) { - delivery := &eventingduckv1.DeliverySpec{} +func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscription, error) { + var destination *url.URL + if sub.SubscriberURI != nil { + destination = sub.SubscriberURI.URL() + } + + var reply *url.URL + if sub.ReplyURI != nil { + reply = sub.ReplyURI.URL() + } + + var deadLetter *url.URL + if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil { + deadLetter = sub.Delivery.DeadLetterSink.URI.URL() + } - _ = spec.ConvertTo(context.Background(), delivery) + var retryConfig *kncloudevents.RetryConfig + if sub.Delivery != nil { + if rc, err := kncloudevents.RetryConfigFromDeliverySpec(*sub.Delivery); err != nil { + return nil, err + } else { + retryConfig = &rc + } + } - return kncloudevents.RetryConfigFromDeliverySpec(*delivery) + return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil } func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error { @@ -198,21 +201,13 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M // makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and // the `sink` portions of the subscription. func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) error { - - var destination *url.URL - if sub.SubscriberURI != nil { - destination = sub.SubscriberURI.URL() - } - - var reply *url.URL - if sub.ReplyURI != nil { - reply = sub.ReplyURI.URL() - } - - var deadLetterURL *url.URL - if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil { - deadLetterURL = sub.Delivery.DeadLetterSink.URI.URL() - } - - return f.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetterURL, &sub.RetryConfig) + return f.dispatcher.DispatchMessageWithRetries( + ctx, + message, + additionalHeaders, + sub.Subscriber, + sub.Reply, + sub.DeadLetter, + sub.RetryConfig, + ) } diff --git a/pkg/channel/fanout/fanout_message_handler_test.go b/pkg/channel/fanout/fanout_message_handler_test.go index 4ff477f81a9..060556f5705 100644 --- a/pkg/channel/fanout/fanout_message_handler_test.go +++ b/pkg/channel/fanout/fanout_message_handler_test.go @@ -33,15 +33,14 @@ import ( "go.uber.org/zap" "knative.dev/pkg/apis" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/channel" ) // Domains used in subscriptions, which will be replaced by the real domains of the started HTTP // servers. var ( - replaceSubscriber = apis.HTTP("replaceSubscriber") - replaceReplier = apis.HTTP("replaceReplier") + replaceSubscriber = apis.HTTP("replaceSubscriber").URL() + replaceReplier = apis.HTTP("replaceReplier").URL() ) func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { @@ -77,9 +76,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { timeout: time.Millisecond, subs: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - }, + Subscriber: replaceSubscriber, }, }, subscriber: func(writer http.ResponseWriter, _ *http.Request) { @@ -105,9 +102,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { "reply fails": { subs: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - ReplyURI: replaceReplier, - }, + Reply: replaceReplier, }, }, replier: func(writer http.ResponseWriter, _ *http.Request) { @@ -120,9 +115,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { "subscriber fails": { subs: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - }, + Subscriber: replaceSubscriber, }, }, subscriber: func(writer http.ResponseWriter, _ *http.Request) { @@ -135,10 +128,8 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { "subscriber succeeds, result fails": { subs: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - ReplyURI: replaceReplier, - }, + Subscriber: replaceSubscriber, + Reply: replaceReplier, }, }, subscriber: callableSucceed, @@ -153,10 +144,8 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { "one sub succeeds": { subs: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - ReplyURI: replaceReplier, - }, + Subscriber: replaceSubscriber, + Reply: replaceReplier, }, }, subscriber: callableSucceed, @@ -171,16 +160,12 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { "one sub succeeds, one sub fails": { subs: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - ReplyURI: replaceReplier, - }, + Subscriber: replaceSubscriber, + Reply: replaceReplier, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - ReplyURI: replaceReplier, - }, + Subscriber: replaceSubscriber, + Reply: replaceReplier, }, }, subscriber: callableSucceed, @@ -193,22 +178,16 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { "all subs succeed": { subs: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - ReplyURI: replaceReplier, - }, + Subscriber: replaceSubscriber, + Reply: replaceReplier, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - ReplyURI: replaceReplier, - }, + Subscriber: replaceSubscriber, + Reply: replaceReplier, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceSubscriber, - ReplyURI: replaceReplier, - }, + Subscriber: replaceSubscriber, + Reply: replaceReplier, }, }, subscriber: callableSucceed, @@ -257,11 +236,11 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb // Rewrite the subs to use the servers we just started. subs := make([]Subscription, 0) for _, sub := range inSubs { - if sub.SubscriberURI == replaceSubscriber { - sub.SubscriberURI = apis.HTTP(subscriberServer.URL[7:]) // strip the leading 'http://' + if sub.Subscriber == replaceSubscriber { + sub.Subscriber = apis.HTTP(subscriberServer.URL[7:]).URL() // strip the leading 'http://' } - if sub.ReplyURI == replaceReplier { - sub.ReplyURI = apis.HTTP(replyServer.URL[7:]) // strip the leading 'http://' + if sub.Reply == replaceReplier { + sub.Reply = apis.HTTP(replyServer.URL[7:]).URL() // strip the leading 'http://' } subs = append(subs, sub) } diff --git a/pkg/channel/multichannelfanout/config.go b/pkg/channel/multichannelfanout/config.go index d0acac96f2e..d3d0fcef71f 100644 --- a/pkg/channel/multichannelfanout/config.go +++ b/pkg/channel/multichannelfanout/config.go @@ -23,13 +23,13 @@ import ( // Config for a multichannelfanout.Handler. type Config struct { // The configuration of each channel in this handler. - ChannelConfigs []ChannelConfig `json:"channelConfigs"` + ChannelConfigs []ChannelConfig } // ChannelConfig is the configuration for a single Channel. type ChannelConfig struct { - Namespace string `json:"namespace"` - Name string `json:"name"` - HostName string `json:"hostname"` - FanoutConfig fanout.Config `json:"fanoutConfig"` + Namespace string + Name string + HostName string + FanoutConfig fanout.Config } diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go b/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go index 83850e2f9ad..dec895bf685 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go @@ -30,14 +30,13 @@ import ( "go.uber.org/zap/zaptest" "knative.dev/pkg/apis" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" ) var ( // The httptest.Server's host name will replace this value in all ChannelConfigs. - replaceDomain = apis.HTTP("replaceDomain") + replaceDomain = apis.HTTP("replaceDomain").URL() ) func TestNewMessageHandler(t *testing.T) { @@ -94,9 +93,7 @@ func TestCopyMessageHandlerWithNewConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: apis.HTTP("subscriberdomain"), - }, + Subscriber: apis.HTTP("subscriberdomain").URL(), }, }, }, @@ -111,9 +108,7 @@ func TestCopyMessageHandlerWithNewConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - ReplyURI: apis.HTTP("replydomain"), - }, + Reply: apis.HTTP("replydomain").URL(), }, }, }, @@ -162,9 +157,7 @@ func TestConfigDiffMessageHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: apis.HTTP("subscriberdomain"), - }, + Subscriber: apis.HTTP("subscriberdomain").URL(), }, }, }, @@ -194,9 +187,7 @@ func TestConfigDiffMessageHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: apis.HTTP("different"), - }, + Subscriber: apis.HTTP("different").URL(), }, }, }, @@ -250,9 +241,7 @@ func TestServeHTTPMessageHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - ReplyURI: replaceDomain, - }, + Reply: replaceDomain, }, }, }, @@ -274,9 +263,7 @@ func TestServeHTTPMessageHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - ReplyURI: apis.HTTP("first-to-domain"), - }, + Reply: apis.HTTP("first-to-domain").URL(), }, }, }, @@ -288,9 +275,7 @@ func TestServeHTTPMessageHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceDomain, - }, + Subscriber: replaceDomain, }, }, }, @@ -360,11 +345,11 @@ func fakeHandler(statusCode int) http.HandlerFunc { func replaceDomains(config Config, replacement string) { for i, cc := range config.ChannelConfigs { for j, sub := range cc.FanoutConfig.Subscriptions { - if sub.SubscriberURI == replaceDomain { - sub.SubscriberURI = apis.HTTP(replacement) + if sub.Subscriber == replaceDomain { + sub.Subscriber = apis.HTTP(replacement).URL() } - if sub.ReplyURI == replaceDomain { - sub.ReplyURI = apis.HTTP(replacement) + if sub.Reply == replaceDomain { + sub.Reply = apis.HTTP(replacement).URL() } cc.FanoutConfig.Subscriptions[j] = sub } diff --git a/pkg/channel/swappable/swappable_message_handler_test.go b/pkg/channel/swappable/swappable_message_handler_test.go index 9250de335cc..df616864308 100644 --- a/pkg/channel/swappable/swappable_message_handler_test.go +++ b/pkg/channel/swappable/swappable_message_handler_test.go @@ -29,13 +29,12 @@ import ( "go.uber.org/zap/zaptest" "knative.dev/pkg/apis" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/channel/multichannelfanout" ) -var replaceDomain = apis.HTTP("replaceDomain") +var replaceDomain = apis.HTTP("replaceDomain").URL() const ( hostName = "a.b.c.d" @@ -54,9 +53,7 @@ func TestMessageHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceDomain, - }, + Subscriber: replaceDomain, }, }, }, @@ -70,9 +67,7 @@ func TestMessageHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - ReplyURI: replaceDomain, - }, + Reply: replaceDomain, }, }, }, @@ -109,9 +104,7 @@ func TestMessageHandler_InvalidConfigChange(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []fanout.Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - SubscriberURI: replaceDomain, - }, + Subscriber: replaceDomain, }, }, }, @@ -222,11 +215,11 @@ func successHandler(w http.ResponseWriter, r *http.Request) { func replaceDomains(c multichannelfanout.Config, replacement string) multichannelfanout.Config { for i, cc := range c.ChannelConfigs { for j, sub := range cc.FanoutConfig.Subscriptions { - if sub.ReplyURI == replaceDomain { - sub.ReplyURI = apis.HTTP(replacement) + if sub.Subscriber == replaceDomain { + sub.Subscriber = apis.HTTP(replacement).URL() } - if sub.SubscriberURI == replaceDomain { - sub.SubscriberURI = apis.HTTP(replacement) + if sub.Reply == replaceDomain { + sub.Reply = apis.HTTP(replacement).URL() } cc.FanoutConfig.Subscriptions[j] = sub } diff --git a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go index e9157dd7c3f..ce641d9065c 100644 --- a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go +++ b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go @@ -28,7 +28,6 @@ import ( "github.com/cloudevents/sdk-go/v2/test" "go.uber.org/zap" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/channel/multichannelfanout" @@ -73,12 +72,8 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { FanoutConfig: fanout.Config{ AsyncHandler: false, Subscriptions: []fanout.Subscription{{ - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "aaaa", - Generation: 1, - SubscriberURI: transformationsUrl, - ReplyURI: channelBUrl, - }, + Subscriber: transformationsUrl.URL(), + Reply: channelBUrl.URL(), }}, }, }, @@ -89,11 +84,7 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { FanoutConfig: fanout.Config{ AsyncHandler: false, Subscriptions: []fanout.Subscription{{ - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "bbbb", - Generation: 1, - SubscriberURI: receiverUrl, - }, + Subscriber: receiverUrl.URL(), }}, }, }, diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index 894c8f88996..20071566be5 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -34,9 +34,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" "knative.dev/pkg/apis" - duckv1 "knative.dev/pkg/apis/duck/v1" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/channel/multichannelfanout" @@ -232,22 +230,12 @@ func TestDispatcher_dispatch(t *testing.T) { FanoutConfig: fanout.Config{ AsyncHandler: false, Subscriptions: []fanout.Subscription{{ - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "aaaa", - Generation: 1, - SubscriberURI: mustParseUrl(t, transformationsServer.URL), - ReplyURI: mustParseUrl(t, channelBProxy.URL), - }, + Subscriber: mustParseUrl(t, transformationsServer.URL).URL(), + Reply: mustParseUrl(t, channelBProxy.URL).URL(), }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "cccc", - Generation: 1, - SubscriberURI: mustParseUrl(t, transformationsFailureServer.URL), - ReplyURI: mustParseUrl(t, channelBProxy.URL), - Delivery: &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{URI: mustParseUrl(t, deadLetterServer.URL)}, - }, - }, + Subscriber: mustParseUrl(t, transformationsFailureServer.URL).URL(), + Reply: mustParseUrl(t, channelBProxy.URL).URL(), + DeadLetter: mustParseUrl(t, deadLetterServer.URL).URL(), }}, }, }, @@ -258,11 +246,7 @@ func TestDispatcher_dispatch(t *testing.T) { FanoutConfig: fanout.Config{ AsyncHandler: false, Subscriptions: []fanout.Subscription{{ - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "bbbb", - Generation: 1, - SubscriberURI: mustParseUrl(t, receiverServer.URL), - }, + Subscriber: mustParseUrl(t, receiverServer.URL).URL(), }}, }, }, diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index c68aca38c2a..6b48c19d91a 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -18,7 +18,6 @@ package dispatcher import ( "context" - "fmt" "knative.dev/pkg/reconciler" @@ -32,7 +31,6 @@ import ( "knative.dev/eventing/pkg/channel/multichannelfanout" listers "knative.dev/eventing/pkg/client/listers/messaging/v1beta1" "knative.dev/eventing/pkg/inmemorychannel" - "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/logging" ) @@ -84,24 +82,17 @@ func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1beta1.InMemoryC subs := make([]fanout.Subscription, len(c.Spec.Subscribers)) for _, sub := range c.Spec.Subscribers { - retriesConfig := kncloudevents.NoRetries() - if sub.Delivery != nil && sub.Delivery.Retry != nil { - delivery := eventingduckv1.DeliverySpec{} - err := sub.Delivery.ConvertTo(context.Background(), &delivery) - if err != nil { - return nil, err - } - _retriesConfig, err := kncloudevents.RetryConfigFromDeliverySpec(delivery) - if err != nil { - return nil, fmt.Errorf("failed to create retries config: %w", err) - } - retriesConfig = _retriesConfig + subSpecV1 := eventingduckv1.SubscriberSpec{} + if err := sub.ConvertTo(context.TODO(), &subSpecV1); err != nil { + return nil, err } - subs = append(subs, fanout.Subscription{ - SubscriberSpec: sub, - RetryConfig: retriesConfig, - }) + conf, err := fanout.SubscriberSpecToFanoutConfig(subSpecV1) + if err != nil { + return nil, err + } + + subs = append(subs, *conf) } channelConfig := multichannelfanout.ChannelConfig{