Skip to content

Completely rewrite add_messages_streaming#277

Merged
bmerkle merged 43 commits into
microsoft:mainfrom
gvanrossum:pipeline
May 16, 2026
Merged

Completely rewrite add_messages_streaming#277
bmerkle merged 43 commits into
microsoft:mainfrom
gvanrossum:pipeline

Conversation

@gvanrossum
Copy link
Copy Markdown
Collaborator

The throughput is now much higher -- e.g. with concurrency 10 and batch size 10, the Adrian podcast ingest in 40 seconds, compared to 90 on main (with the previous pipelining implementation).

A consequence of the new design is that the message index is now populated at the time messages are added -- the secondary index building no longer needs to do this.

gvanrossum-ms and others added 22 commits May 9, 2026 13:02
Split embedding strategy (uncached chunk, cached related terms).
Add precomputed-embedding write paths for message and related-term
indexes, introducing explicit *_with_embeddings methods in interfaces
and both memory/SQLite implementations. Refactor existing add methods
to compute embeddings once and delegate, enabling pipeline commit paths
to reuse worker-generated embeddings without recomputation.
Previously typechat.Failure from the extractor was a soft error: the
message was still committed (with no knowledge) and the failure recorded.
Since LLM responses are non-deterministic, a Failure is just as
unreliable as a raised exception, so both now stop the pipeline at the
failing message and propagate the error.

- Remove extraction_failure_msg from ChunkProcessingResult and
  _ChunkCommitResult; simplify _commit_batch_from_chunk_results
- Keep stop_state.exception in sync with stop_at_message_id so it
  always reflects the lowest-ordinal failing message
- Update tests accordingly

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace nested try/except* handling with ExceptionGroup handling.
- Preserve producer_state and stop_state exceptions and raise a combined
  ExceptionGroup when multiple distinct failures occur.
- Complete ChunkProcessingResult docstring with all class fields and
  clarify success semantics.
@gvanrossum gvanrossum requested a review from bmerkle May 13, 2026 20:26
@gvanrossum
Copy link
Copy Markdown
Collaborator Author

@KRRT7 If you still care about typeagent-py I'd appreciate your review!

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 13, 2026

I'll review it in the morning

Comment thread tools/ingest_email.py
Comment thread src/typeagent/knowpro/add_messages.py Outdated
Comment thread src/typeagent/knowpro/add_messages.py
Comment thread tests/test_add_messages_pipeline.py
Comment thread src/typeagent/knowpro/add_messages.py Outdated

# Step 2: Generate embeddings (only if extraction succeeded)
try:
result.chunk_embedding = await embedding_model.get_embedding_nocache(chunk_text)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunk_embedding is computed but never written to the message text index

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a deep one! Need to dig more for this.

Comment thread src/typeagent/knowpro/add_messages.py
Copy link
Copy Markdown
Collaborator Author

@gvanrossum gvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bmerkle It's ready for your re-review. I fixed the chunk embeddings issue. Also had added one more PR since your review came in.

Comment thread src/typeagent/knowpro/interfaces_storage.py Outdated
Comment thread src/typeagent/knowpro/conversation_base.py Outdated
Comment thread src/typeagent/knowpro/add_messages.py Outdated
Comment thread src/typeagent/podcasts/podcast.py
Comment thread src/typeagent/transcripts/transcript.py
bmerkle and others added 5 commits May 14, 2026 23:57
…remental_with_embeddings

Chunk embeddings are already consumed by messages.extend(); passing them
through to this function served no purpose.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
After the success-check early return, extracted_knowledge is guaranteed
non-None; an assert communicates this to pyright and catches programmer
errors at runtime.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ing]

Both the IMessageCollection protocol and the MemoryMessageCollection
concrete class now use the specific type.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add index_messages=False flag to IMessageCollection.extend() so callers
can skip message text index population. Deserialization uses it because
message_index.deserialize() will replace the index from the sidecar file
anyway. Also change generate_embeddings(cache=True) to cache=False since
indexing embeddings should not be cached.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@gvanrossum
Copy link
Copy Markdown
Collaborator Author

@bmerkle Claude and I addressed all your feedback. Please re-review.

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 15, 2026

Review findings

All 734 tests pass on the branch. The prior-round issues (type narrowing, dead param, type tightening, deserialization wasted work) look good. The following are new observations not covered in earlier threads.


Correctness

list[typing.Any] still in SQLite backend (sqlite/collections.py:192)

cffb4c3 fixed interfaces_storage.py and memory/collections.py but left the SQLite extend() signature as list[typing.Any]. The Protocol and the concrete SQLite class are now inconsistent — Pyright won't catch type misuse on the SQLite-backed path.

Silent if embedding is not None in _commit_batch_from_chunk_results (conversation_base.py:310–312)

embedding = chunk_embedding_map.get((msg_ord, chunk_ord))
if embedding is not None:
    chunk_embeddings.append(embedding)

The validation loop above this already raises ValueError for any None chunk embedding, so this guard is dead for the non-zero-chunk case. A missing entry in the map means a programmer error (the reassembler staged a message whose chunks weren't all processed), but the silent skip surfaces as a cryptic length-mismatch error one level down in add_messages_starting_at_with_embeddings. Prefer an explicit assertion.

_commit_if_needed staged state not cleared before commit (add_messages.py:401–409)

If commit_batch raises, staged_messages is not cleared. The reassembler's finally block calls _drain_consecutive_complete(force=True), which calls _commit_if_needed(force=True), which retries the same staged messages. For the common failure modes (validation error before the transaction, or a DB error that rolls back) this is harmless. But commit_batch calls _accumulate, which calls the user-provided on_batch_committed. If that callback raises after a successful DB write, the retry double-commits the same messages. Fix: clear staged_messages/staged_results/staged_chunks before await commit_batch(...), not after.

Missing comment on replace-not-append semantics (podcast.py:81, transcript.py:82)

b2fa6ce correctly adds index_messages=False so no indexing happens during extend(), making the old index_size == 0 assertion redundant. But the question from the prior review — does MessageTextIndex.deserialize() replace or append? — is answered by the code (TextToTextLocationIndex.deserialize calls .clear() then reassigns; VectorBase.deserialize does self._vectors = data), not by any comment. Worth a one-liner explaining why the assertion is safe to remove.


Performance

Unnecessary list allocations in hot path (add_messages.py:260)

_collect_related_terms_for_fuzzy_index is called for every chunk in every worker:

for action in list(knowledge.actions) + list(knowledge.inverse_actions):

This allocates three lists (two coercions + one concatenation) to iterate once. Use itertools.chain(knowledge.actions, knowledge.inverse_actions).

Extraction and chunk embedding are sequential (add_messages.py:307–329)

get_embedding_nocache(chunk_text) runs after extraction completes, but it only needs chunk_text — it's independent of the extraction result. Related-term embeddings do depend on extraction, so they must remain after. Using asyncio.gather for the first two:

knowledge_result, chunk_embedding = await asyncio.gather(
    knowledge_extractor.extract(chunk_text),
    embedding_model.get_embedding_nocache(chunk_text),
)

At concurrency=10, each worker saves ~embedding-latency (~20 ms) per chunk off the critical path. On extraction failure the embedding runs unnecessarily, but at low error rates that's acceptable.

[e for e in numpy_array] in three places

result.related_term_embeddings = [e for e in rel_embeddings] (add_messages.py:329), and the same pattern in memory/reltermsindex.py:295 and sqlite/reltermsindex.py:219. All three are just list(rel_embeddings) / list(embeddings).

Semaphore held through result_queue.put (add_messages.py:196–198)

    try:
        ...compute result...
        await result_queue.put(result)   # can block when queue full
    finally:
        sem.release()                    # held until put() completes

When the reassembler is blocked on a commit, result_queue fills to concurrency * 2. Workers that have already finished their LLM/embedding work still hold semaphore slots waiting for a queue slot. The dispatcher stalls, the chunk queue fills, the producer stalls. Moving sem.release() to before result_queue.put() lets the dispatcher start the next chunk immediately after the expensive work finishes. The only risk: on TaskGroup cancellation, a result put-in-progress is silently dropped — but the reassembler task is cancelled too at that point, so it won't be waiting for that result.

@gvanrossum
Copy link
Copy Markdown
Collaborator Author

@KRRT7, I think Claude and I have addressed your concerns in the most recent 8 commits (one per issue).

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 16, 2026

  • Redundant Callback Path: In _reassembler_task, the on_batch_committed parameter is currently unused because the orchestrator passes None and handles the callback via the _accumulate wrapper instead. It might be cleaner to remove this parameter from the reassembler if we intend to keep the current orchestration pattern.
  • Logging Verbosity: When skip_failed_messages is enabled, failures are logged in both the _dispatcher_task (per chunk) and _reassembler_task (per message). For messages with many chunks, this could result in quite a bit of noise in the logs. We might want to consolidate this to only log once at the message level in the reassembler.

Aside from these, the PR looks solid and ready to go.

@gvanrossum
Copy link
Copy Markdown
Collaborator Author

Thanks! Fixed.

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 16, 2026

LGTM now

@bmerkle
Copy link
Copy Markdown
Collaborator

bmerkle commented May 16, 2026

I have one failing tests, however not really stable fail. output appended below
on a second, third and following runs, test_mcp_server.py ran fine. All other tests passed sucessfully.

should be proceed with this PR and file a new bug ?

------------------------------------------------- Captured stderr call -------------------------------------------------
[05/16/26 11:02:28] INFO     Processing request of type            server.py:727
                             ListToolsRequest                                   
                    INFO     Processing request of type            server.py:727
                             CallToolRequest                                    
[05/16/26 11:02:31] INFO     HTTP Request: POST                  _client.py:1740
                             https://xyz.net/opena                
                             i/openai/deployments/text-embedding                
                             -3-small/embeddings?api-version=202                
                             5-01-01-preview "HTTP/1.1 200 OK"                  
                    INFO     HTTP Request: POST                  _client.py:1740
                             https://xyz.net/opena                
                             i/openai/deployments/text-embedding                
                             -3-small/embeddings?api-version=202                
                             5-01-01-preview "HTTP/1.1 200 OK"                  
=============================================== short test summary info ================================================
FAILED tests/test_mcp_server.py::test_mcp_server_query_conversation_slow - AssertionError("{'answer': 'The provided [ANSWER CONTEXT] mentions Kevin Scott briefly but does not include detaile...
============================================ 1 failed, 744 passed in 22.57s ============================================
make: *** [test] Error 1

typeagent-py on  pipeline [$] is 📦 v0.4.0.dev via 🐍 v3.13.12 (typeagent) took 36s 

@bmerkle
Copy link
Copy Markdown
Collaborator

bmerkle commented May 16, 2026

additional failing test...

tests/test_textlocindex.py .........                                                                                                 [ 89%]
tests/test_timestampindex.py .                                                                                                       [ 89%]
tests/test_transcripts.py ......F..........                                                                                          [ 92%]
tests/test_universal_message.py ..                                                                                                   [ 92%]
tests/test_utils.py ...................................                                                                              [ 97%]
tests/test_vectorbase.py ......................                                                                                      [100%]

================================================================= FAILURES =================================================================
________________________________________________ test_transcript_knowledge_extraction_slow _________________________________________________

really_needs_auth = None, embedding_model = <typeagent.aitools.embeddings.CachingEmbeddingModel object at 0x10f5998b0>

    @pytest.mark.asyncio
    async def test_transcript_knowledge_extraction_slow(
        really_needs_auth: None, embedding_model: IEmbeddingModel
    ):
        """
        Test that knowledge extraction works during transcript ingestion.
    
        This test verifies the complete ingestion pipeline:
        1. Parses first 5 messages from Parrot Sketch VTT file
        2. Creates transcript with in-memory storage (fast)
        3. Runs build_index() with auto_extract_knowledge=True
        4. Verifies both mechanical extraction (entities/actions from metadata)
           and LLM extraction (topics from content) work correctly
        """
        # Use in-memory storage for speed
        settings = ConversationSettings(embedding_model)
    
        # Parse first 5 captions from Parrot Sketch
        vtt_file = PARROT_SKETCH_VTT
        if not os.path.exists(vtt_file):
            pytest.skip(f"Test file {vtt_file} not found")
    
        vtt = webvtt.read(vtt_file)
    
        # Create messages from first 5 captions
        messages_list = []
        # vtt is indexable but not iterable
        for i in range(min(len(vtt), 5)):
            caption = vtt[i]
            if not caption.text.strip():
                continue
    
            speaker = getattr(caption, "voice", None)
            text = caption.text.strip()
    
            # Calculate timestamp from WebVTT start time
            offset_seconds = webvtt_timestamp_to_seconds(caption.start)
            timestamp = format_timestamp_utc(UNIX_EPOCH + timedelta(seconds=offset_seconds))
    
            metadata = TranscriptMessageMeta(
                speaker=speaker,
                recipients=[],
            )
            message = TranscriptMessage(
                text_chunks=[text], metadata=metadata, timestamp=timestamp
            )
            messages_list.append(message)
    
        # Create in-memory collections
        msg_coll = MemoryMessageCollection[TranscriptMessage]()
        await msg_coll.extend(messages_list)
    
        _semref_coll = MemorySemanticRefCollection()
        _semref_index = TermToSemanticRefIndex()
    
        # Create transcript with in-memory storage
        transcript = await Transcript.create(
            settings,
            name="Parrot-Test",
            tags=["test", "parrot"],
        )
    
        # Verify we have messages
        assert len(messages_list) >= 3, "Need at least 3 messages for testing"
    
        # Enable knowledge extraction
        settings.semantic_ref_index_settings.auto_extract_knowledge = True
        settings.semantic_ref_index_settings.concurrency = 10
    
        # Add messages with indexing (this should extract knowledge)
>       result = await transcript.add_messages_with_indexing(messages_list)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/test_transcripts.py:311: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
src/typeagent/knowpro/conversation_base.py:193: in add_messages_with_indexing
    await self._add_llm_knowledge_incremental(
src/typeagent/knowpro/conversation_base.py:420: in _add_llm_knowledge_incremental
    await semrefindex.add_batch_to_semantic_ref_index_from_list(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

conversation = Transcript(settings=<typeagent.knowpro.convsettings.ConversationSettings object at 0x10f07b6e0>, storage_provider=<typ....knowpro.secindex.ConversationSecondaryIndexes object at 0x11087cc50>, _query_translator=None, _answer_translator=None)
messages = [ConversationMessage(text_chunks=['[ANIMATION] including dancing Botticelli Venus, which links to pet shop: Mr. Pralin...nversationMessageMeta(speaker='Shopkeeper', recipients=[]), tags=[], timestamp='1970-01-01T00:00:17Z', source_id=None)]
batch = [TextLocation(0, 0), TextLocation(1, 0), TextLocation(2, 0), TextLocation(3, 0), TextLocation(4, 0)]
knowledge_extractor = KnowledgeExtractor(model=<typeagent.aitools.model_adapters.PydanticAIChatModel object at 0x10f0b1e60>, max_chars_per_c... merge_action_knowledge=False, translator=<typechat._internal.translator.TypeChatJsonTranslator object at 0x10f2078a0>)
concurrency = 10

    async def add_batch_to_semantic_ref_index_from_list[
        TMessage: IMessage, TTermToSemanticRefIndex: ITermToSemanticRefIndex
    ](
        conversation: IConversation[TMessage, TTermToSemanticRefIndex],
        messages: list[TMessage],
        batch: list[TextLocation],
        knowledge_extractor: IKnowledgeExtractor,
        concurrency: int = 4,
    ) -> None:
        """Extract knowledge from messages and bulk-add to the semantic ref index."""
        if not batch:
            return
        start_ordinal = batch[0].message_ordinal
    
        text_batch = []
        for tl in batch:
            list_index = tl.message_ordinal - start_ordinal
            if list_index < 0 or list_index >= len(messages):
                raise IndexError(
                    f"Message ordinal {tl.message_ordinal} out of range "
                    f"for list starting at {start_ordinal}"
                )
            text_batch.append(messages[list_index].text_chunks[tl.chunk_ordinal].strip())
    
        knowledge_results = await extract_knowledge_from_text_batch(
            knowledge_extractor,
            text_batch,
            concurrency,
        )
        bulk_items: list[tuple[int, int, kplib.KnowledgeResponse]] = []
        for i, knowledge_result in enumerate(knowledge_results):
            if isinstance(knowledge_result, Failure):
>               raise RuntimeError(
                    f"Knowledge extraction failed: {knowledge_result.message:.150}"
                )
E               RuntimeError: Knowledge extraction failed: Several possible issues may have occurred with the given data.
E               
E               Validation path `actions.0.params.0.str` failed for value `{"name": "item_carried", "v

src/typeagent/storage/memory/semrefindex.py:114: RuntimeError
------------------------------------------------------------ Captured log call -------------------------------------------------------------
ERROR    asyncio:base_events.py:1879 Task exception was never retrieved
future: <Task finished name='Task-1602' coro=<AsyncClient.aclose() done, defined at /Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpx/_client.py:1978> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpx/_client.py", line 1985, in aclose
    await self._transport.aclose()
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpx/_transports/default.py", line 406, in aclose
    await self._pool.aclose()
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 353, in aclose
    await self._close_connections(closing_connections)
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 345, in _close_connections
    await connection.aclose()
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpcore/_async/connection.py", line 173, in aclose
    await self._connection.aclose()
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpcore/_async/http11.py", line 258, in aclose
    await self._network_stream.aclose()
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/httpcore/_backends/anyio.py", line 53, in aclose
    await self._stream.aclose()
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/anyio/streams/tls.py", line 236, in aclose
    await self.transport_stream.aclose()
  File "/Users/merkle/work/microsoft/typeagent-py/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 1344, in aclose
    self._transport.close()
    ~~~~~~~~~~~~~~~~~~~~~^^
  File "/Users/merkle/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/asyncio/selector_events.py", line 1216, in close
    super().close()
    ~~~~~~~~~~~~~^^
  File "/Users/merkle/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/asyncio/selector_events.py", line 869, in close
    self._loop.call_soon(self._call_connection_lost, None)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/merkle/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/asyncio/base_events.py", line 833, in call_soon
    self._check_closed()
    ~~~~~~~~~~~~~~~~~~^^
  File "/Users/merkle/.local/share/uv/python/cpython-3.13.12-macos-aarch64-none/lib/python3.13/asyncio/base_events.py", line 556, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
========================================================= short test summary info ==========================================================
FAILED tests/test_transcripts.py::test_transcript_knowledge_extraction_slow - RuntimeError: Knowledge extraction failed: Several possible issues may have occurred with the given data.
====================================================== 1 failed, 744 passed in 20.32s ======================================================
make: *** [test] Error 1

typeagent-py on  pipeline [$] is 📦 v0.4.0.dev via 🐍 v3.13.12 (typeagent) took 21s 

@gvanrossum
Copy link
Copy Markdown
Collaborator Author

I think those tests are inherently flaky -- they depend on the LLM service being up, speedy, and returning the right answer. You can file a bug for that and in the meantime merge this PR.

@bmerkle bmerkle merged commit c3b1091 into microsoft:main May 16, 2026
16 checks passed
@gvanrossum gvanrossum deleted the pipeline branch May 16, 2026 16:51
@gvanrossum
Copy link
Copy Markdown
Collaborator Author

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants