Skip to content

Migrate ingest_vtt.py to streaming ingestion API#267

Merged
gvanrossum merged 4 commits intomicrosoft:mainfrom
KRRT7:feat/vtt-streaming-ingestion
May 5, 2026
Merged

Migrate ingest_vtt.py to streaming ingestion API#267
gvanrossum merged 4 commits intomicrosoft:mainfrom
KRRT7:feat/vtt-streaming-ingestion

Conversation

@KRRT7
Copy link
Copy Markdown
Contributor

@KRRT7 KRRT7 commented May 4, 2026

Summary

Streaming migration (commit 1):

  • Replaces the manual add_messages_with_indexing batch loop with add_messages_streaming, matching the pattern already used by ingest_email.py and podcast_ingest.py
  • Lazy async generator yields TranscriptMessage directly from the VTT parsing loop — no intermediate list allocation
  • Adds source_id per message ({vtt_file}#{index}) for restartability and dedup
  • Adds --batch-size (default 100) and --concurrency CLI arguments
  • on_batch_committed callback reports per-batch chunks, semrefs, and timing
  • Graceful ^C handling — committed batches survive interruption

Review feedback (commit 2):

  • Replaced msg_count = [0] list trick with nonlocal (per Guido's review)
  • Removed verbose prints from inside the async generator that raced with batch callback output

Dead code cleanup (commit 3):

All three ingest tools now use the same streaming pattern.

Stack

This PR is stacked on #271. Merge order: #271#267#268

# PR Branch Description
1 #271 fix/remove-filter-ingested Remove framework dedup
2 #267 (this) feat/vtt-streaming-ingestion VTT streaming migration
3 #268 refactor/email-dedup-consolidation Email bulk dedup

Reproducible test (Parrot Sketch VTT, ships in repo)

uv run python tools/ingest_vtt.py tests/testdata/Parrot_Sketch.vtt -d /tmp/parrot.db -v --batch-size 20
Metric Value
Messages 76 (5 speakers)
Batches 4 (batch-size 20)
Chunks 76
Semrefs 612
Total time 67.5s
Per chunk 0.89s

^C interrupt test

uv run python tools/ingest_vtt.py tests/testdata/Parrot_Sketch.vtt -d /tmp/ctrl_c.db -v --batch-size 10
# send SIGINT after 2 batches

Output after ^C:

Ingestion interrupted by user (^C).
  Successfully added 20 messages
  Ingested 20 chunk(s)
  Extracted 188 semantic references

DB verification: 20 messages, 188 semrefs, 20 ingested sources — exactly the 2 committed batches.

Test plan

  • make format check test passes (696 tests, 0 pyright errors)
  • Manual test: ingest a VTT file, verify progress callback output (see above)
  • Manual test: interrupt with ^C mid-ingestion, verify partial data committed (see above)

@KRRT7 KRRT7 force-pushed the feat/vtt-streaming-ingestion branch 3 times, most recently from a7dfb7c to 817df7a Compare May 4, 2026 07:24
@KRRT7 KRRT7 marked this pull request as ready for review May 4, 2026 07:50
Copy link
Copy Markdown
Collaborator

@gvanrossum gvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this on tests/testdata/PythonDocumentary.vtt and it did its job in 1m30s, which is pretty good (though I have a hard time coming up with queries it can answer).

However, I noticed some odd interleaving in the verbose output; I assume due to concurrency:

$ time uv run tools/ingest_vtt.py tests/testdata/PythonDocumentary.vtt -d docu-new.db --conc 20 -v
Ingesting 1 VTT file(s):
  - tests/testdata/PythonDocumentary.vtt
Target database: docu-new.db

Analyzing VTT files...
  tests/testdata/PythonDocumentary.vtt:
    Duration: 5037.52 seconds (83.96 minutes)
    Speakers: None detected

Total duration: 5037.52 seconds (83.96 minutes)
All speakers: 0 (None detected)
Loading environment...
Setting up conversation settings...
Settings and storage provider configured

Parsing VTT files and creating messages...
    auto_extract_knowledge = True
    concurrency = 20
  Processing messages in batches of 100 (concurrency=20)...
  Processing tests/testdata/PythonDocumentary.vtt...
    100 messages | +100 chunks | +501 semrefs | 12.4s (0.12s/chunk) | 12.4s elapsed
    200 messages | +100 chunks | +646 semrefs | 9.2s (0.09s/chunk) | 21.6s elapsed
    300 messages | +100 chunks | +620 semrefs | 15.1s (0.15s/chunk) | 36.7s elapsed
    400 messages | +100 chunks | +651 semrefs | 12.0s (0.12s/chunk) | 48.7s elapsed
    500 messages | +100 chunks | +631 semrefs | 10.1s (0.10s/chunk) | 58.8s elapsed
    600 messages | +100 chunks | +631 semrefs | 10.3s (0.10s/chunk) | 69.1s elapsed
    Extracted 769 messages so far
    File time range: 0.00s to 5037.52s (with offset: 0.00s to 5037.52s)
    700 messages | +100 chunks | +593 semrefs | 10.3s (0.10s/chunk) | 79.3s elapsed
    769 messages | +69 chunks | +412 semrefs | 8.6s (0.12s/chunk) | 87.9s elapsed
  Successfully added 769 messages
  Ingested 769 chunk(s)
  Extracted 4685 semantic references
  Total time: 87.9s
  Overall time per chunk: 0.11s/chunk
All indexes built successfully

To query the transcript, use:
  python tools/query.py --database 'docu-new.db' --query 'Your question here'

real    1m29.432s
user    0m6.711s
sys     0m0.567s
$ 

Note how Extracted 769 messages so far and File time range: 0.00s to 5037.52s (with offset: 0.00s to 5037.52s) are followed by two more per-batch log lines.

Comment thread tools/ingest_vtt.py Outdated
# Process all VTT files and collect messages
all_messages: list[TranscriptMessage] = []
time_offset = 0.0 # Cumulative time offset for multiple files
msg_count = [0] # mutable counter shared with the generator
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the introduction of 'nonlocal' this is an anti-pattern.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taking a look, thanks!

@KRRT7
Copy link
Copy Markdown
Contributor Author

KRRT7 commented May 5, 2026

Fixed both in fb29108:

  1. nonlocal — replaced the msg_count = [0] list trick with nonlocal msg_count, thanks for the catch.

  2. Interleaved output — removed the "Extracted N messages so far" / "File time range" prints from inside the async generator. They raced with on_batch_committed because the generator yields messages that are still being processed concurrently. The per-file analysis section (printed before streaming starts) already shows duration/speakers, and the batch progress lines + summary cover message counts, so no info is lost.

KRRT7 added 4 commits May 5, 2026 02:24
The framework no longer does per-batch dedup queries inside
add_messages_streaming. Callers are responsible for filtering
duplicates before yielding messages into the stream.

- ingest_email.py already pre-filters via is_source_ingested
- ingest_vtt.py enforces a fresh DB (refuses existing)
- podcast_ingest.py uses unique source_ids by construction

This eliminates ~N unnecessary are_sources_ingested DB round-trips
(one per batch) that always returned empty sets.

Closes microsoft#269
Replace manual add_messages_with_indexing batch loop with
add_messages_streaming, matching the pattern already used by
ingest_email.py and podcast_ingest.py. This pipelines LLM
extraction with DB commits for ~2x throughput on multi-batch
ingestions.

- Add source_id per message for restartability/dedup
- Add --batch-size and --concurrency CLI arguments
- Graceful ^C handling (committed batches survive)
- Replace msg_count = [0] list trick with nonlocal (per Guido's review)
- Remove verbose prints from inside the async generator that raced with
  on_batch_committed callback output during concurrent processing
The framework no longer populates messages_skipped (removed in microsoft#271),
so the skipped counter and conditional output are dead code.
@KRRT7
Copy link
Copy Markdown
Contributor Author

KRRT7 commented May 5, 2026

Heads up — I've stacked these three PRs so they can merge cleanly in sequence:

  1. Remove redundant _filter_ingested from streaming pipeline #271 removes _filter_ingested from the streaming pipeline entirely (callers own dedup now). This is the base.
  2. Migrate ingest_vtt.py to streaming ingestion API #267 (this PR) rebases on top of Remove redundant _filter_ingested from streaming pipeline #271 and also cleans up the now-dead messages_skipped tracking.
  3. Remove redundant per-file is_source_ingested check from email generator #268 (email bulk dedup) rebases on top of Migrate ingest_vtt.py to streaming ingestion API #267.

Each PR is independently green (pyright + 696 tests), and they touch different files so conflicts won't appear if merged in order. If you want to review them independently, #271 is the architectural change — the other two are tool-level improvements that benefit from it.

@gvanrossum gvanrossum merged commit fed4b52 into microsoft:main May 5, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants