-
Notifications
You must be signed in to change notification settings - Fork 474
/
aggregate_node.go
289 lines (251 loc) · 12.3 KB
/
aggregate_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
package appender
import (
"fmt"
"time"
"github.com/prometheus/common/model"
"github.com/kiali/kiali/graph"
"github.com/kiali/kiali/graph/telemetry/istio/util"
"github.com/kiali/kiali/log"
"github.com/kiali/kiali/prometheus"
)
const (
AggregateNodeAppenderName = "aggregateNode"
)
// AggregateNodeAppender is responsible for injecting aggregate nodes into the graph to gain
// visibility into traffic aggregations for a user-specfied metric attribute.
// Note: Aggregate Nodes are supported only on Requests traffic (not TCP or gRPC-message traffic)
type AggregateNodeAppender struct {
Aggregate string
AggregateValue string
GraphType string
InjectServiceNodes bool
Namespaces map[string]graph.NamespaceInfo
QueryTime int64 // unix time in seconds
Rates graph.RequestedRates
Service string
}
// Name implements Appender
func (a AggregateNodeAppender) Name() string {
return AggregateNodeAppenderName
}
// IsFinalizer implements Appender
func (a AggregateNodeAppender) IsFinalizer() bool {
return false
}
// AppendGraph implements Appender
func (a AggregateNodeAppender) AppendGraph(trafficMap graph.TrafficMap, globalInfo *graph.AppenderGlobalInfo, namespaceInfo *graph.AppenderNamespaceInfo) {
if len(trafficMap) == 0 {
return
}
// Aggregate Nodes are not applicable to Service Graphs
if a.GraphType == graph.GraphTypeService {
return
}
// Aggregate Nodes are currently supported only on Requests traffic (not TCP or gRPC-message traffic)
if a.Rates.Grpc != graph.RateRequests && a.Rates.Http != graph.RateRequests {
return
}
if globalInfo.PromClient == nil {
var err error
globalInfo.PromClient, err = prometheus.NewClient()
graph.CheckError(err)
}
if a.AggregateValue == "" {
a.appendGraph(trafficMap, namespaceInfo.Namespace, globalInfo.PromClient)
} else {
a.appendNodeGraph(trafficMap, namespaceInfo.Namespace, globalInfo.PromClient)
}
}
func (a AggregateNodeAppender) appendGraph(trafficMap graph.TrafficMap, namespace string, client *prometheus.Client) {
log.Tracef("Resolving request aggregates for namespace=[%s], aggregate=[%s]", namespace, a.Aggregate)
duration := a.Namespaces[namespace].Duration
// query prometheus for aggregate info in two queries (assume aggregation is typically request classification, so use dest telemetry):
// note1: we want to only match the aggregate when it is set and not "unknown". But in Prometheus a negative test on an unset label
// matches everything, so using %s!=unknown means we still have to filter out unset time-series below...
// note2: for now we will filter out aggregates with no traffic on the assumption that users probably don't want to
// see them and it will just increase the graph density. To change that behavior remove the "> 0" conditions.
// 1) query for requests originating from a workload outside the namespace.
groupBy := fmt.Sprintf("source_cluster,source_workload_namespace,source_workload,source_canonical_service,source_canonical_revision,destination_cluster,destination_service_namespace,destination_service,destination_service_name,destination_workload_namespace,destination_workload,destination_canonical_service,destination_canonical_revision,request_protocol,response_code,grpc_response_status,response_flags,%s", a.Aggregate)
httpQuery := fmt.Sprintf(`sum(rate(%s{reporter="destination",source_workload_namespace!="%s",destination_service_namespace="%v",%s!="unknown"}[%vs])) by (%s) > 0`,
"istio_requests_total",
namespace,
namespace,
a.Aggregate,
int(duration.Seconds()), // range duration for the query
groupBy)
query := httpQuery
vector := promQuery(query, time.Unix(a.QueryTime, 0), client.GetContext(), client.API(), a)
a.injectAggregates(trafficMap, &vector)
// 2) query for requests originating from a workload inside of the namespace
httpQuery = fmt.Sprintf(`sum(rate(%s{reporter="destination",source_workload_namespace="%s",%s!="unknown"}[%vs])) by (%s) > 0`,
"istio_requests_total",
namespace,
a.Aggregate,
int(duration.Seconds()), // range duration for the query
groupBy)
query = httpQuery
vector = promQuery(query, time.Unix(a.QueryTime, 0), client.GetContext(), client.API(), a)
a.injectAggregates(trafficMap, &vector)
}
func (a AggregateNodeAppender) appendNodeGraph(trafficMap graph.TrafficMap, namespace string, client *prometheus.Client) {
log.Tracef("Resolving node request aggregates for namespace=[%s], aggregate=[%s=%s]", namespace, a.Aggregate, a.AggregateValue)
duration := a.Namespaces[namespace].Duration
// query prometheus for aggregate info in a single query (assume aggregation is typically request classification, so use dest telemetry):
// note1: for now we will filter out aggregates with no traffic on the assumption that users probably don't want to
// see them and it will just increase the graph density. To change that behavior remove the "> 0" conditions.
serviceFragment := ""
if a.Service != "" {
serviceFragment = fmt.Sprintf(`,destination_service_name="%s"`, a.Service)
}
groupBy := fmt.Sprintf("source_cluster,source_workload_namespace,source_workload,source_canonical_service,source_canonical_revision,destination_cluster,destination_service_namespace,destination_service,destination_service_name,destination_workload_namespace,destination_workload,destination_canonical_service,destination_canonical_revision,request_protocol,response_code,grpc_response_status,response_flags,%s", a.Aggregate)
httpQuery := fmt.Sprintf(`sum(rate(%s{reporter="destination",destination_service_namespace="%s",%s="%s"%s}[%vs])) by (%s) > 0`,
"istio_requests_total",
namespace,
a.Aggregate,
a.AggregateValue,
serviceFragment,
int(duration.Seconds()), // range duration for the query
groupBy)
query := httpQuery
vector := promQuery(query, time.Unix(a.QueryTime, 0), client.GetContext(), client.API(), a)
a.injectAggregates(trafficMap, &vector)
}
func (a AggregateNodeAppender) injectAggregates(trafficMap graph.TrafficMap, vector *model.Vector) {
skipRequestsGrpc := a.Rates.Grpc != graph.RateRequests
skipRequestsHttp := a.Rates.Http != graph.RateRequests
for _, s := range *vector {
m := s.Metric
lSourceCluster, sourceClusterOk := m["source_cluster"]
lSourceWlNs, sourceWlNsOk := m["source_workload_namespace"]
lSourceWl, sourceWlOk := m["source_workload"]
lSourceApp, sourceAppOk := m["source_canonical_service"]
lSourceVer, sourceVerOk := m["source_canonical_revision"]
lDestCluster, destClusterOk := m["destination_cluster"]
lDestSvcNs, destSvcNsOk := m["destination_service_namespace"]
lDestSvc, destSvcOk := m["destination_service"]
lDestSvcName, destSvcNameOk := m["destination_service_name"]
lDestWlNs, destWlNsOk := m["destination_workload_namespace"]
lDestWl, destWlOk := m["destination_workload"]
lDestApp, destAppOk := m["destination_canonical_service"]
lDestVer, destVerOk := m["destination_canonical_revision"]
lCode := m["response_code"]
lGrpc, grpcOk := m["grpc_response_status"] // will be missing for non-GRPC
lFlags, flagsOk := m["response_flags"]
lProtocol, protocolOk := m["request_protocol"] // because currently we only support requests traffic the protocol should be set
lAggregate, aggregateOk := m[model.LabelName(a.Aggregate)] // may be unset, see note above
if !aggregateOk {
continue
}
if !sourceWlNsOk || !sourceWlOk || !sourceAppOk || !sourceVerOk || !destSvcNsOk || !destSvcOk || !destSvcNameOk || !destWlNsOk || !destWlOk || !destAppOk || !destVerOk || !flagsOk || !protocolOk {
log.Warningf("Skipping %v, missing expected labels", m.String())
continue
}
sourceWlNs := string(lSourceWlNs)
sourceWl := string(lSourceWl)
sourceApp := string(lSourceApp)
sourceVer := string(lSourceVer)
destSvc := string(lDestSvc)
code := string(lCode)
protocol := string(lProtocol)
flags := string(lFlags)
aggregate := string(lAggregate)
if (skipRequestsHttp && protocol == graph.HTTP.Name) || (skipRequestsGrpc && protocol == graph.GRPC.Name) {
continue
}
// handle clusters
sourceCluster, destCluster := util.HandleClusters(lSourceCluster, sourceClusterOk, lDestCluster, destClusterOk)
if util.IsBadSourceTelemetry(sourceCluster, sourceClusterOk, sourceWlNs, sourceWl, sourceApp) {
continue
}
if protocolOk {
// set response code in a backward compatible way
code = util.HandleResponseCode(protocol, code, grpcOk, string(lGrpc))
} else {
// because currently we only support requests traffic the protocol should be set
log.Warningf("Skipping %v, missing expected protocol label", m.String())
continue
// protocol = "tcp"
}
// handle unusual destinations
destCluster, destSvcNs, destSvcName, destWlNs, destWl, destApp, destVer, _ := util.HandleDestination(sourceCluster, sourceWlNs, sourceWl, destCluster, string(lDestSvcNs), string(lDestSvc), string(lDestSvcName), string(lDestWlNs), string(lDestWl), string(lDestApp), string(lDestVer))
if util.IsBadDestTelemetry(destCluster, destClusterOk, destSvcNs, destSvc, destSvcName, destWl) {
continue
}
// make code more readable by setting "host" because "destSvc" holds destination.service.host | request.host | "unknown"
host := destSvc
val := float64(s.Value)
// inject aggregate node between source and destination
sourceID, _, _ := graph.Id(sourceCluster, sourceWlNs, "", sourceWlNs, sourceWl, sourceApp, sourceVer, a.GraphType)
sourceNode, sourceFound := trafficMap[sourceID]
if !sourceFound {
log.Debugf("Expected source [%s] node not found in traffic map. Skipping aggregate injection [%s]", sourceID, aggregate)
continue
}
// if service nodes are injected show the service-related aggregation:
// - use the service node as the dest
// - associate aggregate node with the destSvcName and, if set, destApp
// else show the independent aggregation by using the workload/app node as the dest
destID := ""
if a.InjectServiceNodes {
destID, _, _ = graph.Id(destCluster, destSvcNs, destSvcName, "", "", "", "", a.GraphType) // service
} else {
destID, _, _ = graph.Id(destCluster, destSvcNs, destSvcName, destWlNs, destWl, destApp, destVer, a.GraphType) // wl/app
}
destNode, destFound := trafficMap[destID]
if !destFound {
log.Debugf("Expected dest [%s] node not found in traffic map. Skipping aggregate injection [%s]", destID, aggregate)
continue
}
var aggrNode *graph.Node
if a.InjectServiceNodes {
aggrNode, _ = addNode(trafficMap, destCluster, destSvcNs, a.Aggregate, aggregate, destSvcName, destApp)
} else {
aggrNode, _ = addNode(trafficMap, destCluster, destWlNs, a.Aggregate, aggregate, "", "")
}
// replace the non-classified edge (from source to dest) with the classified edges
// - note that if not every request has a classification match the traffic may be lower than actual, I
// think this this OK, and if the user cares they should define a "catch-all" classification match
safeEdges := []*graph.Edge{}
for _, e := range sourceNode.Edges {
if e.Dest.ID != destID {
safeEdges = append(safeEdges, e)
}
}
sourceNode.Edges = safeEdges
addTraffic(val, protocol, code, flags, host, sourceNode, aggrNode)
addTraffic(val, protocol, code, flags, host, aggrNode, destNode)
}
}
func addTraffic(val float64, protocol, code, flags, host string, source, dest *graph.Node) {
var edge *graph.Edge
for _, e := range source.Edges {
if dest.ID == e.Dest.ID && e.Metadata[graph.ProtocolKey] == protocol {
edge = e
break
}
}
if nil == edge {
edge = source.AddEdge(dest)
edge.Metadata[graph.ProtocolKey] = protocol
}
// Only update traffic on the aggregate node and associated edges. Remember that this is an appender and the
// in/out traffic is already set for the non-aggregate nodes.
var sourceMetadata graph.Metadata
var destMetadata graph.Metadata
if source.NodeType == graph.NodeTypeAggregate {
sourceMetadata = source.Metadata
} else {
destMetadata = dest.Metadata
}
graph.AddToMetadata(protocol, val, code, flags, host, sourceMetadata, destMetadata, edge.Metadata)
}
func addNode(trafficMap graph.TrafficMap, cluster, namespace, aggregate, aggregateVal, svcName, app string) (*graph.Node, bool) {
id := graph.AggregateID(cluster, namespace, aggregate, aggregateVal, svcName)
node, found := trafficMap[id]
if !found {
newNode := graph.NewAggregateNodeExplicit(id, cluster, namespace, aggregate, aggregateVal, svcName, app)
node = &newNode
trafficMap[id] = node
}
return node, found
}