Skip to content

Commit

Permalink
only allow starting a source once
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Apr 22, 2024
1 parent 22f4a15 commit 6fcaa42
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 102 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
watch := source.Channel(source.NewChannelBroadcaster(watchChan), &handler.EnqueueRequestForObject{})
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ var _ = Describe("controller", func() {
}

ins := source.Channel(
ch,
source.NewChannelBroadcaster(ch),
handler.Funcs{
GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expand All @@ -248,7 +248,7 @@ var _ = Describe("controller", func() {
<-processed
})

It("should error when channel source is not specified", func() {
It("should error when ChannelBroadcaster is not specified", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -257,7 +257,7 @@ var _ = Describe("controller", func() {

e := ctrl.Start(ctx)
Expect(e).To(HaveOccurred())
Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster"))
})

It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/source/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func ExampleChannel() {

err := ctrl.Watch(
source.Channel(
events,
source.NewChannelBroadcaster(events),
&handler.EnqueueRequestForObject{},
),
)
Expand Down
127 changes: 52 additions & 75 deletions pkg/source/internal/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync"

"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -33,23 +32,18 @@ import (
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (e.g. http handler) to write GenericEvents to the underlying channel.
type Channel[T any] struct {
// once ensures the event distribution goroutine will be performed only once
once sync.Once

// source is the source channel to fetch GenericEvents
Source <-chan event.TypedGenericEvent[T]
// Broadcaster contains the source channel for events.
Broadcaster *ChannelBroadcaster[T]

Handler handler.TypedEventHandler[T]

Predicates []predicate.TypedPredicate[T]

BufferSize *int

// dest is the destination channels of the added event handlers
dest []chan event.TypedGenericEvent[T]
DestBufferSize int

// destLock is to ensure the destination channels are safely added/removed
destLock sync.Mutex
mu sync.Mutex
// isStarted is true if the source has been started. A source can only be started once.
isStarted bool
}

func (cs *Channel[T]) String() string {
Expand All @@ -62,89 +56,72 @@ func (cs *Channel[T]) Start(
queue workqueue.RateLimitingInterface,
) error {
// Source should have been specified by the user.
if cs.Source == nil {
return fmt.Errorf("must specify Channel.Source")
if cs.Broadcaster == nil {
return fmt.Errorf("must create Channel with a non-nil Broadcaster")
}
if cs.Handler == nil {
return errors.New("must specify Channel.Handler")
return errors.New("must create Channel with a non-nil Handler")
}

if cs.BufferSize == nil {
cs.BufferSize = ptr.To(1024)
if cs.DestBufferSize == 0 {
return errors.New("must create Channel with a >0 DestBufferSize")
}

dst := make(chan event.TypedGenericEvent[T], *cs.BufferSize)

cs.destLock.Lock()
cs.dest = append(cs.dest, dst)
cs.destLock.Unlock()
cs.mu.Lock()
defer cs.mu.Unlock()
if cs.isStarted {
return fmt.Errorf("cannot start an already started Channel source")
}
cs.isStarted = true

cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop(ctx)
})
// Create a destination channel for the event handler
// and add it to the list of destinations
destination := make(chan event.TypedGenericEvent[T], cs.DestBufferSize)
cs.Broadcaster.AddListener(destination)

go func() {
for evt := range dst {
shouldHandle := true
for _, p := range cs.Predicates {
if !p.Generic(evt) {
shouldHandle = false
break
}
}

if shouldHandle {
func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cs.Handler.Generic(ctx, evt, queue)
}()
}
}
// Remove the listener and wait for the broadcaster
// to stop sending events to the destination channel.
defer cs.Broadcaster.RemoveListener(destination)

cs.processReceivedEvents(
ctx,
destination,
queue,
cs.Handler,
cs.Predicates,
)
}()

return nil
}

func (cs *Channel[T]) doStop() {
cs.destLock.Lock()
defer cs.destLock.Unlock()

for _, dst := range cs.dest {
close(dst)
}
}

func (cs *Channel[T]) distribute(evt event.TypedGenericEvent[T]) {
cs.destLock.Lock()
defer cs.destLock.Unlock()

for _, dst := range cs.dest {
// We cannot make it under goroutine here, or we'll meet the
// race condition of writing message to closed channels.
// To avoid blocking, the dest channels are expected to be of
// proper buffer size. If we still see it blocked, then
// the controller is thought to be in an abnormal state.
dst <- evt
}
}

func (cs *Channel[T]) syncLoop(ctx context.Context) {
func (cs *Channel[T]) processReceivedEvents(
ctx context.Context,
destination <-chan event.TypedGenericEvent[T],
queue workqueue.RateLimitingInterface,
eventHandler handler.TypedEventHandler[T],
predicates []predicate.TypedPredicate[T],
) {
eventloop:
for {
select {
case <-ctx.Done():
// Close destination channels
cs.doStop()
return
case evt, stillOpen := <-cs.Source:
case event, stillOpen := <-destination:
if !stillOpen {
// if the source channel is closed, we're never gonna get
// anything more on it, so stop & bail
cs.doStop()
return
}
cs.distribute(evt)

// Check predicates against the event first
// and continue the outer loop if any of them fail.
for _, p := range predicates {
if !p.Generic(event) {
continue eventloop
}
}

// Call the event handler with the event.
eventHandler.Generic(ctx, event, queue)
}
}
}

0 comments on commit 6fcaa42

Please sign in to comment.