Skip to content

Commit e438513

Browse files
authored
[DEV-792] Support tracing in multi stream append (#430)
1 parent 6fa620a commit e438513

File tree

8 files changed

+377
-51
lines changed

8 files changed

+377
-51
lines changed

packages/opentelemetry/src/attributes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export const KurrentAttributes = {
1212
SERVER_PORT: `${server}.port`,
1313

1414
STREAM_APPEND: `${streams}.append`,
15+
STREAM_MULTI_APPEND: `${streams}.multi-append`,
1516
STREAM_SUBSCRIBE: `${streams}.subscribe`,
1617

1718
KURRENT_DB_STREAM: `${kurrentdb}.stream`,

packages/opentelemetry/src/instrumentation.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
Span,
99
SpanKind,
1010
SpanStatusCode,
11+
TimeInput,
1112
trace,
1213
TraceFlags,
1314
Tracer,
@@ -24,6 +25,7 @@ import type {
2425
EventData,
2526
EventType,
2627
JSONEventType,
28+
MultiAppendResult,
2729
ResolvedEvent,
2830
SubscribeToAllOptions,
2931
SubscribeToPersistentSubscriptionToAllOptions,
@@ -38,6 +40,7 @@ import type { Subscription } from "@kurrent/kurrentdb-client/src/streams/utils/S
3840
import { INSTRUMENTATION_NAME, INSTRUMENTATION_VERSION } from "./version";
3941
import type {
4042
AppendToStreamParams,
43+
MultiStreamAppendParams,
4144
PersistentSubscribeParameters,
4245
SubscribeParameters,
4346
} from "./types";
@@ -67,6 +70,11 @@ export class Instrumentation extends InstrumentationBase {
6770
"appendToStream",
6871
this._patchAppendToStream()
6972
);
73+
this.wrap(
74+
moduleExports.KurrentDBClient.prototype,
75+
"multiStreamAppend",
76+
this._patchMultiStreamAppend()
77+
);
7078
this.wrap(
7179
moduleExports.KurrentDBClient.prototype,
7280
"subscribeToStream",
@@ -106,6 +114,10 @@ export class Instrumentation extends InstrumentationBase {
106114
this._diag.debug("un-patching");
107115

108116
this._unwrap(moduleExports.KurrentDBClient.prototype, "appendToStream");
117+
this._unwrap(
118+
moduleExports.KurrentDBClient.prototype,
119+
"multiStreamAppend"
120+
);
109121
this._unwrap(
110122
moduleExports.KurrentDBClient.prototype,
111123
"subscribeToStream"
@@ -196,6 +208,118 @@ export class Instrumentation extends InstrumentationBase {
196208
};
197209
}
198210

211+
private _patchMultiStreamAppend(): (
212+
original: Function,
213+
operation: keyof kurrentdb.KurrentDBClient
214+
) => (...args: MultiStreamAppendParams) => Promise<MultiAppendResult> {
215+
const instrumentation = this;
216+
const tracer = instrumentation.tracer;
217+
218+
return function multiStreamAppend(
219+
original: Function,
220+
operation: keyof kurrentdb.KurrentDBClient
221+
) {
222+
return async function (
223+
this: kurrentdb.KurrentDBClient,
224+
...args: MultiStreamAppendParams
225+
): Promise<MultiAppendResult> {
226+
const [requests] = [...args];
227+
228+
const uri = await this.resolveUri();
229+
const { hostname, port } = Instrumentation.getServerAddress(uri);
230+
231+
const requestStartTime: TimeInput = Date.now();
232+
233+
const span = tracer.startSpan(KurrentAttributes.STREAM_MULTI_APPEND, {
234+
kind: SpanKind.CLIENT,
235+
startTime: requestStartTime,
236+
attributes: {
237+
[KurrentAttributes.SERVER_ADDRESS]: hostname,
238+
[KurrentAttributes.SERVER_PORT]: port,
239+
[KurrentAttributes.DATABASE_SYSTEM]: INSTRUMENTATION_NAME,
240+
[KurrentAttributes.DATABASE_OPERATION]: operation,
241+
},
242+
});
243+
244+
requests.forEach((request) => {
245+
const traceId = span.spanContext().traceId;
246+
const spanId = span.spanContext().spanId;
247+
248+
request.events.forEach((event) => {
249+
const metadata = (event.metadata = event.metadata || {});
250+
if (isJSONEventData(event) && typeof metadata === "object") {
251+
event.metadata = {
252+
...metadata,
253+
[TRACE_ID]: traceId,
254+
[SPAN_ID]: spanId,
255+
};
256+
}
257+
});
258+
});
259+
260+
try {
261+
const result = await original.apply(this, [requests]);
262+
263+
const requestEndTime: TimeInput = Date.now();
264+
265+
if (!result.success) {
266+
const failures: kurrentdb.AppendStreamFailure[] = result.output;
267+
268+
span.setStatus({
269+
code: SpanStatusCode.ERROR,
270+
});
271+
272+
failures.forEach((failure) => {
273+
switch (failure.details.type) {
274+
case "wrong_expected_revision":
275+
span.addEvent("exception", {
276+
"exception.type": "wrong_expected_revision",
277+
"exception.revision":
278+
failure.details.revision.toLocaleString(),
279+
});
280+
break;
281+
282+
case "access_denied":
283+
span.addEvent("exception", {
284+
"exception.type": failure.details.type,
285+
"exception.message": failure.details.reason,
286+
});
287+
break;
288+
289+
case "stream_deleted":
290+
span.addEvent("exception", {
291+
"exception.type": failure.details.type,
292+
});
293+
break;
294+
295+
case "transaction_max_size_exceeded":
296+
span.addEvent("exception", {
297+
"exception.type": failure.details.type,
298+
"exception.max_size":
299+
failure.details.maxSize.toLocaleString(),
300+
});
301+
break;
302+
303+
case "unknown":
304+
span.addEvent("exception", {
305+
"exception.type": "unknown",
306+
});
307+
break;
308+
}
309+
});
310+
}
311+
312+
span.end(requestEndTime);
313+
314+
return result;
315+
} catch (error) {
316+
Instrumentation.handleError(error, span);
317+
throw error;
318+
}
319+
};
320+
};
321+
}
322+
199323
static applySubscriptionInstrumentation<KnownEventType>(
200324
spanName: string,
201325
subscription:

packages/opentelemetry/src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@ export type PersistentSubscribeParameters =
2323
export type AppendToStreamParams = Parameters<
2424
kdb.KurrentDBClient["appendToStream"]
2525
>;
26+
27+
export type MultiStreamAppendParams = Parameters<
28+
kdb.KurrentDBClient["multiStreamAppend"]
29+
>;

packages/test/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"@opentelemetry/api": "^1.9.0",
3838
"@opentelemetry/exporter-trace-otlp-grpc": "^0.51.1",
3939
"@opentelemetry/instrumentation": "^0.56.0",
40+
"@opentelemetry/resources": "^2.0.1",
4041
"@opentelemetry/sdk-trace-node": "^1.30.0",
4142
"@opentelemetry/semantic-conventions": "^1.28.0",
4243
"@types/debug": "^4.1.12",

packages/test/src/opentelemetry/instrumentation.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
/** @jest-environment ./src/utils/enableVersionCheck.ts */
2+
13
import { createTestNode, Defer, delay, jsonTestEvents } from "@test-utils";
24
import {
35
NodeTracerProvider,

0 commit comments

Comments
 (0)