This repository has been archived by the owner on Dec 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 43
/
broker.go
123 lines (104 loc) · 3.18 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package pubsub
import (
"context"
"errors"
"sync"
"github.com/go-logr/logr"
"github.com/leg100/otf/internal/sql"
)
const (
// subBufferSize is the buffer size of the channel for each subscription.
subBufferSize = 100
)
// ErrSubscriptionTerminated is for use by subscribers to indicate that their
// subscription has been terminated by the broker.
var ErrSubscriptionTerminated = errors.New("broker terminated the subscription")
// Broker allows clients to subscribe to OTF events.
type Broker[T any] struct {
logr.Logger
subs map[chan Event[T]]struct{} // subscriptions
mu sync.Mutex // sync access to map
getter GetterFunc[T]
table string
}
// GetterFunc retrieves the type T using its unique id.
type GetterFunc[T any] func(ctx context.Context, id string, action sql.Action) (T, error)
// databaseListener is the upstream database events listener
type databaseListener interface {
RegisterFunc(table string, ff sql.ForwardFunc)
}
func NewBroker[T any](logger logr.Logger, listener databaseListener, table string, getter GetterFunc[T]) *Broker[T] {
b := &Broker[T]{
Logger: logger.WithValues("component", "broker"),
subs: make(map[chan Event[T]]struct{}),
getter: getter,
table: table,
}
listener.RegisterFunc(table, b.forward)
return b
}
// Subscribe subscribes the caller to a stream of events. The caller can close
// the subscription by either canceling the context or calling the returned
// unsubscribe function.
func (b *Broker[T]) Subscribe(ctx context.Context) (<-chan Event[T], func()) {
b.mu.Lock()
defer b.mu.Unlock()
sub := make(chan Event[T], subBufferSize)
b.subs[sub] = struct{}{}
// when the context is canceled remove the subscriber
go func() {
<-ctx.Done()
b.unsubscribe(sub)
}()
return sub, func() { b.unsubscribe(sub) }
}
func (b *Broker[T]) unsubscribe(sub chan Event[T]) {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.subs[sub]; !ok {
// already unsubscribed
return
}
close(sub)
delete(b.subs, sub)
}
// forward retrieves the type T uniquely identified by id and forwards it onto
// subscribers as an event together with the action.
func (b *Broker[T]) forward(ctx context.Context, id string, action sql.Action) {
var event Event[T]
payload, err := b.getter(ctx, id, action)
if err != nil {
b.Error(err, "retrieving type for database event", "table", b.table, "id", id, "action", action)
return
}
event.Payload = payload
switch action {
case sql.InsertAction:
event.Type = CreatedEvent
case sql.UpdateAction:
event.Type = UpdatedEvent
case sql.DeleteAction:
event.Type = DeletedEvent
default:
b.Error(nil, "unknown action", "action", action)
return
}
var fullSubscribers []chan Event[T]
b.mu.Lock()
for sub := range b.subs {
select {
case sub <- event:
continue
default:
// could not publish event to subscriber because their buffer is
// full, so add them to a list for action below
fullSubscribers = append(fullSubscribers, sub)
}
}
b.mu.Unlock()
// forceably unsubscribe full subscribers and leave it them to re-subscribe
for _, name := range fullSubscribers {
b.Error(nil, "unsubscribing full subscriber", "sub", name, "queue_length", subBufferSize)
b.unsubscribe(name)
}
}