Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 37 additions & 42 deletions pkg/channel/fanout/fanout_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ package fanout

import (
"context"
"encoding/json"
"errors"
"fmt"
nethttp "net/http"
"net/url"
"time"
Expand All @@ -36,7 +34,6 @@ import (
"go.uber.org/zap"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
)
Expand All @@ -46,16 +43,10 @@ const (
)

type Subscription struct {
eventingduck.SubscriberSpec
RetryConfig kncloudevents.RetryConfig
}

func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.SubscriberSpec)
}

func (s *Subscription) UnmarshalJSON(bytes []byte) error {
return json.Unmarshal(bytes, &s.SubscriberSpec)
Subscriber *url.URL
Reply *url.URL
DeadLetter *url.URL
RetryConfig *kncloudevents.RetryConfig
}

// Config for a fanout.MessageHandler.
Expand Down Expand Up @@ -96,23 +87,35 @@ func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDisp
}
handler.receiver = receiver

for i := range config.Subscriptions {
retriesConfig, err := retriesOf(config.Subscriptions[i].SubscriberSpec)
if err != nil {
return nil, fmt.Errorf("failed to create retries config from SubscriberSpec: %w", err)
}
config.Subscriptions[i].RetryConfig = retriesConfig
}

return handler, nil
}

func retriesOf(spec eventingduck.SubscriberSpec) (kncloudevents.RetryConfig, error) {
delivery := &eventingduckv1.DeliverySpec{}
func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscription, error) {
var destination *url.URL
if sub.SubscriberURI != nil {
destination = sub.SubscriberURI.URL()
}

var reply *url.URL
if sub.ReplyURI != nil {
reply = sub.ReplyURI.URL()
}

var deadLetter *url.URL
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
deadLetter = sub.Delivery.DeadLetterSink.URI.URL()
}

_ = spec.ConvertTo(context.Background(), delivery)
var retryConfig *kncloudevents.RetryConfig
if sub.Delivery != nil {
if rc, err := kncloudevents.RetryConfigFromDeliverySpec(*sub.Delivery); err != nil {
return nil, err
} else {
retryConfig = &rc
}
}

return kncloudevents.RetryConfigFromDeliverySpec(*delivery)
return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil
}

func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error {
Expand Down Expand Up @@ -198,21 +201,13 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M
// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) error {

var destination *url.URL
if sub.SubscriberURI != nil {
destination = sub.SubscriberURI.URL()
}

var reply *url.URL
if sub.ReplyURI != nil {
reply = sub.ReplyURI.URL()
}

var deadLetterURL *url.URL
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
deadLetterURL = sub.Delivery.DeadLetterSink.URI.URL()
}

return f.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetterURL, &sub.RetryConfig)
return f.dispatcher.DispatchMessageWithRetries(
ctx,
message,
additionalHeaders,
sub.Subscriber,
sub.Reply,
sub.DeadLetter,
sub.RetryConfig,
)
}
67 changes: 23 additions & 44 deletions pkg/channel/fanout/fanout_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ import (
"go.uber.org/zap"
"knative.dev/pkg/apis"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/channel"
)

// Domains used in subscriptions, which will be replaced by the real domains of the started HTTP
// servers.
var (
replaceSubscriber = apis.HTTP("replaceSubscriber")
replaceReplier = apis.HTTP("replaceReplier")
replaceSubscriber = apis.HTTP("replaceSubscriber").URL()
replaceReplier = apis.HTTP("replaceReplier").URL()
)

func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
Expand Down Expand Up @@ -77,9 +76,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
timeout: time.Millisecond,
subs: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
},
Subscriber: replaceSubscriber,
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -105,9 +102,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
"reply fails": {
subs: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
ReplyURI: replaceReplier,
},
Reply: replaceReplier,
},
},
replier: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -120,9 +115,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
"subscriber fails": {
subs: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
},
Subscriber: replaceSubscriber,
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -135,10 +128,8 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
"subscriber succeeds, result fails": {
subs: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
Subscriber: replaceSubscriber,
Reply: replaceReplier,
},
},
subscriber: callableSucceed,
Expand All @@ -153,10 +144,8 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
"one sub succeeds": {
subs: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
Subscriber: replaceSubscriber,
Reply: replaceReplier,
},
},
subscriber: callableSucceed,
Expand All @@ -171,16 +160,12 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
"one sub succeeds, one sub fails": {
subs: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
Subscriber: replaceSubscriber,
Reply: replaceReplier,
},
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
Subscriber: replaceSubscriber,
Reply: replaceReplier,
},
},
subscriber: callableSucceed,
Expand All @@ -193,22 +178,16 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
"all subs succeed": {
subs: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
Subscriber: replaceSubscriber,
Reply: replaceReplier,
},
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
Subscriber: replaceSubscriber,
Reply: replaceReplier,
},
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
Subscriber: replaceSubscriber,
Reply: replaceReplier,
},
},
subscriber: callableSucceed,
Expand Down Expand Up @@ -257,11 +236,11 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
// Rewrite the subs to use the servers we just started.
subs := make([]Subscription, 0)
for _, sub := range inSubs {
if sub.SubscriberURI == replaceSubscriber {
sub.SubscriberURI = apis.HTTP(subscriberServer.URL[7:]) // strip the leading 'http://'
if sub.Subscriber == replaceSubscriber {
sub.Subscriber = apis.HTTP(subscriberServer.URL[7:]).URL() // strip the leading 'http://'
}
if sub.ReplyURI == replaceReplier {
sub.ReplyURI = apis.HTTP(replyServer.URL[7:]) // strip the leading 'http://'
if sub.Reply == replaceReplier {
sub.Reply = apis.HTTP(replyServer.URL[7:]).URL() // strip the leading 'http://'
}
subs = append(subs, sub)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/channel/multichannelfanout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
// Config for a multichannelfanout.Handler.
type Config struct {
// The configuration of each channel in this handler.
ChannelConfigs []ChannelConfig `json:"channelConfigs"`
ChannelConfigs []ChannelConfig
}

// ChannelConfig is the configuration for a single Channel.
type ChannelConfig struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
HostName string `json:"hostname"`
FanoutConfig fanout.Config `json:"fanoutConfig"`
Namespace string
Name string
HostName string
FanoutConfig fanout.Config
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ import (
"go.uber.org/zap/zaptest"
"knative.dev/pkg/apis"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
)

var (
// The httptest.Server's host name will replace this value in all ChannelConfigs.
replaceDomain = apis.HTTP("replaceDomain")
replaceDomain = apis.HTTP("replaceDomain").URL()
)

func TestNewMessageHandler(t *testing.T) {
Expand Down Expand Up @@ -94,9 +93,7 @@ func TestCopyMessageHandlerWithNewConfig(t *testing.T) {
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: apis.HTTP("subscriberdomain"),
},
Subscriber: apis.HTTP("subscriberdomain").URL(),
},
},
},
Expand All @@ -111,9 +108,7 @@ func TestCopyMessageHandlerWithNewConfig(t *testing.T) {
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
ReplyURI: apis.HTTP("replydomain"),
},
Reply: apis.HTTP("replydomain").URL(),
},
},
},
Expand Down Expand Up @@ -162,9 +157,7 @@ func TestConfigDiffMessageHandler(t *testing.T) {
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: apis.HTTP("subscriberdomain"),
},
Subscriber: apis.HTTP("subscriberdomain").URL(),
},
},
},
Expand Down Expand Up @@ -194,9 +187,7 @@ func TestConfigDiffMessageHandler(t *testing.T) {
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: apis.HTTP("different"),
},
Subscriber: apis.HTTP("different").URL(),
},
},
},
Expand Down Expand Up @@ -250,9 +241,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
ReplyURI: replaceDomain,
},
Reply: replaceDomain,
},
},
},
Expand All @@ -274,9 +263,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
ReplyURI: apis.HTTP("first-to-domain"),
},
Reply: apis.HTTP("first-to-domain").URL(),
},
},
},
Expand All @@ -288,9 +275,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceDomain,
},
Subscriber: replaceDomain,
},
},
},
Expand Down Expand Up @@ -360,11 +345,11 @@ func fakeHandler(statusCode int) http.HandlerFunc {
func replaceDomains(config Config, replacement string) {
for i, cc := range config.ChannelConfigs {
for j, sub := range cc.FanoutConfig.Subscriptions {
if sub.SubscriberURI == replaceDomain {
sub.SubscriberURI = apis.HTTP(replacement)
if sub.Subscriber == replaceDomain {
sub.Subscriber = apis.HTTP(replacement).URL()
}
if sub.ReplyURI == replaceDomain {
sub.ReplyURI = apis.HTTP(replacement)
if sub.Reply == replaceDomain {
sub.Reply = apis.HTTP(replacement).URL()
}
cc.FanoutConfig.Subscriptions[j] = sub
}
Expand Down
Loading