Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added instrumentation for chain.stream for langchain js. #2052

Merged
merged 2 commits into from Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
228 changes: 190 additions & 38 deletions lib/instrumentation/langchain/runnable.js
Expand Up @@ -6,9 +6,8 @@
'use strict'

const common = require('./common')
const {
AI: { LANGCHAIN }
} = require('../../metrics/names')
const { AI } = require('../../metrics/names')
const { LANGCHAIN } = AI
const {
LangChainCompletionMessage,
LangChainCompletionSummary
Expand All @@ -28,10 +27,33 @@ module.exports = function initialize(shim, langchain) {
return
}

instrumentInvokeChain({ langchain, shim })

if (agent.config.ai_monitoring.streaming.enabled) {
instrumentStream({ langchain, shim })
} else {
shim.logger.warn(
'`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.'
)
agent.metrics.getOrCreateMetric(AI.STREAMING_DISABLED).incrementCallCount()
agent.metrics
.getOrCreateMetric(`${LANGCHAIN.TRACKING_PREFIX}/${pkgVersion}`)
.incrementCallCount()
}
}

/**
* Instruments and records span and relevant LLM events for `chain.invoke`
*
* @param {object} params function params
* @param {object} params.langchain `@langchain/core/runnables/base` export
* @param {Shim} params.shim instace of shim
*/
function instrumentInvokeChain({ langchain, shim }) {
shim.record(
langchain.RunnableSequence.prototype,
'invoke',
function wrapCall(shim, invoke, fnName, args) {
function wrapCall(shim, _invoke, fnName, args) {
const [request, params] = args
const metadata = params?.metadata ?? {}
const tags = params?.tags ?? []
Expand All @@ -41,53 +63,183 @@ module.exports = function initialize(shim, langchain) {
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
segment.end()
const completionSummary = new LangChainCompletionSummary({
agent,
messages: [{ output }],
metadata,
tags,
recordChatCompletionEvents({
segment,
error: err != null,
runId: segment[langchainRunId]
})

common.recordEvent({
agent,
type: 'LlmChatCompletionSummary',
pkgVersion,
msg: completionSummary
})

// output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages
// or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers
recordCompletions({
messages: [output],
events: [request, output],
completionSummary,
agent,
segment,
metadata,
tags,
err,
shim
})
}
})
}
)
}

/**
* Instruments and records span and relevant LLM events for `chain.stream`
*
* @param {object} params function params
* @param {object} params.langchain `@langchain/core/runnables/base` export
* @param {Shim} params.shim instace of shim
*/
function instrumentStream({ langchain, shim }) {
shim.record(
langchain.RunnableSequence.prototype,
'stream',
function wrapStream(shim, _stream, fnName, args) {
const [request, params] = args
const metadata = params?.metadata ?? {}
const tags = params?.tags ?? []

if (err) {
agent.errors.add(
segment.transaction,
return new RecorderSpec({
name: `${LANGCHAIN.CHAIN}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
// Input error occurred which means a stream was not created.
// Skip instrumenting streaming and create Llm Events from
// the data we have
if (output?.next) {
wrapNextHandler({ shim, output, segment, request, metadata, tags })
} else {
recordChatCompletionEvents({
segment,
messages: [],
events: [request],
metadata,
tags,
err,
new LlmErrorMessage({
response: {},
cause: err,
summary: completionSummary
})
)
shim
})
}

segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
}
})
}
)
}

/**
* Wraps the next method on the IterableReadableStream. It will also record the Llm
* events when the stream is done processing.
*
* @param {object} params function params
* @param {Shim} params.shim shim instance
* @param {TraceSegment} params.segment active segment
* @param {function} params.output IterableReadableStream
* @param {string} params.request the prompt message
* @param {object} params.metadata metadata for the call
* @param {Array} params.tags tags for the call
*/
function wrapNextHandler({ shim, output, segment, request, metadata, tags }) {
shim.wrap(output, 'next', function wrapIterator(shim, orig) {
let content = ''
return async function wrappedIterator() {
try {
const result = await orig.apply(this, arguments)
// only create Llm events when stream iteration is done
if (result?.done) {
recordChatCompletionEvents({
segment,
messages: [content],
events: [request, content],
metadata,
tags,
shim
})
} else {
content += result.value
}
return result
} catch (error) {
recordChatCompletionEvents({
segment,
messages: [content],
events: [request, content],
metadata,
tags,
err: error,
shim
})
throw error
} finally {
// update segment duration on every stream iteration to extend
// the timer
segment.touch()
}
}
})
}

/**
* Ends active segment, creates LlmChatCompletionSummary, and LlmChatCompletionMessage(s), and handles errors if they exists
*
* @param {object} params function params
* @param {TraceSegment} params.segment active segment
* @param {Array} params.messages response messages
* @param {Array} params.events prompt and response messages
* @param {object} params.metadata metadata for the call
* @param {Array} params.tags tags for the call
* @param {Error} params.err error object from call
* @param {Shim} params.shim shim instance
*/
function recordChatCompletionEvents({ segment, messages, events, metadata, tags, err, shim }) {
const { pkgVersion, agent } = shim
segment.end()
const completionSummary = new LangChainCompletionSummary({
agent,
messages,
metadata,
tags,
segment,
error: err != null,
runId: segment[langchainRunId]
})

common.recordEvent({
agent,
type: 'LlmChatCompletionSummary',
pkgVersion,
msg: completionSummary
})

// output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages
// or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers
recordCompletions({
events,
completionSummary,
agent,
segment,
shim
})

if (err) {
agent.errors.add(
segment.transaction,
err,
new LlmErrorMessage({
response: {},
cause: err,
summary: completionSummary
})
)
}

segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
}

/**
* Records the LlmChatCompletionMessage(s)
*
* @param {object} params function params
* @param {Array} params.events prompt and response messages
* @param {LangChainCompletionSummary} params.completionSummary LlmChatCompletionSummary event
* @param {Agent} params.agent instance of agent
* @param {TraceSegment} params.segment active segment
* @param {Shim} params.shim shim instance
*/
function recordCompletions({ events, completionSummary, agent, segment, shim }) {
for (let i = 0; i < events.length; i += 1) {
let msg = events[i]
Expand Down
3 changes: 1 addition & 2 deletions lib/llm-events/langchain/event.js
Expand Up @@ -51,7 +51,6 @@ class LangChainEvent extends BaseEvent {
ingest_source = 'Node'
vendor = 'langchain'
virtual_llm = true
error = false

constructor(params = defaultParams) {
params = Object.assign({}, defaultParams, params)
Expand All @@ -66,7 +65,7 @@ class LangChainEvent extends BaseEvent {
this.langchainMeta = params.metadata
this.metadata = agent
this.tags = Array.isArray(params.tags) ? params.tags.join(',') : params.tags
this.error = params.error ?? false
this.error = params.error ?? null

if (params.virtual !== undefined) {
if (params.virtual !== true && params.virtual !== false) {
Expand Down
7 changes: 7 additions & 0 deletions test/unit/llm-events/langchain/event.test.js
Expand Up @@ -62,6 +62,7 @@ tap.test('constructs default instance', async (t) => {
['metadata.foo']: 'foo',
ingest_source: 'Node',
vendor: 'langchain',
error: null,
virtual_llm: true
})
})
Expand Down Expand Up @@ -103,3 +104,9 @@ tap.test('sets tags from string', async (t) => {
const msg = new LangChainEvent(t.context)
t.equal(msg.tags, 'foo,bar')
})

tap.test('sets error property', async (t) => {
t.context.error = true
const msg = new LangChainEvent(t.context)
t.equal(msg.error, true)
})
13 changes: 10 additions & 3 deletions test/versioned/langchain/common.js
Expand Up @@ -48,7 +48,14 @@ function assertLangChainChatCompletionSummary({ tx, chatSummary, withCallback })
this.match(chatSummary[1], expectedSummary, 'should match chat summary message')
}

function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, withCallback }) {
function assertLangChainChatCompletionMessages({
tx,
chatMsgs,
chatSummary,
withCallback,
input = '{"topic":"scientist"}',
output = '212 degrees Fahrenheit is equal to 100 degrees Celsius.'
}) {
const baseMsg = {
id: /[a-f0-9]{36}/,
appName: 'New Relic for Node.js tests',
Expand All @@ -71,11 +78,11 @@ function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, with
const expectedChatMsg = { ...baseMsg }
if (msg[1].sequence === 0) {
expectedChatMsg.sequence = 0
expectedChatMsg.content = '{"topic":"scientist"}'
expectedChatMsg.content = input
expectedChatMsg.is_response = false
} else if (msg[1].sequence === 1) {
expectedChatMsg.sequence = 1
expectedChatMsg.content = '212 degrees Fahrenheit is equal to 100 degrees Celsius.'
expectedChatMsg.content = output
expectedChatMsg.is_response = true
}

Expand Down
3 changes: 2 additions & 1 deletion test/versioned/langchain/package.json
Expand Up @@ -16,7 +16,8 @@
},
"files": [
"tools.tap.js",
"runnables.tap.js"
"runnables.tap.js",
"runnables-streaming.tap.js"
]
}
]
Expand Down