From 03abfce666d3c48abd0994c44817bb5cbe8655a9 Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Thu, 29 Feb 2024 10:58:34 -0500 Subject: [PATCH] feat: Added instrumentation for `chain.stream` for langchain js. (#2052) --- lib/instrumentation/langchain/runnable.js | 228 +++++-- lib/llm-events/langchain/event.js | 3 +- test/unit/llm-events/langchain/event.test.js | 7 + test/versioned/langchain/common.js | 13 +- test/versioned/langchain/package.json | 3 +- .../langchain/runnables-streaming.tap.js | 575 ++++++++++++++++++ 6 files changed, 785 insertions(+), 44 deletions(-) create mode 100644 test/versioned/langchain/runnables-streaming.tap.js diff --git a/lib/instrumentation/langchain/runnable.js b/lib/instrumentation/langchain/runnable.js index bc75ba652..9ac0e2452 100644 --- a/lib/instrumentation/langchain/runnable.js +++ b/lib/instrumentation/langchain/runnable.js @@ -6,9 +6,8 @@ 'use strict' const common = require('./common') -const { - AI: { LANGCHAIN } -} = require('../../metrics/names') +const { AI } = require('../../metrics/names') +const { LANGCHAIN } = AI const { LangChainCompletionMessage, LangChainCompletionSummary @@ -28,10 +27,33 @@ module.exports = function initialize(shim, langchain) { return } + instrumentInvokeChain({ langchain, shim }) + + if (agent.config.ai_monitoring.streaming.enabled) { + instrumentStream({ langchain, shim }) + } else { + shim.logger.warn( + '`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.' + ) + agent.metrics.getOrCreateMetric(AI.STREAMING_DISABLED).incrementCallCount() + agent.metrics + .getOrCreateMetric(`${LANGCHAIN.TRACKING_PREFIX}/${pkgVersion}`) + .incrementCallCount() + } +} + +/** + * Instruments and records span and relevant LLM events for `chain.invoke` + * + * @param {object} params function params + * @param {object} params.langchain `@langchain/core/runnables/base` export + * @param {Shim} params.shim instace of shim + */ +function instrumentInvokeChain({ langchain, shim }) { shim.record( langchain.RunnableSequence.prototype, 'invoke', - function wrapCall(shim, invoke, fnName, args) { + function wrapCall(shim, _invoke, fnName, args) { const [request, params] = args const metadata = params?.metadata ?? {} const tags = params?.tags ?? [] @@ -41,53 +63,183 @@ module.exports = function initialize(shim, langchain) { promise: true, // eslint-disable-next-line max-params after(_shim, _fn, _name, err, output, segment) { - segment.end() - const completionSummary = new LangChainCompletionSummary({ - agent, - messages: [{ output }], - metadata, - tags, + recordChatCompletionEvents({ segment, - error: err != null, - runId: segment[langchainRunId] - }) - - common.recordEvent({ - agent, - type: 'LlmChatCompletionSummary', - pkgVersion, - msg: completionSummary - }) - - // output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages - // or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers - recordCompletions({ + messages: [output], events: [request, output], - completionSummary, - agent, - segment, + metadata, + tags, + err, shim }) + } + }) + } + ) +} + +/** + * Instruments and records span and relevant LLM events for `chain.stream` + * + * @param {object} params function params + * @param {object} params.langchain `@langchain/core/runnables/base` export + * @param {Shim} params.shim instace of shim + */ +function instrumentStream({ langchain, shim }) { + shim.record( + langchain.RunnableSequence.prototype, + 'stream', + function wrapStream(shim, _stream, fnName, args) { + const [request, params] = args + const metadata = params?.metadata ?? {} + const tags = params?.tags ?? [] - if (err) { - agent.errors.add( - segment.transaction, + return new RecorderSpec({ + name: `${LANGCHAIN.CHAIN}/${fnName}`, + promise: true, + // eslint-disable-next-line max-params + after(_shim, _fn, _name, err, output, segment) { + // Input error occurred which means a stream was not created. + // Skip instrumenting streaming and create Llm Events from + // the data we have + if (output?.next) { + wrapNextHandler({ shim, output, segment, request, metadata, tags }) + } else { + recordChatCompletionEvents({ + segment, + messages: [], + events: [request], + metadata, + tags, err, - new LlmErrorMessage({ - response: {}, - cause: err, - summary: completionSummary - }) - ) + shim + }) } - - segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true) } }) } ) } +/** + * Wraps the next method on the IterableReadableStream. It will also record the Llm + * events when the stream is done processing. + * + * @param {object} params function params + * @param {Shim} params.shim shim instance + * @param {TraceSegment} params.segment active segment + * @param {function} params.output IterableReadableStream + * @param {string} params.request the prompt message + * @param {object} params.metadata metadata for the call + * @param {Array} params.tags tags for the call + */ +function wrapNextHandler({ shim, output, segment, request, metadata, tags }) { + shim.wrap(output, 'next', function wrapIterator(shim, orig) { + let content = '' + return async function wrappedIterator() { + try { + const result = await orig.apply(this, arguments) + // only create Llm events when stream iteration is done + if (result?.done) { + recordChatCompletionEvents({ + segment, + messages: [content], + events: [request, content], + metadata, + tags, + shim + }) + } else { + content += result.value + } + return result + } catch (error) { + recordChatCompletionEvents({ + segment, + messages: [content], + events: [request, content], + metadata, + tags, + err: error, + shim + }) + throw error + } finally { + // update segment duration on every stream iteration to extend + // the timer + segment.touch() + } + } + }) +} + +/** + * Ends active segment, creates LlmChatCompletionSummary, and LlmChatCompletionMessage(s), and handles errors if they exists + * + * @param {object} params function params + * @param {TraceSegment} params.segment active segment + * @param {Array} params.messages response messages + * @param {Array} params.events prompt and response messages + * @param {object} params.metadata metadata for the call + * @param {Array} params.tags tags for the call + * @param {Error} params.err error object from call + * @param {Shim} params.shim shim instance + */ +function recordChatCompletionEvents({ segment, messages, events, metadata, tags, err, shim }) { + const { pkgVersion, agent } = shim + segment.end() + const completionSummary = new LangChainCompletionSummary({ + agent, + messages, + metadata, + tags, + segment, + error: err != null, + runId: segment[langchainRunId] + }) + + common.recordEvent({ + agent, + type: 'LlmChatCompletionSummary', + pkgVersion, + msg: completionSummary + }) + + // output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages + // or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers + recordCompletions({ + events, + completionSummary, + agent, + segment, + shim + }) + + if (err) { + agent.errors.add( + segment.transaction, + err, + new LlmErrorMessage({ + response: {}, + cause: err, + summary: completionSummary + }) + ) + } + + segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true) +} + +/** + * Records the LlmChatCompletionMessage(s) + * + * @param {object} params function params + * @param {Array} params.events prompt and response messages + * @param {LangChainCompletionSummary} params.completionSummary LlmChatCompletionSummary event + * @param {Agent} params.agent instance of agent + * @param {TraceSegment} params.segment active segment + * @param {Shim} params.shim shim instance + */ function recordCompletions({ events, completionSummary, agent, segment, shim }) { for (let i = 0; i < events.length; i += 1) { let msg = events[i] diff --git a/lib/llm-events/langchain/event.js b/lib/llm-events/langchain/event.js index c842fff9c..52d5ae20f 100644 --- a/lib/llm-events/langchain/event.js +++ b/lib/llm-events/langchain/event.js @@ -51,7 +51,6 @@ class LangChainEvent extends BaseEvent { ingest_source = 'Node' vendor = 'langchain' virtual_llm = true - error = false constructor(params = defaultParams) { params = Object.assign({}, defaultParams, params) @@ -66,7 +65,7 @@ class LangChainEvent extends BaseEvent { this.langchainMeta = params.metadata this.metadata = agent this.tags = Array.isArray(params.tags) ? params.tags.join(',') : params.tags - this.error = params.error ?? false + this.error = params.error ?? null if (params.virtual !== undefined) { if (params.virtual !== true && params.virtual !== false) { diff --git a/test/unit/llm-events/langchain/event.test.js b/test/unit/llm-events/langchain/event.test.js index 4ac40023d..9c870e135 100644 --- a/test/unit/llm-events/langchain/event.test.js +++ b/test/unit/llm-events/langchain/event.test.js @@ -62,6 +62,7 @@ tap.test('constructs default instance', async (t) => { ['metadata.foo']: 'foo', ingest_source: 'Node', vendor: 'langchain', + error: null, virtual_llm: true }) }) @@ -103,3 +104,9 @@ tap.test('sets tags from string', async (t) => { const msg = new LangChainEvent(t.context) t.equal(msg.tags, 'foo,bar') }) + +tap.test('sets error property', async (t) => { + t.context.error = true + const msg = new LangChainEvent(t.context) + t.equal(msg.error, true) +}) diff --git a/test/versioned/langchain/common.js b/test/versioned/langchain/common.js index 0118cc475..a6ff7e64d 100644 --- a/test/versioned/langchain/common.js +++ b/test/versioned/langchain/common.js @@ -48,7 +48,14 @@ function assertLangChainChatCompletionSummary({ tx, chatSummary, withCallback }) this.match(chatSummary[1], expectedSummary, 'should match chat summary message') } -function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, withCallback }) { +function assertLangChainChatCompletionMessages({ + tx, + chatMsgs, + chatSummary, + withCallback, + input = '{"topic":"scientist"}', + output = '212 degrees Fahrenheit is equal to 100 degrees Celsius.' +}) { const baseMsg = { id: /[a-f0-9]{36}/, appName: 'New Relic for Node.js tests', @@ -71,11 +78,11 @@ function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, with const expectedChatMsg = { ...baseMsg } if (msg[1].sequence === 0) { expectedChatMsg.sequence = 0 - expectedChatMsg.content = '{"topic":"scientist"}' + expectedChatMsg.content = input expectedChatMsg.is_response = false } else if (msg[1].sequence === 1) { expectedChatMsg.sequence = 1 - expectedChatMsg.content = '212 degrees Fahrenheit is equal to 100 degrees Celsius.' + expectedChatMsg.content = output expectedChatMsg.is_response = true } diff --git a/test/versioned/langchain/package.json b/test/versioned/langchain/package.json index 1f4e0598e..34ff9d2b1 100644 --- a/test/versioned/langchain/package.json +++ b/test/versioned/langchain/package.json @@ -16,7 +16,8 @@ }, "files": [ "tools.tap.js", - "runnables.tap.js" + "runnables.tap.js", + "runnables-streaming.tap.js" ] } ] diff --git a/test/versioned/langchain/runnables-streaming.tap.js b/test/versioned/langchain/runnables-streaming.tap.js new file mode 100644 index 000000000..d866890ad --- /dev/null +++ b/test/versioned/langchain/runnables-streaming.tap.js @@ -0,0 +1,575 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +const tap = require('tap') +const helper = require('../../lib/agent_helper') +// load the assertSegments assertion +require('../../lib/metrics_helper') +const { filterLangchainEvents, filterLangchainMessages } = require('./common') +const { version: pkgVersion } = require('@langchain/core/package.json') +const createOpenAIMockServer = require('../openai/mock-server') +const mockResponses = require('../openai/mock-responses') +const config = { + ai_monitoring: { + enabled: true, + streaming: { + enabled: true + } + }, + feature_flag: { + langchain_instrumentation: true + } +} + +const { DESTINATIONS } = require('../../../lib/config/attribute-filter') + +async function beforeEach({ enabled, t }) { + const { host, port, server } = await createOpenAIMockServer() + t.context.server = server + t.context.agent = helper.instrumentMockedAgent(config) + t.context.agent.config.ai_monitoring.streaming.enabled = enabled + const { ChatPromptTemplate } = require('@langchain/core/prompts') + const { StringOutputParser } = require('@langchain/core/output_parsers') + const { ChatOpenAI } = require('@langchain/openai') + + t.context.prompt = ChatPromptTemplate.fromMessages([['assistant', '{topic} response']]) + t.context.model = new ChatOpenAI({ + openAIApiKey: 'fake-key', + maxRetries: 0, + configuration: { + baseURL: `http://${host}:${port}` + } + }) + t.context.outputParser = new StringOutputParser() +} + +async function afterEach(t) { + t.context?.server?.close() + helper.unloadAgent(t.context.agent) + // bust the require-cache so it can re-instrument + Object.keys(require.cache).forEach((key) => { + if (key.includes('@langchain/core') || key.includes('openai')) { + delete require.cache[key] + } + }) +} + +tap.test('Langchain instrumentation - chain streaming', (t) => { + t.beforeEach(beforeEach.bind(null, { enabled: true, t })) + + t.afterEach(afterEach.bind(null, t)) + + t.test('should create langchain events for every stream call', (t) => { + const { agent, prompt, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input) + let content = '' + for await (const chunk of stream) { + content += chunk + } + + const { streamData: expectedContent } = mockResponses.get('Streamed response') + t.equal(content, expectedContent) + const events = agent.customEventAggregator.events.toArray() + t.equal(events.length, 6, 'should create 6 events') + + const langchainEvents = events.filter((event) => { + const [, chainEvent] = event + return chainEvent.vendor === 'langchain' + }) + + t.equal(langchainEvents.length, 3, 'should create 3 langchain events') + + tx.end() + t.end() + }) + }) + + t.test('should increment tracking metric for each langchain chat prompt event', (t) => { + const { agent, prompt, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input) + for await (const chunk of stream) { + chunk + // no-op + } + + const metrics = agent.metrics.getOrCreateMetric( + `Supportability/Nodejs/ML/Langchain/${pkgVersion}` + ) + t.equal(metrics.callCount > 0, true) + + tx.end() + t.end() + }) + }) + + t.test( + 'should create langchain events for every stream call on chat prompt + model + parser', + (t) => { + const { agent, prompt, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + const options = { metadata: { key: 'value', hello: 'world' }, tags: ['tag1', 'tag2'] } + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input, options) + let content = '' + for await (const chunk of stream) { + content += chunk + } + + const events = agent.customEventAggregator.events.toArray() + + const langchainEvents = filterLangchainEvents(events) + const langChainMessageEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionMessage' + ) + const langChainSummaryEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionSummary' + ) + + t.langchainSummary({ + tx, + chatSummary: langChainSummaryEvents[0] + }) + + t.langchainMessages({ + tx, + chatMsgs: langChainMessageEvents, + chatSummary: langChainSummaryEvents[0][1], + input: '{"topic":"Streamed"}', + output: content + }) + + tx.end() + t.end() + }) + } + ) + + t.test('should create langchain events for every stream call on chat prompt + model', (t) => { + const { agent, prompt, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + const options = { metadata: { key: 'value', hello: 'world' }, tags: ['tag1', 'tag2'] } + + const chain = prompt.pipe(model) + const stream = await chain.stream(input, options) + let content = '' + for await (const chunk of stream) { + content += chunk + } + + const events = agent.customEventAggregator.events.toArray() + + const langchainEvents = filterLangchainEvents(events) + const langChainMessageEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionMessage' + ) + const langChainSummaryEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionSummary' + ) + + t.langchainSummary({ + tx, + chatSummary: langChainSummaryEvents[0] + }) + + t.langchainMessages({ + tx, + chatMsgs: langChainMessageEvents, + chatSummary: langChainSummaryEvents[0][1], + input: '{"topic":"Streamed"}', + output: content + }) + + tx.end() + t.end() + }) + }) + + t.test( + 'should create langchain events for every stream call with parser that returns an array as output', + (t) => { + const { CommaSeparatedListOutputParser } = require('@langchain/core/output_parsers') + const { agent, prompt, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const parser = new CommaSeparatedListOutputParser() + + const input = { topic: 'Streamed' } + const options = { metadata: { key: 'value', hello: 'world' }, tags: ['tag1', 'tag2'] } + + const chain = prompt.pipe(model).pipe(parser) + const stream = await chain.stream(input, options) + let content = '' + for await (const chunk of stream) { + content += chunk + } + + const events = agent.customEventAggregator.events.toArray() + + const langchainEvents = filterLangchainEvents(events) + const langChainMessageEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionMessage' + ) + const langChainSummaryEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionSummary' + ) + + t.langchainSummary({ + tx, + chatSummary: langChainSummaryEvents[0] + }) + + t.langchainMessages({ + tx, + chatMsgs: langChainMessageEvents, + chatSummary: langChainSummaryEvents[0][1], + input: '{"topic":"Streamed"}', + output: content + }) + + tx.end() + t.end() + }) + } + ) + + t.test('should add runId when a callback handler exists', (t) => { + const { BaseCallbackHandler } = require('@langchain/core/callbacks/base') + let runId + const cbHandler = BaseCallbackHandler.fromMethods({ + handleChainStart(...args) { + runId = args?.[2] + } + }) + + const { agent, prompt, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + const options = { + metadata: { key: 'value', hello: 'world' }, + callbacks: [cbHandler], + tags: ['tag1', 'tag2'] + } + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input, options) + for await (const chunk of stream) { + chunk + // no-op + } + + const events = agent.customEventAggregator.events.toArray() + + const langchainEvents = filterLangchainEvents(events) + t.equal(langchainEvents[0][1].request_id, runId) + + tx.end() + t.end() + }) + }) + + t.test( + 'should create langchain events for every stream call on chat prompt + model + parser with callback', + (t) => { + const { BaseCallbackHandler } = require('@langchain/core/callbacks/base') + const cbHandler = BaseCallbackHandler.fromMethods({ + handleChainStart() {} + }) + + const { agent, prompt, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + const options = { + metadata: { key: 'value', hello: 'world' }, + callbacks: [cbHandler], + tags: ['tag1', 'tag2'] + } + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input, options) + + let content = '' + for await (const chunk of stream) { + content += chunk + } + + const events = agent.customEventAggregator.events.toArray() + + const langchainEvents = filterLangchainEvents(events) + const langChainMessageEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionMessage' + ) + const langChainSummaryEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionSummary' + ) + t.langchainSummary({ + tx, + chatSummary: langChainSummaryEvents[0], + withCallback: cbHandler + }) + + t.langchainMessages({ + tx, + chatMsgs: langChainMessageEvents, + chatSummary: langChainSummaryEvents[0][1], + withCallback: cbHandler, + input: '{"topic":"Streamed"}', + output: content + }) + + tx.end() + t.end() + }) + } + ) + + t.test('should not create langchain events when not in a transaction', async (t) => { + const { agent, prompt, outputParser, model } = t.context + + const input = { topic: 'Streamed' } + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input) + for await (const chunk of stream) { + chunk + // no-op + } + + const events = agent.customEventAggregator.events.toArray() + t.equal(events.length, 0, 'should not create langchain events') + t.end() + }) + + t.test('should add llm attribute to transaction', (t) => { + const { agent, prompt, model } = t.context + + const input = { topic: 'Streamed' } + + helper.runInTransaction(agent, async (tx) => { + const chain = prompt.pipe(model) + const stream = await chain.stream(input) + for await (const chunk of stream) { + chunk + // no-op + } + + const attributes = tx.trace.attributes.get(DESTINATIONS.TRANS_EVENT) + t.equal(attributes.llm, true) + + tx.end() + t.end() + }) + }) + + t.test('should create span on successful runnables create', (t) => { + const { agent, prompt, model } = t.context + + const input = { topic: 'Streamed' } + + helper.runInTransaction(agent, async (tx) => { + const chain = prompt.pipe(model) + const stream = await chain.stream(input) + for await (const chunk of stream) { + chunk + // no-op + } + + t.assertSegments(tx.trace.root, ['Llm/chain/Langchain/stream'], { exact: false }) + + tx.end() + t.end() + }) + }) + + // testing JSON.stringify on request (input) during creation of LangChainCompletionMessage event + t.test( + 'should use empty string for content property on completion message event when invalid input is used - circular reference', + (t) => { + const { agent, prompt, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + input.myself = input + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input) + for await (const chunk of stream) { + chunk + // no-op + } + + const events = agent.customEventAggregator.events.toArray() + + const langchainEvents = filterLangchainEvents(events) + const langChainMessageEvents = filterLangchainMessages( + langchainEvents, + 'LlmChatCompletionMessage' + ) + + const msgEventEmptyContent = langChainMessageEvents.filter( + (event) => event[1].content === '' + ) + + t.equal(msgEventEmptyContent.length, 1, 'should have 1 event with empty content property') + + tx.end() + t.end() + }) + } + ) + + t.test('should create error events from input', (t) => { + const { ChatPromptTemplate } = require('@langchain/core/prompts') + const prompt = ChatPromptTemplate.fromMessages([ + ['assistant', 'tell me short joke about {topic}'] + ]) + const { agent, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const chain = prompt.pipe(model).pipe(outputParser) + + try { + await chain.stream('') + } catch (error) { + t.ok(error) + } + + // No openai events as it errors before talking to LLM + const events = agent.customEventAggregator.events.toArray() + t.equal(events.length, 2, 'should create 2 events') + + const summary = events.find((e) => e[0].type === 'LlmChatCompletionSummary')?.[1] + t.equal(summary.error, true) + + // But, we should also get two error events: 1xLLM and 1xLangChain + const exceptions = tx.exceptions + for (const e of exceptions) { + const str = Object.prototype.toString.call(e.customAttributes) + t.equal(str, '[object LlmErrorMessage]') + } + + tx.end() + t.end() + }) + }) + + t.test('should create error events when stream fails', (t) => { + const { ChatPromptTemplate } = require('@langchain/core/prompts') + const prompt = ChatPromptTemplate.fromMessages([['assistant', '{topic} stream']]) + const { agent, model, outputParser } = t.context + + helper.runInTransaction(agent, async (tx) => { + const chain = prompt.pipe(model).pipe(outputParser) + + try { + const stream = await chain.stream({ topic: 'bad' }) + for await (const chunk of stream) { + chunk + // no-op + } + } catch (error) { + t.ok(error) + } + + // We should still get the same 3xLangChain and 3xLLM events as in the + // success case: + const events = agent.customEventAggregator.events.toArray() + t.equal(events.length, 6, 'should create 6 events') + + const langchainEvents = events.filter((event) => { + const [, chainEvent] = event + return chainEvent.vendor === 'langchain' + }) + t.equal(langchainEvents.length, 3, 'should create 3 langchain events') + const summary = langchainEvents.find((e) => e[0].type === 'LlmChatCompletionSummary')?.[1] + t.equal(summary.error, true) + + // But, we should also get two error events: 1xLLM and 1xLangChain + const exceptions = tx.exceptions + for (const e of exceptions) { + const str = Object.prototype.toString.call(e.customAttributes) + t.equal(str, '[object LlmErrorMessage]') + t.match(e, { + customAttributes: { + 'error.message': 'Premature close', + 'completion_id': /\w{32}/ + } + }) + } + tx.end() + t.end() + }) + }) + t.end() +}) + +tap.test('Langchain instrumentation - streaming disabled', (t) => { + t.beforeEach(beforeEach.bind(null, { enabled: false, t })) + + t.afterEach(afterEach.bind(null, t)) + + t.test('should not create llm events when `ai_monitoring.streaming.enabled` is false', (t) => { + const { agent, prompt, outputParser, model } = t.context + + helper.runInTransaction(agent, async (tx) => { + const input = { topic: 'Streamed' } + + const chain = prompt.pipe(model).pipe(outputParser) + const stream = await chain.stream(input) + let content = '' + for await (const chunk of stream) { + content += chunk + } + + const { streamData: expectedContent } = mockResponses.get('Streamed response') + t.equal(content, expectedContent) + const events = agent.customEventAggregator.events.toArray() + t.equal(events.length, 0, 'should not create llm events when streaming is disabled') + const metrics = agent.metrics.getOrCreateMetric( + `Supportability/Nodejs/ML/Langchain/${pkgVersion}` + ) + t.equal(metrics.callCount > 0, true) + const attributes = tx.trace.attributes.get(DESTINATIONS.TRANS_EVENT) + t.equal(attributes.llm, true) + const streamingDisabled = agent.metrics.getOrCreateMetric( + 'Supportability/Nodejs/ML/Streaming/Disabled' + ) + t.equal( + streamingDisabled.callCount, + 2, + 'should increment streaming disabled in both langchain and openai' + ) + tx.end() + t.end() + }) + }) + t.end() +})