diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index d2c2e77fd9..6b73e66704 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -18,12 +18,10 @@ package dispatcher import ( "context" "encoding/json" - "errors" "fmt" nethttp "net/http" "strings" "sync" - "sync/atomic" "github.com/Shopify/sarama" protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" @@ -33,16 +31,18 @@ import ( "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/logging" + + eventingchannels "knative.dev/eventing/pkg/channel" + "knative.dev/eventing/pkg/channel/fanout" + "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" "knative.dev/eventing-kafka/pkg/common/client" "knative.dev/eventing-kafka/pkg/common/consumer" "knative.dev/eventing-kafka/pkg/common/tracing" - eventingchannels "knative.dev/eventing/pkg/channel" - "knative.dev/eventing/pkg/channel/fanout" - "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/pkg/kmeta" ) const ( @@ -50,19 +50,20 @@ const ( ) type KafkaDispatcher struct { - hostToChannelMap atomic.Value - // hostToChannelMapLock is used to update hostToChannelMap - hostToChannelMapLock sync.Mutex - receiver *eventingchannels.MessageReceiver dispatcher *eventingchannels.MessageDispatcherImpl - kafkaSyncProducer sarama.SyncProducer - channelSubscriptions map[eventingchannels.ChannelReference]*KafkaSubscription + // Receiver data structures + // map[string]eventingchannels.ChannelReference + hostToChannelMap sync.Map + kafkaSyncProducer sarama.SyncProducer + + // Dispatcher data structures + // consumerUpdateLock must be used to update all the below maps + consumerUpdateLock sync.Mutex + channelSubscriptions map[types.NamespacedName]*KafkaSubscription subsConsumerGroups map[types.UID]sarama.ConsumerGroup subscriptions map[types.UID]Subscription - // consumerUpdateLock must be used to update kafkaConsumers - consumerUpdateLock sync.Mutex kafkaConsumerFactory consumer.KafkaConsumerGroupFactory topicFunc TopicFunc @@ -106,7 +107,7 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request return } channelRefNamespace, channelRefName := uriSplit[1], uriSplit[2] - channelRef := eventingchannels.ChannelReference{ + channelRef := types.NamespacedName{ Name: channelRefName, Namespace: channelRefNamespace, } @@ -156,13 +157,13 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat }) dispatcher := &KafkaDispatcher{ - dispatcher: eventingchannels.NewMessageDispatcher(args.Logger.Desugar()), + dispatcher: eventingchannels.NewMessageDispatcher(logging.FromContext(ctx).Desugar()), kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, conf), - channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), + channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), subscriptions: make(map[types.UID]Subscription), kafkaSyncProducer: producer, - logger: args.Logger, + logger: logging.FromContext(ctx), topicFunc: args.TopicFunc, } @@ -170,11 +171,11 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat dispatcher.logger.Fatal(nethttp.ListenAndServe(":8081", dispatcher)) }() - podName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.PodNameEnvVarKey) + podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey) if err != nil { return nil, err } - containerName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.ContainerNameEnvVarKey) + containerName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.ContainerNameEnvVarKey) if err != nil { return nil, err } @@ -203,7 +204,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat return err }, - args.Logger.Desugar(), + logging.FromContext(ctx).Desugar(), reporter, eventingchannels.ResolveMessageChannelFromHostHeader(dispatcher.getChannelReferenceFromHost)) if err != nil { @@ -211,7 +212,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat } dispatcher.receiver = receiverFunc - dispatcher.setHostToChannelMap(map[string]eventingchannels.ChannelReference{}) return dispatcher, nil } @@ -223,12 +223,6 @@ type KafkaDispatcherArgs struct { Brokers []string KafkaAuthConfig *client.KafkaAuthConfig TopicFunc TopicFunc - Logger *zap.SugaredLogger -} - -type Config struct { - // The configuration of each channel in this handler. - ChannelConfigs []ChannelConfig } type ChannelConfig struct { @@ -238,120 +232,138 @@ type ChannelConfig struct { Subscriptions []Subscription } -// UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller. -func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) { - if config == nil { - return nil, fmt.Errorf("nil config") +func (cc ChannelConfig) SubscriptionsUIDs() []string { + res := make([]string, 0, len(cc.Subscriptions)) + for _, s := range cc.Subscriptions { + res = append(res, string(s.UID)) } + return res +} + +// UpdateError is the error returned from the ReconcileConsumers method, with the details of which +// subscriptions failed to subscribe to. +type UpdateError map[types.UID]error + +func (k UpdateError) Error() string { + errs := make([]string, 0, len(k)) + for uid, err := range k { + errs = append(errs, fmt.Sprintf("subscription %s: %v", uid, err)) + } + return strings.Join(errs, ",") +} + +// ReconcileConsumers will be called by new CRD based kafka channel dispatcher controller. +func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error { + channelNamespacedName := types.NamespacedName{ + Namespace: config.Namespace, + Name: config.Name, + } + + // Aux data structures to reconcile + toAddSubs := make(map[types.UID]Subscription) + toRemoveSubs := sets.NewString() d.consumerUpdateLock.Lock() defer d.consumerUpdateLock.Unlock() - var newSubs []types.UID - failedToSubscribe := make(map[types.UID]error) - for _, cc := range config.ChannelConfigs { - channelRef := eventingchannels.ChannelReference{ - Name: cc.Name, - Namespace: cc.Namespace, - } - for _, subSpec := range cc.Subscriptions { - newSubs = append(newSubs, subSpec.UID) - - // Check if sub already exists - exists := false - if _, ok := d.channelSubscriptions[channelRef]; ok { - for _, s := range d.channelSubscriptions[channelRef].subs { - if s == subSpec.UID { - exists = true - } - } - } else { //ensure the pointer is populated or things go boom - d.channelSubscriptions[channelRef] = &KafkaSubscription{ - logger: d.logger, - subs: []types.UID{}, - channelReadySubscriptions: map[string]sets.Int32{}, - } - } + // This loop takes care of filling toAddSubs and toRemoveSubs for new and existing channels + thisChannelKafkaSubscriptions := d.channelSubscriptions[channelNamespacedName] - if !exists { - // only subscribe when not exists in channel-subscriptions map - // do not need to resubscribe every time channel fanout config is updated - if err := d.subscribe(channelRef, subSpec); err != nil { - failedToSubscribe[subSpec.UID] = err - } - } - } + var existingSubsForThisChannel sets.String + if thisChannelKafkaSubscriptions != nil { + existingSubsForThisChannel = thisChannelKafkaSubscriptions.subs + } else { + existingSubsForThisChannel = sets.NewString() } - d.logger.Debug("Number of new subs", zap.Any("subs", len(newSubs))) - d.logger.Debug("Number of subs failed to subscribe", zap.Any("subs", len(failedToSubscribe))) + newSubsForThisChannel := sets.NewString(config.SubscriptionsUIDs()...) - // Unsubscribe and close consumer for any deleted subscriptions - subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID) - for channelRef, actualSubs := range d.channelSubscriptions { - subsToRemove[channelRef] = uidSetDifference(actualSubs.subs, newSubs) - } + // toRemoveSubs += existing subs of this channel - new subs of this channel + thisChannelToRemoveSubs := existingSubsForThisChannel.Difference(newSubsForThisChannel).UnsortedList() + toRemoveSubs.Insert( + thisChannelToRemoveSubs..., + ) - for channelRef, subs := range subsToRemove { - for _, s := range subs { - if err := d.unsubscribe(channelRef, d.subscriptions[s]); err != nil { - return nil, err - } + // toAddSubs += new subs of this channel - existing subs of this channel + thisChannelToAddSubs := newSubsForThisChannel.Difference(existingSubsForThisChannel) + for _, subSpec := range config.Subscriptions { + if thisChannelToAddSubs.Has(string(subSpec.UID)) { + toAddSubs[subSpec.UID] = subSpec } - d.channelSubscriptions[channelRef].subs = newSubs } - return failedToSubscribe, nil -} + d.logger.Debug("Number of new subs", zap.Any("subs", len(toAddSubs))) + d.logger.Debug("Number of old subs", zap.Any("subs", len(toRemoveSubs))) + + failedToSubscribe := make(UpdateError) + for subUid, subSpec := range toAddSubs { + if err := d.subscribe(channelNamespacedName, subSpec); err != nil { + failedToSubscribe[subUid] = err + } + } + d.logger.Debug("Number of subs failed to subscribe", zap.Any("subs", len(failedToSubscribe))) -func uidSetDifference(a, b []types.UID) (diff []types.UID) { - m := make(map[types.UID]bool) + for _, subUid := range toRemoveSubs.UnsortedList() { + // We don't signal to the caller the unsubscribe invocation + if err := d.unsubscribe(channelNamespacedName, d.subscriptions[types.UID(subUid)]); err != nil { + d.logger.Warnw("Error while unsubscribing", zap.Error(err)) + } + } - for _, item := range b { - m[item] = true + if len(failedToSubscribe) == 0 { + return nil } + return failedToSubscribe +} - for _, item := range a { - if _, ok := m[item]; !ok { - diff = append(diff, item) +// RegisterChannelHost adds a new channel to the host-channel mapping. +func (d *KafkaDispatcher) RegisterChannelHost(channelConfig *ChannelConfig) error { + old, ok := d.hostToChannelMap.LoadOrStore(channelConfig.HostName, eventingchannels.ChannelReference{ + Name: channelConfig.Name, + Namespace: channelConfig.Namespace, + }) + if ok { + oldChannelRef := old.(eventingchannels.ChannelReference) + if !(oldChannelRef.Namespace == channelConfig.Namespace && oldChannelRef.Name == channelConfig.Name) { + // If something is already there, but it's not the same channel, then fail + return fmt.Errorf( + "duplicate hostName found. Each channel must have a unique host header. HostName:%s, channel:%s.%s, channel:%s.%s", + channelConfig.HostName, + old.(eventingchannels.ChannelReference).Namespace, + old.(eventingchannels.ChannelReference).Name, + channelConfig.Namespace, + channelConfig.Name, + ) } } - return + return nil } -// UpdateHostToChannelMap will be called by new CRD based kafka channel dispatcher controller. -func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error { - if config == nil { - return errors.New("nil config") +func (d *KafkaDispatcher) CleanupChannel(name, namespace, hostname string) error { + channelRef := types.NamespacedName{ + Name: name, + Namespace: namespace, } - d.hostToChannelMapLock.Lock() - defer d.hostToChannelMapLock.Unlock() + // Remove from the hostToChannel map the mapping with this channel + d.hostToChannelMap.Delete(hostname) - hcMap, err := createHostToChannelMap(config) - if err != nil { - return err - } + // Remove all subs + d.consumerUpdateLock.Lock() + defer d.consumerUpdateLock.Unlock() - d.setHostToChannelMap(hcMap) - return nil -} + if d.channelSubscriptions[channelRef] == nil { + // No subs to remove + return nil + } -func createHostToChannelMap(config *Config) (map[string]eventingchannels.ChannelReference, error) { - hcMap := make(map[string]eventingchannels.ChannelReference, len(config.ChannelConfigs)) - for _, cConfig := range config.ChannelConfigs { - if cr, ok := hcMap[cConfig.HostName]; ok { - return nil, fmt.Errorf( - "duplicate hostName found. Each channel must have a unique host header. HostName:%s, channel:%s.%s, channel:%s.%s", - cConfig.HostName, - cConfig.Namespace, - cConfig.Name, - cr.Namespace, - cr.Name) + for _, s := range d.channelSubscriptions[channelRef].subs.UnsortedList() { + if err := d.unsubscribe(channelRef, d.subscriptions[types.UID(s)]); err != nil { + return err } - hcMap[cConfig.HostName] = eventingchannels.ChannelReference{Name: cConfig.Name, Namespace: cConfig.Namespace} } - return hcMap, nil + + return nil } // Start starts the kafka dispatcher's message processing. @@ -365,15 +377,24 @@ func (d *KafkaDispatcher) Start(ctx context.Context) error { // subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. // subscribe must be called under updateLock. -func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub Subscription) error { +func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscription) error { d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID)) + topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name) groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID)) + + // Get or create the channel kafka subscription + kafkaSubscription, ok := d.channelSubscriptions[channelRef] + if !ok { + kafkaSubscription = NewKafkaSubscription(d.logger) + d.channelSubscriptions[channelRef] = kafkaSubscription + } + handler := &consumerMessageHandler{ d.logger, sub, d.dispatcher, - d.channelSubscriptions[channelRef], + kafkaSubscription, groupID, } d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef), @@ -394,7 +415,8 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference } }() - d.channelSubscriptions[channelRef].subs = append(d.channelSubscriptions[channelRef].subs, sub.UID) + // Update the data structures that holds the reconciliation data + kafkaSubscription.subs.Insert(string(sub.UID)) d.subscriptions[sub.UID] = sub d.subsConsumerGroups[sub.UID] = consumerGroup @@ -403,42 +425,37 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference // unsubscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. // unsubscribe must be called under updateLock. -func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error { - d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub.UID)) +func (d *KafkaDispatcher) unsubscribe(channelRef types.NamespacedName, sub Subscription) error { + d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channelRef), zap.Any("subscription", sub.UID)) + + // Remove the sub spec delete(d.subscriptions, sub.UID) - if _, ok := d.channelSubscriptions[channel]; !ok { + + // Remove the sub from the channel + kafkaSubscription, ok := d.channelSubscriptions[channelRef] + if !ok { + // If this happens, then there's a bug somewhere... return nil } - if subsSlice := d.channelSubscriptions[channel].subs; subsSlice != nil { - var newSlice []types.UID - for _, oldSub := range subsSlice { - if oldSub != sub.UID { - newSlice = append(newSlice, oldSub) - } - } - d.channelSubscriptions[channel].subs = newSlice + kafkaSubscription.subs.Delete(string(sub.UID)) + if kafkaSubscription.subs.Len() == 0 { + // We can get rid of this + delete(d.channelSubscriptions, channelRef) } - if consumer, ok := d.subsConsumerGroups[sub.UID]; ok { + + // Delete the consumer group + if consumerGroup, ok := d.subsConsumerGroups[sub.UID]; ok { delete(d.subsConsumerGroups, sub.UID) - d.logger.Debugw("Closing cached consumer group", zap.Any("consumer group", consumer)) - return consumer.Close() + d.logger.Debugw("Closing cached consumerGroup group", zap.Any("consumer group", consumerGroup)) + return consumerGroup.Close() } return nil } -func (d *KafkaDispatcher) getHostToChannelMap() map[string]eventingchannels.ChannelReference { - return d.hostToChannelMap.Load().(map[string]eventingchannels.ChannelReference) -} - -func (d *KafkaDispatcher) setHostToChannelMap(hcMap map[string]eventingchannels.ChannelReference) { - d.hostToChannelMap.Store(hcMap) -} - func (d *KafkaDispatcher) getChannelReferenceFromHost(host string) (eventingchannels.ChannelReference, error) { - chMap := d.getHostToChannelMap() - cr, ok := chMap[host] + cr, ok := d.hostToChannelMap.Load(host) if !ok { - return cr, eventingchannels.UnknownHostError(host) + return eventingchannels.ChannelReference{}, eventingchannels.UnknownHostError(host) } - return cr, nil + return cr.(eventingchannels.ChannelReference), nil } diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_it_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_it_test.go index 7304d231f8..0c7de46915 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_it_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_it_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/transformer" protocolhttp "github.com/cloudevents/sdk-go/v2/protocol/http" @@ -69,7 +71,6 @@ func TestDispatcher(t *testing.T) { ClientID: "testing", Brokers: []string{"localhost:9092"}, TopicFunc: utils.TopicName, - Logger: logger.Sugar(), } // Create the dispatcher. At this point, if Kafka is not up, this thing fails @@ -149,58 +150,46 @@ func TestDispatcher(t *testing.T) { ) // send -> channela -> sub with transformationServer and reply to channelb -> channelb -> sub with receiver -> receiver - config := Config{ - ChannelConfigs: []ChannelConfig{ + channelAConfig := &ChannelConfig{ + Namespace: "default", + Name: "channela", + HostName: "channela.svc", + Subscriptions: []Subscription{ { - Namespace: "default", - Name: "channela", - HostName: "channela.svc", - Subscriptions: []Subscription{ - { - UID: "aaaa", - Subscription: fanout.Subscription{ - Subscriber: mustParseUrl(t, transformationsServer.URL), - Reply: mustParseUrl(t, channelBProxy.URL), - }, - }, - { - UID: "cccc", - Subscription: fanout.Subscription{ - Subscriber: mustParseUrl(t, transformationsFailureServer.URL), - Reply: mustParseUrl(t, channelBProxy.URL), - DeadLetter: mustParseUrl(t, deadLetterServer.URL), - }, - }, + UID: "aaaa", + Subscription: fanout.Subscription{ + Subscriber: mustParseUrl(t, transformationsServer.URL), + Reply: mustParseUrl(t, channelBProxy.URL), }, }, { - Namespace: "default", - Name: "channelb", - HostName: "channelb.svc", - Subscriptions: []Subscription{ - { - UID: "bbbb", - Subscription: fanout.Subscription{ - Subscriber: mustParseUrl(t, receiverServer.URL), - }, - }, + UID: "cccc", + Subscription: fanout.Subscription{ + Subscriber: mustParseUrl(t, transformationsFailureServer.URL), + Reply: mustParseUrl(t, channelBProxy.URL), + DeadLetter: mustParseUrl(t, deadLetterServer.URL), }, }, }, } + require.NoError(t, dispatcher.RegisterChannelHost(channelAConfig)) + require.NoError(t, dispatcher.ReconcileConsumers(channelAConfig)) - err = dispatcher.UpdateHostToChannelMap(&config) - if err != nil { - t.Fatal(err) - } - - failed, err := dispatcher.UpdateKafkaConsumers(&config) - if err != nil { - t.Fatal(err) - } - if len(failed) != 0 { - t.Fatal(err) + channelBConfig := &ChannelConfig{ + Namespace: "default", + Name: "channelb", + HostName: "channelb.svc", + Subscriptions: []Subscription{ + { + UID: "bbbb", + Subscription: fanout.Subscription{ + Subscriber: mustParseUrl(t, receiverServer.URL), + }, + }, + }, } + require.NoError(t, dispatcher.RegisterChannelHost(channelBConfig)) + require.NoError(t, dispatcher.ReconcileConsumers(channelBConfig)) time.Sleep(5 * time.Second) @@ -233,18 +222,8 @@ func TestDispatcher(t *testing.T) { receiverWg.Wait() // Try to close consumer groups - err = dispatcher.UpdateHostToChannelMap(&Config{}) - if err != nil { - t.Fatal(err) - } - - failed, err = dispatcher.UpdateKafkaConsumers(&Config{}) - if err != nil { - t.Fatal(err) - } - if len(failed) != 0 { - t.Fatal(err) - } + require.NoError(t, dispatcher.CleanupChannel("channela", "default", "channela.svc")) + require.NoError(t, dispatcher.CleanupChannel("channelb", "default", "channelb.svc")) } func createReverseProxy(t *testing.T, host string) *httputil.ReverseProxy { diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_test.go index f968bba865..773a0a1a3d 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_test.go @@ -24,11 +24,14 @@ import ( "net/http" "net/http/httptest" "net/url" + "sync" "testing" "github.com/Shopify/sarama" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" "k8s.io/apimachinery/pkg/types" @@ -77,30 +80,69 @@ var _ sarama.ConsumerGroup = (*mockConsumerGroup)(nil) // ----- Tests -// test util for various config checks -func (d *KafkaDispatcher) checkConfigAndUpdate(config *Config) error { - if config == nil { - return errors.New("nil config") +func (d *KafkaDispatcher) getHostToChannelMap() map[string]eventingchannels.ChannelReference { + m := make(map[string]eventingchannels.ChannelReference) + d.hostToChannelMap.Range(func(key, value interface{}) bool { + m[key.(string)] = value.(eventingchannels.ChannelReference) + return true + }) + return m +} + +func TestKafkaDispatcher_RegisterChannelHost(t *testing.T) { + firstChannelConfig := &ChannelConfig{ + Namespace: "default", + Name: "test-channel-1", + HostName: "a.b.c.d", + } + secondChannelConfig := &ChannelConfig{ + Namespace: "default", + Name: "test-channel-2", + HostName: "a.b.c.d", } - if _, err := d.UpdateKafkaConsumers(config); err != nil { - // failed to update dispatchers consumers - return err + d := &KafkaDispatcher{ + kafkaConsumerFactory: &mockKafkaConsumerFactory{}, + channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), + subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), + subscriptions: make(map[types.UID]Subscription), + topicFunc: utils.TopicName, + logger: zaptest.NewLogger(t).Sugar(), } - if err := d.UpdateHostToChannelMap(config); err != nil { - return err + + require.NoError(t, d.RegisterChannelHost(firstChannelConfig)) + require.Error(t, d.RegisterChannelHost(secondChannelConfig)) +} + +func TestKafkaDispatcher_RegisterSameChannelTwiceShouldNotFail(t *testing.T) { + channelConfig := &ChannelConfig{ + Namespace: "default", + Name: "test-channel-1", + HostName: "a.b.c.d", } - return nil + d := &KafkaDispatcher{ + kafkaConsumerFactory: &mockKafkaConsumerFactory{}, + channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), + subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), + subscriptions: make(map[types.UID]Subscription), + topicFunc: utils.TopicName, + logger: zaptest.NewLogger(t).Sugar(), + } + + require.NoError(t, d.RegisterChannelHost(channelConfig)) + require.Contains(t, d.getHostToChannelMap(), "a.b.c.d") + require.NoError(t, d.RegisterChannelHost(channelConfig)) + require.Contains(t, d.getHostToChannelMap(), "a.b.c.d") } -func TestDispatcher_UpdateConfig(t *testing.T) { +func TestDispatcher_UpdateConsumers(t *testing.T) { subscriber, _ := url.Parse("http://test/subscriber") testCases := []struct { name string - oldConfig *Config - newConfig *Config + oldConfig *ChannelConfig + newConfig *ChannelConfig subscribes []string unsubscribes []string createErr string @@ -109,7 +151,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { }{ { name: "nil config", - oldConfig: &Config{}, + oldConfig: &ChannelConfig{}, newConfig: nil, createErr: "nil config", oldHostToChanMap: map[string]eventingchannels.ChannelReference{}, @@ -117,22 +159,18 @@ func TestDispatcher_UpdateConfig(t *testing.T) { }, { name: "same config", - oldConfig: &Config{}, - newConfig: &Config{}, + oldConfig: &ChannelConfig{}, + newConfig: &ChannelConfig{}, oldHostToChanMap: map[string]eventingchannels.ChannelReference{}, newHostToChanMap: map[string]eventingchannels.ChannelReference{}, }, { name: "config with no subscription", - oldConfig: &Config{}, - newConfig: &Config{ - ChannelConfigs: []ChannelConfig{ - { - Namespace: "default", - Name: "test-channel", - HostName: "a.b.c.d", - }, - }, + oldConfig: &ChannelConfig{}, + newConfig: &ChannelConfig{ + Namespace: "default", + Name: "test-channel", + HostName: "a.b.c.d", }, oldHostToChanMap: map[string]eventingchannels.ChannelReference{}, newHostToChanMap: map[string]eventingchannels.ChannelReference{ @@ -141,26 +179,22 @@ func TestDispatcher_UpdateConfig(t *testing.T) { }, { name: "single channel w/ new subscriptions", - oldConfig: &Config{}, - newConfig: &Config{ - ChannelConfigs: []ChannelConfig{ + oldConfig: &ChannelConfig{}, + newConfig: &ChannelConfig{ + Namespace: "default", + Name: "test-channel", + HostName: "a.b.c.d", + Subscriptions: []Subscription{ + { + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, + }, + }, { - Namespace: "default", - Name: "test-channel", - HostName: "a.b.c.d", - Subscriptions: []Subscription{ - { - UID: "subscription-1", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, - { - UID: "subscription-2", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, @@ -173,173 +207,72 @@ func TestDispatcher_UpdateConfig(t *testing.T) { }, { name: "single channel w/ existing subscriptions", - oldConfig: &Config{ - ChannelConfigs: []ChannelConfig{ - { - Namespace: "default", - Name: "test-channel", - HostName: "a.b.c.d", - Subscriptions: []Subscription{ - { - UID: "subscription-1", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, - { - UID: "subscription-2", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, - }, - }, - }, - }, - newConfig: &Config{ - ChannelConfigs: []ChannelConfig{ + oldConfig: &ChannelConfig{ + Namespace: "default", + Name: "test-channel", + HostName: "a.b.c.d", + Subscriptions: []Subscription{ { - Namespace: "default", - Name: "test-channel", - HostName: "a.b.c.d", - Subscriptions: []Subscription{ - { - UID: "subscription-2", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, - { - UID: "subscription-3", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, - }, - }, - subscribes: []string{"subscription-2", "subscription-3"}, - unsubscribes: []string{"subscription-1"}, - oldHostToChanMap: map[string]eventingchannels.ChannelReference{ - "a.b.c.d": {Name: "test-channel", Namespace: "default"}, - }, - newHostToChanMap: map[string]eventingchannels.ChannelReference{ - "a.b.c.d": {Name: "test-channel", Namespace: "default"}, - }, - }, - { - name: "multi channel w/old and new subscriptions", - oldConfig: &Config{ - ChannelConfigs: []ChannelConfig{ { - Namespace: "default", - Name: "test-channel-1", - HostName: "a.b.c.d", - Subscriptions: []Subscription{ - { - UID: "subscription-1", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, - { - UID: "subscription-2", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, }, - newConfig: &Config{ - ChannelConfigs: []ChannelConfig{ + newConfig: &ChannelConfig{ + Namespace: "default", + Name: "test-channel", + HostName: "a.b.c.d", + Subscriptions: []Subscription{ { - Namespace: "default", - Name: "test-channel-1", - HostName: "a.b.c.d", - Subscriptions: []Subscription{ - { - UID: "subscription-1", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, { - Namespace: "default", - Name: "test-channel-2", - HostName: "e.f.g.h", - Subscriptions: []Subscription{ - { - UID: "subscription-3", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, - { - UID: "subscription-4", - Subscription: fanout.Subscription{ - Subscriber: subscriber, - }, - }, + UID: "subscription-3", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, }, - subscribes: []string{"subscription-1", "subscription-3", "subscription-4"}, - unsubscribes: []string{"subscription-2"}, + subscribes: []string{"subscription-2", "subscription-3"}, + unsubscribes: []string{"subscription-1"}, oldHostToChanMap: map[string]eventingchannels.ChannelReference{ - "a.b.c.d": {Name: "test-channel-1", Namespace: "default"}, + "a.b.c.d": {Name: "test-channel", Namespace: "default"}, }, newHostToChanMap: map[string]eventingchannels.ChannelReference{ - "a.b.c.d": {Name: "test-channel-1", Namespace: "default"}, - "e.f.g.h": {Name: "test-channel-2", Namespace: "default"}, - }, - }, - { - name: "Duplicate hostnames", - oldConfig: &Config{}, - newConfig: &Config{ - ChannelConfigs: []ChannelConfig{ - { - Namespace: "default", - Name: "test-channel-1", - HostName: "a.b.c.d", - }, - { - Namespace: "default", - Name: "test-channel-2", - HostName: "a.b.c.d", - }, - }, + "a.b.c.d": {Name: "test-channel", Namespace: "default"}, }, - createErr: "duplicate hostName found. Each channel must have a unique host header. HostName:a.b.c.d, channel:default.test-channel-2, channel:default.test-channel-1", - oldHostToChanMap: map[string]eventingchannels.ChannelReference{}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + t.Parallel() t.Logf("Running %s", t.Name()) d := &KafkaDispatcher{ kafkaConsumerFactory: &mockKafkaConsumerFactory{}, - channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), + channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), subscriptions: make(map[types.UID]Subscription), topicFunc: utils.TopicName, logger: zaptest.NewLogger(t).Sugar(), } - d.setHostToChannelMap(map[string]eventingchannels.ChannelReference{}) // Initialize using oldConfig - err := d.checkConfigAndUpdate(tc.oldConfig) - if err != nil { + require.NoError(t, d.RegisterChannelHost(tc.oldConfig)) + require.NoError(t, d.ReconcileConsumers(tc.oldConfig)) - t.Errorf("unexpected error: %v", err) - } oldSubscribers := sets.NewString() for _, sub := range d.subscriptions { oldSubscribers.Insert(string(sub.UID)) @@ -352,7 +285,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { } // Update with new config - err = d.checkConfigAndUpdate(tc.newConfig) + err := d.ReconcileConsumers(tc.newConfig) if tc.createErr != "" { if err == nil { t.Errorf("Expected UpdateConfig error: '%v'. Actual nil", tc.createErr) @@ -380,15 +313,157 @@ func TestDispatcher_UpdateConfig(t *testing.T) { } } +func TestDispatcher_MultipleChannelsInParallel(t *testing.T) { + subscriber, _ := url.Parse("http://test/subscriber") + + configs := []*ChannelConfig{ + { + Namespace: "default", + Name: "test-channel", + HostName: "a.b.c.d", + }, + { + Namespace: "default", + Name: "test-channel-1", + HostName: "x.y.w.z", + Subscriptions: []Subscription{ + { + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, + }, + }, + }, + }, + { + Namespace: "default", + Name: "test-channel-2", + HostName: "e.f.g.h", + Subscriptions: []Subscription{ + { + UID: "subscription-3", + Subscription: fanout.Subscription{ + Subscriber: subscriber, + }, + }, + { + UID: "subscription-4", + Subscription: fanout.Subscription{ + Subscriber: subscriber, + }, + }, + }, + }, + } + + d := &KafkaDispatcher{ + kafkaConsumerFactory: &mockKafkaConsumerFactory{}, + channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), + subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), + subscriptions: make(map[types.UID]Subscription), + topicFunc: utils.TopicName, + logger: zaptest.NewLogger(t).Sugar(), + } + + // Let's register channel configs first + for _, c := range configs { + require.NoError(t, d.RegisterChannelHost(c)) + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { // Let's reiterate several times to check everything is fine + for _, c := range configs { + wg.Add(1) + go func(c *ChannelConfig) { + defer wg.Done() + assert.NoError(t, d.ReconcileConsumers(c)) + }(c) + } + } + wg.Wait() + + // Assert the state is the final one we want + require.Contains(t, d.getHostToChannelMap(), "a.b.c.d") + require.Contains(t, d.getHostToChannelMap(), "x.y.w.z") + require.Contains(t, d.getHostToChannelMap(), "e.f.g.h") + + require.Contains(t, d.subscriptions, types.UID("subscription-1")) + require.Contains(t, d.subscriptions, types.UID("subscription-3")) + require.Contains(t, d.subscriptions, types.UID("subscription-4")) + + // Now let's remove all of them + wg = sync.WaitGroup{} + for _, c := range configs { + wg.Add(1) + go func(c *ChannelConfig) { + defer wg.Done() + assert.NoError(t, d.CleanupChannel(c.Name, c.Namespace, c.HostName)) + }(c) + } + wg.Wait() + + require.Empty(t, d.getHostToChannelMap()) + require.Empty(t, d.subscriptions) + require.Empty(t, d.channelSubscriptions) + require.Empty(t, d.subsConsumerGroups) +} + +func TestKafkaDispatcher_CleanupChannel(t *testing.T) { + subscriber, _ := url.Parse("http://test/subscriber") + + d := &KafkaDispatcher{ + kafkaConsumerFactory: &mockKafkaConsumerFactory{}, + channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), + subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), + subscriptions: make(map[types.UID]Subscription), + topicFunc: utils.TopicName, + logger: zaptest.NewLogger(t).Sugar(), + } + + channelConfig := &ChannelConfig{ + Namespace: "default", + Name: "test-channel", + HostName: "a.b.c.d", + Subscriptions: []Subscription{ + { + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, + }, + }, + { + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, + }, + }, + }, + } + require.NoError(t, d.RegisterChannelHost(channelConfig)) + require.NoError(t, d.ReconcileConsumers(channelConfig)) + + require.NoError(t, d.CleanupChannel(channelConfig.Name, channelConfig.Namespace, channelConfig.HostName)) + require.NotContains(t, d.subscriptions, "subscription-1") + require.NotContains(t, d.subscriptions, "subscription-2") + require.NotContains(t, d.channelSubscriptions, eventingchannels.ChannelReference{ + Namespace: "default", + Name: "test-channel", + }) + require.NotContains(t, d.subsConsumerGroups, "subscription-1") + require.NotContains(t, d.subsConsumerGroups, "subscription-2") +} + func TestSubscribeError(t *testing.T) { cf := &mockKafkaConsumerFactory{createErr: true} d := &KafkaDispatcher{ kafkaConsumerFactory: cf, logger: zap.NewNop().Sugar(), topicFunc: utils.TopicName, + subscriptions: map[types.UID]Subscription{}, + channelSubscriptions: map[types.NamespacedName]*KafkaSubscription{}, } - channelRef := eventingchannels.ChannelReference{ + channelRef := types.NamespacedName{ Name: "test-channel", Namespace: "test-ns", } @@ -410,7 +485,7 @@ func TestUnsubscribeUnknownSub(t *testing.T) { logger: zap.NewNop().Sugar(), } - channelRef := eventingchannels.ChannelReference{ + channelRef := types.NamespacedName{ Name: "test-channel", Namespace: "test-ns", } @@ -437,7 +512,6 @@ func TestNewDispatcher(t *testing.T) { ClientID: "kafka-ch-dispatcher", Brokers: []string{"localhost:10000"}, TopicFunc: utils.TopicName, - Logger: nil, } _, err := NewDispatcher(context.TODO(), args) if err == nil { @@ -464,7 +538,7 @@ func TestSetReady(t *testing.T) { channelReadySubscriptions: map[string]sets.Int32{"bar": sets.NewInt32(0)}, }, desiredKafkaSub: &KafkaSubscription{ - subs: []types.UID{}, + subs: sets.NewString(), channelReadySubscriptions: map[string]sets.Int32{ "bar": sets.NewInt32(0), "foo": sets.NewInt32(0), @@ -483,7 +557,7 @@ func TestSetReady(t *testing.T) { }, }, desiredKafkaSub: &KafkaSubscription{ - subs: []types.UID{}, + subs: sets.NewString(), channelReadySubscriptions: map[string]sets.Int32{ "bar": sets.NewInt32(0), "foo": sets.NewInt32(0, 1), @@ -577,7 +651,7 @@ func TestServeHTTP(t *testing.T) { name string responseReturnCode int desiredJson []byte - channelSubs map[eventingchannels.ChannelReference]*KafkaSubscription + channelSubs map[types.NamespacedName]*KafkaSubscription requestURI string httpMethod string }{ @@ -598,9 +672,9 @@ func TestServeHTTP(t *testing.T) { httpMethod: httpGet, responseReturnCode: http.StatusOK, desiredJson: []byte(`{}`), - channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + channelSubs: map[types.NamespacedName]*KafkaSubscription{ {Name: "foo", Namespace: "bar"}: { - subs: []types.UID{}, + subs: sets.NewString(), channelReadySubscriptions: map[string]sets.Int32{}, }, }, @@ -610,9 +684,9 @@ func TestServeHTTP(t *testing.T) { httpMethod: httpGet, desiredJson: []byte{}, responseReturnCode: http.StatusNotFound, - channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + channelSubs: map[types.NamespacedName]*KafkaSubscription{ {Name: "foo", Namespace: "baz"}: { - subs: []types.UID{"a", "b"}, + subs: sets.NewString("a", "b"), channelReadySubscriptions: map[string]sets.Int32{ "a": sets.NewInt32(0), "b": sets.NewInt32(0), @@ -625,9 +699,9 @@ func TestServeHTTP(t *testing.T) { httpMethod: httpGet, desiredJson: []byte(`{"a":[0],"b":[0,2,5]}`), responseReturnCode: http.StatusOK, - channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + channelSubs: map[types.NamespacedName]*KafkaSubscription{ {Name: "foo", Namespace: "bar"}: { - subs: []types.UID{"a", "b"}, + subs: sets.NewString("a", "b"), channelReadySubscriptions: map[string]sets.Int32{ "a": sets.NewInt32(0), "b": sets.NewInt32(0, 2, 5), @@ -640,15 +714,15 @@ func TestServeHTTP(t *testing.T) { httpMethod: httpGet, desiredJson: []byte(`{"a":[0],"b":[0,2,5]}`), responseReturnCode: http.StatusOK, - channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + channelSubs: map[types.NamespacedName]*KafkaSubscription{ {Name: "table", Namespace: "flip"}: { - subs: []types.UID{"c", "d"}, + subs: sets.NewString("c", "d"), channelReadySubscriptions: map[string]sets.Int32{ "c": sets.NewInt32(0), "d": sets.NewInt32(0), }}, {Name: "foo", Namespace: "bar"}: { - subs: []types.UID{"a", "b"}, + subs: sets.NewString("a", "b"), channelReadySubscriptions: map[string]sets.Int32{ "a": sets.NewInt32(0), "b": sets.NewInt32(0, 2, 5), @@ -670,7 +744,7 @@ func TestServeHTTP(t *testing.T) { }, } d := &KafkaDispatcher{ - channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), + channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), logger: klogtesting.TestLogger(t), } ts := httptest.NewServer(d) diff --git a/pkg/channel/consolidated/dispatcher/kafka_subscription.go b/pkg/channel/consolidated/dispatcher/kafka_subscription.go index 8f54c4a77c..6cd088a968 100644 --- a/pkg/channel/consolidated/dispatcher/kafka_subscription.go +++ b/pkg/channel/consolidated/dispatcher/kafka_subscription.go @@ -27,12 +27,20 @@ import ( type KafkaSubscription struct { logger *zap.SugaredLogger - subs []types.UID + subs sets.String // readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions readySubscriptionsLock sync.RWMutex channelReadySubscriptions map[string]sets.Int32 } +func NewKafkaSubscription(logger *zap.SugaredLogger) *KafkaSubscription { + return &KafkaSubscription{ + logger: logger, + subs: sets.NewString(), + channelReadySubscriptions: map[string]sets.Int32{}, + } +} + // SetReady will mark the subid in the KafkaSubscription and call any registered callbacks func (ks *KafkaSubscription) SetReady(subID types.UID, partition int32, ready bool) { ks.logger.Debugw("Setting subscription readiness", zap.Any("subscription", subID), zap.Bool("ready", ready)) diff --git a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go index dcf7acf71e..0eb2198028 100644 --- a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go @@ -21,7 +21,6 @@ import ( "fmt" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" @@ -63,7 +62,6 @@ type Reconciler struct { impl *controller.Impl } -// Check that our Reconciler implements controller.Reconciler. var _ kafkachannelreconciler.Interface = (*Reconciler)(nil) // NewController initializes the controller and is called by the generated code. @@ -100,7 +98,6 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl Brokers: kafkaConfig.Brokers, KafkaAuthConfig: kafkaAuthCfg, TopicFunc: utils.TopicName, - Logger: logger, } kafkaDispatcher, err := dispatcher.NewDispatcher(ctx, args) if err != nil { @@ -125,7 +122,16 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl kafkaChannelInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: filterWithAnnotation(injection.HasNamespaceScope(ctx)), - Handler: controller.HandleAll(r.impl.Enqueue), + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: r.impl.Enqueue, + UpdateFunc: controller.PassNew(r.impl.Enqueue), + DeleteFunc: func(obj interface{}) { + // TODO when finalize kind is fixed, we'll need to handle that error properly + if err := r.CleanupChannel(obj.(*v1beta1.KafkaChannel)); err != nil { + logger.Warnw("Unable to remove kafka channel", zap.Any("kafkachannel", obj), zap.Error(err)) + } + }, + }, }) logger.Info("Starting dispatcher.") @@ -147,50 +153,43 @@ func filterWithAnnotation(namespaced bool) func(obj interface{}) bool { func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { logging.FromContext(ctx).Debugw("ReconcileKind for channel", zap.String("channel", kc.Name)) - return r.syncDispatcher(ctx) + return r.syncChannel(ctx, kc) } func (r *Reconciler) ObserveKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { logging.FromContext(ctx).Debugw("ObserveKind for channel", zap.String("channel", kc.Name)) - return r.syncDispatcher(ctx) + return r.syncChannel(ctx, kc) } -func (r *Reconciler) syncDispatcher(ctx context.Context) pkgreconciler.Event { - channels, err := r.kafkachannelLister.List(labels.Everything()) - if err != nil { - logging.FromContext(ctx).Error("Error listing kafka channels") - return err +func (r *Reconciler) syncChannel(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { + if !kc.Status.IsReady() { + logging.FromContext(ctx).Debugw("KafkaChannel still not ready, short-circuiting the reconciler", zap.String("channel", kc.Name)) + return nil } - // TODO: revisit this code. Instead of reading all channels and updating consumers and hostToChannel map for all - // why not just reconcile the current channel. With this the UpdateKafkaConsumers can now return SubscribableStatus - // for the subscriptions on the channel that is being reconciled. - kafkaChannels := make([]*v1beta1.KafkaChannel, 0) - for _, channel := range channels { - if channel.Status.IsReady() { - kafkaChannels = append(kafkaChannels, channel) - } - } - config := r.newConfigFromKafkaChannels(kafkaChannels) - if err := r.kafkaDispatcher.UpdateHostToChannelMap(config); err != nil { + config := r.newConfigFromKafkaChannel(kc) + + // Update receiver side + if err := r.kafkaDispatcher.RegisterChannelHost(config); err != nil { logging.FromContext(ctx).Error("Error updating host to channel map in dispatcher") return err } - failedSubscriptions, err := r.kafkaDispatcher.UpdateKafkaConsumers(config) + // Update dispatcher side + err := r.kafkaDispatcher.ReconcileConsumers(config) if err != nil { - logging.FromContext(ctx).Error("Error updating kafka consumers in dispatcher") - return err - } - if len(failedSubscriptions) > 0 { - logging.FromContext(ctx).Error("Some kafka subscriptions failed to subscribe") - return fmt.Errorf("Some kafka subscriptions failed to subscribe") + logging.FromContext(ctx).Errorw("Some kafka subscriptions failed to subscribe", zap.Error(err)) + return fmt.Errorf("some kafka subscriptions failed to subscribe: %v", err) } return nil } -// newConfigFromKafkaChannels creates a new Config from the list of kafka channels. -func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1beta1.KafkaChannel) *dispatcher.ChannelConfig { +func (r *Reconciler) CleanupChannel(kc *v1beta1.KafkaChannel) pkgreconciler.Event { + return r.kafkaDispatcher.CleanupChannel(kc.Name, kc.Namespace, kc.Status.Address.URL.Host) +} + +// newConfigFromKafkaChannel creates a new Config from the list of kafka channels. +func (r *Reconciler) newConfigFromKafkaChannel(c *v1beta1.KafkaChannel) *dispatcher.ChannelConfig { channelConfig := dispatcher.ChannelConfig{ Namespace: c.Namespace, Name: c.Name, @@ -211,15 +210,3 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1beta1.KafkaChannel) * return &channelConfig } - -// newConfigFromKafkaChannels creates a new Config from the list of kafka channels. -func (r *Reconciler) newConfigFromKafkaChannels(channels []*v1beta1.KafkaChannel) *dispatcher.Config { - cc := make([]dispatcher.ChannelConfig, 0) - for _, c := range channels { - channelConfig := r.newChannelConfigFromKafkaChannel(c) - cc = append(cc, *channelConfig) - } - return &dispatcher.Config{ - ChannelConfigs: cc, - } -}