Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenAI Comment Improvements #913

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions v3/integrations/nropenai/nropenai.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
errAIMonitoringDisabled = errors.New("AI Monitoring is set to disabled or High Security Mode is enabled. Please enable AI Monitoring and ensure High Security Mode is disabled")
)

// OpenAIClient is any type that can invoke OpenAI model with a request.
type OpenAIClient interface {
CreateChatCompletion(ctx context.Context, request openai.ChatCompletionRequest) (response openai.ChatCompletionResponse, err error)
CreateChatCompletionStream(ctx context.Context, request openai.ChatCompletionRequest) (stream *openai.ChatCompletionStream, err error)
Expand Down Expand Up @@ -157,7 +158,10 @@ func GetInput(any interface{}) any {

}

// Wrapper for Recv() method that calls the underlying stream's Recv() method
// Wrapper for OpenAI Streaming Recv() method
// Captures the response messages as they are received in the wrapper
// Once the stream is closed, the Close() method is called and sends the captured
// data to New Relic
func (w *ChatCompletionStreamWrapper) Recv() (openai.ChatCompletionStreamResponse, error) {
response, err := w.stream.Recv()
if err != nil {
Expand All @@ -184,6 +188,7 @@ func (w *ChatCompletionStreamWrapper) Recv() (openai.ChatCompletionStreamRespons

}

// Close the stream and send the event to New Relic
func (w *ChatCompletionStreamWrapper) Close() {
w.StreamingData["response.model"] = w.model
NRCreateChatCompletionMessageStream(w.app, uuid.MustParse(w.uuid), w, w.cw, w.sequence)
Expand All @@ -200,8 +205,11 @@ func (w *ChatCompletionStreamWrapper) Close() {
w.stream.Close()
}

// NRCreateChatCompletionSummary captures the request and response data for a chat completion request and records a custom event in New Relic. It also captures the completion messages
// With a call to NRCreateChatCompletionMessage
// NRCreateChatCompletionSummary captures the request data for a chat completion request
// A new segment is created for the chat completion request, and the response data is timed and captured
// Custom attributes are added to the event if they exist from client.AddCustomAttributes()
// After closing out the custom event for the chat completion summary, the function then calls
// NRCreateChatCompletionMessageInput/NRCreateChatCompletionMessage to capture the request messages
func NRCreateChatCompletionSummary(txn *newrelic.Transaction, app *newrelic.Application, cw *ClientWrapper, req openai.ChatCompletionRequest) ChatCompletionResponseWrapper {
// Start span
txn.AddAttribute("llm", true)
Expand Down Expand Up @@ -311,6 +319,9 @@ func NRCreateChatCompletionSummary(txn *newrelic.Transaction, app *newrelic.Appl
}

// Captures initial request messages and records a custom event in New Relic for each message
// similarly to NRCreateChatCompletionMessage, but only for the request messages
// Returns the sequence of the messages sent in the request
// which is used to calculate the sequence in the response messages
func NRCreateChatCompletionMessageInput(txn *newrelic.Transaction, app *newrelic.Application, req openai.ChatCompletionRequest, inputuuid uuid.UUID, cw *ClientWrapper) int {
sequence := 0
for i, message := range req.Messages {
Expand Down Expand Up @@ -358,7 +369,13 @@ func NRCreateChatCompletionMessageInput(txn *newrelic.Transaction, app *newrelic

}

// NRCreateChatCompletionMessage captures the completion response messages and records a custom event in New Relic for each message
// NRCreateChatCompletionMessage captures the completion response messages and records a custom event
// in New Relic for each message. The completion response messages are the responses from the model
// after the request messages have been sent and logged in NRCreateChatCompletionMessageInput.
// The sequence of the messages is calculated by logging each of the request messages first, then
// incrementing the sequence for each response message.
// The token count is calculated for each message and added to the custom event if the token count callback is set
// If not, no token count is added to the custom event
func NRCreateChatCompletionMessage(txn *newrelic.Transaction, app *newrelic.Application, resp openai.ChatCompletionResponse, uuid uuid.UUID, cw *ClientWrapper, sequence int, req openai.ChatCompletionRequest) {
spanID := txn.GetTraceMetadata().SpanID
traceID := txn.GetTraceMetadata().TraceID
Expand Down Expand Up @@ -412,6 +429,8 @@ func NRCreateChatCompletionMessage(txn *newrelic.Transaction, app *newrelic.Appl
}
}

// NRCreateChatCompletionMessageStream is identical to NRCreateChatCompletionMessage, but for streaming responses.
// Gets invoked only when the stream is closed
func NRCreateChatCompletionMessageStream(app *newrelic.Application, uuid uuid.UUID, sw *ChatCompletionStreamWrapper, cw *ClientWrapper, sequence int) {

spanID := sw.txn.GetTraceMetadata().SpanID
Expand Down Expand Up @@ -476,6 +495,32 @@ func TokenCountingHelper(app *newrelic.Application, message openai.ChatCompletio
return numTokens, (contentCounted && roleCounted)
}

// Similar to NRCreateChatCompletionSummary, but for streaming responses
// Returns a custom wrapper with a stream that can be used to receive messages
// Example Usage:
/*
ctx := context.Background()
stream, err := nropenai.NRCreateChatCompletionStream(client, ctx, req, app)
if err != nil {
panic(err)
}
for {
var response openai.ChatCompletionStreamResponse
response, err = stream.Recv()
if errors.Is(err, io.EOF) {
fmt.Println("\nStream finished")
break
}
if err != nil {
fmt.Printf("\nStream error: %v\n", err)
return
}
fmt.Printf(response.Choices[0].Delta.Content)
}
stream.Close()
*/
// It is important to call stream.Close() after the stream has been used, as it will close the stream and send the event to New Relic.
// Additionally, custom attributes can be added to the client using client.AddCustomAttributes(map[string]interface{}) just like in NRCreateChatCompletionSummary
func NRCreateChatCompletionStream(cw *ClientWrapper, ctx context.Context, req openai.ChatCompletionRequest, app *newrelic.Application) (*ChatCompletionStreamWrapper, error) {
txn := app.StartTransaction("OpenAIChatCompletionStream")

Expand Down Expand Up @@ -546,7 +591,13 @@ func NRCreateChatCompletionStream(cw *ClientWrapper, ctx context.Context, req op
}

// NRCreateChatCompletion is a wrapper for the OpenAI CreateChatCompletion method.
// If AI Monitoring is disabled, the wrapped function will still call the OpenAI CreateChatCompletion method and return the response with no New Relic instrumentation
// If AI Monitoring is disabled, the wrapped function will still call the OpenAI CreateChatCompletion method
// and return the response with no New Relic instrumentation
// Calls NRCreateChatCompletionSummary to capture the request data and response data
// Returns a ChatCompletionResponseWrapper with the response and the TraceID of the transaction
// The trace ID is used to link the chat response with its feedback, with a call to SendFeedback()
// Otherwise, the response is the same as the OpenAI CreateChatCompletion method. It can be accessed
// by calling resp.ChatCompletionResponse
func NRCreateChatCompletion(cw *ClientWrapper, req openai.ChatCompletionRequest, app *newrelic.Application) (ChatCompletionResponseWrapper, error) {
config, _ := app.Config()

Expand Down
Loading