/
SpanSingle.js
117 lines (101 loc) · 2.37 KB
/
SpanSingle.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import {Single, IFutureSubscriber} from 'rsocket-flowable/build/Single';
import {Tracer, Span, SpanContext, FORMAT_TEXT_MAP} from 'opentracing';
export function createSpanSingle(
single: Single<T>,
tracer: Tracer,
name: string,
context?: SpanContext | Span,
metadata?: Object,
...tags: Object
) {
return new Single(subscriber => {
const spanSubscriber = new SpanSingleSubscriber(
subscriber,
tracer,
name,
context,
metadata,
...tags,
);
single.subscribe(spanSubscriber);
});
}
class SpanSingleSubscriber implements IFutureSubscriber<T> {
_span: Span;
_subscriber: IFutureSubscriber<T>;
_tracer: Tracer;
_cancel: () => void;
constructor(
subscriber: IFutureSubscriber<T>,
tracer: Tracer,
name: string,
context?: SpanContext | Span,
metadata?: Object,
...tags: Object
) {
this._subscriber = subscriber;
this._tracer = tracer;
let options = {};
if (context) {
options.childOf = context;
}
if (tags) {
const finalTags = {};
tags.forEach(tag => {
Object.keys(tag).forEach(key => {
finalTags[key] = tag[key];
});
});
options.tags = finalTags;
}
//Not supported at this time.
// if (references) {
// options.references = references;
// }
//
options.startTime = Date.now() * 1000;
this._span = this._tracer.startSpan(name, options);
this._tracer.inject(
this._span.context(),
FORMAT_TEXT_MAP,
metadata === undefined || metadata === null ? {} : metadata,
);
}
cleanup() {
this._span.finish();
}
onSubscribe(cancel?: () => void) {
this._cancel = cancel;
this._span.log('onSubscribe', timeInMicros());
this._subscriber.onSubscribe(() => {
this.cancel();
});
}
cancel() {
try {
this._span.log('cancel', timeInMicros());
this._cancel && this._cancel();
} finally {
this.cleanup();
}
}
onError(error: Error) {
try {
this._span.log('onError', timeInMicros());
this._subscriber.onError(error);
} finally {
this.cleanup();
}
}
onComplete(value: T) {
try {
this._span.log('onComplete', timeInMicros());
this._subscriber.onComplete(value);
} finally {
this.cleanup();
}
}
}
function timeInMicros() {
return Date.now() * 1000 /*microseconds*/;
}