Skip to content

Commit

Permalink
feat: Added instrumentation for chain.stream for langchain js. (#2052)
Browse files Browse the repository at this point in the history
  • Loading branch information
bizob2828 committed Feb 29, 2024
1 parent 4830ea3 commit 03abfce
Show file tree
Hide file tree
Showing 6 changed files with 785 additions and 44 deletions.
228 changes: 190 additions & 38 deletions lib/instrumentation/langchain/runnable.js
Expand Up @@ -6,9 +6,8 @@
'use strict'

const common = require('./common')
const {
AI: { LANGCHAIN }
} = require('../../metrics/names')
const { AI } = require('../../metrics/names')
const { LANGCHAIN } = AI
const {
LangChainCompletionMessage,
LangChainCompletionSummary
Expand All @@ -28,10 +27,33 @@ module.exports = function initialize(shim, langchain) {
return
}

instrumentInvokeChain({ langchain, shim })

if (agent.config.ai_monitoring.streaming.enabled) {
instrumentStream({ langchain, shim })
} else {
shim.logger.warn(
'`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.'
)
agent.metrics.getOrCreateMetric(AI.STREAMING_DISABLED).incrementCallCount()
agent.metrics
.getOrCreateMetric(`${LANGCHAIN.TRACKING_PREFIX}/${pkgVersion}`)
.incrementCallCount()
}
}

/**
* Instruments and records span and relevant LLM events for `chain.invoke`
*
* @param {object} params function params
* @param {object} params.langchain `@langchain/core/runnables/base` export
* @param {Shim} params.shim instace of shim
*/
function instrumentInvokeChain({ langchain, shim }) {
shim.record(
langchain.RunnableSequence.prototype,
'invoke',
function wrapCall(shim, invoke, fnName, args) {
function wrapCall(shim, _invoke, fnName, args) {
const [request, params] = args
const metadata = params?.metadata ?? {}
const tags = params?.tags ?? []
Expand All @@ -41,53 +63,183 @@ module.exports = function initialize(shim, langchain) {
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
segment.end()
const completionSummary = new LangChainCompletionSummary({
agent,
messages: [{ output }],
metadata,
tags,
recordChatCompletionEvents({
segment,
error: err != null,
runId: segment[langchainRunId]
})

common.recordEvent({
agent,
type: 'LlmChatCompletionSummary',
pkgVersion,
msg: completionSummary
})

// output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages
// or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers
recordCompletions({
messages: [output],
events: [request, output],
completionSummary,
agent,
segment,
metadata,
tags,
err,
shim
})
}
})
}
)
}

/**
* Instruments and records span and relevant LLM events for `chain.stream`
*
* @param {object} params function params
* @param {object} params.langchain `@langchain/core/runnables/base` export
* @param {Shim} params.shim instace of shim
*/
function instrumentStream({ langchain, shim }) {
shim.record(
langchain.RunnableSequence.prototype,
'stream',
function wrapStream(shim, _stream, fnName, args) {
const [request, params] = args
const metadata = params?.metadata ?? {}
const tags = params?.tags ?? []

if (err) {
agent.errors.add(
segment.transaction,
return new RecorderSpec({
name: `${LANGCHAIN.CHAIN}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
// Input error occurred which means a stream was not created.
// Skip instrumenting streaming and create Llm Events from
// the data we have
if (output?.next) {
wrapNextHandler({ shim, output, segment, request, metadata, tags })
} else {
recordChatCompletionEvents({
segment,
messages: [],
events: [request],
metadata,
tags,
err,
new LlmErrorMessage({
response: {},
cause: err,
summary: completionSummary
})
)
shim
})
}

segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
}
})
}
)
}

/**
* Wraps the next method on the IterableReadableStream. It will also record the Llm
* events when the stream is done processing.
*
* @param {object} params function params
* @param {Shim} params.shim shim instance
* @param {TraceSegment} params.segment active segment
* @param {function} params.output IterableReadableStream
* @param {string} params.request the prompt message
* @param {object} params.metadata metadata for the call
* @param {Array} params.tags tags for the call
*/
function wrapNextHandler({ shim, output, segment, request, metadata, tags }) {
shim.wrap(output, 'next', function wrapIterator(shim, orig) {
let content = ''
return async function wrappedIterator() {
try {
const result = await orig.apply(this, arguments)
// only create Llm events when stream iteration is done
if (result?.done) {
recordChatCompletionEvents({
segment,
messages: [content],
events: [request, content],
metadata,
tags,
shim
})
} else {
content += result.value
}
return result
} catch (error) {
recordChatCompletionEvents({
segment,
messages: [content],
events: [request, content],
metadata,
tags,
err: error,
shim
})
throw error
} finally {
// update segment duration on every stream iteration to extend
// the timer
segment.touch()
}
}
})
}

/**
* Ends active segment, creates LlmChatCompletionSummary, and LlmChatCompletionMessage(s), and handles errors if they exists
*
* @param {object} params function params
* @param {TraceSegment} params.segment active segment
* @param {Array} params.messages response messages
* @param {Array} params.events prompt and response messages
* @param {object} params.metadata metadata for the call
* @param {Array} params.tags tags for the call
* @param {Error} params.err error object from call
* @param {Shim} params.shim shim instance
*/
function recordChatCompletionEvents({ segment, messages, events, metadata, tags, err, shim }) {
const { pkgVersion, agent } = shim
segment.end()
const completionSummary = new LangChainCompletionSummary({
agent,
messages,
metadata,
tags,
segment,
error: err != null,
runId: segment[langchainRunId]
})

common.recordEvent({
agent,
type: 'LlmChatCompletionSummary',
pkgVersion,
msg: completionSummary
})

// output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages
// or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers
recordCompletions({
events,
completionSummary,
agent,
segment,
shim
})

if (err) {
agent.errors.add(
segment.transaction,
err,
new LlmErrorMessage({
response: {},
cause: err,
summary: completionSummary
})
)
}

segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
}

/**
* Records the LlmChatCompletionMessage(s)
*
* @param {object} params function params
* @param {Array} params.events prompt and response messages
* @param {LangChainCompletionSummary} params.completionSummary LlmChatCompletionSummary event
* @param {Agent} params.agent instance of agent
* @param {TraceSegment} params.segment active segment
* @param {Shim} params.shim shim instance
*/
function recordCompletions({ events, completionSummary, agent, segment, shim }) {
for (let i = 0; i < events.length; i += 1) {
let msg = events[i]
Expand Down
3 changes: 1 addition & 2 deletions lib/llm-events/langchain/event.js
Expand Up @@ -51,7 +51,6 @@ class LangChainEvent extends BaseEvent {
ingest_source = 'Node'
vendor = 'langchain'
virtual_llm = true
error = false

constructor(params = defaultParams) {
params = Object.assign({}, defaultParams, params)
Expand All @@ -66,7 +65,7 @@ class LangChainEvent extends BaseEvent {
this.langchainMeta = params.metadata
this.metadata = agent
this.tags = Array.isArray(params.tags) ? params.tags.join(',') : params.tags
this.error = params.error ?? false
this.error = params.error ?? null

if (params.virtual !== undefined) {
if (params.virtual !== true && params.virtual !== false) {
Expand Down
7 changes: 7 additions & 0 deletions test/unit/llm-events/langchain/event.test.js
Expand Up @@ -62,6 +62,7 @@ tap.test('constructs default instance', async (t) => {
['metadata.foo']: 'foo',
ingest_source: 'Node',
vendor: 'langchain',
error: null,
virtual_llm: true
})
})
Expand Down Expand Up @@ -103,3 +104,9 @@ tap.test('sets tags from string', async (t) => {
const msg = new LangChainEvent(t.context)
t.equal(msg.tags, 'foo,bar')
})

tap.test('sets error property', async (t) => {
t.context.error = true
const msg = new LangChainEvent(t.context)
t.equal(msg.error, true)
})
13 changes: 10 additions & 3 deletions test/versioned/langchain/common.js
Expand Up @@ -48,7 +48,14 @@ function assertLangChainChatCompletionSummary({ tx, chatSummary, withCallback })
this.match(chatSummary[1], expectedSummary, 'should match chat summary message')
}

function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, withCallback }) {
function assertLangChainChatCompletionMessages({
tx,
chatMsgs,
chatSummary,
withCallback,
input = '{"topic":"scientist"}',
output = '212 degrees Fahrenheit is equal to 100 degrees Celsius.'
}) {
const baseMsg = {
id: /[a-f0-9]{36}/,
appName: 'New Relic for Node.js tests',
Expand All @@ -71,11 +78,11 @@ function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, with
const expectedChatMsg = { ...baseMsg }
if (msg[1].sequence === 0) {
expectedChatMsg.sequence = 0
expectedChatMsg.content = '{"topic":"scientist"}'
expectedChatMsg.content = input
expectedChatMsg.is_response = false
} else if (msg[1].sequence === 1) {
expectedChatMsg.sequence = 1
expectedChatMsg.content = '212 degrees Fahrenheit is equal to 100 degrees Celsius.'
expectedChatMsg.content = output
expectedChatMsg.is_response = true
}

Expand Down
3 changes: 2 additions & 1 deletion test/versioned/langchain/package.json
Expand Up @@ -16,7 +16,8 @@
},
"files": [
"tools.tap.js",
"runnables.tap.js"
"runnables.tap.js",
"runnables-streaming.tap.js"
]
}
]
Expand Down

0 comments on commit 03abfce

Please sign in to comment.