Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net: [WIP] OTel model diagnostics: streaming APIs #6242

Merged
114 changes: 93 additions & 21 deletions dotnet/samples/Demos/TelemetryWithAppInsights/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,24 @@ public static async Task Main()
Console.WriteLine();

Console.WriteLine("Write a poem about John Doe and translate it to Italian.");
await RunAzureOpenAIChatAsync(kernel);
using (var _ = s_activitySource.StartActivity("Chat"))
{
await RunAzureOpenAIChatAsync(kernel);
Console.WriteLine();
await RunGoogleAIChatAsync(kernel);
Console.WriteLine();
await RunHuggingFaceChatAsync(kernel);
}

Console.WriteLine();
await RunGoogleAIChatAsync(kernel);
Console.WriteLine();
await RunHuggingFaceChatAsync(kernel);

Console.WriteLine("Get weather.");
using (var _ = s_activitySource.StartActivity("ToolCalls"))
{
await RunAzureOpenAIToolCallsAsync(kernel);
Console.WriteLine();
}
}

#region Private
Expand All @@ -99,16 +112,17 @@ public static async Task Main()
/// </summary>
private static readonly ActivitySource s_activitySource = new("Telemetry.Example");

private const string AzureOpenAIChatServiceKey = "AzureOpenAIChat";
private const string GoogleAIGeminiChatServiceKey = "GoogleAIGeminiChat";
private const string HuggingFaceChatServiceKey = "HuggingFaceChat";
private const string AzureOpenAIServiceKey = "AzureOpenAI";
private const string GoogleAIGeminiServiceKey = "GoogleAIGemini";
private const string HuggingFaceServiceKey = "HuggingFace";
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved

#region chat completion
private static async Task RunAzureOpenAIChatAsync(Kernel kernel)
{
Console.WriteLine("============= Azure OpenAI Chat Completion =============");

using var activity = s_activitySource.StartActivity(AzureOpenAIChatServiceKey);
SetTargetService(kernel, AzureOpenAIChatServiceKey);
using var activity = s_activitySource.StartActivity(AzureOpenAIServiceKey);
SetTargetService(kernel, AzureOpenAIServiceKey);
try
{
await RunChatAsync(kernel);
Expand All @@ -124,8 +138,8 @@ private static async Task RunGoogleAIChatAsync(Kernel kernel)
{
Console.WriteLine("============= Google Gemini Chat Completion =============");

using var activity = s_activitySource.StartActivity(GoogleAIGeminiChatServiceKey);
SetTargetService(kernel, GoogleAIGeminiChatServiceKey);
using var activity = s_activitySource.StartActivity(GoogleAIGeminiServiceKey);
SetTargetService(kernel, GoogleAIGeminiServiceKey);

try
{
Expand All @@ -142,8 +156,8 @@ private static async Task RunHuggingFaceChatAsync(Kernel kernel)
{
Console.WriteLine("============= HuggingFace Chat Completion =============");

using var activity = s_activitySource.StartActivity(HuggingFaceChatServiceKey);
SetTargetService(kernel, HuggingFaceChatServiceKey);
using var activity = s_activitySource.StartActivity(HuggingFaceServiceKey);
SetTargetService(kernel, HuggingFaceServiceKey);

try
{
Expand All @@ -158,21 +172,54 @@ private static async Task RunHuggingFaceChatAsync(Kernel kernel)

private static async Task RunChatAsync(Kernel kernel)
{
// Using non-streaming to get the poem.
var poem = await kernel.InvokeAsync<string>(
"WriterPlugin",
"ShortPoem",
new KernelArguments { ["input"] = "Write a poem about John Doe." });
var translatedPoem = await kernel.InvokeAsync<string>(
Console.WriteLine($"Poem:\n{poem}\n");

// Use streaming to translate the poem.
Console.WriteLine("Translated Poem:");
await foreach (var update in kernel.InvokeStreamingAsync<string>(
"WriterPlugin",
"Translate",
new KernelArguments
{
["input"] = poem,
["language"] = "Italian"
});
}))
{
Console.Write(update);
}
}
#endregion

#region tool calls
private static async Task RunAzureOpenAIToolCallsAsync(Kernel kernel)
{
Console.WriteLine("============= Azure OpenAI ToolCalls =============");

using var activity = s_activitySource.StartActivity(AzureOpenAIServiceKey);
SetTargetService(kernel, AzureOpenAIServiceKey);
try
{
await RunAutoToolCallAsync(kernel);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
Console.WriteLine($"Error: {ex.Message}");
}
}

Console.WriteLine($"Poem:\n{poem}\n\nTranslated Poem:\n{translatedPoem}");
private static async Task RunAutoToolCallAsync(Kernel kernel)
{
var result = await kernel.InvokePromptAsync("What is the weather like in my location?");

Console.WriteLine(result);
}
#endregion

private static Kernel GetKernel(ILoggerFactory loggerFactory)
{
Expand All @@ -187,19 +234,21 @@ private static Kernel GetKernel(ILoggerFactory loggerFactory)
modelId: TestConfiguration.AzureOpenAI.ChatModelId,
endpoint: TestConfiguration.AzureOpenAI.Endpoint,
apiKey: TestConfiguration.AzureOpenAI.ApiKey,
serviceId: AzureOpenAIChatServiceKey)
serviceId: AzureOpenAIServiceKey)
.AddGoogleAIGeminiChatCompletion(
modelId: TestConfiguration.GoogleAI.Gemini.ModelId,
apiKey: TestConfiguration.GoogleAI.ApiKey,
serviceId: GoogleAIGeminiChatServiceKey)
serviceId: GoogleAIGeminiServiceKey)
.AddHuggingFaceChatCompletion(
model: TestConfiguration.HuggingFace.ModelId,
endpoint: new Uri("https://api-inference.huggingface.co"),
apiKey: TestConfiguration.HuggingFace.ApiKey,
serviceId: HuggingFaceChatServiceKey);
serviceId: HuggingFaceServiceKey);

builder.Services.AddSingleton<IAIServiceSelector>(new AIServiceSelector());
builder.Plugins.AddFromPromptDirectory(Path.Combine(folder, "WriterPlugin"));
builder.Plugins.AddFromType<WeatherPlugin>();
builder.Plugins.AddFromType<LocationPlugin>();

return builder.Build();
}
Expand Down Expand Up @@ -240,9 +289,13 @@ private sealed class AIServiceSelector : IAIServiceSelector
service = targetService;
serviceSettings = targetServiceKey switch
{
AzureOpenAIChatServiceKey => new OpenAIPromptExecutionSettings(),
GoogleAIGeminiChatServiceKey => new GeminiPromptExecutionSettings(),
HuggingFaceChatServiceKey => new HuggingFacePromptExecutionSettings(),
AzureOpenAIServiceKey => new OpenAIPromptExecutionSettings()
{
Temperature = 0,
ToolCallBehavior = ToolCallBehavior.AutoInvokeKernelFunctions
},
GoogleAIGeminiServiceKey => new GeminiPromptExecutionSettings(),
HuggingFaceServiceKey => new HuggingFacePromptExecutionSettings(),
_ => null,
};

Expand All @@ -256,4 +309,23 @@ private sealed class AIServiceSelector : IAIServiceSelector
}
}
#endregion

#region Plugins

public sealed class WeatherPlugin
{
[KernelFunction]
public string GetWeather(string location) => $"Weather in {location} is 70掳F.";
}

public sealed class LocationPlugin
{
[KernelFunction]
public string GetCurrentLocation()
{
return "Seattle";
}
}

#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<ImplicitUsings>disable</ImplicitUsings>
<IsPackable>false</IsPackable>
<!-- Suppress: "Declare types in namespaces", "Require ConfigureAwait" -->
<NoWarn>$(NoWarn);CA1050;CA1707;CA2007;CS1591;VSTHRD111,SKEXP0050,SKEXP0060,SKEXP0070</NoWarn>
<NoWarn>$(NoWarn);CA1024;CA1050;CA1707;CA2007;CS1591;VSTHRD111,SKEXP0050,SKEXP0060,SKEXP0070</NoWarn>
<UserSecretsId>5ee045b0-aea3-4f08-8d31-32d1a6f8fed0</UserSecretsId>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,56 @@ internal sealed class GeminiChatCompletionClient : ClientBase

for (state.Iteration = 1; ; state.Iteration++)
{
using var httpRequestMessage = await this.CreateHttpRequestAsync(state.GeminiRequest, this._chatStreamingEndpoint).ConfigureAwait(false);
using var response = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken)
.ConfigureAwait(false);
using var responseStream = await response.Content.ReadAsStreamAndTranslateExceptionAsync()
.ConfigureAwait(false);

await foreach (var messageContent in this.GetStreamingChatMessageContentsOrPopulateStateForToolCallingAsync(state, responseStream, cancellationToken).ConfigureAwait(false))
using (var activity = ModelDiagnostics.StartCompletionActivity(
this._chatGenerationEndpoint, this._modelId, ModelProvider, chatHistory, executionSettings))
{
yield return messageContent;
HttpResponseMessage? httpResponseMessage = null;
Stream? responseStream = null;
try
{
using var httpRequestMessage = await this.CreateHttpRequestAsync(state.GeminiRequest, this._chatStreamingEndpoint).ConfigureAwait(false);
httpResponseMessage = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken).ConfigureAwait(false);
responseStream = await httpResponseMessage.Content.ReadAsStreamAndTranslateExceptionAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
httpResponseMessage?.Dispose();
responseStream?.Dispose();
throw;
}
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved

var responseEnumerator = this.GetStreamingChatMessageContentsOrPopulateStateForToolCallingAsync(state, responseStream, cancellationToken)
.GetAsyncEnumerator(cancellationToken);
List<StreamingChatMessageContent>? streamedContents = activity is not null ? [] : null;
try
{
while (true)
{
try
{
if (!await responseEnumerator.MoveNextAsync().ConfigureAwait(false))
{
break;
}
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}
stephentoub marked this conversation as resolved.
Show resolved Hide resolved

streamedContents?.Add(responseEnumerator.Current);
yield return responseEnumerator.Current;
}
}
finally
{
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
activity?.EndStreaming(streamedContents);
httpResponseMessage?.Dispose();
responseStream?.Dispose();
await responseEnumerator.DisposeAsync().ConfigureAwait(false);
}
}

if (!state.AutoInvoke)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,53 @@ internal HttpRequestMessage CreatePost(object requestData, Uri endpoint, string?
var request = this.CreateTextRequest(prompt, executionSettings);
request.Stream = true;

using var httpRequestMessage = this.CreatePost(request, endpoint, this.ApiKey);

using var response = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken)
.ConfigureAwait(false);

using var responseStream = await response.Content.ReadAsStreamAndTranslateExceptionAsync()
.ConfigureAwait(false);
using var activity = ModelDiagnostics.StartCompletionActivity(endpoint, modelId, this.ModelProvider, prompt, executionSettings);
HttpResponseMessage? httpResponseMessage = null;
Stream? responseStream = null;
try
{
using var httpRequestMessage = this.CreatePost(request, endpoint, this.ApiKey);
httpResponseMessage = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken).ConfigureAwait(false);
responseStream = await httpResponseMessage.Content.ReadAsStreamAndTranslateExceptionAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
httpResponseMessage?.Dispose();
responseStream?.Dispose();
throw;
}

await foreach (var streamingTextContent in this.ProcessTextResponseStreamAsync(responseStream, modelId, cancellationToken).ConfigureAwait(false))
var responseEnumerator = this.ProcessTextResponseStreamAsync(responseStream, modelId, cancellationToken)
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
.GetAsyncEnumerator(cancellationToken);
List<StreamingTextContent>? streamedContents = activity is not null ? [] : null;
try
{
while (true)
{
try
{
if (!await responseEnumerator.MoveNextAsync().ConfigureAwait(false))
{
break;
}
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}

streamedContents?.Add(responseEnumerator.Current);
yield return responseEnumerator.Current;
}
}
finally
{
yield return streamingTextContent;
activity?.EndStreaming(streamedContents);
httpResponseMessage?.Dispose();
responseStream?.Dispose();
await responseEnumerator.DisposeAsync().ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,53 @@ internal sealed class HuggingFaceMessageApiClient
var request = this.CreateChatRequest(chatHistory, executionSettings);
request.Stream = true;

using var httpRequestMessage = this._clientCore.CreatePost(request, endpoint, this._clientCore.ApiKey);

using var response = await this._clientCore.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken)
.ConfigureAwait(false);

using var responseStream = await response.Content.ReadAsStreamAndTranslateExceptionAsync()
.ConfigureAwait(false);
using var activity = ModelDiagnostics.StartCompletionActivity(endpoint, modelId, this._clientCore.ModelProvider, chatHistory, executionSettings);
HttpResponseMessage? httpResponseMessage = null;
Stream? responseStream = null;
try
{
using var httpRequestMessage = this._clientCore.CreatePost(request, endpoint, this._clientCore.ApiKey);
httpResponseMessage = await this._clientCore.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken).ConfigureAwait(false);
responseStream = await httpResponseMessage.Content.ReadAsStreamAndTranslateExceptionAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
httpResponseMessage?.Dispose();
responseStream?.Dispose();
throw;
}

await foreach (var streamingChatContent in this.ProcessChatResponseStreamAsync(responseStream, modelId, cancellationToken).ConfigureAwait(false))
var responseEnumerator = this.ProcessChatResponseStreamAsync(responseStream, modelId, cancellationToken)
.GetAsyncEnumerator(cancellationToken);
List<StreamingChatMessageContent>? streamedContents = activity is not null ? [] : null;
try
{
while (true)
{
try
{
if (!await responseEnumerator.MoveNextAsync().ConfigureAwait(false))
{
break;
}
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}

streamedContents?.Add(responseEnumerator.Current);
yield return responseEnumerator.Current;
}
}
finally
{
yield return streamingChatContent;
activity?.EndStreaming(streamedContents);
httpResponseMessage?.Dispose();
responseStream?.Dispose();
await responseEnumerator.DisposeAsync().ConfigureAwait(false);
}
}

Expand Down
Loading
Loading