-
Notifications
You must be signed in to change notification settings - Fork 585
feat: add Bedrock InvokeModelWithResponseStream instrumentation #2845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add Bedrock InvokeModelWithResponseStream instrumentation #2845
Conversation
09f9777
to
eeb1e84
Compare
@@ -102,6 +102,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |||
return this.requestPreSpanHookConverse(request, config, diag, true); | |||
case 'InvokeModel': | |||
return this.requestPreSpanHookInvokeModel(request, config, diag); | |||
case 'InvokeModelWithResponseStream': | |||
return this.requestPreSpanHookInvokeModelWithResponseStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to not re-use requestPreSpanHookInvokeModel
: add a 4th isStream
argument and pass in false
for 'InvokeModel', true
for 'InvokeModelWithResponseStream', and then make the minor update to the implementation? This is how it was done for 'Converse' and 'ConverseStream'.
It looks to me like the requestPreSpanHookInvokeModel
and requestPreSpanHookInvokeModelWithResponseStream
functions are almost identical ... except that the latter doesn't have blocks for 'meta.llama', 'cohere.*', and 'mistral'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, @trentm ! You are absolutely right! I've updated the code to consolidate requestPreSpanHookInvokeModel
and requestPreSpanHookInvokeModelWithResponseStream
into a single method using isStream
parameter as you suggested.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2845 +/- ##
==========================================
+ Coverage 89.76% 89.81% +0.04%
==========================================
Files 187 187
Lines 9149 9208 +59
Branches 1885 1903 +18
==========================================
+ Hits 8213 8270 +57
- Misses 936 938 +2
🚀 New features to boost your workflow:
|
eeb1e84
to
0d197f0
Compare
): Promise<any> { | ||
const stream = response.data?.body; | ||
const modelId = response.request.commandInput?.modelId; | ||
if (!stream || !span.isRecording()) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!stream || !span.isRecording()) return; | |
if (!stream) return; |
!span.isRecording()
is already checked before responseHookInvokeModelWithResponseStream()
is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, @jj22ee! I removed this unnecessary check
return response.data; | ||
|
||
// Tap into the stream at the chunk level without modifying the chunk itself. | ||
function instrumentAsyncIterable<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be more efficient to declare these following functions outside of this member function scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @jj22ee , for the review and good catches! Yes, these helper functions could be declared outside the method, at the class level, instead of being re-created every time the method is called.
Changed!
fe8416b
to
ff1cb2d
Compare
Update this to pull in the main branch and I can take another look over. Thanks! |
parsedChunk, | ||
span | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The non-stream responseHookInvokeModel()
is adding some attributes for a few more models: 'meta.llama'
, 'cohere.command-r'
, etc.
Should this function also add relevant attributes for those models? Or is it possible streaming is not supported for those models?
Also, perhaps the recordNovaAttributes()
, recordClaudeAttributes()
, etc. methods could be used by both responseHookInvokeModel
and responseHookInvokeModelWithResponseStream
. Or perhaps all the record*Attribute()
methods could be moved to one setInvokeModelResponseAttributes()
that is used by both responseHookInvokeModel*()
methods.
// while OpenTelemetry can record span attributes from streamed data. | ||
response.data.body = (async function* () { | ||
try { | ||
for await (const item of wrappedStream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the result here is that two async iterators are created. Could the chunk handling above be moved into this for await ...
and then not bother having a wrappedStream
at all? I haven't tried this.
const str = Buffer.from(bytes).toString('utf-8'); | ||
return JSON.parse(str); | ||
} catch (err) { | ||
console.warn('Failed to parse streamed chunk', err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't use console
in instrumentation code. Instead use the DiagLogger that every intsrumentation instance has.
console.warn('Failed to parse streamed chunk', err); | |
this._diag.warn('Failed to parse streamed chunk', err); |
This will mean you'll need to not use static
for this method.
Which problem is this PR solving?
Adds instrumentation of the InvokeModelWithResponseStreamCommand in the AWS Bedrock SDK.
Short description of the changes
instrumentAsyncIterable
is used to inspect streamed chunks in real time and extract relevant telemetry.