-
Notifications
You must be signed in to change notification settings - Fork 567
/
api_server.go
115 lines (99 loc) · 2.65 KB
/
api_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package server
import (
"github.com/pachyderm/pachyderm/v2/src/internal/collection"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/uuid"
"github.com/pachyderm/pachyderm/v2/src/proxy"
"go.uber.org/zap"
)
// Env is the set of dependencies required for APIServer
type Env struct {
Listener collection.PostgresListener
}
type APIServer struct {
proxy.UnimplementedAPIServer
env Env
}
func NewAPIServer(env Env) *APIServer {
return &APIServer{
env: env,
}
}
// Listen streams database events.
// It signals that it is internally set up by sending an initial empty ListenResponse.
func (a *APIServer) Listen(request *proxy.ListenRequest, server proxy.API_ListenServer) (retErr error) {
listener := a.env.Listener
notifier := newNotifier(server, request.Channel)
if err := listener.Register(notifier); err != nil {
return errors.EnsureStack(err)
}
// send initial empty event to indicate to client that the listener has been registered
if err := server.Send(&proxy.ListenResponse{}); err != nil {
return errors.EnsureStack(err)
}
go notifier.send()
defer func() {
if err := listener.Unregister(notifier); err != nil {
log.Error(pctx.TODO(), "errored while unregistering notifier", zap.Error(err))
}
}()
return <-notifier.errChan
}
type notifier struct {
server proxy.API_ListenServer
id string
channel string
bufChan chan *collection.Notification
errChan chan error
}
func newNotifier(server proxy.API_ListenServer, channel string) *notifier {
return ¬ifier{
server: server,
id: uuid.NewWithoutDashes(),
channel: channel,
bufChan: make(chan *collection.Notification, collection.ChannelBufferSize),
errChan: make(chan error, 1),
}
}
func (n *notifier) ID() string {
return n.id
}
func (n *notifier) Channel() string {
return n.channel
}
func (n *notifier) Notify(notification *collection.Notification) {
select {
case n.bufChan <- notification:
case <-n.server.Context().Done():
n.sendError(n.server.Context().Err())
default:
n.sendError(errors.New("listener buffer is full, aborting listen"))
}
}
func (n *notifier) send() {
for {
select {
case notification := <-n.bufChan:
if err := n.server.Send(&proxy.ListenResponse{
Extra: notification.Extra,
}); err != nil {
n.sendError(err)
return
}
case <-n.server.Context().Done():
n.sendError(n.server.Context().Err())
return
}
}
}
func (n *notifier) Error(err error) {
n.sendError(err)
}
func (n *notifier) sendError(err error) {
select {
case n.errChan <- err:
default:
}
}