# Using multiple LLMs for data cleaning

[![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/plugboard-dev/plugboard)

This model demonstrates how you can use multiple LLMs for cleaning and organising input data. We're going to use the hotel review dataset, but this approach could easily be applied to other data structuring tasks.

Using a costly LLM...

To run this model you will need to set environment variables for [`OPENAI_API_KEY`](https://platform.openai.com/settings/) and [`GOOGLE_API_KEY`](https://aistudio.google.com/app/apikey). 

In [None]:
from abc import ABC
import json
import os
from getpass import getpass
import typing as _t

import pandas as pd
from pydantic import BaseModel

from plugboard.component import Component, IOController as IO
from plugboard.connector import AsyncioConnector
from plugboard.connector import AsyncioConnector, ConnectorBuilder
from plugboard.events import Event, EventConnectorBuilder, StopEvent
from plugboard.schemas import ConnectorSpec
from plugboard.process import LocalProcess
from plugboard.library import FileReader, FileWriter, LLMChat

In [None]:
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
if "GOOGLE_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter your Google API key: ")

In [None]:
class EventHandlingComponent(Component, ABC):
    base_class: type[Component]

    def __init__(self, **kwargs):
        super().__init__(name=kwargs["name"])
        kwargs["name"] = f"_event_{kwargs['name']}"
        self._base = self.base_class(**kwargs)

    async def init(self):
        await self._base.init()

    async def step(self):
        pass

    async def destroy(self):
        await self._base.destroy()
        return await super().destroy()

In [None]:
class StoppingFileReader(EventHandlingComponent):
    io = IO(outputs=["review_id", "review_text"])
    base_class = FileReader

    async def step(self):
        await self._base.step()
        for field in self._base.io.outputs:
            setattr(self, field, getattr(self._base, field))
        if self._base.io.is_closed:
            await self.send(StopEvent())


file_reader = StoppingFileReader(
    name="file-reader", path="hotel-reviews.csv", field_names=["review_id", "review_text"]
)

In [None]:
Rating = _t.Literal["positive", "negative", "neutral"]


class HotelReview(BaseModel):
    facilities: Rating
    cleanliness: Rating
    location: Rating
    price: Rating
    staff: Rating


system_prompt = """
You are going to receive hotel reviews. For each one try to identify if the following aspects are positive, negative or neutral:
facilities, cleanliness, location, price, staff.
If the review doesn't mention a particular aspect, respond with 'neutral'.
"""

In [None]:
open_ai_mini = LLMChat(
    name="openai-mini",
    system_prompt=system_prompt,
    llm_kwargs={"model": "gpt-4o-mini"},
    response_model=HotelReview,
)
gemini_mini = LLMChat(
    name="gemini-lite",
    llm="llama_index.llms.gemini.Gemini",
    system_prompt=system_prompt,
    llm_kwargs={"model": "models/gemini-2.0-flash-lite"},
    response_model=HotelReview,
)

In [None]:
class SaveReview(BaseModel):
    """Data to save from review."""

    review_id: str
    review: HotelReview


class RawReview(BaseModel):
    """Raw review data for the expensive LLM."""

    review_id: str
    review_text: str


class SaveReviewEvent(Event):
    """Event to save a review."""

    type: _t.ClassVar[str] = "save_review"
    data: SaveReview


class RequestReviewEvent(Event):
    """Event to request another LLM review."""

    type: _t.ClassVar[str] = "request_review"
    data: RawReview

In [None]:
class CompareLLMs(Component):
    """Checks if two LLMs give the same output."""

    io = IO(
        inputs=["review_id", "review_text", "model1", "model2"],
        output_events=[SaveReviewEvent, RequestReviewEvent],
    )

    async def step(self) -> None:
        if self.model1 == self.model2:
            # Both light models agree, save the review
            response = HotelReview.model_validate_json(self.model1)
            self.io.queue_event(
                SaveReviewEvent(
                    source=self.name, data=SaveReview(review_id=self.review_id, review=response)
                )
            )
        else:
            # Request a review from the expensive model
            self._logger.warning(
                "Models disagree, requesting review from expensive model", review_id=self.review_id
            )
            self.io.queue_event(
                RequestReviewEvent(
                    source=self.name,
                    data=RawReview(review_id=self.review_id, review_text=self.review_text),
                )
            )


compare = CompareLLMs(name="compare")

In [None]:
class LLMChatOnEvent(EventHandlingComponent):
    """Runs an LLM on a review event."""

    io = IO(input_events=[RequestReviewEvent], output_events=[SaveReviewEvent])
    base_class = LLMChat

    @RequestReviewEvent.handler
    async def call_llm(self, event: RequestReviewEvent) -> None:
        self._base.prompt = event.data.review_text
        await self._base.step()
        response = HotelReview.model_validate_json(self._base.response)
        save_event = SaveReviewEvent(
            source=self.name, data=SaveReview(review_id=event.data.review_id, review=response)
        )
        self.io.queue_event(save_event)


open_ai_expensive = LLMChatOnEvent(
    name="openai-expensive",
    system_prompt=system_prompt,
    llm_kwargs={"model": "gpt-4o"},
    response_model=HotelReview,
)

In [None]:
class CaptureData(Component):
    """This component captures the data."""

    io = IO(inputs=["source", "review"], input_events=[SaveReviewEvent])

    def __init__(self, path: str, **kwargs):
        super().__init__(**kwargs)
        self._path = path
        self._data = []

    async def step(self) -> None:
        pass

    @SaveReviewEvent.handler
    async def car_leaves_wash(self, event: SaveReviewEvent) -> None:
        data = event.data
        self._data.append({"source": event.source, "review": data.model_dump_json()})

    async def destroy(self):
        pd.DataFrame(self._data).to_csv(self._path, index=False)
        return await super().destroy()


file_writer = CaptureData(name="file-writer", path="processed-reviews.csv")

In [None]:
components = [
    file_reader,
    open_ai_mini,
    gemini_mini,
    compare,
    open_ai_expensive,
    file_writer,
]
connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
connectors = [
    connect("file-reader.review_text", "openai-mini.prompt"),
    connect("file-reader.review_text", "gemini-lite.prompt"),
    connect("file-reader.review_id", "compare.review_id"),
    connect("file-reader.review_text", "compare.review_text"),
    connect("openai-mini.response", "compare.model1"),
    connect("gemini-lite.response", "compare.model2"),
]
connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector)
event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
event_connectors = list(event_connector_builder.build(components).values())

In [None]:
process = LocalProcess(
    components=components,
    connectors=connectors + event_connectors,
)
async with process:
    await process.run()