Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions packages/collector/src/immediate.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ if (isNodeJsTooOld()) {
}

const { util: coreUtil } = require('@instana/core');
const agentOpts = require('./agent/opts');

// This file can be used with NODE_OPTIONS or `node --require` to instrument a Node.js app with Instana without
// modifying the source code. See
Expand All @@ -27,22 +26,14 @@ const agentOpts = require('./agent/opts');

const isExcludedFromInstrumentation = coreUtil.excludedFromInstrumentation && coreUtil.excludedFromInstrumentation();

// In case this is a child process of an instrumented parent process we might receive the agent uuid from the parent
// process to be able to produce and collect spans immediately without waiting for a connection to the agent in this
// process.
// TODO: This does not work because you would report spans with parent agent uuid and the child process pid -
// this is not compatible. Our codebase does not support this.
const parentProcessAgentUuid = process.env.INSTANA_AGENT_UUID;
// CASE: This process is a forked child process of a bull worker.
const currentProcessIsBullChildProcess = process.env.INSTANA_IS_BULL_CHILD_PROCESS === 'true';
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from INSTANA_AGENT_UUID to exact env INSTANA_IS_BULL_CHILD_PROCESS.

This env is not documented INSTANA_AGENT_UUID.


if (!isExcludedFromInstrumentation) {
if (parentProcessAgentUuid) {
// @ts-ignore - Type 'string' is not assignable to type 'undefined'
// Probably because exports.agentUuid is set to undefined and export values were not supposed to be changed
// TODO: This has no effect. Remove! See comment above.
agentOpts.agentUuid = parentProcessAgentUuid;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally getting rid of this dirty code. It was not working anyway

if (currentProcessIsBullChildProcess) {
require('./index')({
tracing: {
activateImmediately: true,
// If we don't ACTIVATE the process instrumentation for bull forked processes immediately, we miss this event.
activateBullProcessInstrumentation: true,
forceTransmissionStartingAt: 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

'use strict';

const processIdentityProvider = require('../../../pidStore');

const coreChildProcess = require('child_process');
const { tracing } = require('@instana/core');
const shimmer = tracing.shimmer;
Expand Down Expand Up @@ -90,7 +88,9 @@ function shimFork(original) {
`Detected a child_process.fork of Bull, instrumenting it by adding --require ${selfPath.immediate}.`
);

process.env.INSTANA_AGENT_UUID = processIdentityProvider.getFrom().h;
// NOTE: master.js is forked here!
// https://github.com/OptimalBits/bull/blob/v4.16.5/lib/process/master.js
process.env.INSTANA_IS_BULL_CHILD_PROCESS = 'true';
args.execArgv.unshift(selfPath.immediate);
args.execArgv.unshift('--require');
}
Expand Down
10 changes: 6 additions & 4 deletions packages/collector/test/tracing/messaging/bull/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ function processJob(job, done, log, info) {
return Promise.reject(new Error('Invalid data. Expected data structure is {name: string}'));
}
} else {
log(`Consuming: ${info || 'no extra info provided'}: ${JSON.stringify(job.data)}`);
log(`Consuming: ${info || 'no extra info provided'}: ${JSON.stringify(job)}`);

if (done) {
setTimeout(() => {
writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job)).then(() => {
writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job), log).then(() => {
fetch(`http://127.0.0.1:${agentPort}`)
.then(() => {
log('The follow up request after receiving a message has happened.');
Expand All @@ -73,7 +73,7 @@ function processJob(job, done, log, info) {
}, 100);
} else {
return delay(100)
.then(() => writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job)))
.then(() => writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job), log))
.then(() => fetch(`http://127.0.0.1:${agentPort}`))
.then(() => {
log('The follow up request after receiving a message has happened.');
Expand Down Expand Up @@ -155,7 +155,7 @@ function getJobData(job) {
};
}

function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData) {
function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData, log) {
let fileCreatedByJob;

if (jobData.data.bulkIndex) {
Expand Down Expand Up @@ -186,6 +186,8 @@ function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData) {

jobData.pid = process.pid;

log(`Writing file ${fileCreatedByJob} to prove that this job has been processed.`);

return new Promise((resolve, reject) => {
fs.writeFile(fileCreatedByJob, JSON.stringify(jobData, null, 2), (err, success) => {
if (err) {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/tracing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ exports.init = function init(_config, downstreamConnection, _processIdentityProv
}
}

// TODO: This is not used anymore. Investigate any usages. Potentially remove/deprecate in the next major release.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its super nice that we might be able to remove activateImmediately functionality

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this configuration option isn’t documented, but we can look into it later. It’s really nice to discover such untouched or unknown parts of our codebase.

if (config.tracing.activateImmediately) {
exports.activate();
}
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/tracing/instrumentation/messaging/bull.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ function instrumentedJobCreate(ctx, originalJobCreate, originalArgs, options) {
// Job.create args: Queue data, job name or ctx.DEFAULT_JOB_NAME, job data, options

// queue name should always be found, as it's required in order to start the whole process

const queueName = (originalArgs[0] && originalArgs[0].name) || 'name not found';

return cls.ns.runAndReturn(() => {
Expand Down Expand Up @@ -238,6 +239,8 @@ function instrumentedProcessJob(ctx, originalProcessJob, originalArgs) {
}

// TODO: The entry is CREATED BEFORE the child process is forked. Its created ON THE receiver process.
// Sender process -> bull exit (create jobs in redis)
// Receiver process -> apply for jobs -> bull entry -> process jobs via forked processes from bull
// This is not correct.
const span = cls.startSpan({
spanName: exports.spanName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ exports.init = function init(config) {
if (config.tracing.activateBullProcessInstrumentation) {
shimmer.wrap(process, 'emit', shimProcessEmitForBullChildWorker);
shimmer.wrap(process, 'send', shimProcessSendForBullChildWorker);

// Activate immediately, otherwise we miss the first message event and the instana
// headers would not get removed from the message.
exports.activate();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of activating everything, we just activate what we need. The config even says it

activateBullProcessInstrumentation

}
};

Expand All @@ -32,6 +36,7 @@ function shimProcessEmitForBullChildWorker(originalProcessEmit) {
if (!isActive || event !== 'message') {
return originalProcessEmit.apply(this, arguments);
}

const ipcMessage = arguments[1];

if (
Expand Down