Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(instrumentation-grpc): instrument @grpc/grpc-js Client methods #3804

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ All notable changes to experimental packages in this project will be documented

* fix(sdk-node): use resource interface instead of concrete class [#3803](https://github.com/open-telemetry/opentelemetry-js/pull/3803) @blumamir
* fix(sdk-logs): remove includeTraceContext configuration and use LogRecord context when available [#3817](https://github.com/open-telemetry/opentelemetry-js/pull/3817) @hectorhdzg
* fix(instrumentation-grpc): instrument @grpc/grpc-js Client methods [#3804](https://github.com/open-telemetry/opentelemetry-js/pull/3804) @pichlermarc

## 0.39.1

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
build
test/proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"prepublishOnly": "npm run compile",
"compile": "tsc --build",
"clean": "tsc --build --clean",
"test": "nyc ts-mocha -p tsconfig.json test/**/*.test.ts",
"test": "npm run protos:generate && nyc ts-mocha -p tsconfig.json test/**/*.test.ts",
"tdd": "npm run test -- --watch-extensions ts --watch",
"lint": "eslint . --ext .ts",
"lint:fix": "eslint . --ext .ts --fix",
Expand All @@ -18,7 +18,8 @@
"watch": "tsc --build --watch",
"precompile": "cross-var lerna run version --scope $npm_package_name --include-dependencies",
"prewatch": "node ../../../scripts/version-update.js",
"peer-api-check": "node ../../../scripts/peer-api-check.js"
"peer-api-check": "node ../../../scripts/peer-api-check.js",
"protos:generate": "cd test/fixtures && buf generate"
},
"keywords": [
"opentelemetry",
Expand All @@ -45,6 +46,10 @@
"access": "public"
},
"devDependencies": {
"@bufbuild/buf": "1.21.0-1",
"@protobuf-ts/grpc-transport": "2.9.0",
Flarna marked this conversation as resolved.
Show resolved Hide resolved
"@protobuf-ts/runtime-rpc": "2.9.0",
"@protobuf-ts/runtime": "2.9.0",
"@grpc/grpc-js": "^1.7.1",
"@grpc/proto-loader": "^0.7.3",
"@opentelemetry/api": "1.4.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@
*/

import { GrpcJsInstrumentation } from './';
import type { GrpcClientFunc, SendUnaryDataCallback } from './types';
import {
Span,
SpanStatusCode,
SpanStatus,
propagation,
context,
} from '@opentelemetry/api';
import type {
GrpcClientFunc,
GrpcEmitter,
SendUnaryDataCallback,
} from './types';
import { context, propagation, Span, SpanStatus } from '@opentelemetry/api';
import type * as grpcJs from '@grpc/grpc-js';
import {
_grpcStatusCodeToSpanStatus,
_grpcStatusCodeToOpenTelemetryStatusCode,
_grpcStatusCodeToSpanStatus,
_methodIsIgnored,
} from '../utils';
import { CALL_SPAN_ENDED } from './serverUtils';
import { EventEmitter } from 'events';
import { errorMonitor, EventEmitter } from 'events';
import { AttributeNames } from '../enums/AttributeNames';
import { SemanticAttributes } from '@opentelemetry/semantic-conventions';
import { metadataCaptureType } from '../internal-types';
Expand Down Expand Up @@ -66,6 +63,91 @@ export function getMethodsToWrap(
return methodList;
}

/**
* Patches a callback so that the current span for this trace is also ended
* when the callback is invoked.
*/
export function patchedCallback(
span: Span,
callback: SendUnaryDataCallback<ResponseType>
) {
const wrappedFn: SendUnaryDataCallback<ResponseType> = (
err: grpcJs.ServiceError | null,
res?: ResponseType
) => {
if (err) {
Flarna marked this conversation as resolved.
Show resolved Hide resolved
if (err.code) {
span.setStatus(_grpcStatusCodeToSpanStatus(err.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, err.code);
}
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
});
} else {
span.setAttribute(
SemanticAttributes.RPC_GRPC_STATUS_CODE,
GRPC_STATUS_CODE_OK
);
}

span.end();
callback(err, res);
Flarna marked this conversation as resolved.
Show resolved Hide resolved
};
return context.bind(context.active(), wrappedFn);
}

export function patchResponseMetadataEvent(
span: Span,
call: GrpcEmitter,
metadataCapture: metadataCaptureType
) {
call.on('metadata', (responseMetadata: any) => {
metadataCapture.client.captureResponseMetadata(span, responseMetadata);
});
}

export function patchResponseStreamEvents(span: Span, call: GrpcEmitter) {
// Both error and status events can be emitted
// the first one emitted set spanEnded to true
let spanEnded = false;
const endSpan = () => {
if (!spanEnded) {
span.end();
spanEnded = true;
}
};
context.bind(context.active(), call);
call.on(errorMonitor, (err: grpcJs.ServiceError) => {
if (spanEnded) {
return;
}

span.setStatus({
code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code),
message: err.message,
});
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
[SemanticAttributes.RPC_GRPC_STATUS_CODE]: err.code,
});

endSpan();
});

call.on('status', (status: SpanStatus) => {
if (spanEnded) {
return;
}

span.setStatus(_grpcStatusCodeToSpanStatus(status.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.code);

endSpan();
});
}

/**
* Execute grpc client call. Apply completitionspan properties and end the
* span on callback or receiving an emitted event.
Expand All @@ -77,41 +159,6 @@ export function makeGrpcClientRemoteCall(
metadata: grpcJs.Metadata,
self: grpcJs.Client
): (span: Span) => EventEmitter {
/**
* Patches a callback so that the current span for this trace is also ended
* when the callback is invoked.
*/
function patchedCallback(
span: Span,
callback: SendUnaryDataCallback<ResponseType>
) {
const wrappedFn: SendUnaryDataCallback<ResponseType> = (
err: grpcJs.ServiceError | null,
res?: ResponseType
) => {
if (err) {
if (err.code) {
span.setStatus(_grpcStatusCodeToSpanStatus(err.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, err.code);
}
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
});
} else {
span.setStatus({ code: SpanStatusCode.UNSET });
span.setAttribute(
SemanticAttributes.RPC_GRPC_STATUS_CODE,
GRPC_STATUS_CODE_OK
);
}

span.end();
callback(err, res);
};
return context.bind(context.active(), wrappedFn);
}

return (span: Span) => {
// if unary or clientStream
if (!original.responseStream) {
Expand All @@ -135,90 +182,68 @@ export function makeGrpcClientRemoteCall(

// if server stream or bidi
if (original.responseStream) {
// Both error and status events can be emitted
// the first one emitted set spanEnded to true
let spanEnded = false;
const endSpan = () => {
if (!spanEnded) {
span.end();
spanEnded = true;
}
};
context.bind(context.active(), call);
call.on('error', (err: grpcJs.ServiceError) => {
if (call[CALL_SPAN_ENDED]) {
return;
}
call[CALL_SPAN_ENDED] = true;

span.setStatus({
code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code),
message: err.message,
});
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
[SemanticAttributes.RPC_GRPC_STATUS_CODE]: err.code,
});

endSpan();
});

call.on('status', (status: SpanStatus) => {
if (call[CALL_SPAN_ENDED]) {
return;
}
call[CALL_SPAN_ENDED] = true;

span.setStatus(_grpcStatusCodeToSpanStatus(status.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.code);

endSpan();
});
patchResponseStreamEvents(span, call);
}
return call;
};
}

/**
* Returns the metadata argument from user provided arguments (`args`)
*/
export function getMetadata(
this: GrpcJsInstrumentation,
grpcClient: typeof grpcJs,
original: GrpcClientFunc,
export function getMetadataIndex(
args: Array<unknown | grpcJs.Metadata>
): grpcJs.Metadata {
let metadata: grpcJs.Metadata;

): number {
// This finds an instance of Metadata among the arguments.
// A possible issue that could occur is if the 'options' parameter from
// the user contains an '_internal_repr' as well as a 'getMap' function,
// but this is an extremely rare case.
let metadataIndex = args.findIndex((arg: unknown | grpcJs.Metadata) => {
return args.findIndex((arg: unknown | grpcJs.Metadata) => {
return (
arg &&
typeof arg === 'object' &&
(arg as grpcJs.Metadata)['internalRepr'] && // changed from _internal_repr in grpc --> @grpc/grpc-js https://github.com/grpc/grpc-node/blob/95289edcaf36979cccf12797cc27335da8d01f03/packages/grpc-js/src/metadata.ts#L88
typeof (arg as grpcJs.Metadata).getMap === 'function'
);
});
}

/**
* Returns the metadata argument from user provided arguments (`args`)
* If no metadata is provided in `args`: adds empty metadata to `args` and returns that empty metadata
*/
export function extractMetadataOrSplice(
grpcLib: typeof grpcJs,
args: Array<unknown | grpcJs.Metadata>,
spliceIndex: number
) {
let metadata: grpcJs.Metadata;
const metadataIndex = getMetadataIndex(args);
if (metadataIndex === -1) {
metadata = new grpcClient.Metadata();
if (!original.requestStream) {
// unary or server stream
metadataIndex = 1;
} else {
// client stream or bidi
metadataIndex = 0;
}
args.splice(metadataIndex, 0, metadata);
// Create metadata if it does not exist
metadata = new grpcLib.Metadata();
args.splice(spliceIndex, 0, metadata);
} else {
metadata = args[metadataIndex] as grpcJs.Metadata;
}
return metadata;
}

/**
* Returns the metadata argument from user provided arguments (`args`)
* Adds empty metadata to arguments if the default is used.
*/
export function extractMetadataOrSpliceDefault(
grpcClient: typeof grpcJs,
original: GrpcClientFunc,
args: Array<unknown | grpcJs.Metadata>
): grpcJs.Metadata {
if (!original.requestStream) {
// unary or server stream
return extractMetadataOrSplice(grpcClient, args, 1);
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
} else {
// client stream or bidi
return extractMetadataOrSplice(grpcClient, args, 0);
}
}

/**
* Inject opentelemetry trace context into `metadata` for use by another
* grpc receiver
Expand Down
Loading