/
kafka-recorder.js
81 lines (71 loc) · 2.58 KB
/
kafka-recorder.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Copyright 2020 The OpenZipkin Authors; licensed to You under the Apache License, Version 2.0.
const {
TraceId, option: {fromNullable}, Annotation, HttpHeaders
} = require('zipkin');
function bufferToAscii(maybeBuffer) { // TODO: backfill tests for this
return Buffer.isBuffer(maybeBuffer) ? maybeBuffer.asciiSlice(0) : maybeBuffer;
}
const recordConsumeStart = (tracer, name, remoteServiceName, {topic, partition, message}) => {
const traceId = message.headers[HttpHeaders.TraceId];
const spanId = message.headers[HttpHeaders.SpanId];
let id;
if (traceId && spanId) {
const parentId = message.headers[HttpHeaders.ParentSpanId];
const sampled = message.headers[HttpHeaders.Sampled];
const flags = message.headers[HttpHeaders.Flags];
id = tracer.createChildId(new TraceId({
traceId: bufferToAscii(traceId),
parentId: fromNullable(parentId).map(bufferToAscii),
spanId: bufferToAscii(spanId),
sampled: fromNullable(sampled).map(bufferToAscii),
debug: flags ? parseInt(flags) === 1 : false
}));
} else {
id = tracer.createRootId();
}
tracer.setId(id);
tracer.recordServiceName(tracer.localEndpoint.serviceName);
tracer.recordRpc(name);
tracer.recordBinary('kafka.topic', topic);
tracer.recordBinary('kafka.partition', partition);
if (typeof remoteServiceName !== 'undefined') {
tracer.recordAnnotation(new Annotation.ServerAddr({serviceName: remoteServiceName}));
}
tracer.recordAnnotation(new Annotation.ConsumerStart());
return id;
};
const recordConsumeStop = (tracer, id, error) => {
tracer.letId(id, () => {
if (typeof error !== 'undefined') {
tracer.recordBinary('error', error.toString());
}
tracer.recordAnnotation(new Annotation.ConsumerStop());
});
};
const recordProducerStart = (tracer, name, remoteServiceName, {topic}) => {
tracer.setId(tracer.createChildId());
const traceId = tracer.id;
tracer.recordServiceName(tracer.localEndpoint.serviceName);
tracer.recordRpc(name);
tracer.recordBinary('kafka.topic', topic);
if (typeof remoteServiceName !== 'undefined') {
tracer.recordAnnotation(new Annotation.ServerAddr({serviceName: remoteServiceName}));
}
tracer.recordAnnotation(new Annotation.ProducerStart());
return traceId;
};
const recordProducerStop = (tracer, id, error) => {
tracer.letId(id, () => {
if (error) {
tracer.recordBinary('error', error.toString());
}
tracer.recordAnnotation(new Annotation.ProducerStop());
});
};
module.exports = {
recordConsumeStart,
recordConsumeStop,
recordProducerStart,
recordProducerStop,
bufferToAscii
};