-
Notifications
You must be signed in to change notification settings - Fork 35
/
running_events.go
64 lines (56 loc) · 1.53 KB
/
running_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package handlers
import (
"context"
"net/http"
visualizer "github.com/jenkins-x/jx-pipelines-visualizer"
"github.com/rs/xid"
"github.com/sirupsen/logrus"
sse "github.com/subchord/go-sse"
)
type RunningEventsHandler struct {
RunningPipelines *visualizer.RunningPipelines
Broker *sse.Broker
Logger *logrus.Logger
}
func (h *RunningEventsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
clientID := xid.New().String()
clientConnection, err := h.Broker.Connect(clientID, w, r)
if err != nil {
// streaming unsupported. http.Error() already used in broker.Connect()
return
}
watcher := visualizer.Watcher{
Name: clientID,
Added: make(chan visualizer.RunningPipeline),
Deleted: make(chan visualizer.RunningPipeline),
}
h.RunningPipelines.Register(watcher)
for {
select {
case running := <-watcher.Added:
h.send(r.Context(), clientConnection, "added", running.JSON())
case running := <-watcher.Deleted:
h.send(r.Context(), clientConnection, "deleted", running.JSON())
case <-clientConnection.Done():
h.RunningPipelines.UnRegister(watcher)
return
case <-r.Context().Done():
h.RunningPipelines.UnRegister(watcher)
return
}
}
}
func (h *RunningEventsHandler) send(ctx context.Context, clientConnection *sse.ClientConnection, eventType, eventData string) {
select {
case <-clientConnection.Done():
return
case <-ctx.Done():
return
default:
clientConnection.Send(sse.StringEvent{
Id: xid.New().String(),
Event: eventType,
Data: eventData,
})
}
}