Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added ai_monitoring.streaming.enabled. When set to false this will not instrument chat completion streams, thus it will not create relevant Llm events. #2021

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,16 @@ defaultConfig.definition = () => ({
enabled: {
formatter: boolean,
default: false
},
/**
* Toggles the capturing of Llm events when using streaming
* based methods in AIM supported libraries(i.e.- openai, AWS bedrock, langchain)
*/
streaming: {
enabled: {
formatter: boolean,
default: true
}
}
}
})
Expand Down
34 changes: 26 additions & 8 deletions lib/instrumentation/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ const { RecorderSpec } = require('../../lib/shim/specs')

const MIN_VERSION = '4.0.0'
const MIN_STREAM_VERSION = '4.12.2'
const {
AI: { OPENAI }
} = require('../../lib/metrics/names')
const { AI } = require('../../lib/metrics/names')
const { OPENAI } = AI
const semver = require('semver')
const { DESTINATIONS } = require('../config/attribute-filter')

Expand Down Expand Up @@ -77,10 +76,21 @@ function decorateSegment({ shim, result, apiKey }) {
* @param {object} params.msg LLM event
*/
function recordEvent({ agent, type, msg }) {
agent.metrics.getOrCreateMetric(TRACKING_METRIC).incrementCallCount()
agent.customEventAggregator.add([{ type, timestamp: Date.now() }, msg])
}

/**
* Increments the tracking metric and sets the llm attribute on transactions
*
* @param {object} params input params
* @param {Agent} params.agent NR agent instance
* @param {TraceSegment} params.segment active segment
*/
function addLlmMeta({ agent, segment }) {
agent.metrics.getOrCreateMetric(TRACKING_METRIC).incrementCallCount()
segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
}

/**
* Assigns requestId, conversationId and messageIds for a given
* chat completion response on the active transaction.
Expand Down Expand Up @@ -173,7 +183,15 @@ function recordChatCompletionMessages({ agent, segment, request, response, err }
* messages
*
*/
function instrumentStream({ shim, request, response, segment }) {
function instrumentStream({ agent, shim, request, response, segment }) {
if (!agent.config.ai_monitoring.streaming.enabled) {
shim.logger.warn(
'`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.'
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
)
agent.metrics.getOrCreateMetric(AI.STREAMING_DISABLED).incrementCallCount()
return
}

shim.wrap(response, 'iterator', function wrapIterator(shim, orig) {
return async function* wrappedIterator() {
let content = ''
Expand Down Expand Up @@ -272,7 +290,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, response, segment) {
if (request.stream) {
instrumentStream({ shim, request, response, segment })
instrumentStream({ agent, shim, request, response, segment })
} else {
recordChatCompletionMessages({
agent,
Expand All @@ -283,7 +301,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
})
}

segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
addLlmMeta({ agent, segment })
}
})
}
Expand All @@ -303,7 +321,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, response, segment) {
segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
addLlmMeta({ agent, segment })

if (!response) {
// If we get an error, it is possible that `response = null`.
Expand Down
3 changes: 2 additions & 1 deletion lib/metrics/names.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ const EXPRESS = {
}

const AI = {
TRACKING_PREFIX: 'Supportability/Nodejs/ML',
TRACKING_PREFIX: `${SUPPORTABILITY.NODEJS}/ML`,
STREAMING_DISABLED: `${SUPPORTABILITY.NODEJS}/ML/Streaming/Disabled`,
EMBEDDING: 'Llm/embedding',
COMPLETION: 'Llm/completion'
}
Expand Down
3 changes: 2 additions & 1 deletion test/unit/config/config-defaults.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ tap.test('with default properties', (t) => {
t.end()
})

t.test('should default ai_monitoring.enabled to false', (t) => {
t.test('ai_monitoring defaults', (t) => {
t.equal(configuration.ai_monitoring.enabled, false)
t.equal(configuration.ai_monitoring.streaming.enabled, true)
t.end()
})
})
26 changes: 23 additions & 3 deletions test/unit/instrumentation/openai.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ const GenericShim = require('../../../lib/shim/shim')
const sinon = require('sinon')

test('openai unit tests', (t) => {
t.autoend()

t.beforeEach(function (t) {
const sandbox = sinon.createSandbox()
const agent = helper.loadMockedAgent()
agent.config.ai_monitoring = { enabled: true }
agent.config.ai_monitoring = { enabled: true, streaming: { enabled: true } }
const shim = new GenericShim(agent, 'openai')
shim.pkgVersion = '4.0.0'
sandbox.stub(shim.logger, 'debug')
Expand Down Expand Up @@ -54,6 +52,27 @@ test('openai unit tests', (t) => {
t.end()
})

t.test(
'should not instrument chat completion streams if ai_monitoring.streaming.enabled is false',
(t) => {
const { shim, agent, initialize } = t.context
agent.config.ai_monitoring.streaming.enabled = false
shim.pkgVersion = '4.12.3'
const MockOpenAi = getMockModule()
initialize(agent, MockOpenAi, 'openai', shim)
const completions = new MockOpenAi.Chat.Completions()

helper.runInTransaction(agent, async () => {
await completions.create({ stream: true })
t.equal(
shim.logger.warn.args[0][0],
'`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.'
)
t.end()
})
}
)

t.test('should not instrument chat completion streams if < 4.12.2', async (t) => {
const { shim, agent, initialize } = t.context
shim.pkgVersion = '4.12.0'
Expand Down Expand Up @@ -100,4 +119,5 @@ test('openai unit tests', (t) => {
t.equal(isWrapped, false, 'should not wrap chat completions create')
t.end()
})
t.end()
})
43 changes: 40 additions & 3 deletions test/versioned/openai/chat-completions.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const { version: pkgVersion } = JSON.parse(
fs.readFileSync(`${__dirname}/node_modules/openai/package.json`)
)
const { DESTINATIONS } = require('../../../lib/config/attribute-filter')
const TRACKING_METRIC = `Supportability/Nodejs/ML/OpenAI/${pkgVersion}`

tap.test('OpenAI instrumentation - chat completions', (t) => {
t.autoend()
Expand Down Expand Up @@ -63,9 +64,7 @@ tap.test('OpenAI instrumentation - chat completions', (t) => {
messages: [{ role: 'user', content: 'You are a mathematician.' }]
})

const metrics = agent.metrics.getOrCreateMetric(
`Supportability/Nodejs/ML/OpenAI/${pkgVersion}`
)
const metrics = agent.metrics.getOrCreateMetric(TRACKING_METRIC)
t.equal(metrics.callCount > 0, true)

tx.end()
Expand Down Expand Up @@ -233,6 +232,44 @@ tap.test('OpenAI instrumentation - chat completions', (t) => {
}
})
})

t.test('should not create llm events when ai_monitoring.streaming.enabled is false', (test) => {
const { client, agent } = t.context
agent.config.ai_monitoring.streaming.enabled = false
helper.runInTransaction(agent, async (tx) => {
const content = 'Streamed response'
const model = 'gpt-4'
const stream = await client.chat.completions.create({
max_tokens: 100,
temperature: 0.5,
model,
messages: [{ role: 'user', content }],
stream: true
})

let res = ''
let chunk = {}

for await (chunk of stream) {
res += chunk.choices[0]?.delta?.content
}
const expectedRes = responses.get(content)
test.equal(res, expectedRes.streamData)

const events = agent.customEventAggregator.events.toArray()
test.equal(events.length, 0, 'should not llm events when streaming is disabled')
const metrics = agent.metrics.getOrCreateMetric(TRACKING_METRIC)
test.equal(metrics.callCount > 0, true)
const attributes = tx.trace.attributes.get(DESTINATIONS.TRANS_EVENT)
test.equal(attributes.llm, true)
const streamingDisabled = agent.metrics.getOrCreateMetric(
'Supportability/Nodejs/ML/Streaming/Disabled'
)
test.equal(streamingDisabled.callCount > 0, true)
tx.end()
test.end()
})
})
} else {
t.test('should not instrument streams when openai < 4.12.2', (test) => {
const { client, agent, host, port } = t.context
Expand Down
3 changes: 3 additions & 0 deletions test/versioned/openai/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const helper = require('../../lib/agent_helper')
const config = {
ai_monitoring: {
enabled: true
},
streaming: {
enabled: true
}
}

Expand Down