Skip to content

Commit

Permalink
OpenTelemetry in Assistants (#712)
Browse files Browse the repository at this point in the history
* OpenTelemetry in Assistants

* OpenTelemetry in Assistants
  • Loading branch information
javipacheco committed Apr 8, 2024
1 parent b2b45e5 commit 1a33f73
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package com.xebia.functional.xef.llm

import com.xebia.functional.openai.generated.model.CreateChatCompletionResponse
import com.xebia.functional.openai.generated.model.RunObject
import com.xebia.functional.openai.generated.model.RunStepObject
import com.xebia.functional.xef.conversation.Conversation
import com.xebia.functional.xef.llm.assistants.RunDelta
import com.xebia.functional.xef.metrics.Metric
import com.xebia.functional.xef.prompt.Prompt
import com.xebia.functional.xef.prompt.completionRole
Expand All @@ -20,6 +22,18 @@ suspend fun CreateChatCompletionResponse.addMetrics(
)
conversation.metric.parameter("openai.chat_completion.token.count", "${it.totalTokens}")
}
choices.forEach { choice ->
choice.message.content?.let {
conversation.metric.parameter("openai.chat_completion.choice.${choice.index}.content", it)
}
choice.message.toolCalls?.zip(choice.message.toolCalls!!.indices)?.forEach {
(toolCall, toolCallIndex) ->
conversation.metric.parameter(
"openai.chat_completion.choice.${choice.index}.tool_call.$toolCallIndex",
toolCall.function.arguments
)
}
}
return this
}

Expand Down Expand Up @@ -50,3 +64,30 @@ suspend fun RunObject.addMetrics(metric: Metric): RunObject {
metric.assistantCreateRun(this)
return this
}

suspend fun RunStepObject.addMetrics(metric: Metric): RunStepObject {
metric.assistantCreateRunStep(this)
return this
}

suspend fun RunDelta.addMetrics(metric: Metric): RunDelta {
when (this) {
is RunDelta.RunCancelled -> run.addMetrics(metric)
is RunDelta.RunCancelling -> run.addMetrics(metric)
is RunDelta.RunCompleted -> run.addMetrics(metric)
is RunDelta.RunCreated -> run.addMetrics(metric)
is RunDelta.RunExpired -> run.addMetrics(metric)
is RunDelta.RunFailed -> run.addMetrics(metric)
is RunDelta.RunInProgress -> run.addMetrics(metric)
is RunDelta.RunQueued -> run.addMetrics(metric)
is RunDelta.RunRequiresAction -> run.addMetrics(metric)
is RunDelta.RunStepCancelled -> runStep.addMetrics(metric)
is RunDelta.RunStepCompleted -> runStep.addMetrics(metric)
is RunDelta.RunStepCreated -> runStep.addMetrics(metric)
is RunDelta.RunStepExpired -> runStep.addMetrics(metric)
is RunDelta.RunStepFailed -> runStep.addMetrics(metric)
is RunDelta.RunStepInProgress -> runStep.addMetrics(metric)
else -> {} // ignore other cases
}
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@ class AssistantThread(
)
.data

suspend fun createRun(assistant: Assistant): RunObject =
createRun(CreateRunRequest(assistantId = assistant.assistantId))

suspend fun createRun(request: CreateRunRequest): RunObject =
api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric)

fun createRunStream(assistant: Assistant, request: CreateRunRequest): Flow<RunDelta> = flow {
api
.createRunStream(threadId, request, configure = ::defaultConfig)
.map { RunDelta.fromServerSentEvent(it) }
.map { it.addMetrics(metric) }
.collect { event ->
when (event) {
// submit tool outputs and join streams
Expand Down Expand Up @@ -153,9 +157,6 @@ class AssistantThread(
suspend fun getRun(runId: String): RunObject =
api.getRun(threadId, runId, configure = ::defaultConfig)

suspend fun createRun(assistant: Assistant): RunObject =
createRun(CreateRunRequest(assistantId = assistant.assistantId))

fun run(assistant: Assistant): Flow<RunDelta> =
createRunStream(assistant, CreateRunRequest(assistantId = assistant.assistantId))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
return output
}

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${runObject.threadId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${runObject.id}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${runObject.status.name}"
}
}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ interface Metric {

suspend fun assistantCreateRun(runId: String, block: suspend Metric.() -> RunObject): RunObject

suspend fun assistantCreateRunStep(runObject: RunStepObject)

suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
Expand Down Expand Up @@ -51,6 +53,8 @@ interface Metric {
block: suspend Metric.() -> RunObject
): RunObject = block()

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.xebia.functional.xef.assistants

import com.xebia.functional.openai.generated.model.*
import com.xebia.functional.xef.OpenAI
import com.xebia.functional.xef.llm.assistants.Assistant
import com.xebia.functional.xef.llm.assistants.AssistantThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) {
}
}

fun runStepSpan(runObject: RunStepObject) {

val parentOrRoot: Context = runObject.runId.getOrCreateContext()

val currentSpan =
tracer
.spanBuilder("step ${runObject.status.name} ${runObject.id}")
.setParent(parentOrRoot)
.setSpanKind(SpanKind.CLIENT)
.startSpan()

try {
currentSpan.makeCurrent().use { runObject.setParameters(currentSpan) }
} finally {
currentSpan.end()
}
}

suspend fun runStepSpan(runId: String, block: suspend () -> RunStepObject): RunStepObject {

val parentOrRoot: Context = runId.getOrCreateContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class OpenTelemetryMetric(
block: suspend Metric.() -> RunObject
): RunObject = assistantState.runSpan(runId) { block() }

override suspend fun assistantCreateRunStep(runObject: RunStepObject) =
assistantState.runStepSpan(runObject)

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
Expand Down

0 comments on commit 1a33f73

Please sign in to comment.