Skip to content
This repository has been archived by the owner on Jul 10, 2023. It is now read-only.

Commit

Permalink
Change ES integration for support '@elastic/elasticsearch >= 7.0.0' (#…
Browse files Browse the repository at this point in the history
…325)

* Change ES integration for support '@elastic/elasticsearch >= 7.0.0'

* Add AlreadyTracedHeader check for http span creation && add flag to option for twice span creation (client < 7.0.0)
  • Loading branch information
bcaglayan committed Jan 29, 2022
1 parent e6089d6 commit 4adde72
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 124 deletions.
104 changes: 17 additions & 87 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"@babel/polyfill": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"@babel/runtime-corejs2": "^7.0.0",
"@elastic/elasticsearch": "^6.2.4",
"@elastic/elasticsearch": "^7.0.0",
"@google-cloud/pubsub": "^2.18.5",
"@google-cloud/bigquery": "^5.10.0",
"@hapi/hapi": "^20.1.5",
Expand Down
2 changes: 2 additions & 0 deletions src/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ export const TriggerHeaderTags = {
RESOURCE_NAME: 'x-thundra-resource-name',
};

export const AlreadyTracedHeader = 'x-thundra-already-traced';

export const SpanTypes = {
REDIS: 'Redis',
ELASTIC: 'Elastic',
Expand Down
60 changes: 40 additions & 20 deletions src/integrations/ESIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
ESTags,
ClassNames,
INTEGRATIONS,
AlreadyTracedHeader,
} from '../Constants';
import ThundraLogger from '../ThundraLogger';
import ThundraSpan from '../opentracing/Span';
Expand Down Expand Up @@ -52,38 +53,48 @@ class ESIntegration implements Integration {
*/
wrap(lib: any, config: any) {
ThundraLogger.debug('<ESIntegration> Wrap');

function wrapRequest(request: any) {

return function requestWithTrace(params: any, options: any, cb: any) {
if (!params || (params.method === 'GET' && params.path === '/')) {
/**
* avoid span creation for client version >= 7.0.0 health check request
*/
return request.apply(this, arguments);
}

let span: ThundraSpan;

try {
ThundraLogger.debug('<ESIntegration> Tracing Elasticsearch request:', params);

const { tracer } = ExecutionContextManager.get();

if (!tracer) {
ThundraLogger.debug('<ESIntegration> Skipped tracing request as no tracer is available');
return request.call(this, params, options, cb);
}

const originalCallback = request.length === 2 || typeof options === 'function' ?
options : cb;

if (typeof originalCallback !== 'function') {
if (!options) {
options = {};
}

if (options[AlreadyTracedHeader]) {
/**
* avoid twice span creation for client version < 7.0.0
*/
return request.apply(this, arguments);
}

options[AlreadyTracedHeader] = true;

const lastIndex = arguments.length - 1;
cb = arguments[lastIndex];
const currentInstace = this;
const parentSpan = tracer.getActiveSpan();

const normalizedPath = Utils.getNormalizedPath(params.path, config.esPathDepth);
if (this.headers && !this.headers[AlreadyTracedHeader]) {
this.headers[AlreadyTracedHeader] = true;
}

ThundraLogger.debug(`<ESIntegration> Starting Elasticsearch span with name ${normalizedPath}`);

span = tracer._startSpan(normalizedPath, {
childOf: parentSpan,
domainName: DomainNames.DB,
Expand Down Expand Up @@ -121,9 +132,7 @@ class ESIntegration implements Integration {
}

span._initialized();

const wrappedCallback = (err: any, res: any) => {

if (err) {
span.setErrorTag(err);
}
Expand All @@ -139,14 +148,29 @@ class ESIntegration implements Integration {

ThundraLogger.debug(
`<ESIntegration> Closing Elasticsearch span with name ${span.getOperationName()}`);

span.closeWithCallback(currentInstace, originalCallback, [err, res]);
span.closeWithCallback(currentInstace, cb, [err, res]);
};

return request.call(this, params, options, wrappedCallback);
if (typeof cb === 'function') {
return request.call(this, params, options, wrappedCallback);
} else {
const result = request.apply(this, arguments);
if (result && typeof result.then === 'function') {
result.then((res: any) => {
wrappedCallback(null, res);
}).catch((err: any) => {
wrappedCallback(err, null);
});
} else {
if (span) {
span.close();
}
}

return result;
}
} catch (error) {
ThundraLogger.error('<ESIntegration> Error occurred while tracing Elasticsearch request:', error);

if (span) {
ThundraLogger.debug(
`<ESIntegration> Because of error, closing Elasticsearch span with name
Expand All @@ -165,7 +189,6 @@ class ESIntegration implements Integration {

if (has(lib, 'Transport.prototype.request')) {
ThundraLogger.debug('<ESIntegration> Wrapping "elasticsearch.request"');

shimmer.wrap(lib.Transport.prototype, 'request', wrapRequest);
}
}
Expand All @@ -176,9 +199,7 @@ class ESIntegration implements Integration {
*/
doUnwrap(lib: any) {
ThundraLogger.debug('<ESIntegration> Do unwrap');

ThundraLogger.debug('<ESIntegration> Unwrapping "elasticsearch.request"');

shimmer.unwrap(lib.Transport.prototype, 'request');
}

Expand All @@ -187,7 +208,6 @@ class ESIntegration implements Integration {
*/
unwrap() {
ThundraLogger.debug('<ESIntegration> Unwrap');

if (this.instrumentContext.uninstrument) {
this.instrumentContext.uninstrument();
}
Expand Down
15 changes: 12 additions & 3 deletions src/utils/HTTPUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ClassNames,
TraceHeaderTags,
TriggerHeaderTags,
AlreadyTracedHeader,
} from '../Constants';

/**
Expand Down Expand Up @@ -87,11 +88,19 @@ class HTTPUtils {
* {@code false} otherwise
*/
static wasAlreadyTraced(headers: any): boolean {
if (headers && headers[TraceHeaderTags.TRACE_ID]) {
return true;
if (!headers) {
return false;
}

return false;
let result = false;
if (headers[TraceHeaderTags.TRACE_ID]) {
result = true;
} else if (headers[AlreadyTracedHeader]) {
delete headers[AlreadyTracedHeader];
result = true;
}

return result;
}

/**
Expand Down
20 changes: 10 additions & 10 deletions test/integrations/es.integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ describe('ESIntegration integration', () => {
node: 'http://localhost:9200'
});

return ES.query(client).then((data) => {
return ES.newQuery(client).then((data) => {

const span = tracer.getRecorder().spanList[0];

expect(span.operationName).toBe('/twitter/tweets');
expect(span.operationName).toBe('/twitter/_search');
expect(span.className).toBe('ELASTICSEARCH');
expect(span.domainName).toBe('DB');

Expand All @@ -44,8 +44,8 @@ describe('ESIntegration integration', () => {

expect(span.tags['topology.vertex']).toEqual(true);

expect(span.tags['elasticsearch.uri']).toEqual('/twitter/tweets/_search');
expect(span.tags['elasticsearch.normalized_uri']).toEqual('/twitter/tweets');
expect(span.tags['elasticsearch.uri']).toEqual('/twitter/_search');
expect(span.tags['elasticsearch.normalized_uri']).toEqual('/twitter/_search');
expect(span.tags['elasticsearch.method']).toEqual('POST');
expect(span.tags['elasticsearch.params']).toEqual('{}');
expect(span.tags['elasticsearch.body']).toEqual('{"query":{"match":{"body":"elasticsearch"}}}');
Expand All @@ -65,7 +65,7 @@ describe('ESIntegration integration', () => {
nodes: ['http://localhost:9200', 'http://test.elastic.io:9200', 'http://test.elastic.io:9201']
});

return ES.queryWithMultipleHost(client).then((data) => {
return ES.newQueryWithMultipleHost(client).then((data) => {

const span = tracer.getRecorder().spanList[0];

Expand All @@ -80,7 +80,7 @@ describe('ESIntegration integration', () => {

expect(span.tags['topology.vertex']).toEqual(true);

expect(span.tags['elasticsearch.uri']).toEqual('/twitter/tweets/_search');
expect(span.tags['elasticsearch.uri']).toEqual('/twitter/_search');
expect(span.tags['elasticsearch.normalized_uri']).toEqual('/twitter');
expect(span.tags['elasticsearch.method']).toEqual('POST');
expect(span.tags['elasticsearch.params']).toEqual('{}');
Expand All @@ -100,23 +100,23 @@ describe('ESIntegration integration', () => {
node: 'http://localhost:9200'
});

return ES.query(client).then((data) => {
return ES.newQuery(client).then((data) => {

const span = tracer.getRecorder().spanList[0];

expect(span.tags['elasticsearch.params']).not.toBeTruthy();
expect(span.tags['elasticsearch.body']).not.toBeTruthy();

expect(span.operationName).toBe('/twitter/tweets');
expect(span.operationName).toBe('/twitter/_search');
expect(span.className).toBe('ELASTICSEARCH');
expect(span.domainName).toBe('DB');
expect(span.tags['operation.type']).toBe('POST');
expect(span.tags['db.host']).toBe('localhost');
expect(span.tags['db.port']).toBe(9200);
expect(span.tags['db.type']).toBe('elasticsearch');
expect(span.tags['topology.vertex']).toEqual(true);
expect(span.tags['elasticsearch.uri']).toEqual('/twitter/tweets/_search');
expect(span.tags['elasticsearch.normalized_uri']).toEqual('/twitter/tweets');
expect(span.tags['elasticsearch.uri']).toEqual('/twitter/_search');
expect(span.tags['elasticsearch.normalized_uri']).toEqual('/twitter/_search');
expect(span.tags['elasticsearch.method']).toEqual('POST');
});
});
Expand Down

0 comments on commit 4adde72

Please sign in to comment.