From 00d63ae9bf5dd742197ca1a908e8fab9e288218a Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Fri, 21 Nov 2025 12:12:03 +0100 Subject: [PATCH 01/16] Added streaming support to Chat API & persistence to storage through accumulator callback chaining --- src/agent/src/Toolbox/StreamResult.php | 5 +- src/chat/CHANGELOG.md | 6 + .../Doctrine/DoctrineDbalMessageStore.php | 3 +- src/chat/src/Bridge/Local/CacheStore.php | 3 +- src/chat/src/Bridge/Local/InMemoryStore.php | 3 +- .../src/Bridge/Meilisearch/MessageStore.php | 3 +- .../src/Bridge/Pogocache/MessageStore.php | 3 +- src/chat/src/Bridge/Redis/MessageStore.php | 3 +- .../src/Bridge/SurrealDb/MessageStore.php | 3 +- src/chat/src/Chat.php | 21 +++- src/chat/src/ChatInterface.php | 3 +- .../src/Result/AccumulatingStreamResult.php | 82 ++++++++++++ src/chat/src/StreamableStoreInterface.php | 20 +++ src/chat/tests/ChatTest.php | 117 ++++++++++++++++++ src/platform/src/Result/StreamResult.php | 2 +- .../tests/Result/StreamResultTest.php | 39 +++++- 16 files changed, 295 insertions(+), 21 deletions(-) create mode 100644 src/chat/src/Result/AccumulatingStreamResult.php create mode 100644 src/chat/src/StreamableStoreInterface.php diff --git a/src/agent/src/Toolbox/StreamResult.php b/src/agent/src/Toolbox/StreamResult.php index afc10378a..4b71b14e3 100644 --- a/src/agent/src/Toolbox/StreamResult.php +++ b/src/agent/src/Toolbox/StreamResult.php @@ -33,10 +33,7 @@ public function getContent(): \Generator if ($value instanceof ToolCallResult) { $innerResult = ($this->handleToolCallsCallback)($value, Message::ofAssistant($streamedResult)); - // Propagate metadata from inner result to this result - foreach ($innerResult->getMetadata()->all() as $key => $metadataValue) { - $this->getMetadata()->add($key, $metadataValue); - } + $this->getMetadata()->set($innerResult->getMetadata()->all()); $content = $innerResult->getContent(); // Strings are iterable in PHP but yield from would iterate character-by-character. diff --git a/src/chat/CHANGELOG.md b/src/chat/CHANGELOG.md index 64267a841..eefd0fbb7 100644 --- a/src/chat/CHANGELOG.md +++ b/src/chat/CHANGELOG.md @@ -1,5 +1,11 @@ CHANGELOG ========= +0.2 + * Add streaming support to `ChatInterface::submit()` + - Add `StreamableStoreInterface` which indicates `StoreInterface` implementation can be configured with streaming + - Add `AccumulatingStreamResult` wrapper class which adds accumulation logic & callback chaining to `StreamResult` implementations (can wrap both `Agent` and `Platform` variants) to return the full message once `Generator` is exhausted + - Streamed responses now also create `AssistantMessage` & are added to `Store` in `Chat::submit()` + - Bugfixed loss of metadata in `Chat::submit()` 0.1 --- diff --git a/src/chat/src/Bridge/Doctrine/DoctrineDbalMessageStore.php b/src/chat/src/Bridge/Doctrine/DoctrineDbalMessageStore.php index 6a2ef80fb..31e526253 100644 --- a/src/chat/src/Bridge/Doctrine/DoctrineDbalMessageStore.php +++ b/src/chat/src/Bridge/Doctrine/DoctrineDbalMessageStore.php @@ -24,6 +24,7 @@ use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageNormalizer; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\MessageInterface; use Symfony\Component\Serializer\Encoder\JsonEncoder; @@ -34,7 +35,7 @@ /** * @author Guillaume Loulier */ -final class DoctrineDbalMessageStore implements ManagedStoreInterface, MessageStoreInterface +final class DoctrineDbalMessageStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { public function __construct( private readonly string $tableName, diff --git a/src/chat/src/Bridge/Local/CacheStore.php b/src/chat/src/Bridge/Local/CacheStore.php index 0b2626fa3..57bec1b05 100644 --- a/src/chat/src/Bridge/Local/CacheStore.php +++ b/src/chat/src/Bridge/Local/CacheStore.php @@ -15,12 +15,13 @@ use Symfony\AI\Agent\Exception\RuntimeException; use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; /** * @author Christopher Hertel */ -final class CacheStore implements ManagedStoreInterface, MessageStoreInterface +final class CacheStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { public function __construct( private readonly CacheItemPoolInterface $cache, diff --git a/src/chat/src/Bridge/Local/InMemoryStore.php b/src/chat/src/Bridge/Local/InMemoryStore.php index 362a90472..e75f10cbd 100644 --- a/src/chat/src/Bridge/Local/InMemoryStore.php +++ b/src/chat/src/Bridge/Local/InMemoryStore.php @@ -13,12 +13,13 @@ use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; /** * @author Christopher Hertel */ -final class InMemoryStore implements ManagedStoreInterface, MessageStoreInterface +final class InMemoryStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { /** * @var MessageBag[] diff --git a/src/chat/src/Bridge/Meilisearch/MessageStore.php b/src/chat/src/Bridge/Meilisearch/MessageStore.php index 83b7add7c..b72fba2b5 100644 --- a/src/chat/src/Bridge/Meilisearch/MessageStore.php +++ b/src/chat/src/Bridge/Meilisearch/MessageStore.php @@ -16,6 +16,7 @@ use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageNormalizer; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\MessageInterface; use Symfony\Component\Clock\ClockInterface; @@ -31,7 +32,7 @@ /** * @author Guillaume Loulier */ -final class MessageStore implements ManagedStoreInterface, MessageStoreInterface +final class MessageStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { public function __construct( private readonly HttpClientInterface $httpClient, diff --git a/src/chat/src/Bridge/Pogocache/MessageStore.php b/src/chat/src/Bridge/Pogocache/MessageStore.php index 2a7a28c6d..c80cec290 100644 --- a/src/chat/src/Bridge/Pogocache/MessageStore.php +++ b/src/chat/src/Bridge/Pogocache/MessageStore.php @@ -15,6 +15,7 @@ use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageNormalizer; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\MessageInterface; use Symfony\Component\Serializer\Encoder\JsonEncoder; @@ -28,7 +29,7 @@ /** * @author Guillaume Loulier */ -final class MessageStore implements ManagedStoreInterface, MessageStoreInterface +final class MessageStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { public function __construct( private readonly HttpClientInterface $httpClient, diff --git a/src/chat/src/Bridge/Redis/MessageStore.php b/src/chat/src/Bridge/Redis/MessageStore.php index d8d964fa2..0e176764b 100644 --- a/src/chat/src/Bridge/Redis/MessageStore.php +++ b/src/chat/src/Bridge/Redis/MessageStore.php @@ -14,6 +14,7 @@ use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageNormalizer; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\MessageInterface; use Symfony\Component\Serializer\Encoder\JsonEncoder; @@ -24,7 +25,7 @@ /** * @author Guillaume Loulier */ -final class MessageStore implements ManagedStoreInterface, MessageStoreInterface +final class MessageStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { public function __construct( private readonly \Redis $redis, diff --git a/src/chat/src/Bridge/SurrealDb/MessageStore.php b/src/chat/src/Bridge/SurrealDb/MessageStore.php index 707a4c0be..d77a88187 100644 --- a/src/chat/src/Bridge/SurrealDb/MessageStore.php +++ b/src/chat/src/Bridge/SurrealDb/MessageStore.php @@ -16,6 +16,7 @@ use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageNormalizer; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\MessageInterface; use Symfony\Component\Serializer\Encoder\JsonEncoder; @@ -29,7 +30,7 @@ /** * @author Guillaume Loulier */ -final class MessageStore implements ManagedStoreInterface, MessageStoreInterface +final class MessageStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { private string $authenticationToken = ''; diff --git a/src/chat/src/Chat.php b/src/chat/src/Chat.php index 153672f02..006ea32c8 100644 --- a/src/chat/src/Chat.php +++ b/src/chat/src/Chat.php @@ -12,10 +12,14 @@ namespace Symfony\AI\Chat; use Symfony\AI\Agent\AgentInterface; +use Symfony\AI\Agent\Exception\RuntimeException; +use Symfony\AI\Agent\Toolbox\StreamResult as ToolboxStreamResult; +use Symfony\AI\Chat\Result\AccumulatingStreamResult; use Symfony\AI\Platform\Message\AssistantMessage; use Symfony\AI\Platform\Message\Message; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\UserMessage; +use Symfony\AI\Platform\Result\StreamResult; use Symfony\AI\Platform\Result\TextResult; /** @@ -35,18 +39,31 @@ public function initiate(MessageBag $messages): void $this->store->save($messages); } - public function submit(UserMessage $message): AssistantMessage + public function submit(UserMessage $message): AssistantMessage|AccumulatingStreamResult { $messages = $this->store->load(); $messages->add($message); $result = $this->agent->call($messages); + if ($result instanceof StreamResult || $result instanceof ToolboxStreamResult) { + if (!$this->store instanceof StreamableStoreInterface) { + throw new RuntimeException($this->store::class . ' does not support streaming.'); + } + + return new AccumulatingStreamResult($result, function (AssistantMessage $assistantMessage) use ($messages) { + $messages->add($assistantMessage); + $this->store->save($messages); + }); + } + \assert($result instanceof TextResult); $assistantMessage = Message::ofAssistant($result->getContent()); - $messages->add($assistantMessage); + $assistantMessage->getMetadata()->set($result->getMetadata()->all()); + + $messages->add($assistantMessage); $this->store->save($messages); return $assistantMessage; diff --git a/src/chat/src/ChatInterface.php b/src/chat/src/ChatInterface.php index 727146131..d9122f983 100644 --- a/src/chat/src/ChatInterface.php +++ b/src/chat/src/ChatInterface.php @@ -12,6 +12,7 @@ namespace Symfony\AI\Chat; use Symfony\AI\Agent\Exception\ExceptionInterface; +use Symfony\AI\Chat\Result\AccumulatingStreamResult; use Symfony\AI\Platform\Message\AssistantMessage; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\UserMessage; @@ -26,5 +27,5 @@ public function initiate(MessageBag $messages): void; /** * @throws ExceptionInterface When the chat submission fails due to agent errors */ - public function submit(UserMessage $message): AssistantMessage; + public function submit(UserMessage $message): AssistantMessage|AccumulatingStreamResult; } diff --git a/src/chat/src/Result/AccumulatingStreamResult.php b/src/chat/src/Result/AccumulatingStreamResult.php new file mode 100644 index 000000000..bee9f5a0d --- /dev/null +++ b/src/chat/src/Result/AccumulatingStreamResult.php @@ -0,0 +1,82 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\Chat\Result; + +use Symfony\AI\Agent\Toolbox\StreamResult as ToolboxStreamResult; +use Symfony\AI\Platform\Message\AssistantMessage; +use Symfony\AI\Platform\Message\Message; +use Symfony\AI\Platform\Metadata\Metadata; +use Symfony\AI\Platform\Result\StreamResult; +use Symfony\AI\Platform\Result\ToolCallResult; + +/** + * @author Marco van Angeren + */ +final class AccumulatingStreamResult +{ + private ?\Closure $onComplete = null; + + public function __construct( + private readonly StreamResult|ToolboxStreamResult $innerResult, + ?\Closure $onComplete = null, + ) { + $this->onComplete = $onComplete; + } + + public function addOnComplete(\Closure $callback): void + { + $existingCallback = $this->onComplete; + + $this->onComplete = $existingCallback + ? function (AssistantMessage $message) use ($existingCallback, $callback) { + $existingCallback($message); + $callback($message); + } + : $callback; + } + + public function getContent(): \Generator + { + $accumulatedContent = ''; + $toolCalls = []; + + try { + foreach ($this->innerResult->getContent() as $value) { + if ($value instanceof ToolCallResult) { + array_push($toolCalls, ...$value->getContent()); + yield $value; + continue; + } + + $accumulatedContent .= $value; + yield $value; + } + } finally { + if ($this->onComplete !== null) { + $assistantMessage = Message::ofAssistant( + $accumulatedContent === '' ? null : $accumulatedContent, + $toolCalls ?: null + ); + + $assistantMessage->getMetadata()->set($this->innerResult->getMetadata()->all()); + + ($this->onComplete)($assistantMessage); + } + } + } + + public function getMetadata(): Metadata + { + return $this->innerResult->getMetadata(); + } +} + diff --git a/src/chat/src/StreamableStoreInterface.php b/src/chat/src/StreamableStoreInterface.php new file mode 100644 index 000000000..3636e66ba --- /dev/null +++ b/src/chat/src/StreamableStoreInterface.php @@ -0,0 +1,20 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\AI\Chat; + +/** + * @author Marco van Angeren + */ +interface StreamableStoreInterface +{ +} + diff --git a/src/chat/tests/ChatTest.php b/src/chat/tests/ChatTest.php index b6ad22d9a..40517ccd7 100644 --- a/src/chat/tests/ChatTest.php +++ b/src/chat/tests/ChatTest.php @@ -16,10 +16,14 @@ use Symfony\AI\Agent\AgentInterface; use Symfony\AI\Chat\Bridge\Local\InMemoryStore; use Symfony\AI\Chat\Chat; +use Symfony\AI\Chat\Result\AccumulatingStreamResult; use Symfony\AI\Platform\Message\AssistantMessage; use Symfony\AI\Platform\Message\Message; use Symfony\AI\Platform\Message\MessageBag; +use Symfony\AI\Platform\Result\StreamResult; use Symfony\AI\Platform\Result\TextResult; +use Symfony\AI\Platform\Result\ToolCall; +use Symfony\AI\Platform\Result\ToolCallResult; final class ChatTest extends TestCase { @@ -113,4 +117,117 @@ public function testItHandlesEmptyMessageStore() $this->assertSame($assistantContent, $result->getContent()); $this->assertCount(2, $this->store->load()); } + + public function testItSupportsStreaming() + { + $userMessage = Message::ofUser('What is your favourite song?'); + $generator = (function () { + yield 'Bitter Sweet'; + yield ' '; + yield 'Symfony'; + })(); + + $streamResult = new StreamResult($generator); + + $this->agent->expects($this->once()) + ->method('call') + ->willReturn($streamResult); + + $result = $this->chat->submit($userMessage); + $this->assertInstanceOf(AccumulatingStreamResult::class, $result); + + $chunks = iterator_to_array($result->getContent()); + $this->assertSame(['Bitter Sweet', ' ', 'Symfony'], $chunks); + + $storedMessages = $this->store->load(); + $this->assertCount(2, $storedMessages); + + $lastMessage = end($storedMessages->getMessages()); + $this->assertInstanceOf(AssistantMessage::class, $lastMessage); + $this->assertSame('Bitter Sweet Symfony', $lastMessage->getContent()); + } + + public function testStreamingPreservesMetadata() + { + $userMessage = Message::ofUser('Hello'); + $generator = (function () { + yield 'Test'; + })(); + + $streamResult = new StreamResult($generator); + $streamResult->getMetadata()->add('key1', 'value1'); + $streamResult->getMetadata()->add('key2', 'value2'); + + $this->agent->expects($this->once()) + ->method('call') + ->willReturn($streamResult); + + $result = $this->chat->submit($userMessage); + + iterator_to_array($result->getContent()); + + $storedMessages = $this->store->load(); + $lastMessage = $storedMessages->getMessages()[1]; + $this->assertTrue($lastMessage->getMetadata()->has('key1')); + $this->assertTrue($lastMessage->getMetadata()->has('key2')); + $this->assertSame('value1', $lastMessage->getMetadata()->get('key1')); + $this->assertSame('value2', $lastMessage->getMetadata()->get('key2')); + } + + public function testStreamingWithToolCalls() + { + $userMessage = Message::ofUser('Hello'); + $toolCall = new ToolCall('call_123', 'test_tool', ['param' => 'value']); + $toolCallResult = new ToolCallResult($toolCall); + + $generator = (function () use ($toolCallResult) { + yield 'Some text'; + yield $toolCallResult; + })(); + + $streamResult = new StreamResult($generator); + + $this->agent->expects($this->once()) + ->method('call') + ->willReturn($streamResult); + + $result = $this->chat->submit($userMessage); + + iterator_to_array($result->getContent()); + + $storedMessages = $this->store->load(); + $lastMessage = $storedMessages->getMessages()[1]; + $this->assertInstanceOf(AssistantMessage::class, $lastMessage); + $this->assertSame('Some text', $lastMessage->getContent()); + $this->assertTrue($lastMessage->hasToolCalls()); + } + + public function testStreamingCallbackFiresEvenIfIterationStopsEarly() + { + $userMessage = Message::ofUser('Hello'); + $generator = (function () { + yield 'Chunk1'; + yield 'Chunk2'; + yield 'Chunk3'; + })(); + + $streamResult = new StreamResult($generator); + + $this->agent->expects($this->once()) + ->method('call') + ->willReturn($streamResult); + + $result = $this->chat->submit($userMessage); + + $content = $result->getContent(); + $content->current(); + $content->next(); + + while ($content->valid()) { + $content->next(); + } + + $storedMessages = $this->store->load(); + $this->assertCount(2, $storedMessages); + } } diff --git a/src/platform/src/Result/StreamResult.php b/src/platform/src/Result/StreamResult.php index ef253ec3c..f20c5158d 100644 --- a/src/platform/src/Result/StreamResult.php +++ b/src/platform/src/Result/StreamResult.php @@ -25,4 +25,4 @@ public function getContent(): \Generator { yield from $this->generator; } -} +} \ No newline at end of file diff --git a/src/platform/tests/Result/StreamResultTest.php b/src/platform/tests/Result/StreamResultTest.php index a0506f255..5a860e33f 100644 --- a/src/platform/tests/Result/StreamResultTest.php +++ b/src/platform/tests/Result/StreamResultTest.php @@ -19,17 +19,44 @@ final class StreamResultTest extends TestCase public function testGetContent() { $generator = (function () { - yield 'data1'; - yield 'data2'; + yield 'Hello'; + yield ' '; + yield 'World'; })(); $result = new StreamResult($generator); - $this->assertInstanceOf(\Generator::class, $result->getContent()); + $content = iterator_to_array($result->getContent()); + + $this->assertSame(['Hello', ' ', 'World'], $content); + } + public function testGetContentWithMultipleChunks() + { + $generator = (function () { + yield 'Chunk'; + yield '1'; + yield 'Chunk'; + yield '2'; + })(); + + $result = new StreamResult($generator); + $content = iterator_to_array($result->getContent()); + + $this->assertSame(['Chunk', '1', 'Chunk', '2'], $content); + } + + public function testGetContentWithEmptyGenerator() + { + $generator = (function () { + // Empty generator + if (false) { + yield; + } + })(); + + $result = new StreamResult($generator); $content = iterator_to_array($result->getContent()); - $this->assertCount(2, $content); - $this->assertSame('data1', $content[0]); - $this->assertSame('data2', $content[1]); + $this->assertSame([], $content); } } From 33e0475b6e3d558447db7a7e609615a457f913a1 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Fri, 21 Nov 2025 12:15:07 +0100 Subject: [PATCH 02/16] added newline --- src/platform/src/Result/StreamResult.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/platform/src/Result/StreamResult.php b/src/platform/src/Result/StreamResult.php index f20c5158d..ef253ec3c 100644 --- a/src/platform/src/Result/StreamResult.php +++ b/src/platform/src/Result/StreamResult.php @@ -25,4 +25,4 @@ public function getContent(): \Generator { yield from $this->generator; } -} \ No newline at end of file +} From d2a3b86aa4f1c12887d4445767dbfe16c6053590 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Fri, 21 Nov 2025 12:30:34 +0100 Subject: [PATCH 03/16] reverted irrelevant test changes --- .../tests/Result/StreamResultTest.php | 39 +++---------------- 1 file changed, 6 insertions(+), 33 deletions(-) diff --git a/src/platform/tests/Result/StreamResultTest.php b/src/platform/tests/Result/StreamResultTest.php index 5a860e33f..a0506f255 100644 --- a/src/platform/tests/Result/StreamResultTest.php +++ b/src/platform/tests/Result/StreamResultTest.php @@ -19,44 +19,17 @@ final class StreamResultTest extends TestCase public function testGetContent() { $generator = (function () { - yield 'Hello'; - yield ' '; - yield 'World'; + yield 'data1'; + yield 'data2'; })(); $result = new StreamResult($generator); - $content = iterator_to_array($result->getContent()); - - $this->assertSame(['Hello', ' ', 'World'], $content); - } + $this->assertInstanceOf(\Generator::class, $result->getContent()); - public function testGetContentWithMultipleChunks() - { - $generator = (function () { - yield 'Chunk'; - yield '1'; - yield 'Chunk'; - yield '2'; - })(); - - $result = new StreamResult($generator); - $content = iterator_to_array($result->getContent()); - - $this->assertSame(['Chunk', '1', 'Chunk', '2'], $content); - } - - public function testGetContentWithEmptyGenerator() - { - $generator = (function () { - // Empty generator - if (false) { - yield; - } - })(); - - $result = new StreamResult($generator); $content = iterator_to_array($result->getContent()); - $this->assertSame([], $content); + $this->assertCount(2, $content); + $this->assertSame('data1', $content[0]); + $this->assertSame('data2', $content[1]); } } From 9f698365de496fe9fb0c2c6a7110c7e22fd676f1 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Fri, 21 Nov 2025 12:35:37 +0100 Subject: [PATCH 04/16] fixed phpstan chat --- src/chat/tests/ChatTest.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/chat/tests/ChatTest.php b/src/chat/tests/ChatTest.php index 40517ccd7..6a1b9bfbe 100644 --- a/src/chat/tests/ChatTest.php +++ b/src/chat/tests/ChatTest.php @@ -142,7 +142,8 @@ public function testItSupportsStreaming() $storedMessages = $this->store->load(); $this->assertCount(2, $storedMessages); - $lastMessage = end($storedMessages->getMessages()); + $messages = $storedMessages->getMessages(); + $lastMessage = end($messages); $this->assertInstanceOf(AssistantMessage::class, $lastMessage); $this->assertSame('Bitter Sweet Symfony', $lastMessage->getContent()); } From 048b25d5ed1db3a859d317af05bd022317419c96 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 08:29:25 +0100 Subject: [PATCH 05/16] Fixed metadata overwriting bug in StreamResult --- src/agent/src/Toolbox/StreamResult.php | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/agent/src/Toolbox/StreamResult.php b/src/agent/src/Toolbox/StreamResult.php index 4b71b14e3..304f7224f 100644 --- a/src/agent/src/Toolbox/StreamResult.php +++ b/src/agent/src/Toolbox/StreamResult.php @@ -33,8 +33,11 @@ public function getContent(): \Generator if ($value instanceof ToolCallResult) { $innerResult = ($this->handleToolCallsCallback)($value, Message::ofAssistant($streamedResult)); - $this->getMetadata()->set($innerResult->getMetadata()->all()); - + // Propagate metadata from inner result to this result + foreach ($innerResult->getMetadata()->all() as $key => $metadataValue) { + $this->getMetadata()->add($key, $metadataValue); + } + $content = $innerResult->getContent(); // Strings are iterable in PHP but yield from would iterate character-by-character. // We need to yield the complete string as a single value to preserve streaming behavior. From 82f480413ec504662d338f812c35f86840de32d6 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 08:30:29 +0100 Subject: [PATCH 06/16] Reverted whitespace change --- src/agent/src/Toolbox/StreamResult.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/agent/src/Toolbox/StreamResult.php b/src/agent/src/Toolbox/StreamResult.php index 304f7224f..adaf619e4 100644 --- a/src/agent/src/Toolbox/StreamResult.php +++ b/src/agent/src/Toolbox/StreamResult.php @@ -37,7 +37,6 @@ public function getContent(): \Generator foreach ($innerResult->getMetadata()->all() as $key => $metadataValue) { $this->getMetadata()->add($key, $metadataValue); } - $content = $innerResult->getContent(); // Strings are iterable in PHP but yield from would iterate character-by-character. // We need to yield the complete string as a single value to preserve streaming behavior. From e40acc75ee9dca465497df7e9ebf55732f283459 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 08:31:23 +0100 Subject: [PATCH 07/16] Reverted whitespace change --- src/agent/src/Toolbox/StreamResult.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/agent/src/Toolbox/StreamResult.php b/src/agent/src/Toolbox/StreamResult.php index adaf619e4..2d2256f0e 100644 --- a/src/agent/src/Toolbox/StreamResult.php +++ b/src/agent/src/Toolbox/StreamResult.php @@ -38,6 +38,7 @@ public function getContent(): \Generator $this->getMetadata()->add($key, $metadataValue); } $content = $innerResult->getContent(); + // Strings are iterable in PHP but yield from would iterate character-by-character. // We need to yield the complete string as a single value to preserve streaming behavior. // null should also be yielded as-is. From 3949dee9366494781d226aa57fd6881fd4e3bc6b Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 08:32:34 +0100 Subject: [PATCH 08/16] Finally reverted whitespace properly --- src/agent/src/Toolbox/StreamResult.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/agent/src/Toolbox/StreamResult.php b/src/agent/src/Toolbox/StreamResult.php index 2d2256f0e..37f521e5f 100644 --- a/src/agent/src/Toolbox/StreamResult.php +++ b/src/agent/src/Toolbox/StreamResult.php @@ -37,8 +37,9 @@ public function getContent(): \Generator foreach ($innerResult->getMetadata()->all() as $key => $metadataValue) { $this->getMetadata()->add($key, $metadataValue); } - $content = $innerResult->getContent(); + $content = $innerResult->getContent(); + // Strings are iterable in PHP but yield from would iterate character-by-character. // We need to yield the complete string as a single value to preserve streaming behavior. // null should also be yielded as-is. From 4aa44e535c078327b4e78b84c5e0f3545ad9d737 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 08:34:39 +0100 Subject: [PATCH 09/16] Revert whole file --- src/agent/src/Toolbox/StreamResult.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/agent/src/Toolbox/StreamResult.php b/src/agent/src/Toolbox/StreamResult.php index 37f521e5f..afc10378a 100644 --- a/src/agent/src/Toolbox/StreamResult.php +++ b/src/agent/src/Toolbox/StreamResult.php @@ -37,9 +37,8 @@ public function getContent(): \Generator foreach ($innerResult->getMetadata()->all() as $key => $metadataValue) { $this->getMetadata()->add($key, $metadataValue); } - - $content = $innerResult->getContent(); + $content = $innerResult->getContent(); // Strings are iterable in PHP but yield from would iterate character-by-character. // We need to yield the complete string as a single value to preserve streaming behavior. // null should also be yielded as-is. From 7efc534dd1a8fb5969ba5ee7e3f9d372ba634a8e Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 08:56:44 +0100 Subject: [PATCH 10/16] Fixed changelog --- src/chat/CHANGELOG.md | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/chat/CHANGELOG.md b/src/chat/CHANGELOG.md index eefd0fbb7..7f4542eea 100644 --- a/src/chat/CHANGELOG.md +++ b/src/chat/CHANGELOG.md @@ -1,20 +1,15 @@ CHANGELOG ========= -0.2 +0.1 +--- * Add streaming support to `ChatInterface::submit()` - Add `StreamableStoreInterface` which indicates `StoreInterface` implementation can be configured with streaming - Add `AccumulatingStreamResult` wrapper class which adds accumulation logic & callback chaining to `StreamResult` implementations (can wrap both `Agent` and `Platform` variants) to return the full message once `Generator` is exhausted - Streamed responses now also create `AssistantMessage` & are added to `Store` in `Chat::submit()` - Bugfixed loss of metadata in `Chat::submit()` - -0.1 ---- - - * Introduce the component - * Add support for external message stores: - - Doctrine - - Meilisearch - - MongoDb - - Pogocache - - Redis - - SurrealDb + * Introduce the component + * Add support for external message stores: + - Meilisearch + - Pogocache + - Redis + - SurrealDb From 64f7b9819c45a7606e852d4457c3cbb0149da9f1 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 08:57:34 +0100 Subject: [PATCH 11/16] added whitespace --- src/chat/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/src/chat/CHANGELOG.md b/src/chat/CHANGELOG.md index 7f4542eea..f7f09c9fc 100644 --- a/src/chat/CHANGELOG.md +++ b/src/chat/CHANGELOG.md @@ -2,6 +2,7 @@ CHANGELOG ========= 0.1 --- + * Add streaming support to `ChatInterface::submit()` - Add `StreamableStoreInterface` which indicates `StoreInterface` implementation can be configured with streaming - Add `AccumulatingStreamResult` wrapper class which adds accumulation logic & callback chaining to `StreamResult` implementations (can wrap both `Agent` and `Platform` variants) to return the full message once `Generator` is exhausted From 8a719169089d72cce0a6f7896d517dec290d1086 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 11:37:31 +0100 Subject: [PATCH 12/16] added mongodb streaminablestoreinterface --- src/chat/src/Bridge/MongoDb/MessageStore.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/chat/src/Bridge/MongoDb/MessageStore.php b/src/chat/src/Bridge/MongoDb/MessageStore.php index 3d167e8f1..eb7c2a857 100644 --- a/src/chat/src/Bridge/MongoDb/MessageStore.php +++ b/src/chat/src/Bridge/MongoDb/MessageStore.php @@ -15,6 +15,7 @@ use Symfony\AI\Chat\ManagedStoreInterface; use Symfony\AI\Chat\MessageNormalizer; use Symfony\AI\Chat\MessageStoreInterface; +use Symfony\AI\Chat\StreamableStoreInterface; use Symfony\AI\Platform\Message\MessageBag; use Symfony\AI\Platform\Message\MessageInterface; use Symfony\Component\Serializer\Encoder\JsonEncoder; @@ -27,7 +28,7 @@ /** * @author Guillaume Loulier */ -final class MessageStore implements ManagedStoreInterface, MessageStoreInterface +final class MessageStore implements ManagedStoreInterface, MessageStoreInterface, StreamableStoreInterface { public function __construct( private readonly Client $client, From 4fd4943f142aa5bf17a5348e0684d34b2b25848e Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 11:45:03 +0100 Subject: [PATCH 13/16] Fixed style issues (fabbot) --- src/chat/src/Chat.php | 2 +- src/chat/src/Result/AccumulatingStreamResult.php | 7 +++---- src/chat/src/StreamableStoreInterface.php | 1 - 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/chat/src/Chat.php b/src/chat/src/Chat.php index 006ea32c8..b181b34d7 100644 --- a/src/chat/src/Chat.php +++ b/src/chat/src/Chat.php @@ -48,7 +48,7 @@ public function submit(UserMessage $message): AssistantMessage|AccumulatingStrea if ($result instanceof StreamResult || $result instanceof ToolboxStreamResult) { if (!$this->store instanceof StreamableStoreInterface) { - throw new RuntimeException($this->store::class . ' does not support streaming.'); + throw new RuntimeException($this->store::class.' does not support streaming.'); } return new AccumulatingStreamResult($result, function (AssistantMessage $assistantMessage) use ($messages) { diff --git a/src/chat/src/Result/AccumulatingStreamResult.php b/src/chat/src/Result/AccumulatingStreamResult.php index bee9f5a0d..2b467218a 100644 --- a/src/chat/src/Result/AccumulatingStreamResult.php +++ b/src/chat/src/Result/AccumulatingStreamResult.php @@ -41,7 +41,7 @@ public function addOnComplete(\Closure $callback): void $existingCallback($message); $callback($message); } - : $callback; + : $callback; } public function getContent(): \Generator @@ -61,9 +61,9 @@ public function getContent(): \Generator yield $value; } } finally { - if ($this->onComplete !== null) { + if (null !== $this->onComplete) { $assistantMessage = Message::ofAssistant( - $accumulatedContent === '' ? null : $accumulatedContent, + '' === $accumulatedContent ? null : $accumulatedContent, $toolCalls ?: null ); @@ -79,4 +79,3 @@ public function getMetadata(): Metadata return $this->innerResult->getMetadata(); } } - diff --git a/src/chat/src/StreamableStoreInterface.php b/src/chat/src/StreamableStoreInterface.php index 3636e66ba..45268b0f6 100644 --- a/src/chat/src/StreamableStoreInterface.php +++ b/src/chat/src/StreamableStoreInterface.php @@ -17,4 +17,3 @@ interface StreamableStoreInterface { } - From 780ef0bf1f4e9ff2b5c6bb0d53670f2c678889b7 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 11:56:35 +0100 Subject: [PATCH 14/16] fix indentaiton changelog --- src/chat/CHANGELOG.md | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/chat/CHANGELOG.md b/src/chat/CHANGELOG.md index f7f09c9fc..a44ca58a3 100644 --- a/src/chat/CHANGELOG.md +++ b/src/chat/CHANGELOG.md @@ -3,14 +3,17 @@ CHANGELOG 0.1 --- - * Add streaming support to `ChatInterface::submit()` - - Add `StreamableStoreInterface` which indicates `StoreInterface` implementation can be configured with streaming - - Add `AccumulatingStreamResult` wrapper class which adds accumulation logic & callback chaining to `StreamResult` implementations (can wrap both `Agent` and `Platform` variants) to return the full message once `Generator` is exhausted - - Streamed responses now also create `AssistantMessage` & are added to `Store` in `Chat::submit()` - - Bugfixed loss of metadata in `Chat::submit()` - * Introduce the component - * Add support for external message stores: - - Meilisearch - - Pogocache - - Redis - - SurrealDb + * Add streaming support to `ChatInterface::submit()` + - Add `StreamableStoreInterface` which indicates `StoreInterface` implementation can be configured with streaming + - Add `AccumulatingStreamResult` wrapper class which adds accumulation logic & callback chaining to `StreamResult` implementations (can wrap both `Agent` and `Platform` variants) to return the full message once `Generator` is exhausted + - Streamed responses now also create `AssistantMessage` & are added to `Store` in `Chat::submit()` + - Bugfixed loss of metadata in `Chat::submit()` + * Introduce the component + * Introduce the component + * Add support for external message stores: + - Doctrine + - Meilisearch + - MongoDb + - Pogocache + - Redis + - SurrealDb From abedd41bc176fdc9a78328b89c2f1613bdc7abcd Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 11:59:24 +0100 Subject: [PATCH 15/16] readded whitespace --- src/chat/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/chat/CHANGELOG.md b/src/chat/CHANGELOG.md index a44ca58a3..e7803ccab 100644 --- a/src/chat/CHANGELOG.md +++ b/src/chat/CHANGELOG.md @@ -1,9 +1,10 @@ CHANGELOG ========= + 0.1 --- - * Add streaming support to `ChatInterface::submit()` +* Add streaming support to `ChatInterface::submit()` - Add `StreamableStoreInterface` which indicates `StoreInterface` implementation can be configured with streaming - Add `AccumulatingStreamResult` wrapper class which adds accumulation logic & callback chaining to `StreamResult` implementations (can wrap both `Agent` and `Platform` variants) to return the full message once `Generator` is exhausted - Streamed responses now also create `AssistantMessage` & are added to `Store` in `Chat::submit()` From 8df829ed054f75e7e4ab55b21605f8f007773740 Mon Sep 17 00:00:00 2001 From: Marco van Angeren Date: Mon, 24 Nov 2025 11:59:57 +0100 Subject: [PATCH 16/16] removed dupe line in changelog --- src/chat/CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/src/chat/CHANGELOG.md b/src/chat/CHANGELOG.md index e7803ccab..de18d90a9 100644 --- a/src/chat/CHANGELOG.md +++ b/src/chat/CHANGELOG.md @@ -10,7 +10,6 @@ CHANGELOG - Streamed responses now also create `AssistantMessage` & are added to `Store` in `Chat::submit()` - Bugfixed loss of metadata in `Chat::submit()` * Introduce the component - * Introduce the component * Add support for external message stores: - Doctrine - Meilisearch