Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
- added @flow and flow types to tchannel_bridge.js
Browse files Browse the repository at this point in the history
- Pass through per process context
  • Loading branch information
Onwukike Ibe committed Dec 8, 2016
1 parent 778fc9b commit 6c7741c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 22 deletions.
3 changes: 2 additions & 1 deletion crossdock/src/tchannel_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ export default class TChannelServer {
});
}

handleTChannelRequest(context: any, req: any, head: any, body: any, callback: Function) {
handleTChannelRequest(perProcessOptions: any, req: any, head: any, body: any, callback: Function) {
let isStartRequest: boolean = false;
let traceRequest = body.request;
let context = req.context;
Helpers.log('TChannel', traceRequest.serverRole, 'received joinTrace request', Helpers.json2str(traceRequest));

let promise = this._helpers.handleRequest(
Expand Down
35 changes: 18 additions & 17 deletions src/tchannel_bridge.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @flow
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down Expand Up @@ -34,7 +35,7 @@ export default class TChannelBridge {
_codec: TextMapCodec;
_contextFactory: Function;

constructor(tracer: Tracer, contextFactory: Function) {
constructor(tracer: Tracer, contextFactory: ?Function) {
this._tracer = tracer;
this._codec = new TextMapCodec({
urlEncoding: false,
Expand All @@ -44,7 +45,7 @@ export default class TChannelBridge {
this._contextFactory = contextFactory || function() { return new DefaultContext(); };
}

_tchannelCallbackWrapper(span, callback, err, res) {
_tchannelCallbackWrapper(span: Span, callback: Function, err: any, res: any) {
if (err) {
span.setTag(opentracing.Tags.ERROR, true);
span.log('error_msg', err);
Expand All @@ -63,15 +64,15 @@ export default class TChannelBridge {
* @returns {Function} - a function that wrapps the handler in order to automatically populate
* a the handler's context with a span.
**/
tracedHandler(handlerFunc: any, options: startSpanArgs = {}): Function {
tracedHandler(handlerFunc: any, options: any = {}): Function {
return (perProcessOptions, request, headers, body, callback) => {
let context: Context = this._contextFactory();
let operationName = options.operationName || request.arg1;
let span = this._extractSpan(operationName, headers);
let operationName: string = options.operationName || request.arg1;
let span: Span = this._extractSpan(operationName, headers);

// set tags
span.setTag(opentracing.Tags.PEER_SERVICE, request.callerName);
let hostPort = request.remoteAddr.split(':');
let hostPort: Array<string> = request.remoteAddr.split(':');
if (hostPort.length == 2) {
span.setTag(opentracing.Tags.PEER_HOST_IPV4, Utils.ipToInt(hostPort[0]));
span.setTag(opentracing.Tags.PEER_PORT, parseInt(hostPort[1]));
Expand All @@ -83,24 +84,24 @@ export default class TChannelBridge {
context.setSpan(span);

// remove headers prefixed with $tracing$
let headerKeys = Object.keys(headers);
let headerKeys: Array<string> = Object.keys(headers);
for (let i = 0; i < headerKeys.length; i++) {
let key = headerKeys[i];
if (headers.hasOwnProperty(key) && Utils.startsWith(key, TCHANNEL_TRACING_PREFIX)) {
delete headers[key];
}
}

let wrappingCallback = this._tchannelCallbackWrapper.bind(null, span, callback);
let wrappingCallback: Function = this._tchannelCallbackWrapper.bind(null, span, callback);
request.context = context;
handlerFunc(context, request, headers, body, wrappingCallback);
handlerFunc(perProcessOptions, request, headers, body, wrappingCallback);
};
}

_wrapTChannelSend(wrappedSend, channel, req, endpoint, headers, body, callback) {
_wrapTChannelSend(wrappedSend: Function, channel: any, req: any, endpoint: string, headers: any, body: any, callback: Function) {
headers = headers || {};
let context: Context = req.context || this._contextFactory();
let childOf = context.getSpan();
let childOf: Span = context.getSpan();
let clientSpan = this._tracer.startSpan(endpoint, {
childOf: childOf // ok if null, will start a new trace
});
Expand All @@ -109,18 +110,18 @@ export default class TChannelBridge {
this._codec.inject(clientSpan.context(), headers);

// wrap callback so that span can be finished as soon as the response is received
let wrappingCallback = this._tchannelCallbackWrapper.bind(null, clientSpan, callback);
let wrappingCallback: Function = this._tchannelCallbackWrapper.bind(null, clientSpan, callback);

return wrappedSend.call(channel, req, endpoint, headers, body, wrappingCallback);
}

_wrapTChannelRequest(channel, wrappedRequestMethod, requestOptions) {
_wrapTChannelRequest(channel: any, wrappedRequestMethod: any, requestOptions: any) {
// We set the parent to a span with trace_id zero, so that tchannel's
// outgoing tracing frame also has a trace id of zero.
// This forces other tchannel implementations to rely on the headers for the trace context.
requestOptions.parent = { span: TChannelBridge.makeFakeTChannelParentSpan() };

let tchannelRequest = wrappedRequestMethod.call(channel, requestOptions);
let tchannelRequest: any = wrappedRequestMethod.call(channel, requestOptions);
tchannelRequest.context = requestOptions.context;
return tchannelRequest;
}
Expand Down Expand Up @@ -154,13 +155,13 @@ export default class TChannelBridge {

_extractSpan(operationName: string, headers: any): Span {
let traceContext: ?SpanContext = this._codec.extract(headers);
let tags = {};
let tags: any = {};
tags[opentracing.Tags.SPAN_KIND] = opentracing.Tags.SPAN_KIND_RPC_SERVER;
let options = {
let options: any = {
childOf: traceContext,
tags: tags
}
let span = this._tracer.startSpan(operationName, options);
let span: Span = this._tracer.startSpan(operationName, options);
return span;
}
}
7 changes: 3 additions & 4 deletions test/tchannel_bridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,16 @@ describe ('test tchannel span bridge', () => {
entryPoint: path.join(__dirname, 'thrift', 'echo.thrift') // ignored in json case
});

// We pass 'null' as the context because the handler will provide a context
// that gets created on every request.
encodedChannel.register(server, 'Echo::echo', null, bridge.tracedHandler(handleServerReq));
let options: any = {};
encodedChannel.register(server, 'Echo::echo', options, bridge.tracedHandler(handleServerReq));
function handleServerReq(context, req, head, body, callback) {
// headers should not contain $tracing$ prefixed keys, which should be the
// only headers used for this test.
assert.equal(Object.keys(head).length, 0);

// assert that the serverSpan is a child of the original span, if context exists
// assert that the serverSpan is NOT a child of the original span, if contexts is null
assert.equal(originalSpan.context().traceIdStr === context.getSpan().context().traceIdStr, !!o.context);
assert.equal(originalSpan.context().traceIdStr === req.context.getSpan().context().traceIdStr, !!o.context);
callback(null, { ok: true, body: { value: 'some-string' }});
}

Expand Down

0 comments on commit 6c7741c

Please sign in to comment.