Skip to content

Implement stream-level fallback for LLM providers#74

Merged
kienbui1995 merged 3 commits intomainfrom
claude/implement-todo-item-uLa31
Apr 19, 2026
Merged

Implement stream-level fallback for LLM providers#74
kienbui1995 merged 3 commits intomainfrom
claude/implement-todo-item-uLa31

Conversation

@kienbui1995
Copy link
Copy Markdown
Owner

@kienbui1995 kienbui1995 commented Apr 19, 2026

What

Implement proper stream-level fallback logic for the FallbackProvider. When the primary provider's stream encounters a retryable error on the first item, the fallback provider is now used. Non-retryable errors are propagated immediately without attempting fallback.

Why

Previously, the fallback provider was not actually used—the code only attempted the primary provider and ignored the fallback. This PR addresses the TODO comment and implements the intended behavior where transient failures (e.g., 503 Service Unavailable) trigger automatic fallback to a secondary provider.

How

  • Refactored FallbackProvider::stream() to use async_stream::stream! macro for composable async stream handling
  • Added next_item() helper function to safely poll the next item from a stream
  • Implemented fallback logic: check the first item from the primary stream; if it's a retryable error, drop the primary stream and switch to the fallback provider
  • Added comprehensive unit tests covering three scenarios:
    • Fallback is used when primary errors with a retryable error
    • Primary provider is used when it succeeds (fallback not called)
    • Non-retryable errors propagate without attempting fallback
  • Added async-stream and futures-core dependencies to support the implementation

Checklist

  • cargo fmt --all && cargo test --workspace && cargo clippy --workspace --all-targets passes
  • No new warnings
  • Tests added for new functionality (3 comprehensive test cases)

https://claude.ai/code/session_01R2n6wKqFkYPvHkwaip8EnJ

Summary by CodeRabbit

  • Improvements

    • Fallback provider now switches to a fallback stream only when the primary fails immediately with a retryable error; successful or non-retryable first responses continue from the primary.
    • More robust stream handling and clearer error propagation.
  • Chores

    • Added async-stream and futures-core dependencies.
    • Added tests validating fallback and error behaviors.

Previously FallbackProvider::stream delegated to primary and never
used the fallback provider. It now peeks the primary's first item;
if that item is a retryable error (connection failure, 429, 503,
timeout), it switches to the fallback provider's full stream. If
primary yields at least one event, subsequent errors propagate
unchanged so the existing mc-core stream_with_retry can handle them
without corrupting partial consumer state.

Built on async-stream (already a workspace dep) rather than adding
tokio-stream. Includes three unit tests covering fallback-on-error,
primary-success, and non-retryable-error-propagation.

https://claude.ai/code/session_01R2n6wKqFkYPvHkwaip8EnJ
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 19, 2026

Important

Review skipped

Review was skipped due to path filters

⛔ Files ignored due to path filters (1)
  • mc/Cargo.lock is excluded by !**/*.lock

CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including **/dist/** will override the default block on the dist directory, by removing the pattern from both the lists.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cc51ded9-d521-481f-b85c-226fea4ab39c

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Implemented stream-level fallback in FallbackProvider::stream that polls the primary stream's first item and switches to the fallback stream only when that first item is a retryable error; added a next_item helper, three tokio tests, and two Cargo workspace dependencies.

Changes

Cohort / File(s) Summary
Dependency Updates
mc/crates/mc-cli/Cargo.toml
Added workspace-managed dependencies async-stream and futures-core to enable async stream construction and futures utilities.
Provider Stream Implementation & Tests
mc/crates/mc-cli/src/provider.rs
Implemented FallbackProvider::stream with logic to poll the primary stream's first item (via new next_item), switch to fallback on retryable first-item errors, yield primary items otherwise. Added three #[cfg(test)] Tokio tests using MockProvider.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant FallbackProvider
    participant PrimaryStream
    participant FallbackStream

    Caller->>FallbackProvider: stream(request)
    activate FallbackProvider
    FallbackProvider->>PrimaryStream: poll first item (next_item)
    activate PrimaryStream
    PrimaryStream-->>FallbackProvider: Result<Event> (ok | err)
    deactivate PrimaryStream

    alt First item is retryable error
        FallbackProvider->>PrimaryStream: stop consuming / drop
        FallbackProvider->>FallbackStream: consume and yield entire fallback stream
        activate FallbackStream
        FallbackStream-->>Caller: yield items until exhaustion
        deactivate FallbackStream
    else First item is ok or non-retryable error
        FallbackProvider->>Caller: yield first item
        FallbackProvider->>PrimaryStream: continue yielding remaining primary items
        activate PrimaryStream
        PrimaryStream-->>Caller: yield remaining items until exhaustion
        deactivate PrimaryStream
    end
    deactivate FallbackProvider
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐇 I nibble code where streams may fail,
If primary trips, I hop the trail,
Futures hum and async threads play,
Fallback blooms to save the day,
Tests applaud my nimble tail.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.46% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately captures the main change: implementing stream-level fallback for LLM providers, which aligns with the core functionality added in FallbackProvider::stream().
Description check ✅ Passed The description comprehensively covers all required template sections (What, Why, How) with specific implementation details, and the checklist confirms all items pass including tests and code quality checks.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/implement-todo-item-uLa31

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a streaming fallback mechanism for LLM providers in the mc-cli crate. It introduces async-stream and futures-core dependencies to facilitate asynchronous stream handling. The FallbackProvider now attempts to consume the primary stream and, if a retryable error is encountered initially, switches to the fallback provider's stream. Comprehensive unit tests have been added to cover success, retryable failure, and non-retryable failure scenarios. The review feedback suggests adding a warning log when a fallback is triggered to improve production observability.

Comment on lines +28 to +34
Some(Err(e)) if e.is_retryable() => {
drop(primary_stream);
let mut fb_stream = fallback.stream(&req);
while let Some(item) = next_item(&mut fb_stream).await {
yield item;
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It would be beneficial to add a log message when a fallback occurs. This improves observability and helps in debugging transient issues with the primary provider in production environments.

                Some(Err(e)) if e.is_retryable() => {
                    tracing::warn!(error = %e, "Primary provider failed with retryable error, switching to fallback");
                    drop(primary_stream);
                    let mut fb_stream = fallback.stream(&req);
                    while let Some(item) = next_item(&mut fb_stream).await {
                        yield item;
                    }
                }

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
mc/crates/mc-cli/src/provider.rs (1)

363-435: Add one regression test for “retryable error after first primary event.”

Current tests don’t explicitly cover: primary yields one event, then hits a retryable error, and fallback must remain unused. Adding this case would lock in the key partial-stream guarantee.

Suggested test case
+    #[tokio::test]
+    async fn retryable_error_after_first_event_does_not_fallback() {
+        let primary = Arc::new(MockProvider::new(vec![vec![
+            Ok(ProviderEvent::TextDelta("from-primary".into())),
+            Err(retryable_api_err()),
+        ]]));
+        let fallback = Arc::new(MockProvider::new(vec![vec![Ok(ProviderEvent::TextDelta(
+            "should-not-appear".into(),
+        ))]]));
+
+        let fp = FallbackProvider {
+            primary: primary.clone() as Arc<dyn LlmProvider>,
+            fallback: fallback.clone() as Arc<dyn LlmProvider>,
+        };
+
+        let events = collect(fp.stream(&empty_request())).await;
+        assert_eq!(primary.call_count(), 1);
+        assert_eq!(fallback.call_count(), 0);
+        assert!(matches!(events.get(1), Some(Err(e)) if e.is_retryable()));
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@mc/crates/mc-cli/src/provider.rs` around lines 363 - 435, Add a new
tokio::test that constructs a primary MockProvider emitting a sequence like
[Ok(ProviderEvent::TextDelta("from-primary".into())), Err(retryable_api_err())]
and a fallback MockProvider with no expected calls, build a FallbackProvider
using those, call collect(fp.stream(&empty_request())).await, then assert the
collected text deltas include only "from-primary", primary.call_count() == 1,
and fallback.call_count() == 0 to ensure a retryable error after the first
primary event does not trigger the fallback.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@mc/crates/mc-cli/src/provider.rs`:
- Around line 363-435: Add a new tokio::test that constructs a primary
MockProvider emitting a sequence like
[Ok(ProviderEvent::TextDelta("from-primary".into())), Err(retryable_api_err())]
and a fallback MockProvider with no expected calls, build a FallbackProvider
using those, call collect(fp.stream(&empty_request())).await, then assert the
collected text deltas include only "from-primary", primary.call_count() == 1,
and fallback.call_count() == 0 to ensure a retryable error after the first
primary event does not trigger the fallback.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7a5fd7e8-c5f4-4f7d-a527-11919a7bda2a

📥 Commits

Reviewing files that changed from the base of the PR and between 519656c and 17365b4.

⛔ Files ignored due to path filters (1)
  • mc/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • mc/crates/mc-cli/Cargo.toml
  • mc/crates/mc-cli/src/provider.rs

claude added 2 commits April 19, 2026 07:25
When the primary provider fails with a retryable error and
FallbackProvider switches to the secondary, emit a tracing::warn!
so operators can observe transient failures in production.

https://claude.ai/code/session_01R2n6wKqFkYPvHkwaip8EnJ
Pre-existing transitive advisory via reqwest; cargo-deny advisories
check was failing in CI. Minimal patch bump fixes both advisories.

https://claude.ai/code/session_01R2n6wKqFkYPvHkwaip8EnJ
@sonarqubecloud
Copy link
Copy Markdown

@kienbui1995 kienbui1995 merged commit c75f265 into main Apr 19, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants