From e1c334f4231a670bce94bbec936ce0399f30116b Mon Sep 17 00:00:00 2001 From: kirrg001 Date: Wed, 17 Sep 2025 14:46:55 +0200 Subject: [PATCH 1/3] refactor: simplified bull child process initialisation --- packages/collector/src/immediate.js | 18 +++++------------- .../instrumentation/process/childProcess.js | 6 +++--- .../test/tracing/messaging/bull/test.js | 2 +- .../test/tracing/messaging/bull/util.js | 10 ++++++---- packages/core/src/tracing/index.js | 1 + .../tracing/instrumentation/messaging/bull.js | 3 +++ .../tracing/instrumentation/process/process.js | 5 +++++ 7 files changed, 24 insertions(+), 21 deletions(-) diff --git a/packages/collector/src/immediate.js b/packages/collector/src/immediate.js index 9276ef14d8..3923baf958 100644 --- a/packages/collector/src/immediate.js +++ b/packages/collector/src/immediate.js @@ -18,7 +18,6 @@ if (isNodeJsTooOld()) { } const { util: coreUtil } = require('@instana/core'); -const agentOpts = require('./agent/opts'); // This file can be used with NODE_OPTIONS or `node --require` to instrument a Node.js app with Instana without // modifying the source code. See @@ -27,22 +26,15 @@ const agentOpts = require('./agent/opts'); const isExcludedFromInstrumentation = coreUtil.excludedFromInstrumentation && coreUtil.excludedFromInstrumentation(); -// In case this is a child process of an instrumented parent process we might receive the agent uuid from the parent -// process to be able to produce and collect spans immediately without waiting for a connection to the agent in this -// process. -// TODO: This does not work because you would report spans with parent agent uuid and the child process pid - -// this is not compatible. Our codebase does not support this. -const parentProcessAgentUuid = process.env.INSTANA_AGENT_UUID; +// CASE: This process is a forked child process of a bull worker. +const currentProcessIsBullChildProcess = process.env.INSTANA_IS_BULL_CHILD_PROCESS === 'true'; if (!isExcludedFromInstrumentation) { - if (parentProcessAgentUuid) { - // @ts-ignore - Type 'string' is not assignable to type 'undefined' - // Probably because exports.agentUuid is set to undefined and export values were not supposed to be changed - // TODO: This has no effect. Remove! See comment above. - agentOpts.agentUuid = parentProcessAgentUuid; + if (currentProcessIsBullChildProcess) { require('./index')({ tracing: { - activateImmediately: true, + // We ONLY have to activate immediately, because the process instruementation removes the instana headers. + // If we don't ACTIVATE immediately, we miss this event. activateBullProcessInstrumentation: true, forceTransmissionStartingAt: 1 } diff --git a/packages/collector/src/tracing/instrumentation/process/childProcess.js b/packages/collector/src/tracing/instrumentation/process/childProcess.js index 83ddfdd67b..88f21dec67 100644 --- a/packages/collector/src/tracing/instrumentation/process/childProcess.js +++ b/packages/collector/src/tracing/instrumentation/process/childProcess.js @@ -6,8 +6,6 @@ 'use strict'; -const processIdentityProvider = require('../../../pidStore'); - const coreChildProcess = require('child_process'); const { tracing } = require('@instana/core'); const shimmer = tracing.shimmer; @@ -90,7 +88,9 @@ function shimFork(original) { `Detected a child_process.fork of Bull, instrumenting it by adding --require ${selfPath.immediate}.` ); - process.env.INSTANA_AGENT_UUID = processIdentityProvider.getFrom().h; + // NOTE: master.js is forked here! + // https://github.com/OptimalBits/bull/blob/v4.16.5/lib/process/master.js + process.env.INSTANA_IS_BULL_CHILD_PROCESS = 'true'; args.execArgv.unshift(selfPath.immediate); args.execArgv.unshift('--require'); } diff --git a/packages/collector/test/tracing/messaging/bull/test.js b/packages/collector/test/tracing/messaging/bull/test.js index 529fe64b97..ca7773d4dc 100644 --- a/packages/collector/test/tracing/messaging/bull/test.js +++ b/packages/collector/test/tracing/messaging/bull/test.js @@ -38,7 +38,7 @@ if (process.env.BULL_QUEUE_NAME) { const mochaSuiteFn = supportedVersion(process.versions.node) ? describe : describe.skip; const retryTime = 1000; -mochaSuiteFn('tracing/messaging/bull', function () { +mochaSuiteFn.only('tracing/messaging/bull', function () { this.timeout(config.getTestTimeout() * 3); const customAgentControls = new AgentStubControls(); diff --git a/packages/collector/test/tracing/messaging/bull/util.js b/packages/collector/test/tracing/messaging/bull/util.js index 6014723b05..7f7364d7fb 100644 --- a/packages/collector/test/tracing/messaging/bull/util.js +++ b/packages/collector/test/tracing/messaging/bull/util.js @@ -46,11 +46,11 @@ function processJob(job, done, log, info) { return Promise.reject(new Error('Invalid data. Expected data structure is {name: string}')); } } else { - log(`Consuming: ${info || 'no extra info provided'}: ${JSON.stringify(job.data)}`); + log(`Consuming: ${info || 'no extra info provided'}: ${JSON.stringify(job)}`); if (done) { setTimeout(() => { - writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job)).then(() => { + writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job), log).then(() => { fetch(`http://127.0.0.1:${agentPort}`) .then(() => { log('The follow up request after receiving a message has happened.'); @@ -73,7 +73,7 @@ function processJob(job, done, log, info) { }, 100); } else { return delay(100) - .then(() => writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job))) + .then(() => writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job), log)) .then(() => fetch(`http://127.0.0.1:${agentPort}`)) .then(() => { log('The follow up request after receiving a message has happened.'); @@ -155,7 +155,7 @@ function getJobData(job) { }; } -function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData) { +function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData, log) { let fileCreatedByJob; if (jobData.data.bulkIndex) { @@ -186,6 +186,8 @@ function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData) { jobData.pid = process.pid; + log(`Writing file ${fileCreatedByJob} to prove that this job has been processed.`); + return new Promise((resolve, reject) => { fs.writeFile(fileCreatedByJob, JSON.stringify(jobData, null, 2), (err, success) => { if (err) { diff --git a/packages/core/src/tracing/index.js b/packages/core/src/tracing/index.js index 87808237d1..b79d85f48b 100644 --- a/packages/core/src/tracing/index.js +++ b/packages/core/src/tracing/index.js @@ -248,6 +248,7 @@ exports.init = function init(_config, downstreamConnection, _processIdentityProv } } + // TODO: This is not used anymore. Investigate any usages. Potentially remove/deprecate in the next major release. if (config.tracing.activateImmediately) { exports.activate(); } diff --git a/packages/core/src/tracing/instrumentation/messaging/bull.js b/packages/core/src/tracing/instrumentation/messaging/bull.js index 7a0dd12d71..21115672a6 100644 --- a/packages/core/src/tracing/instrumentation/messaging/bull.js +++ b/packages/core/src/tracing/instrumentation/messaging/bull.js @@ -79,6 +79,7 @@ function instrumentedJobCreate(ctx, originalJobCreate, originalArgs, options) { // Job.create args: Queue data, job name or ctx.DEFAULT_JOB_NAME, job data, options // queue name should always be found, as it's required in order to start the whole process + const queueName = (originalArgs[0] && originalArgs[0].name) || 'name not found'; return cls.ns.runAndReturn(() => { @@ -238,6 +239,8 @@ function instrumentedProcessJob(ctx, originalProcessJob, originalArgs) { } // TODO: The entry is CREATED BEFORE the child process is forked. Its created ON THE receiver process. + // Sender process -> bull exit (create jobs in redis) + // Receiver process -> apply for jobs -> bull entry -> process jobs via forked processes from bull // This is not correct. const span = cls.startSpan({ spanName: exports.spanName, diff --git a/packages/core/src/tracing/instrumentation/process/process.js b/packages/core/src/tracing/instrumentation/process/process.js index 0c8bbd89c6..7f2c209577 100644 --- a/packages/core/src/tracing/instrumentation/process/process.js +++ b/packages/core/src/tracing/instrumentation/process/process.js @@ -24,6 +24,10 @@ exports.init = function init(config) { if (config.tracing.activateBullProcessInstrumentation) { shimmer.wrap(process, 'emit', shimProcessEmitForBullChildWorker); shimmer.wrap(process, 'send', shimProcessSendForBullChildWorker); + + // Activate immediately, otherwise we miss the first message event and the instana + // headers would not get removed from the message. + exports.activate(); } }; @@ -32,6 +36,7 @@ function shimProcessEmitForBullChildWorker(originalProcessEmit) { if (!isActive || event !== 'message') { return originalProcessEmit.apply(this, arguments); } + const ipcMessage = arguments[1]; if ( From 1146273e985e06868c48103b4b64b99f0e6906b5 Mon Sep 17 00:00:00 2001 From: kirrg001 Date: Wed, 17 Sep 2025 14:49:04 +0200 Subject: [PATCH 2/3] chore: wording --- packages/collector/src/immediate.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/collector/src/immediate.js b/packages/collector/src/immediate.js index 3923baf958..58c2894c43 100644 --- a/packages/collector/src/immediate.js +++ b/packages/collector/src/immediate.js @@ -33,8 +33,7 @@ if (!isExcludedFromInstrumentation) { if (currentProcessIsBullChildProcess) { require('./index')({ tracing: { - // We ONLY have to activate immediately, because the process instruementation removes the instana headers. - // If we don't ACTIVATE immediately, we miss this event. + // If we don't ACTIVATE the process instrumentation for bull forked processes immediately, we miss this event. activateBullProcessInstrumentation: true, forceTransmissionStartingAt: 1 } From 05fc7a48c8952d21d66932ae0c16e2c02f31b15f Mon Sep 17 00:00:00 2001 From: kirrg001 Date: Wed, 17 Sep 2025 14:49:34 +0200 Subject: [PATCH 3/3] chore: lint --- packages/collector/test/tracing/messaging/bull/test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/collector/test/tracing/messaging/bull/test.js b/packages/collector/test/tracing/messaging/bull/test.js index ca7773d4dc..529fe64b97 100644 --- a/packages/collector/test/tracing/messaging/bull/test.js +++ b/packages/collector/test/tracing/messaging/bull/test.js @@ -38,7 +38,7 @@ if (process.env.BULL_QUEUE_NAME) { const mochaSuiteFn = supportedVersion(process.versions.node) ? describe : describe.skip; const retryTime = 1000; -mochaSuiteFn.only('tracing/messaging/bull', function () { +mochaSuiteFn('tracing/messaging/bull', function () { this.timeout(config.getTestTimeout() * 3); const customAgentControls = new AgentStubControls();