Skip to content

Commit

Permalink
fix(instrumentation-grpc): reduce duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
pichlermarc committed May 23, 2023
1 parent 47f5518 commit 50a3513
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ export function patchedCallback(
return context.bind(context.active(), wrappedFn);
}

export function patchResponseMetadataEvent(
span: Span,
call: GrpcEmitter,
metadataCapture: metadataCaptureType
) {
call.on('metadata', responseMetadata => {
metadataCapture.client.captureResponseMetadata(
span,
responseMetadata
);
});
}

export function patchResponseStreamEvents(span: Span, call: GrpcEmitter) {
// Both error and status events can be emitted
// the first one emitted set spanEnded to true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
UnaryRequestFunction,
ClientStreamFunction,
ServerStreamRequestFunction,
BidiStreamRequestFunction,
BidiStreamRequestFunction, GrpcEmitter,
} from './types';
import {
context,
Expand All @@ -55,10 +55,15 @@ import {
getMetadata,
setSpanContext,
patchedCallback,
patchResponseStreamEvents,
patchResponseStreamEvents, patchResponseMetadataEvent,
} from './clientUtils';
import { EventEmitter } from 'events';
import {_extractMethodAndService, _methodIsIgnored, metadataCapture, URI_REGEX} from '../utils';
import {
_extractMethodAndService,
_methodIsIgnored,
metadataCapture,
URI_REGEX,
} from '../utils';
import { AttributeValues } from '../enums/AttributeValues';
import { SemanticAttributes } from '@opentelemetry/semantic-conventions';

Expand Down Expand Up @@ -320,10 +325,10 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
options: grpcJs.CallOptions,
callback: grpcJs.requestCallback<any>
): grpcJs.ClientUnaryCall {
const name = `grpc.${method.replace('/', '')}`;
const { service, method: methodAttributeValue } =
_extractMethodAndService(method);
const { name, service, methodAttributeValue } =
instrumentation._splitMethodString(method);

// Do not trace if method is ignored
if (
_methodIsIgnored(
methodAttributeValue,
Expand All @@ -342,25 +347,18 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
);
}

const span = instrumentation.tracer
.startSpan(name, { kind: SpanKind.CLIENT })
.setAttributes({
[SemanticAttributes.RPC_SYSTEM]: 'grpc',
[SemanticAttributes.RPC_METHOD]: methodAttributeValue,
[SemanticAttributes.RPC_SERVICE]: service,
});

instrumentation.extractNetMetadata(this, span);

instrumentation._metadataCapture.client.captureRequestMetadata(
span,
const span = instrumentation.createClientSpan(
name,
methodAttributeValue,
service,
metadata
);
instrumentation.extractNetMetadata(this, span);

return context.with(trace.setSpan(context.active(), span), () => {
setSpanContext(metadata);

const originalRequestResult = originalUnaryRequest.call(
const call = originalUnaryRequest.call(
this,
method,
serialize,
Expand All @@ -370,13 +368,12 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
options,
patchedCallback(span, callback)
);
originalRequestResult.on('metadata', responseMetadata => {
instrumentation._metadataCapture.client.captureResponseMetadata(
span,
responseMetadata
);
});
return originalRequestResult;
patchResponseMetadataEvent(
span,
call,
instrumentation._metadataCapture
);
return call;
});
} as typeof grpcJs.Client.prototype.makeUnaryRequest;
};
Expand All @@ -400,10 +397,10 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
options: grpcJs.CallOptions,
callback: grpcJs.requestCallback<any>
): grpcJs.ClientWritableStream<any> {
const name = `grpc.${method.replace('/', '')}`;
const { service, method: methodAttributeValue } =
_extractMethodAndService(method);
const { name, service, methodAttributeValue } =
instrumentation._splitMethodString(method);

// Do not trace if method is ignored
if (
_methodIsIgnored(
methodAttributeValue,
Expand All @@ -421,25 +418,18 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
);
}

const span = instrumentation.tracer
.startSpan(name, { kind: SpanKind.CLIENT })
.setAttributes({
[SemanticAttributes.RPC_SYSTEM]: 'grpc',
[SemanticAttributes.RPC_METHOD]: methodAttributeValue,
[SemanticAttributes.RPC_SERVICE]: service,
});

instrumentation.extractNetMetadata(this, span);

instrumentation._metadataCapture.client.captureRequestMetadata(
span,
const span = instrumentation.createClientSpan(
name,
methodAttributeValue,
service,
metadata
);
instrumentation.extractNetMetadata(this, span);

return context.with(trace.setSpan(context.active(), span), () => {
setSpanContext(metadata);

const originalRequestResult = originalClientStreamFunction.call(
const call = originalClientStreamFunction.call(
this,
method,
serialize,
Expand All @@ -448,13 +438,12 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
options,
patchedCallback(span, callback)
);
originalRequestResult.on('metadata', responseMetadata => {
instrumentation._metadataCapture.client.captureResponseMetadata(
span,
responseMetadata
);
});
return originalRequestResult;
patchResponseMetadataEvent(
span,
call,
instrumentation._metadataCapture
);
return call;
});
} as typeof grpcJs.Client.prototype.makeClientStreamRequest;
};
Expand All @@ -480,10 +469,10 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
metadata: grpcJs.Metadata,
options: grpcJs.CallOptions
): grpcJs.ClientReadableStream<any> {
const name = `grpc.${method.replace('/', '')}`;
const { service, method: methodAttributeValue } =
_extractMethodAndService(method);
const { name, service, methodAttributeValue } =
instrumentation._splitMethodString(method);

// Do not trace if method is ignored
if (
_methodIsIgnored(
methodAttributeValue,
Expand All @@ -500,26 +489,17 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
options
);
}

const span = instrumentation.tracer
.startSpan(name, { kind: SpanKind.CLIENT })
.setAttributes({
[SemanticAttributes.RPC_SYSTEM]: 'grpc',
[SemanticAttributes.RPC_METHOD]: methodAttributeValue,
[SemanticAttributes.RPC_SERVICE]: service,
});

instrumentation.extractNetMetadata(this, span);

instrumentation._metadataCapture.client.captureRequestMetadata(
span,
const span = instrumentation.createClientSpan(
name,
methodAttributeValue,
service,
metadata
);

instrumentation.extractNetMetadata(this, span);
return context.with(trace.setSpan(context.active(), span), () => {
setSpanContext(metadata);

const originalRequestResult = originalClientServerStreamFunction.call(
const call = originalClientServerStreamFunction.call(
this,
method,
serialize,
Expand All @@ -528,16 +508,15 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
metadata,
options
);
originalRequestResult.on('metadata', responseMetadata => {
instrumentation._metadataCapture.client.captureResponseMetadata(
span,
responseMetadata
);
});

patchResponseStreamEvents(span, originalRequestResult);
patchResponseMetadataEvent(
span,
call,
instrumentation._metadataCapture
);
patchResponseStreamEvents(span, call);

return originalRequestResult;
return call;
});
} as typeof grpcJs.Client.prototype.makeServerStreamRequest;
};
Expand All @@ -560,10 +539,10 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
metadata: grpcJs.Metadata,
options: grpcJs.CallOptions
): grpcJs.ClientDuplexStream<any, any> {
const name = `grpc.${method.replace('/', '')}`;
const { service, method: methodAttributeValue } =
_extractMethodAndService(method);
const { name, service, methodAttributeValue } =
instrumentation._splitMethodString(method);

// Do not trace if method is ignored
if (
_methodIsIgnored(
methodAttributeValue,
Expand All @@ -580,42 +559,34 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
);
}

const span = instrumentation.tracer
.startSpan(name, { kind: SpanKind.CLIENT })
.setAttributes({
[SemanticAttributes.RPC_SYSTEM]: 'grpc',
[SemanticAttributes.RPC_METHOD]: methodAttributeValue,
[SemanticAttributes.RPC_SERVICE]: service,
});

instrumentation.extractNetMetadata(this, span);

instrumentation._metadataCapture.client.captureRequestMetadata(
span,
const span = instrumentation.createClientSpan(
name,
methodAttributeValue,
service,
metadata
);
instrumentation.extractNetMetadata(this, span);

return context.with(trace.setSpan(context.active(), span), () => {
setSpanContext(metadata);

const originalRequestResult = originalBidiStreamFunction.call(
const call = originalBidiStreamFunction.call(
this,
method,
serialize,
deserialize,
metadata,
options
);
originalRequestResult.on('metadata', responseMetadata => {
instrumentation._metadataCapture.client.captureResponseMetadata(
span,
responseMetadata
);
});

patchResponseStreamEvents(span, originalRequestResult);
patchResponseMetadataEvent(
span,
call,
instrumentation._metadataCapture
);
patchResponseStreamEvents(span, call);

return originalRequestResult;
return call;
});
} as typeof grpcJs.Client.prototype.makeBidiStreamRequest;
};
Expand Down Expand Up @@ -721,6 +692,31 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
};
}

private _splitMethodString(method: string) {
const name = `grpc.${method.replace('/', '')}`;
const { service, method: methodAttributeValue } =
_extractMethodAndService(method);
return { name, service, methodAttributeValue };
}

private createClientSpan(
name: string,
methodAttributeValue: string,
service: string,
metadata: grpcJs.Metadata
) {
const span = this.tracer
.startSpan(name, { kind: SpanKind.CLIENT })
.setAttributes({
[SemanticAttributes.RPC_SYSTEM]: 'grpc',
[SemanticAttributes.RPC_METHOD]: methodAttributeValue,
[SemanticAttributes.RPC_SERVICE]: service,
});

this._metadataCapture.client.captureRequestMetadata(span, metadata);
return span;
}

private extractNetMetadata(client: grpcJs.Client, span: Span) {
// set net.peer.* from target (e.g., "dns:otel-productcatalogservice:8080") as a hint to APMs
const parsedUri = URI_REGEX.exec(client.getChannel().getTarget());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ describe('#grpc-protobuf', () => {
});
});
});
describe('should capture metatdata when set up in config', () => {
describe('should capture metadata when set up in config', () => {
beforeEach(async () => {
propagation.setGlobalPropagator(new W3CTraceContextPropagator());
instrumentation.setTracerProvider(provider);
Expand All @@ -773,7 +773,7 @@ describe('#grpc-protobuf', () => {
server = await loadAndStartServer();
});

it('test', async () => {
it('should capture client metadata', async () => {
await client.unaryMethod(
{ num: NO_ERROR },
{
Expand Down

0 comments on commit 50a3513

Please sign in to comment.