Skip to content

Feature/issue 34 pipeline#243

Open
Subhra-Nandi wants to merge 12 commits into
sahoo-tech:mainfrom
Subhra-Nandi:feature/issue-34-pipeline
Open

Feature/issue 34 pipeline#243
Subhra-Nandi wants to merge 12 commits into
sahoo-tech:mainfrom
Subhra-Nandi:feature/issue-34-pipeline

Conversation

@Subhra-Nandi
Copy link
Copy Markdown

🔗 Related Issue

Closes #34


📝 Summary of Changes

Implements the complete end-to-end real-time guidance streaming pipeline
as specified in issue #34. Connects every module into a single async loop:

PerceptionBus → ContextEngine → IntelligenceCore → TrustScorer → GuidanceDispatcher → WebSocket

Also fixes the original submission which replaced existing api/main.py and
main.py content entirely instead of integrating the pipeline cleanly alongside
the existing code.


🔍 Type of Change

  • ✨ New feature (non-breaking change that adds functionality)
  • 🐛 Bug fix (non-breaking change that fixes an issue)
  • 📖 Documentation update
  • 🧪 Test addition or improvement

🧪 How Was This Tested?

pytest tests/integration/test_pipeline_e2e.py -v

All 14 integration tests pass. Tests cover:

  • Backpressure (oldest frame evicted, perception never blocked)
  • Deduplication (5 identical frames → 1 WebSocket message)
  • Latency SLA (every frame < 500 ms)
  • 50-frame flood never raises or blocks
  • None guidance → no WebSocket message
  • Clean stop is idempotent
  • Metrics (p50/p95/p99) populated correctly

Test environment:

  • OS: Windows 11
  • Python version: 3.12.5

📸 Screenshots / Recordings (if applicable)

Benchmark results (stub LLM, Windows):

Workers Frames Received Dropped Generated Dispatched p99 ms
1 100 100 0 100 100 < 1 ms
2 100 100 0 100 100 < 1 ms

SLA target: ≤500 ms. Status: ✅ compliant


✅ Pre-Submission Checklist

  • My branch is up to date with upstream/main
  • My code follows the project's coding standards
  • I have written / updated tests for my changes (14 integration tests)
  • All existing tests pass
  • I have added docstrings and type hints to all new public functions
  • I have updated relevant documentation (docs/benchmark_results.md)
  • My PR title follows the Conventional Commits format
  • I have not committed any .env files, secrets, or model weights
  • This PR addresses only the scope of the linked issue

💬 Additional Notes for Reviewer

What changed since the previous submission (closed PR):

  1. api/main.py — previous version replaced the entire file, deleting all
    existing routes (status, mode, actions, context), CORS config using
    settings.CORS_ORIGINS, and the root / endpoint. Fixed by keeping all
    existing content and adding pipeline startup via asynccontextmanager lifespan.

  2. main.py — previous version replaced the entire file, deleting the banner,
    settings, and logging setup. Fixed by keeping all original content and adding
    async def _run() with asyncio.gather(server.serve(), pipeline.run()).

  3. An unrelated commit for issue Build a Hardware Repair Domain Guidance Engine #150 (hardware engine) accidentally landed on
    this branch — it has been cleanly reverted.

Design decisions:

  • Backpressure_ingest_loop evicts oldest frame when processing queue
    is full; perception is never awaited on a full queue.
  • Deduplication — SHA-1 fingerprint of instruction text; same instruction
    never dispatched twice in a row.
  • Latency trackingtime.perf_counter() used over time.monotonic() for
    nanosecond resolution on Windows. p50/p95/p99 exposed via GET /metrics.
  • Clean shutdownstop() cancels all tasks, tears down subsystems in
    reverse-start order, idempotent (safe to call twice).
  • Stub pointsContextEngine._infer_task_type,
    IntelligenceCore._build_instruction, and PerceptionBus._capture_loop
    are clearly marked stubs ready to be swapped with real drivers.

- Add PerceptionBus with backpressure (evict oldest frame when full)
- Add ContextEngine, IntelligenceCore, TrustScorer stubs
- Add GuidanceDispatcher with deduplication (SHA-1 fingerprint)
- Add ActionLogger for undo/replay audit trail
- Add PipelineMetrics with p50/p95/p99 latency tracking
- Target: ≤500 ms frame-to-WebSocket latency
…int (sahoo-tech#34)

- Add asynccontextmanager lifespan that starts/stops ExecraPipeline
- Add GET /health and GET /metrics REST endpoints
- Add POST /frame for synthetic frame injection (testing/demos)
- Add WebSocket /ws/guidance for real-time guidance broadcast
- Pipeline injected from main.py in production; self-hosted in standalone mode
…sahoo-tech#34)

- Parse --domain, --mode, --host, --port, --log-level CLI args
- Inject ExecraPipeline into api.main before uvicorn starts
- Register SIGINT/SIGTERM handlers for clean shutdown
- asyncio.gather(server.serve(), pipeline.run()) shares the event loop
Component tests:
  - PerceptionBus: put/get frame, backpressure eviction
  - GuidanceDispatcher: send to client, deduplication, multi-instruction
  - TrustScorer: high/low confidence threshold

Integration tests (full pipeline, mocked LLM):
  - Single frame produces guidance on WS client
  - All frames processed within 500 ms SLA
  - Backpressure: 50-frame flood never blocks
  - Deduplication: 5 identical frames → 1 WS message
  - None guidance: no WS message sent
  - Clean stop is idempotent
  - Metrics populated correctly after run
…tech#34)

Added Optional from 	yping to core/config.py.

The type annotation REDIS_AUTH: Optional[str] = None on line 54 was
raising NameError: name 'Optional' is not defined at import time,
which blocked pytest from loading conftest.py and prevented all tests
from running.
- benchmark_pipeline.py: measures p50/p95/p99 latency under variable
  worker counts and frame injection rates
- docs/benchmark_results.md: stub-LLM results (p99 < 2 ms),
  production estimates with real LLM (~150-460 ms), SLA compliance
  statement and strategies (streaming, prompt caching, parallel rules)
…features (sahoo-tech#150)

Safety bug fix:
- Replace if/elif/elif chain with independent if blocks so ALL hazards
  in a single frame are reported; old code silently dropped any rule
  after the first matching elif (e.g. power_tool warning invisible when
  circuit_board also present)

New features:
- Implement _check_soldering_proximity(): Euclidean bbox-centre distance;
  fires specific component name when iron within 150px threshold; graceful
  fallback when no bbox data provided
- Implement _select_template(): scores each template by keyword matches
  against detected_classes; returns best match in active_template field
- Add active_template field to GuidanceInstruction dataclass
- Implement _parse_ocr(): extracts repair step numbers and P/N part
  numbers from OCR text; appended as WARNING-level guidance notes
- Document hand_results parameter (reserved for future gesture rules)

Tests: 4 → 24
- Keep all 4 original tests; all still pass unchanged
- Add TestCircuitBoardRule, TestPowerToolRule, TestSolderingIronRule
  positive and negative path tests
- Add TestMultipleSimultaneousHazards: regression tests for the elif bug
- Add TestTemplateSelection: all 4 templates + no-match case
- Add TestOCRIntegration: step number, part number, empty string
- Add TestComponentIdentification: mapping, unknown, deduplication
- Add TestEdgeCases: empty list, uppercase class, missing key, None

Docs:
- Expand docs/physical_domain.md: component dictionary table, step
  templates table, safety rules with trigger conditions and exact
  messages, status codes, OCR integration, GuidanceInstruction fields
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Build the End-to-End Real-Time Guidance Streaming Pipeline

1 participant