-
Notifications
You must be signed in to change notification settings - Fork 6
/
eventstream.go
360 lines (320 loc) · 9.72 KB
/
eventstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
package eventstream
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/jannfis/argocd-agent/internal/queue"
"github.com/jannfis/argocd-agent/pkg/api/grpc/eventstreamapi"
"github.com/jannfis/argocd-agent/pkg/types"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
type Server struct {
eventstreamapi.UnimplementedEventStreamServer
options *ServerOptions
queues queue.QueuePair
}
type ServerOptions struct {
MaxStreamDuration time.Duration
}
type ServerOption func(o *ServerOptions)
type client struct {
ctx context.Context
cancelFn context.CancelFunc
logCtx *logrus.Entry
agentName string
wg *sync.WaitGroup
start time.Time
end time.Time
lock sync.RWMutex
}
func WithMaxStreamDuration(d time.Duration) ServerOption {
return func(o *ServerOptions) {
o.MaxStreamDuration = d
}
}
// NewServer returns a new AppStream server instance with the given options
func NewServer(queues queue.QueuePair, opts ...ServerOption) *Server {
options := &ServerOptions{}
for _, o := range opts {
o(options)
}
return &Server{
queues: queues,
options: options,
}
}
// newClientConnection returns a new client object to be used to read from and
// send to the subscription stream.
func newClientConnection(ctx context.Context, timeout time.Duration) (*client, error) {
c := &client{}
c.wg = &sync.WaitGroup{}
agentName, err := agentName(ctx)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
c.agentName = agentName
c.logCtx = logrus.WithFields(logrus.Fields{
"method": "Subscribe",
"client": c.agentName,
})
// The stream can have on optional expiry time
if timeout > 0 {
c.logCtx.Tracef("StreamContext expires in %v", timeout)
c.ctx, c.cancelFn = context.WithTimeout(ctx, timeout)
} else {
c.logCtx.Trace("StreamContext does not expire ")
c.ctx, c.cancelFn = context.WithCancel(ctx)
}
c.start = time.Now()
c.logCtx.Info("An agent connected to the subscription stream")
return c, nil
}
// agentName gets the agent name from the context ctx. If no agent identifier
// could be found in the context, returns an error.
func agentName(ctx context.Context) (string, error) {
agentName, ok := ctx.Value(types.ContextAgentIdentifier).(string)
if !ok {
return "", fmt.Errorf("invalid context: no agent name")
}
return agentName, nil
}
// onDisconnect must be called whenever client c disconnects from the stream
func (s *Server) onDisconnect(c *client) {
c.lock.Lock()
defer c.lock.Unlock()
c.end = time.Now()
if s.queues.HasQueuePair(c.agentName) {
err := s.queues.Delete(c.agentName, true)
if err != nil {
log().Warnf("Could not delete agent queue %s: %v", c.agentName, err)
}
}
c.wg.Done()
}
// recvFunc retrieves exactly one message from the client c on the event stream
// sub. The function will block until a message is available on the stream.
//
// This function must NOT be called concurrently.
//
// On success, it adds the message in cloudevents format to the internal event
// queue for further processing and returns nil.
//
// Any error that occurs during receive or processing will be returned to the
// caller.
func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeServer) error {
logCtx := c.logCtx.WithField("direction", "recv")
logCtx.Tracef("Waiting to receive from channel")
event, err := subs.Recv()
if err != nil {
if err == io.EOF {
logCtx.Tracef("Remote end hung up")
} else {
st, ok := status.FromError(err)
if ok {
if st.Code() != codes.DeadlineExceeded && st.Code() != codes.Canceled {
logCtx.WithError(err).Error("Error receiving application update")
} else if st.Code() == codes.Canceled {
logCtx.Trace("Context was canceled")
}
} else {
logCtx.WithError(err).Error("Unknown error")
}
}
return err
}
if event == nil || event.Event == nil {
return fmt.Errorf("invalid wire transmission")
}
app := &v1alpha1.Application{}
incomingEvent, err := format.FromProto(event.Event)
if err != nil {
return fmt.Errorf("could not unserialize event from wire: %w", err)
}
err = incomingEvent.DataAs(app)
if err != nil {
return fmt.Errorf("could not unserialize app data from wire: %w", err)
}
logCtx.Infof("Received update for application %v", app.QualifiedName())
q := s.queues.RecvQ(c.agentName)
if q == nil {
return fmt.Errorf("panic: no recvq for agent %s", c.agentName)
}
q.Add(incomingEvent)
return nil
}
// sendFunc gets the next event from the internal event queue, transforms it
// into wire format and sends it via the eventstream sub to client c. The
// function will block until there is an event on the internal event queue.
//
// This function must NOT be called concurrently.
//
// If an error occurs, it will be returned to the caller. Otherwise, nil is
// returned.
func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeServer) error {
logCtx := c.logCtx.WithField("direction", "send")
q := s.queues.SendQ(c.agentName)
if q == nil {
return fmt.Errorf("panic: no sendq for agent %s", c.agentName)
}
// Get() is blocking until there is at least one item in the
// queue.
logCtx.Tracef("Waiting to grab an item from the queue")
item, shutdown := q.Get()
if shutdown {
return fmt.Errorf("sendq shutdown in progress")
}
logCtx.Tracef("Grabbed an item")
if item == nil {
return fmt.Errorf("panic: nil item in queue")
}
ev, ok := item.(*cloudevents.Event)
if !ok {
return fmt.Errorf("panic: invalid data in sendqueue: want: %T, have %T", cloudevents.Event{}, item)
}
prEv, err := format.ToProto(ev)
if err != nil {
return fmt.Errorf("panic: could not serialize event to wire format: %v", err)
}
q.Done(item)
logCtx.Tracef("Sending an item to the event stream")
// A Send() on the stream is actually not blocking.
err = subs.Send(&eventstreamapi.Event{Event: prEv})
if err != nil {
status, ok := status.FromError(err)
if !ok && err != io.EOF {
return fmt.Errorf("error sending data to stream: %w", err)
}
if err == io.EOF || status.Code() == codes.Unavailable {
return fmt.Errorf("remote hung up while sending data to stream: %w", err)
}
}
return nil
}
// Subscribe implements a bi-directional stream to exchange application updates
// between the agent and the server.
//
// The connection is kept open until the agent closes it, and the stream tries
// to send updates to the agent as long as possible.
func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) error {
c, err := newClientConnection(subs.Context(), s.options.MaxStreamDuration)
if err != nil {
return err
}
// We receive events in a dedicated go routine
c.wg.Add(1)
go func() {
defer s.onDisconnect(c)
c.logCtx.Trace("Starting event receiver routine")
for {
select {
case <-c.ctx.Done():
c.logCtx.Info("Stopping event receiver routine")
return
default:
err := s.recvFunc(c, subs)
if err != nil {
c.logCtx.Infof("Receiver disconnected: %v", err)
c.cancelFn()
}
}
}
}()
// We send events in a dedicated go routine
c.wg.Add(1)
go func() {
defer s.onDisconnect(c)
c.logCtx.Tracef("Starting event sender routine")
for {
select {
case <-c.ctx.Done():
c.logCtx.Info("Stopping event sender routine")
return
default:
err := s.sendFunc(c, subs)
if err != nil {
c.logCtx.Infof("Send: %v", err)
c.cancelFn()
}
}
}
}()
c.wg.Wait()
c.logCtx.Info("Closing EventStream")
return nil
}
// Push implements a client-side stream to receive updates for the client's
// Application resources.
func (s *Server) Push(pushs eventstreamapi.EventStream_PushServer) error {
logCtx := log().WithField("method", "Push")
var ctx context.Context
var cancel context.CancelFunc
if s.options.MaxStreamDuration > 0 {
logCtx.Debugf("Setting timeout to %v", s.options.MaxStreamDuration)
ctx, cancel = context.WithTimeout(pushs.Context(), s.options.MaxStreamDuration)
} else {
ctx, cancel = context.WithCancel(pushs.Context())
}
defer cancel()
agentName, err := agentName(ctx)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
logCtx = logCtx.WithField("client", agentName)
logCtx.Debug("A new client connected to the event stream")
summary := &eventstreamapi.PushSummary{}
recvloop:
for {
u, err := pushs.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok {
logCtx.Errorf("Error receiving event: %s", st.String())
} else if err == io.EOF {
logCtx.Infof("Client disconnected from stream")
} else {
logCtx.WithError(err).Errorf("Unexpected error")
}
break recvloop
}
select {
case <-ctx.Done():
logCtx.Infof("Context canceled")
break recvloop
default:
// In the Push stream, only application updates will be processed.
// However, depending on configuration, an application update that
// is observed may result in the creation of this particular app
// in the server's application backend.
ev, err := format.FromProto(u.Event)
if err != nil {
logCtx.Errorf("Could not deserialize event from wire: %v", err)
continue
}
app := &v1alpha1.Application{}
err = ev.DataAs(app)
if err != nil {
logCtx.Errorf("Could not deserialize application from event: %v", err)
continue
}
logCtx.Infof("Received update for: %s", app.QualifiedName())
s.queues.RecvQ(agentName).Add(ev)
summary.Received += 1
}
}
logCtx.Infof("Sending summary to agent")
err = pushs.SendAndClose(summary)
if err != nil {
logCtx.Errorf("Error sending summary: %v", err)
}
return nil
}
func log() *logrus.Entry {
return logrus.WithField("module", "grpc.AppStream")
}