This repository has been archived by the owner on Apr 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 169
/
trace_query.go
361 lines (332 loc) · 11.7 KB
/
trace_query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.
package store
import (
"fmt"
"strings"
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
const (
subqueryFormat = `
SELECT
trace_sub.trace_id, start_time_max - $%[1]d::interval as time_low, start_time_max + $%[1]d::interval as time_high
FROM (
SELECT
s.trace_id,
max(start_time) as start_time_max
FROM _ps_trace.span s
WHERE
%[2]s
GROUP BY s.trace_id
) as trace_sub
ORDER BY trace_sub.start_time_max DESC
`
subqueryTimeRangeForTraceID = `
SELECT
s.trace_id,
s.start_time - $1::interval as time_low,
s.start_time + $1::interval as time_high
FROM _ps_trace.span s
WHERE
s.trace_id = $2
LIMIT 1
`
// PostgreSQL badly overestimates the number of rows returned if the complete trace query
// uses an IN clause on trace_id, but gives good estimates for equality conditions. So, leverage an INNER
// JOIN LATERAL to provide an equality condition on the complete trace.
//
// A lateral join is used for the `link` and `event` table instead of using
// directly `array_agg` calls in the main SELECT clause to avoid returning
// duplicated values when the cartesian product of event x link is greater
// than 1.
//
// The GROUP BY clause of the query can be removed because we are not doing
// any kind of aggregation that would require it, but the tests we did showed
// that with the GROUP BY the resulting query plan is better. In the
// no-group-by case the join between the trace_ids with clause and the
// main _ps_trace.span table happens as a hash_join where both sides scan
// the span table.
//
// Query plans:
//
// - With GROUP BY https://explain.dalibo.com/plan/e6c74995bc36begd
// - Without GROUP BY https://explain.dalibo.com/plan/f09259cd21g57dh3
findTraceSQLFormat = `
WITH trace_ids AS (
%s
)
SELECT
complete_trace.*
FROM
trace_ids
INNER JOIN LATERAL (
SELECT
s.trace_id,
s.span_id,
s.parent_span_id,
s.start_time start_times,
s.end_time end_times,
o.span_kind,
s.dropped_tags_count dropped_tags_counts,
s.dropped_events_count dropped_events_counts,
s.dropped_link_count dropped_link_counts,
s.status_code status_code_string,
s.status_message,
s.trace_state trace_states,
s_url.url schema_urls,
o.span_name span_names,
_ps_trace.tag_map_denormalize(s.resource_tags) resource_tags,
_ps_trace.tag_map_denormalize(s.span_tags) span_tags,
event.event_names,
event.event_times,
event.event_dropped_tags_count,
event.event_tags,
inst_lib.name library_name,
inst_lib.version library_version,
inst_lib_url.url library_schema_url,
links_linked_trace_ids,
links_linked_span_ids,
links_trace_states,
links_dropped_tags_count,
links_tags
FROM
_ps_trace.span s
INNER JOIN
_ps_trace.operation o ON (s.operation_id = o.id)
LEFT JOIN
_ps_trace.schema_url s_url ON s.resource_schema_url_id = s_url.id
LEFT JOIN
_ps_trace.instrumentation_lib inst_lib ON s.instrumentation_lib_id = inst_lib.id
LEFT JOIN
_ps_trace.schema_url inst_lib_url ON inst_lib_url.id = inst_lib.schema_url_id
LEFT JOIN LATERAL (
SELECT
array_agg(e.name ORDER BY e.event_nbr) event_names,
array_agg(e.time ORDER BY e.event_nbr) event_times,
array_agg(e.dropped_tags_count ORDER BY e.event_nbr) event_dropped_tags_count,
array_agg(_ps_trace.tag_map_denormalize(e.tags) ORDER BY e.event_nbr) event_tags
FROM _ps_trace.event as e
WHERE e.trace_id = s.trace_id AND e.span_id = s.span_id
AND e.time > trace_ids.time_low AND e.time < trace_ids.time_high
) as event ON (TRUE)
LEFT JOIN LATERAL (
SELECT
array_agg(lk.linked_trace_id ORDER BY lk.link_nbr) links_linked_trace_ids,
array_agg(lk.linked_span_id ORDER BY lk.link_nbr) links_linked_span_ids,
array_agg(lk.trace_state ORDER BY lk.link_nbr) links_trace_states,
array_agg(lk.dropped_tags_count ORDER BY lk.link_nbr) links_dropped_tags_count,
array_agg(_ps_trace.tag_map_denormalize(lk.tags) ORDER BY lk.link_nbr) links_tags
FROM _ps_trace.link as lk
WHERE lk.trace_id = s.trace_id AND lk.span_id = s.span_id
AND lk.span_start_time > trace_ids.time_low AND lk.span_start_time < trace_ids.time_high
) as link ON (TRUE)
WHERE
s.trace_id = trace_ids.trace_id
AND s.start_time > trace_ids.time_low AND s.start_time < trace_ids.time_high
GROUP BY
s.trace_id,
s.span_id,
s.parent_span_id,
s.start_time,
s.end_time,
s.resource_tags,
s.span_tags,
o.span_name,
o.span_kind,
s_url.url,
inst_lib.name,
inst_lib.version,
inst_lib_url.url,
event_names,
event_times,
event_dropped_tags_count,
event_tags,
links_linked_trace_ids,
links_linked_span_ids,
links_trace_states,
links_dropped_tags_count,
links_tags
) AS complete_trace ON (TRUE)
`
// Keys used to represent OTLP constructs from Jaeger tags which are then dropped from the tag map.
TagError = "error"
TagHostname = "hostname"
TagJaegerVersion = "jaeger.version"
TagSpanKind = "span.kind"
TagW3CTraceState = "w3c.tracestate"
TagEventName = "event"
)
type Builder struct {
cfg *Config
}
func NewBuilder(cfg *Config) *Builder {
return &Builder{cfg}
}
func (b *Builder) findTracesQuery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo) (string, []interface{}) {
subquery, params := b.BuildTraceIDSubquery(q, tInfo)
return fmt.Sprintf(findTraceSQLFormat, subquery), params
}
func (b *Builder) findTraceIDsQuery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo) (string, []interface{}) {
return b.BuildTraceIDSubquery(q, tInfo)
}
func getUUIDFromTraceID(traceID model.TraceID) (pgtype.UUID, error) {
var buf [16]byte
var uuid pgtype.UUID
n, err := traceID.MarshalTo(buf[:])
if n != 16 || err != nil {
return uuid, fmt.Errorf("marshaling TraceID: %w", err)
}
return pgtype.UUID{Bytes: buf, Valid: true}, nil
}
func (b *Builder) getTraceQuery(traceID model.TraceID) (string, []interface{}, error) {
traceUUID, err := getUUIDFromTraceID(traceID)
if err != nil {
return "", nil, fmt.Errorf("TraceID to UUID conversion: %w", err)
}
//it may seem silly to build a traceID subquery when we know the traceID
//but, this allows us to get the time range of the trace for the rest of the query.
subquery, params := b.BuildTraceTimeRangeSubqueryForTraceID(traceUUID)
return fmt.Sprintf(findTraceSQLFormat, subquery), params, nil
}
func (b *Builder) buildOperationSubquery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo, params []interface{}) (string, []interface{}) {
operationClauses := tInfo.operationClauses
if len(q.ServiceName) > 0 {
params = append(params, q.ServiceName)
qual := fmt.Sprintf(`
service_name_id = (
SELECT id
FROM _ps_trace.tag
WHERE key = 'service.name'
AND key_id = 1
AND _prom_ext.jsonb_digest(value) = _prom_ext.jsonb_digest(to_jsonb($%d::text))
)`, len(params))
operationClauses = append(operationClauses, qual)
}
if len(q.OperationName) > 0 {
params = append(params, q.OperationName)
qual := fmt.Sprintf(`span_name = $%d`, len(params))
operationClauses = append(operationClauses, qual)
}
if len(operationClauses) > 0 {
subquery := fmt.Sprintf(`
SELECT
id
FROM _ps_trace.operation op
WHERE
%s
`, strings.Join(operationClauses, " AND "))
return subquery, params
}
return "", params
}
func (b *Builder) buildEventSubquery(q *spanstore.TraceQueryParameters, clauses []string, params []interface{}) (string, []interface{}) {
var defaultTime time.Time
if q.StartTimeMin != defaultTime {
params = append(params, q.StartTimeMin.Add(-b.cfg.MaxTraceDuration))
clauses = append(clauses, fmt.Sprintf(`e.time >= $%d`, len(params)))
}
if q.StartTimeMax != defaultTime {
params = append(params, q.StartTimeMax.Add(b.cfg.MaxTraceDuration))
clauses = append(clauses, fmt.Sprintf(`e.time <= $%d`, len(params)))
}
subquery := fmt.Sprintf(
`SELECT 1
FROM _ps_trace.event e
WHERE s.trace_id = e.trace_id AND s.span_id = e.span_id AND %s`, strings.Join(clauses, " AND "))
return subquery, params
}
func (b *Builder) buildTagClauses(q *spanstore.TraceQueryParameters, tInfo *tagsInfo, params []interface{}) (string, []interface{}) {
clauses := make([]string, 0, len(tInfo.generalTags))
for _, tag := range tInfo.generalTags {
tagClauses := make([]string, 0, 3)
params = append(params, tag.jsonbPairArray)
if tag.isSpan {
tagClauses = append(tagClauses, fmt.Sprintf("(s.span_tags @> ANY($%d::jsonb[]))", len(params)))
}
if tag.isResource {
tagClauses = append(tagClauses, fmt.Sprintf("(s.resource_tags @> ANY($%d::jsonb[]))", len(params)))
}
if tag.isEvent {
var subquery string
subquery, params = b.buildEventSubquery(q, []string{fmt.Sprintf("(e.tags @> ANY($%d::jsonb[]))", len(params))}, params)
tagClauses = append(tagClauses, fmt.Sprintf("EXISTS(%s)", subquery))
}
clauses = append(clauses, "("+strings.Join(tagClauses, " OR ")+")")
}
return "(" + strings.Join(clauses, " AND ") + ")", params
}
func (b *Builder) BuildTraceTimeRangeSubqueryForTraceID(traceID pgtype.UUID) (string, []interface{}) {
params := []interface{}{b.cfg.MaxTraceDuration, traceID}
return subqueryTimeRangeForTraceID, params
}
func (b *Builder) BuildTraceIDSubquery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo) (string, []interface{}) {
clauses := make([]string, 0, 15)
params := tInfo.params
clauses = append(clauses, tInfo.spanClauses...)
if len(tInfo.eventClauses) > 0 {
var subquery string
subquery, params = b.buildEventSubquery(q, tInfo.eventClauses, params)
clauses = append(clauses, fmt.Sprintf("EXISTS(%s)", subquery))
}
operationSubquery, params := b.buildOperationSubquery(q, tInfo, params)
if len(operationSubquery) > 0 {
qual := fmt.Sprintf(`
s.operation_id IN (
%s
)
`, operationSubquery)
clauses = append(clauses, qual)
}
if len(tInfo.generalTags) > 0 {
var tagClause string
tagClause, params = b.buildTagClauses(q, tInfo, params)
clauses = append(clauses, tagClause)
}
//todo check the inclusive semantics here
var defaultTime time.Time
if q.StartTimeMin != defaultTime {
params = append(params, q.StartTimeMin)
clauses = append(clauses, fmt.Sprintf(`s.start_time >= $%d`, len(params)))
}
if q.StartTimeMax != defaultTime {
params = append(params, q.StartTimeMax)
clauses = append(clauses, fmt.Sprintf(`s.start_time <= $%d`, len(params)))
}
var defaultDuration time.Duration
if q.DurationMin != defaultDuration {
if q.DurationMin%time.Millisecond == 0 {
params = append(params, q.DurationMin.Milliseconds())
clauses = append(clauses, fmt.Sprintf(`duration_ms >= $%d`, len(params)))
} else {
params = append(params, q.DurationMin)
clauses = append(clauses, fmt.Sprintf(`(end_time - start_time ) >= $%d`, len(params)))
}
}
if q.DurationMax != defaultDuration {
if q.DurationMax%time.Millisecond == 0 {
params = append(params, q.DurationMax.Milliseconds())
clauses = append(clauses, fmt.Sprintf(`duration_ms <= $%d`, len(params)))
} else {
params = append(params, q.DurationMax)
clauses = append(clauses, fmt.Sprintf(`(end_time - start_time ) <= $%d`, len(params)))
}
}
clauseString := ""
if len(clauses) > 0 {
clauseString = strings.Join(clauses, " AND ")
} else {
clauseString = "TRUE"
}
params = append(params, b.cfg.MaxTraceDuration)
//Note: the parameter number for b.cfg.MaxTraceDuration is used in two places ($%[1]d in subqueryFormat)
//to both add and subtract from start_time_max.
query := fmt.Sprintf(subqueryFormat, len(params), clauseString)
if q.NumTraces != 0 {
query += fmt.Sprintf(" LIMIT %d", q.NumTraces)
}
return query, params
}