-
Notifications
You must be signed in to change notification settings - Fork 0
/
alerts.go
121 lines (103 loc) · 3.59 KB
/
alerts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package web
import (
"net/http"
"path/filepath"
"strings"
"github.com/uptrace/opentelemetry-go-extra/otelzap"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/net/websocket"
)
func (h *webHandler) alerts(w http.ResponseWriter, req *http.Request) {
dashboardName := filepath.Base(req.URL.Path)
dashboardName = strings.TrimPrefix(dashboardName, "/")
aggregate := h.aggregator.Alerts(dashboardName)
if req.Header.Get("Accept") == "text/vnd.turbo-stream.html" {
renderer := h.partialRenderer(req, dashboardName, "alerts.gohtml")
renderer(w, 200, webContent{Content: aggregate})
} else {
renderer := h.baseRenderer(req, dashboardName, "alerts.gohtml")
renderer(w, 200, webContent{Content: aggregate})
}
}
func (h *webHandler) wsalerts(s *websocket.Conn) {
defer func() {
if err := recover(); err != nil {
switch err := err.(type) {
case error:
otelzap.Ctx(s.Request().Context()).Info("panic serving", zap.Error(err))
default:
otelzap.Ctx(s.Request().Context()).Info("panic serving", zap.Any("error", err))
}
}
_ = s.Close()
}()
// get dashboard name from /ws/alerts/... directly from path, as information gets lost from the websocket upgrade.
dashboardName := filepath.Base(s.Request().URL.Path)
dashboardName = strings.TrimPrefix(dashboardName, "ws")
dashboardName = strings.TrimPrefix(dashboardName, "/")
renderer := h.wsRenderer(s, "alerts.gohtml")
tr := trace.SpanFromContext(s.Request().Context()).SpanContext().TraceID().String()
otelzap.Ctx(s.Request().Context()).Info("Registering websocket connection",
zap.String("client", tr),
zap.String("dashboard", dashboardName))
update := h.aggregator.Register(tr)
defer h.aggregator.Unregister(tr)
for {
select {
case _, ok := <-update:
if !ok {
otelzap.Ctx(s.Request().Context()).Debug("stop sending to websocket client, update channel closed",
zap.String("client", tr))
update = nil
}
otelzap.Ctx(s.Request().Context()).Debug("sending to websocket client",
zap.String("client", tr))
aggregate := h.aggregator.Alerts(dashboardName)
renderer(webContent{Content: aggregate})
case <-s.Request().Context().Done():
otelzap.Ctx(s.Request().Context()).Debug("stop sending to websocket client, req ctx done",
zap.String("client", tr))
return
}
}
}
func (h *webHandler) ssealerts(w http.ResponseWriter, req *http.Request) {
defer func() {
if err := recover(); err != nil {
switch err := err.(type) {
case error:
otelzap.Ctx(req.Context()).Info("panic serving", zap.Error(err))
default:
otelzap.Ctx(req.Context()).Info("panic serving", zap.Any("error", err))
}
}
}()
dashboardName := getField(req, 0)
renderer, cancel := h.sseRenderer(w, req, "alerts.gohtml")
defer cancel()
tr := trace.SpanFromContext(req.Context()).SpanContext().TraceID().String()
otelzap.Ctx(req.Context()).Info("Registering sse connection", zap.String("client", tr))
update := h.aggregator.Register(tr)
defer h.aggregator.Unregister(tr)
for {
select {
case _, ok := <-update:
if !ok {
otelzap.Ctx(req.Context()).Debug("stop sending to sse client", zap.String("client", tr))
return
}
otelzap.Ctx(req.Context()).Debug("sending to sse client", zap.String("client", tr))
aggregate := h.aggregator.Alerts(dashboardName)
if err := renderer(webContent{Content: aggregate}); err != nil {
otelzap.Ctx(req.Context()).Debug("stop sending to sse client",
zap.String("client", tr),
zap.Error(err))
return
}
case <-req.Context().Done():
otelzap.Ctx(req.Context()).Info("stop sending to sse client", zap.String("client", tr))
return
}
}
}