Skip to content

[FEATURE REQ] Streaming Events: Push-based API #720

@Licantrop0

Description

@Licantrop0

Do you remember AssistantEventHandler in the Python SDK?
That was a very convenient way to handle streaming responses because it was hiding the complexity of handling each Streaming Update type individually.

I'm proposing to create a similar event-wrapper utility class to the C# SDK for Responses APIs.

Why?

  1. StreamingResponseUpdate subclasses are confusing and somewhat duplicated (why do I have StreamingResponseFunctionCallArgumentsDoneUpdate and StreamingResponseOutputItemDoneUpdate done when done.Item is FunctionCallResponseItem?)
  2. It's quite hard to implement StreamingResponseUpdate loop correctly, taking care of Function Calls, Function Arguments accumulation, MCP Tool Calls, MCP Tools approval, reasoning output, full response token counts, etc...
  3. It would be much easier to handle individual events that are triggered by the right StreamingResponseUpdate types, with useful parameters already parsed.
  4. PreviousResponseId is handled automatically; if you want a new thread, just create a new ResponseEvents class.

Here is a sample usage of this wrapper class:

responseOptions = new ResponseCreationOptions()
{
    Tools = { Tool1, Tool2}
    ToolChoice = ResponseToolChoice.CreateAutoChoice(),
    ParallelToolCallsEnabled = true,
    TruncationMode = ResponseTruncationMode.Auto
};

using var responseEvents = _aiService.ResponseClient.ToEvents(responseOptions);

responseEvents.ReasoningTextDelta += Console.WriteLine;
responseEvents.TextDelta += Console.Write;
responseEvents.GetFunctionOutput += async functionCalls => await functionCalls.WhenAll(ResolveToolOutput);
responseEvents.FunctionCallAdded += (name, arguments) => Console.WriteLine($"\n{name}: {arguments}");
responseEvents.FunctionCallDone += (name, output) => Console.WriteLine($"\n{name}: {output}");

await responseEvents.AddItems([ResponseItem.CreateUserMessageItem("Hello, what's the weather today?")]);

async Task<FunctionCallOutputResponseItem> ResolveToolOutput(FunctionCallResponseItem fc)
{
    string result;
    using var arguments = JsonDocument.Parse(fc.FunctionArguments);

    switch (fc.FunctionName)
    {
        case "Tool1":
            {
                var parameter = arguments.RootElement.GetProperty("parameterName").GetString()!;
                result = await Tool1.Execute(parameter);
            }
        case "Tool2":
            {
                var parameter = arguments.RootElement.GetProperty("parameterName").GetString()!;
                result = await Tool2.Execute(parameter);
            }
        default:
            throw new InvalidOperationException($"Unknown tool {fc.FunctionName}");
    }

    return ResponseItem.CreateFunctionCallOutputItem(fc.CallId, result);
}

Notice how easy is to handle parallel tool calls and relative outputs with the extension methods; it's just one line.

Implementation

public class ResponseEvents(OpenAIResponseClient responseClient, ResponseCreationOptions responseOptions) : IDisposable
{
    public event Action? Created;
    public event Action? FirstToken;
    public event Action<string>? TextDelta;
    public event Action<string>? ReasoningTextDelta;
    public event Action<string, string>? McpToolAdded;
    public event Action<string, string>? FunctionCallAdded;
    public event Action<string, string>? FunctionCallDone;
    public event Func<IEnumerable<FunctionCallResponseItem>, Task<IEnumerable<FunctionCallOutputResponseItem>>>? GetFunctionOutput;
    public event Action<OpenAIResponse>? LastToken;
    private readonly SemaphoreSlim _gate = new(1, 1);

    public int ResponseCachedTokens { get; private set; } = 0;
    public int ResponseInputTokens { get; private set; } = 0;
    public int ResponseReasoningTokens { get; private set; } = 0;
    public int ResponseOutputTokens { get; private set; } = 0;
    public int ResponseTotalTokens { get; private set; } = 0;

    public async Task AddItems(IEnumerable<ResponseItem> responseItems, CancellationToken ct)
    {
        await _gate.WaitAsync(ct).ConfigureAwait(false);

        try
        {
            bool isFirstToken = true;
            ResponseCachedTokens = 0;
            ResponseInputTokens = 0;
            ResponseReasoningTokens = 0;
            ResponseOutputTokens = 0;

            McpToolCallApprovalResponseItem? approvalResponseItem = null;
            Dictionary<string, FunctionCallResponseItem> pendingFunctionCalls = [];
            Dictionary<string, string> mcpToolCalls = [];

            do
            {
                pendingFunctionCalls.Clear();

                await foreach (var update in responseClient.CreateResponseStreamingAsync(responseItems, responseOptions, ct))
                {
                    switch (update)
                    {
                        case StreamingResponseCreatedUpdate created:
                            Created?.Invoke();
                            break;

                        case StreamingResponseOutputTextDeltaUpdate text:
                            if (isFirstToken)
                            {
                                FirstToken?.Invoke();
                                isFirstToken = false;
                            }

                            TextDelta?.Invoke(text.Delta);
                            break;

                        #region McpToolDefinitionList

                        case StreamingResponseOutputItemAddedUpdate added when added.Item is McpToolDefinitionListItem mcpTools:
                            Debug.WriteLine($"McpToolDefinition {mcpTools.ServerLabel} added");
                            break;

                        case StreamingResponseMcpListToolsInProgressUpdate mcpListToolsInProgress:
                            break;

                        case StreamingResponseMcpListToolsCompletedUpdate mcpListToolsCompleted:
                            break;

                        case StreamingResponseOutputItemDoneUpdate done when done.Item is McpToolDefinitionListItem mcpTools:
                            foreach (var mcpTool in mcpTools.ToolDefinitions)
                                McpToolAdded?.Invoke(mcpTools.ServerLabel, mcpTool.Name);
                            break;

                        #endregion

                        #region McpToolCallApprovalRequest

                        case StreamingResponseOutputItemAddedUpdate added when added.Item is McpToolCallApprovalRequestItem mcpApprovalRequestItem:
                            Debug.WriteLine($"[MCP Tool needs approval '{mcpApprovalRequestItem.ToolName}']");
                            // Approve MCP Tool (need to show UI to user in real app)
                            approvalResponseItem = new(mcpApprovalRequestItem.Id, approved: true);
                            break;

                        case StreamingResponseOutputItemDoneUpdate done when done.Item is McpToolCallApprovalRequestItem mcpApprovalRequestItem:
                            break;

                        #endregion

                        #region McpToolCall
                        case StreamingResponseOutputItemAddedUpdate itemAdded when itemAdded.Item is McpToolCallItem mcpToolCallItem:
                            // Accumulate tool name
                            mcpToolCalls.Add(mcpToolCallItem.Id, mcpToolCallItem.ToolName);
                            break;

                        case StreamingResponseMcpCallInProgressUpdate mcpCallInProgress:
                            break;

                        case StreamingResponseMcpCallArgumentsDeltaUpdate mcpCallArgumentsDelta:
                            break;

                        case StreamingResponseMcpCallArgumentsDoneUpdate mcpCallArgumentsDone:
                            var toolName = mcpToolCalls[mcpCallArgumentsDone.ItemId];
                            FunctionCallAdded?.Invoke(toolName, mcpCallArgumentsDone.ToolArguments.ToString());
                            break;

                        case StreamingResponseMcpCallCompletedUpdate mcpCallCompleted:
                            break;

                        case StreamingResponseOutputItemDoneUpdate done when done.Item is McpToolCallItem mcpToolCallItem:
                            FunctionCallDone?.Invoke(mcpToolCallItem.ToolName, mcpToolCallItem.ToolOutput);
                            Debug.WriteLine($"{mcpToolCallItem.ToolName}:\n{mcpToolCallItem.ToolOutput}\n");
                            approvalResponseItem = null;
                            break;

                        #endregion

                        #region FunctionCallResponse

                        case StreamingResponseOutputItemAddedUpdate added when added.Item is FunctionCallResponseItem functionCallItem:
                            break;

                        case StreamingResponseFunctionCallArgumentsDeltaUpdate argumentDelta:
                            break;

                        case StreamingResponseFunctionCallArgumentsDoneUpdate argumentDone:
                            break;

                        case StreamingResponseOutputItemDoneUpdate done when done.Item is FunctionCallResponseItem functionCallItem:
                            // Accumulate FunctionCalls
                            pendingFunctionCalls.Add(functionCallItem.CallId, functionCallItem);
                            FunctionCallAdded?.Invoke(functionCallItem.FunctionName, functionCallItem.FunctionArguments.ToString());
                            break;

                        #endregion

                        #region ReasoningResponse

                        case StreamingResponseOutputItemAddedUpdate added when added.Item is ReasoningResponseItem reasoningItem:
                            break;

                        case StreamingResponseOutputItemDoneUpdate done when done.Item is ReasoningResponseItem reasoningItem:
                            foreach (var summaryPart in reasoningItem.SummaryParts.OfType<ReasoningSummaryTextPart>())
                                ReasoningTextDelta?.Invoke(summaryPart.Text);
                            break;

                        #endregion

                        #region MessageResponse

                        case StreamingResponseOutputItemAddedUpdate added when added.Item is MessageResponseItem messageItem:
                            break;

                        case StreamingResponseOutputItemDoneUpdate done when done.Item is MessageResponseItem messageItem:
                            foreach (var content in messageItem.Content)
                            {
                                foreach (var annotation in content.OutputTextAnnotations)
                                {
                                    // Handle annotations events
                                    switch (annotation.Kind)
                                    {
                                        case ResponseMessageAnnotationKind.ContainerFileCitation:
                                            break;
                                        case ResponseMessageAnnotationKind.FileCitation:
                                            var fileAnnotation = (FileCitationMessageAnnotation)annotation;
                                            break;
                                        case ResponseMessageAnnotationKind.UriCitation:
                                            var uriAnnotation = (UriCitationMessageAnnotation)annotation;
                                            break;
                                        case ResponseMessageAnnotationKind.FilePath:
                                            var filePathAnnotation = (FilePathMessageAnnotation)annotation;
                                            break;
                                    }
                                }
                            }
                            break;

                        #endregion

                        case StreamingResponseCompletedUpdate completed:

                            // Set PreviousResponseId to continue the same thread
                            responseOptions.PreviousResponseId = completed.Response.Id;

                            AccumulateTokenUsage(completed.Response.Usage);

                            if (approvalResponseItem is not null)
                            {
                                responseItems = [approvalResponseItem];
                            }
                            else if (pendingFunctionCalls.Count != 0)
                            {
                                ArgumentNullException.ThrowIfNull(GetFunctionOutput);
                                var functionCallOutputs = await GetFunctionOutput.Invoke(pendingFunctionCalls.Values);
                                foreach (var functionCallOutput in functionCallOutputs)
                                {
                                    var functionName = pendingFunctionCalls[functionCallOutput.CallId].FunctionName;
                                    FunctionCallDone?.Invoke(functionName, functionCallOutput.FunctionOutput);
                                    Debug.WriteLine($"{functionName}:\n{functionCallOutput.FunctionOutput}\n");
                                }

                                responseItems = functionCallOutputs;
                            }
                            else // Streaming Completed & no more pending Function Calls
                            {
                                LastToken?.Invoke(completed.Response);
                            }
                            break;

                        #region Errors

                        case StreamingResponseErrorUpdate error:
                            Debug.WriteLine($"Error: {error.Message}");
                            break;
                        case StreamingResponseFailedUpdate failed:
                            Console.WriteLine($"{ConsoleColors.Red}Failed:{ConsoleColors.Default} {failed.Response.Error.Message}");
                            break;

                        #endregion

                        case StreamingResponseInProgressUpdate inProgress:
                            break;
                        case StreamingResponseOutputTextDoneUpdate textDone:
                            break;

                        case StreamingResponseContentPartAddedUpdate partAdded:
                            if (partAdded.Part.Kind != ResponseContentPartKind.OutputText)
                                Debug.WriteLine($"Content Part Added {partAdded.Part.GetType().Name}: {partAdded.Part.Kind}");
                            break;

                        case StreamingResponseContentPartDoneUpdate partDone:
                            if (partDone.Part.Kind != ResponseContentPartKind.OutputText)
                                Debug.WriteLine($"Content Part Done {partDone.Part.GetType().Name}: {partDone.Part.Kind}");
                            break;

                        case StreamingResponseOutputItemAddedUpdate itemAdded:
                            if (!itemAdded.GetType().Name.StartsWith("Internal"))
                                Debug.WriteLine($"Unknown Item Added: {itemAdded.Item.GetType().Name}");
                            break;
                        case StreamingResponseOutputItemDoneUpdate itemDone:
                            if (!itemDone.Item.GetType().Name.StartsWith("Internal"))
                                Debug.WriteLine($"Unknown Item Done: {itemDone.Item.GetType().Name}");
                            break;

                        default:
                            if (!update.GetType().Name.StartsWith("Internal"))
                                Debug.WriteLine($"Unknown Update: {update.GetType().Name}");
                            break;
                    }
                }
            } while (pendingFunctionCalls.Count != 0 || approvalResponseItem is not null);
        }
        finally
        {
            _gate.Release();
        }
    }

    private void AccumulateTokenUsage(ResponseTokenUsage tokenUsage)
    {
        ResponseCachedTokens += tokenUsage.InputTokenDetails.CachedTokenCount;
        ResponseInputTokens += tokenUsage.InputTokenCount;
        ResponseReasoningTokens += tokenUsage.OutputTokenDetails.ReasoningTokenCount;
        ResponseOutputTokens += tokenUsage.OutputTokenCount;
        ResponseTotalTokens = tokenUsage.TotalTokenCount; // no need to accumulate, it's already the total
    }

    public void Dispose()
    {
        // Clear event handlers so subscribers can be GC'd
        Created = null;
        FirstToken = null;
        TextDelta = null;
        ReasoningTextDelta = null;
        FunctionCallAdded = null;
        FunctionCallDone = null;
        McpToolAdded = null;
        GetFunctionOutput = null;
        LastToken = null;

        GC.SuppressFinalize(this);
    }
}

public static class OpenAIResponseClientExtensions
{
    public static ResponseEvents ToEvents(this OpenAIResponseClient client, ResponseCreationOptions options) => new(client, options);
    public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> tasks) => Task.WhenAll(tasks);
    public static Task<TResult[]> WhenAll<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector) => source.Select(selector).WhenAll();
}

Metadata

Metadata

Assignees

Labels

area: responsesThis item is related to Responsesdesign-discussionAn area of design currently under discussion and open to team and community feedback.feature-requestCategory: A new feature or enhancement to an existing feature is being requested.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions