From e0ad35762564f2e9599300871286a994c461e00a Mon Sep 17 00:00:00 2001 From: Oscar Esteve Date: Tue, 7 Oct 2025 21:50:54 +0100 Subject: [PATCH] Add stream usage support for OpenAI GPT --- src/agent/src/Toolbox/StreamResult.php | 8 +- src/agent/tests/Toolbox/StreamResultTest.php | 132 +++++++++++++++++ .../src/Bridge/OpenAi/Gpt/ResultConverter.php | 11 ++ .../OpenAi/Gpt/ResultConverterStreamTest.php | 139 ++++++++++++++++++ 4 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 src/agent/tests/Toolbox/StreamResultTest.php create mode 100644 src/platform/tests/Bridge/OpenAi/Gpt/ResultConverterStreamTest.php diff --git a/src/agent/src/Toolbox/StreamResult.php b/src/agent/src/Toolbox/StreamResult.php index 63efcaa93..6c1d9c84e 100644 --- a/src/agent/src/Toolbox/StreamResult.php +++ b/src/agent/src/Toolbox/StreamResult.php @@ -33,7 +33,13 @@ public function getContent(): \Generator if ($value instanceof ToolCallResult) { yield from ($this->handleToolCallsCallback)($value, Message::ofAssistant($streamedResult))->getContent(); - break; + continue; + } + + if (!\is_string($value)) { + yield $value; + + continue; } $streamedResult .= $value; diff --git a/src/agent/tests/Toolbox/StreamResultTest.php b/src/agent/tests/Toolbox/StreamResultTest.php new file mode 100644 index 000000000..e717c3ed6 --- /dev/null +++ b/src/agent/tests/Toolbox/StreamResultTest.php @@ -0,0 +1,132 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\Agent\Tests\Toolbox; + +use PHPUnit\Framework\TestCase; +use Symfony\AI\Agent\Toolbox\StreamResult as ToolboxStreamResult; +use Symfony\AI\Platform\Message\AssistantMessage; +use Symfony\AI\Platform\Metadata\TokenUsage; +use Symfony\AI\Platform\Result\BaseResult; +use Symfony\AI\Platform\Result\ToolCall; +use Symfony\AI\Platform\Result\ToolCallResult; + +final class StreamResultTest extends TestCase +{ + public function testStreamsPlainChunksWithoutToolCall() + { + $chunks = ['He', 'llo']; + $generator = (function () use ($chunks) { + foreach ($chunks as $c) { + yield $c; + } + })(); + + $callbackCalled = false; + $callback = function () use (&$callbackCalled) { + $callbackCalled = true; + + // Return any result, won't be used in this test + return new class extends BaseResult { + public function getContent(): iterable + { + yield 'ignored'; + } + }; + }; + + $stream = new ToolboxStreamResult($generator, $callback); + $received = []; + foreach ($stream->getContent() as $value) { + $received[] = $value; + } + + $this->assertSame($chunks, $received); + $this->assertFalse($callbackCalled, 'Callback should not be called when no ToolCallResult appears.'); + } + + public function testInvokesCallbackOnToolCallAndYieldsItsContent() + { + $toolCallResult = new ToolCallResult(new ToolCall('id1', 'tool1', ['arg' => 'value'])); + + $generator = (function () use ($toolCallResult) { + yield 'He'; + yield 'llo'; + yield $toolCallResult; + yield 'AFTER'; + })(); + + $receivedAssistantMessage = null; + $receivedToolCallResult = null; + + $callback = function (ToolCallResult $result, AssistantMessage $assistantMessage) use (&$receivedAssistantMessage, &$receivedToolCallResult) { + $receivedToolCallResult = $result; + $receivedAssistantMessage = $assistantMessage; + + // Return a result that itself yields more chunks + return new class extends BaseResult { + public function getContent(): iterable + { + yield ' world'; + yield '!'; + } + }; + }; + + $stream = new ToolboxStreamResult($generator, $callback); + + $received = []; + foreach ($stream->getContent() as $value) { + $received[] = $value; + } + + $this->assertSame(['He', 'llo', ' world', '!', 'AFTER'], $received); + $this->assertInstanceOf(ToolCallResult::class, $receivedToolCallResult); + $this->assertInstanceOf(AssistantMessage::class, $receivedAssistantMessage); + $this->assertSame('Hello', $receivedAssistantMessage->content); + } + + public function testStreamsPlainChunksWithTokenUsage() + { + $chunks = [ + 'He', + 'llo', + new TokenUsage(), + ]; + $generator = (function () use ($chunks) { + foreach ($chunks as $c) { + yield $c; + } + })(); + + $callbackCalled = false; + $callback = function () use (&$callbackCalled) { + $callbackCalled = true; + + // Return any result, won't be used in this test + return new class extends BaseResult { + public function getContent(): iterable + { + yield 'ignored'; + } + }; + }; + + $stream = new ToolboxStreamResult($generator, $callback); + $received = []; + foreach ($stream->getContent() as $value) { + $received[] = $value; + } + + $this->assertSame($chunks, $received); + $this->assertFalse($callbackCalled, 'Callback should not be called when no ToolCallResult appears.'); + } +} diff --git a/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php b/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php index 02c6b611e..4bd292294 100644 --- a/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php +++ b/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php @@ -17,6 +17,7 @@ use Symfony\AI\Platform\Exception\ContentFilterException; use Symfony\AI\Platform\Exception\RateLimitExceededException; use Symfony\AI\Platform\Exception\RuntimeException; +use Symfony\AI\Platform\Metadata\TokenUsage; use Symfony\AI\Platform\Model; use Symfony\AI\Platform\Result\ChoiceResult; use Symfony\AI\Platform\Result\RawHttpResult; @@ -106,6 +107,16 @@ private function convertStream(HttpResponse $result): \Generator yield new ToolCallResult(...array_map($this->convertToolCall(...), $toolCalls)); } + if ($usage = $data['usage'] ?? null) { + yield new TokenUsage( + promptTokens: $usage['prompt_tokens'] ?? null, + completionTokens: $usage['completion_tokens'] ?? null, + thinkingTokens: $usage['completion_tokens_details']['reasoning_tokens'] ?? null, + cachedTokens: $usage['prompt_tokens_details']['cached_tokens'] ?? null, + totalTokens: $usage['total_tokens'] ?? null, + ); + } + if (!isset($data['choices'][0]['delta']['content'])) { continue; } diff --git a/src/platform/tests/Bridge/OpenAi/Gpt/ResultConverterStreamTest.php b/src/platform/tests/Bridge/OpenAi/Gpt/ResultConverterStreamTest.php new file mode 100644 index 000000000..0530fdbc3 --- /dev/null +++ b/src/platform/tests/Bridge/OpenAi/Gpt/ResultConverterStreamTest.php @@ -0,0 +1,139 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\Platform\Tests\Bridge\OpenAi\Gpt; + +use PHPUnit\Framework\TestCase; +use Symfony\AI\Platform\Bridge\OpenAi\Gpt\ResultConverter; +use Symfony\AI\Platform\Metadata\TokenUsage; +use Symfony\AI\Platform\Result\RawHttpResult; +use Symfony\AI\Platform\Result\StreamResult; +use Symfony\AI\Platform\Result\ToolCallResult; +use Symfony\Component\HttpClient\EventSourceHttpClient; +use Symfony\Component\HttpClient\MockHttpClient; +use Symfony\Component\HttpClient\Response\MockResponse; + +final class ResultConverterStreamTest extends TestCase +{ + public function testStreamTextDeltas() + { + $sseBody = '' + ."data: {\"choices\":[{\"delta\":{\"role\":\"assistant\"},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{\"content\":\"Hello \"},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{\"content\":\"world\"},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{},\"index\":0,\"finish_reason\":\"stop\"}]}\n\n" + ."data: [DONE]\n\n"; + + $mockClient = new MockHttpClient([ + new MockResponse($sseBody, [ + 'http_code' => 200, + 'response_headers' => [ + 'content-type' => 'text/event-stream', + ], + ]), + ]); + $esClient = new EventSourceHttpClient($mockClient); + $asyncResponse = $esClient->request('GET', 'http://localhost/stream'); + + $converter = new ResultConverter(); + $result = $converter->convert(new RawHttpResult($asyncResponse), ['stream' => true]); + + $this->assertInstanceOf(StreamResult::class, $result); + $chunks = []; + foreach ($result->getContent() as $delta) { + $chunks[] = $delta; + } + + // Only text deltas are yielded; role and finish chunks are ignored + $this->assertSame(['Hello ', 'world'], $chunks); + } + + public function testStreamToolCallsAreAssembledAndYielded() + { + // Simulate a tool call that is streamed in multiple argument parts + $sseBody = '' + ."data: {\"choices\":[{\"delta\":{\"role\":\"assistant\"},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"id\":\"call_123\",\"type\":\"function\",\"function\":{\"name\":\"test_function\",\"arguments\":\"{\\\"arg1\\\": \\\"value1\\\"}\"}}]},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{},\"index\":0,\"finish_reason\":\"tool_calls\"}]}\n\n" + ."data: [DONE]\n\n"; + + $mockClient = new MockHttpClient([ + new MockResponse($sseBody, [ + 'http_code' => 200, + 'response_headers' => [ + 'content-type' => 'text/event-stream', + ], + ]), + ]); + $esClient = new EventSourceHttpClient($mockClient); + $asyncResponse = $esClient->request('GET', 'http://localhost/stream'); + + $converter = new ResultConverter(); + $result = $converter->convert(new RawHttpResult($asyncResponse), ['stream' => true]); + + $this->assertInstanceOf(StreamResult::class, $result); + + $yielded = []; + foreach ($result->getContent() as $delta) { + $yielded[] = $delta; + } + + // Expect only one yielded item and it should be a ToolCallResult + $this->assertCount(1, $yielded); + $this->assertInstanceOf(ToolCallResult::class, $yielded[0]); + /** @var ToolCallResult $toolCallResult */ + $toolCallResult = $yielded[0]; + $toolCalls = $toolCallResult->getContent(); + + $this->assertCount(1, $toolCalls); + $this->assertSame('call_123', $toolCalls[0]->getId()); + $this->assertSame('test_function', $toolCalls[0]->getName()); + $this->assertSame(['arg1' => 'value1'], $toolCalls[0]->getArguments()); + } + + public function testStreamTokenUsage() + { + $sseBody = '' + ."data: {\"choices\":[{\"delta\":{\"role\":\"assistant\"},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{\"content\":\"Hello \"},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{\"content\":\"world\"},\"index\":0}]}\n\n" + ."data: {\"choices\":[{\"delta\":{},\"index\":0,\"finish_reason\":\"stop\"}]}\n\n" + ."data: {\"usage\":{\"prompt_tokens\":1039,\"completion_tokens\":10,\"total_tokens\":1049,\"prompt_tokens_details\":{\"cached_tokens\":0,\"audio_tokens\":0},\"completion_tokens_details\":{\"reasoning_tokens\":0,\"audio_tokens\":0,\"accepted_prediction_tokens\":0,\"rejected_prediction_tokens\":0}}}\n\n" + ."data: [DONE]\n\n"; + + $mockClient = new MockHttpClient([ + new MockResponse($sseBody, [ + 'http_code' => 200, + 'response_headers' => [ + 'content-type' => 'text/event-stream', + ], + ]), + ]); + $esClient = new EventSourceHttpClient($mockClient); + $asyncResponse = $esClient->request('GET', 'http://localhost/stream'); + + $converter = new ResultConverter(); + $result = $converter->convert(new RawHttpResult($asyncResponse), ['stream' => true]); + + $this->assertInstanceOf(StreamResult::class, $result); + + $yielded = []; + foreach ($result->getContent() as $delta) { + $yielded[] = $delta; + } + $this->assertCount(3, $yielded); + $this->assertInstanceOf(TokenUsage::class, $yielded[2]); + $this->assertSame(1039, $yielded[2]->promptTokens); + $this->assertSame(10, $yielded[2]->completionTokens); + $this->assertSame(1049, $yielded[2]->totalTokens); + $this->assertSame(0, $yielded[2]->cachedTokens); + } +}