Skip to content

Problem with using Flows in a bot with a parallel pipeline and multiple LLMs #170

@chadbailey59

Description

@chadbailey59

I'm trying to use a prerelease version of our new parallel voicemail detection architecture, but also use Pipecat Flows in the human side of the conversation. My pipeline looks like this:

pipeline = Pipeline(
        [
            transport.input(),
            rtvi,
            ParallelPipeline(
                # Voicemail detection branch
                [
                    detection_context_aggregator.user(),
                    detection_llm,
                    # fl0,
                    voicemail_tts,
                    FunctionFilter(voicemail_filter),
                ],
                # Human conversation branch
                [
                    context_aggregator.user(),
                    # bot_mute_processor,
                    llm,
                    human_gate,
                    tts,
                    FunctionFilter(human_filter),
                    context_aggregator.assistant(),
                ],
            ),
            transport.output(),
        ]
    )

If I run flow_manager.initialize() early in the call lifecycle, the FlowManager will update the context of both llm (the human LLM) and detection_llm (the voicemail detection LLM). Here's Claude's explanation of why, and a fix that worked for me:

When the FlowManager initializes a new node with flow_manager.initialize(create_initial_node()), it calls _set_node which then calls _update_llm_context. This method sends frames to the task queue:

await self.task.queue_frames(
    [frame_type(messages=messages), LLMSetToolsFrame(tools=functions)]
)

The key issue is that these frames (LLMMessagesUpdateFrame/LLMMessagesAppendFrame and LLMSetToolsFrame) are being sent to the shared task queue, but they're not specifically targeted to the context manager that was provided to the FlowManager.

In your bot.py, both context managers (the main one and the detection one) are connected to the same task pipeline. When the FlowManager sends these frames to the task, they're processed by all context managers in the pipeline, not just the one that was provided to the FlowManager.

Here's how to fix the issue: Add a filter processor that only allows specific frames to reach the detection context manager.
Here's a suggested fix for your bot.py:

# Create a filter function that blocks LLMMessagesUpdateFrame, LLMMessagesAppendFrame, and LLMSetToolsFrame
# from reaching the detection context manager
async def flow_manager_frame_filter(frame):
    from pipecat.frames.frames import LLMMessagesUpdateFrame, LLMMessagesAppendFrame, LLMSetToolsFrame
    # Return False (block) for frames sent by FlowManager
    return not isinstance(frame, (LLMMessagesUpdateFrame, LLMMessagesAppendFrame, LLMSetToolsFrame))

# Then in your pipeline setup, add this filter before the detection_context_aggregator
pipeline = Pipeline(
    [
        transport.input(),
        rtvi,
        # stt,
        ParallelPipeline(
            # Voicemail detection branch
            [
                # audio_collector,
                FunctionFilter(flow_manager_frame_filter),  # Add this filter
                detection_context_aggregator.user(),
                detection_llm,
                # fl0,
                voicemail_tts,
                FunctionFilter(voicemail_filter),
            ],
            # Human conversation branch
            [
                context_aggregator.user(),
                # bot_mute_processor,
                llm,
                human_gate,
                tts,
                FunctionFilter(human_filter),
                context_aggregator.assistant(),
            ],
        ),
        transport.output(),
    ]
)

This filter will prevent the frames sent by the FlowManager from affecting the detection context manager, while still allowing them to reach the main context manager.

The issue occurs because the FlowManager is designed to work with a single context manager and doesn't have a way to target specific context managers when sending frames. By adding a filter, you can ensure that the frames only affect the intended context manager.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions