/
api_topology.go
166 lines (148 loc) · 4.71 KB
/
api_topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package app
import (
"net/http"
"time"
"context"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/render/detailed"
"github.com/weaveworks/scope/report"
)
const (
websocketLoop = 1 * time.Second
)
// APITopology is returned by the /api/topology/{name} handler.
type APITopology struct {
Nodes detailed.NodeSummaries `json:"nodes"`
}
// APINode is returned by the /api/topology/{name}/{id} handler.
type APINode struct {
Node detailed.Node `json:"node"`
}
// RenderContextForReporter creates the rendering context for the given reporter.
func RenderContextForReporter(rep Reporter, r report.Report) detailed.RenderContext {
rc := detailed.RenderContext{Report: r}
if wrep, ok := rep.(WebReporter); ok {
rc.MetricsGraphURL = wrep.MetricsGraphURL
}
return rc
}
type rendererHandler func(context.Context, render.Renderer, render.Transformer, detailed.RenderContext, http.ResponseWriter, *http.Request)
// Full topology.
func handleTopology(ctx context.Context, renderer render.Renderer, transformer render.Transformer, rc detailed.RenderContext, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APITopology{
Nodes: detailed.Summaries(ctx, rc, render.Render(ctx, rc.Report, renderer, transformer).Nodes),
})
}
// Individual nodes.
func handleNode(ctx context.Context, renderer render.Renderer, transformer render.Transformer, rc detailed.RenderContext, w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
topologyID = vars["topology"]
nodeID = vars["id"]
)
// We must not lose the node during filtering. We achieve that by
// (1) rendering the report with the base renderer, without
// filtering, which gives us the node (if it exists at all), and
// then (2) applying the filter separately to that result. If the
// node is lost in the second step, we simply put it back.
nodes := renderer.Render(ctx, rc.Report)
node, ok := nodes.Nodes[nodeID]
if !ok {
http.NotFound(w, r)
return
}
nodes = transformer.Transform(nodes)
if filteredNode, ok := nodes.Nodes[nodeID]; ok {
node = filteredNode
} else { // we've lost the node during filtering; put it back
nodes.Nodes[nodeID] = node
nodes.Filtered--
}
respondWith(w, http.StatusOK, APINode{Node: detailed.MakeNode(topologyID, rc, nodes.Nodes, node)})
}
// Websocket for the full topology.
func handleWebsocket(
ctx context.Context,
rep Reporter,
w http.ResponseWriter,
r *http.Request,
) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err)
return
}
loop := websocketLoop
if t := r.Form.Get("t"); t != "" {
var err error
if loop, err = time.ParseDuration(t); err != nil {
respondWith(w, http.StatusBadRequest, t)
return
}
}
conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
// log.Info("Upgrade:", err)
return
}
defer conn.Close()
quit := make(chan struct{})
go func(c xfer.Websocket) {
for { // just discard everything the browser sends
if _, _, err := c.ReadMessage(); err != nil {
if !xfer.IsExpectedWSCloseError(err) {
log.Error("err:", err)
}
close(quit)
break
}
}
}(conn)
var (
previousTopo detailed.NodeSummaries
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
topologyID = mux.Vars(r)["topology"]
startReportingAt = deserializeTimestamp(r.Form.Get("timestamp"))
channelOpenedAt = time.Now()
)
rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)
for {
// We measure how much time has passed since the channel was opened
// and add it to the initial report timestamp to get the timestamp
// of the snapshot we want to report right now.
// NOTE: Multiplying `timestampDelta` by a constant factor here
// would have an effect of fast-forward, which is something we
// might be interested in implementing in the future.
timestampDelta := time.Since(channelOpenedAt)
reportTimestamp := startReportingAt.Add(timestampDelta)
re, err := rep.Report(ctx, reportTimestamp)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
}
renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, r.Form, re)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
}
newTopo := detailed.Summaries(ctx, RenderContextForReporter(rep, re), render.Render(ctx, re, renderer, filter).Nodes)
diff := detailed.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo
if err := conn.WriteJSON(diff); err != nil {
if !xfer.IsExpectedWSCloseError(err) {
log.Errorf("cannot serialize topology diff: %s", err)
}
return
}
select {
case <-wait:
case <-tick:
case <-quit:
return
}
}
}