Skip to content

monch1962/consensus-ai

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ConsensusAI

A multi-LLM agent system that queries multiple AI providers simultaneously, evaluates response quality, and detects consensus across responses.

Overview

ConsensusAI provides a framework for:

  • Multi-Provider Queries: Query multiple LLM providers (OpenAI, Ollama, DeepSeek, Gemini, Claude) in parallel
  • Quality Assessment: Evaluate responses across five quality dimensions (relevance, coherence, completeness, accuracy, creativity)
  • Consensus Detection: Analyze agreement and disagreement across provider responses
  • Structured Output: Use the instructor library for type-safe, structured responses
  • Conversation Tracking: Maintain context across multi-turn conversations with trend analysis
  • Batch Processing: Extract lists from LLM responses and process each item in parallel with automatic collation

Features

Quality Scoring

Each response is scored on five dimensions:

Dimension Weight Description
Relevance 25% How well the response addresses the query
Coherence 20% Logical flow and structure
Completeness 20% Thoroughness of coverage
Accuracy 25% Factual correctness (estimated)
Creativity 10% Novel insights and originality

Consensus Analysis

  • Calculates semantic similarity between responses using Jaccard similarity
  • Identifies points of agreement and disagreement
  • Tracks consensus levels and trends over time
  • Generates recommendations based on consensus strength

Multi-Turn Conversations

  • Maintains conversation context across turns
  • Tracks consensus evolution over time
  • Provides performance analytics by provider

Installation

From Source

git clone https://github.com/yourusername/consensusai.git
cd consensusai
pip install -e .

With Optional Dependencies

# Install with CLI support
pip install -e ".[cli]"

# Install with development tools
pip install -e ".[dev]"

Configuration

Set up your API keys as environment variables:

# Required
export OPENAI_API_KEY="your-openai-api-key"

# Optional
export DEEPSEEK_API_KEY="your-deepseek-api-key"
export GEMINI_API_KEY="your-gemini-api-key"
export CLAUDE_API_KEY="your-claude-api-key"

# Optional: Custom model names
export OPENAI_MODEL="gpt-4-turbo-preview"
export DEEPSEEK_MODEL="deepseek-chat"

# Optional: Custom base URLs (for proxy/alternative endpoints)
export OPENAI_BASE_URL="https://api.openai.com/v1"

# Ollama (local/self-hosted models)
# No API key required - Ollama runs locally
export OLLAMA_BASE_URL="http://localhost:11434/v1"
export OLLAMA_MODEL="llama3.2"  # or mistral, codellama, phi3, etc.

Using Ollama (Local & Self-Hosted Models)

Ollama allows you to run open-source LLMs locally. ConsensusAI has full support for Ollama-hosted models.

Installation

  1. Install Ollama:
# Linux/macOS
curl -fsSL https://ollama.com/install.sh | sh

# Or download from https://ollama.com for Windows
  1. Start the Ollama server:
ollama serve
  1. Pull a model:
ollama pull llama3.2
# Or try: mistral, codellama, phi3, gemma2, qwen2.5

Basic Usage

from consensusai import EnhancedMultiProviderAgent, LLMProvider, StructuredResponse

# Use Ollama (no API key required)
api_keys = {LLMProvider.OLLAMA: "ollama"}  # API key is a placeholder

agent = EnhancedMultiProviderAgent(api_keys)

results = await agent.get_enhanced_responses(
    prompt="Explain quantum computing in simple terms",
    response_model=StructuredResponse,
)

Remote Ollama Server

To use an Ollama server on a remote machine or custom host/port:

from consensusai.clients import InstructorLLMClient

# Create client with custom base URL
client = InstructorLLMClient(
    provider=LLMProvider.OLLAMA,
    api_key="ollama",
    base_url="http://192.168.1.100:11434/v1",
    model="llama3.2",
)

Or using environment variables:

export OLLAMA_BASE_URL="http://192.168.1.100:11434/v1"
export OLLAMA_MODEL="mistral"

Model Discovery

Discover and manage Ollama models:

from consensusai.ollama import OllamaClient

client = OllamaClient()

# Check if server is healthy
if client.is_healthy():
    # List available models
    models = client.list_models()
    for model in models:
        print(f"{model.name}: {model.size / 1e9:.1f}GB")

    # Get recommended models
    recommended = client.get_recommended_models()
    print("Recommended models:", recommended)

    # Get default model
    default_model = client.get_default_model()
    print(f"Default: {default_model}")

Using Multiple Ollama Models

You can use multiple Ollama models simultaneously:

from consensusai.clients import InstructorLLMClient
from consensusai.models import LLMProvider, StructuredResponse

# Create clients for different models
llama_client = InstructorLLMClient(
    provider=LLMProvider.OLLAMA,
    model="llama3.2",
    base_url="http://localhost:11434/v1",
)

mistral_client = InstructorLLMClient(
    provider=LLMProvider.OLLAMA,
    model="mistral",
    base_url="http://localhost:11434/v1",
)

# Query both models
prompt = "What are the benefits of local LLMs?"

llama_response = await llama_client.get_structured_response(prompt, StructuredResponse)
mistral_response = await mistral_client.get_structured_response(prompt, StructuredResponse)

print(f"Llama3.2: {llama_response.answer[:100]}...")
print(f"Mistral: {mistral_response.answer[:100]}...")

Recommended Ollama Models

Model Size Best For
llama3.2 ~4GB General purpose, latest Llama
mistral ~4GB Balanced performance
codellama ~4GB Code-related tasks
phi3 ~2GB Small but capable
gemma2 ~2GB Google's efficient model
qwen2.5 ~5GB Strong reasoning

Server Discovery

Discover Ollama servers on your network:

from consensusai.ollama import discover_ollama_servers

# Discover servers on common local addresses
servers = discover_ollama_servers(
    hosts=["localhost", "127.0.0.1", "192.168.1.100"],
    port=11434,
)

for server in servers:
    print(f"Found: {server.host}:{server.port}")
    print(f"  URL: {server.base_url}")
    print(f"  Healthy: {server.is_healthy}")

Quick Start

Python API

import asyncio
import os
from consensusai import EnhancedMultiProviderAgent, LLMProvider, StructuredResponse

async def main():
    # Configure API keys
    api_keys = {
        LLMProvider.OPENAI: os.getenv("OPENAI_API_KEY"),
    }

    # Initialize the agent
    agent = EnhancedMultiProviderAgent(api_keys)

    # Query with a single prompt
    results = await agent.get_enhanced_responses(
        prompt="What are the main challenges of renewable energy adoption?",
        response_model=StructuredResponse,
    )

    # Access results
    consensus = results["consensus_analysis"]
    print(f"Consensus Level: {consensus['consensus_level']:.2f}")
    print(f"Average Confidence: {consensus['average_confidence']:.2f}")

    for provider, data in results["responses"].items():
        print(f"{provider}: Quality Score = {data['overall_score']:.2f}/10")

asyncio.run(main())

Command Line Interface

# Single query
consensusai query -p "What are the main challenges of renewable energy adoption?"

# With output file
consensusai query -p "Your prompt here" -o results.json

# Multi-turn conversation
consensusai converse -p "Initial question" \
  -f "Follow-up 1" \
  -f "Follow-up 2" \
  -o conversation.json

# Use specific providers only
consensusai query -p "Your prompt" --provider openai --provider deepseek

# Verbose logging
consensusai query -p "Your prompt" --verbose

Usage Examples

Single Query

from consensusai import EnhancedMultiProviderAgent, LLMProvider, StructuredResponse

agent = EnhancedMultiProviderAgent({LLMProvider.OPENAI: "your-api-key"})

results = await agent.get_enhanced_responses(
    prompt="Explain quantum computing in simple terms.",
    response_model=StructuredResponse,
    temperature=0.7,
)

Multi-Turn Conversation

from consensusai import EnhancedMultiProviderAgent, LLMProvider, StructuredResponse

agent = EnhancedMultiProviderAgent({LLMProvider.OPENAI: "your-api-key"})

results = await agent.run_multi_turn_conversation(
    initial_prompt="What are the key factors for SaaS success?",
    follow_up_prompts=[
        "What about pricing strategies?",
        "How do we measure success?",
    ],
    response_model=StructuredResponse,
)

# Access conversation analysis
analysis = results["final_analysis"]
print(f"Consensus trend: {analysis['consensus_trend']}")
print(f"Best provider: {analysis['best_provider']}")

Custom Response Models

from pydantic import BaseModel, Field

class CustomResponse(BaseModel):
    """Custom response model for specific use cases."""
    summary: str = Field(description="Brief summary")
    pros: list[str] = Field(description="List of advantages")
    cons: list[str] = Field(description="List of disadvantages")
    verdict: str = Field(description="Final verdict")

results = await agent.get_enhanced_responses(
    prompt="Compare React vs Vue for frontend development",
    response_model=CustomResponse,
)

Generate Report

# After running queries, generate a comprehensive report
report = agent.generate_report()

print(f"Total conversations: {report['summary']['total_conversations']}")
print(f"Provider performance: {report['provider_performance']}")
print(f"Consensus trends: {report['consensus_trends']}")

Batch Processing

Extract a list from an LLM and process each item with parallel queries:

from consensusai.agents.batch import BatchProcessingAgent

agent = BatchProcessingAgent(
    api_keys={LLMProvider.OPENAI: "your-api-key"},
    batch_size=3,  # Process 3 items at a time
    delay_between_batches=1.0,  # Seconds between batches
)

result = await agent.process_list(
    prompt="List the top 5 competitors in the cloud infrastructure market",
    item_prompt_template=(
        "Analyze {item} focusing on:\n"
        "1. Key strengths and advantages\n"
        "2. Main weaknesses or limitations\n"
        "3. Target market and positioning"
    ),
    batch_size=3,
    max_items=5,
    collation_mode="detailed",  # "detailed", "summary", or "table"
)

# Access collated results
print(f"Processed: {result.summary['successful']}/{result.summary['total_items']}")
print(f"Duration: {result.total_duration_seconds:.1f}s")

# Quality scores by item
for item, score in result.collated_output["quality_by_item"].items():
    print(f"{item}: {score:.2f}/10")

Use cases for batch processing:

  • Competitor analysis across multiple companies
  • Technology assessment for emerging trends
  • Risk evaluation for multiple scenarios
  • Feature prioritization for product development
  • SWOT analysis across multiple entities

Streaming Responses

Stream responses in real-time as they arrive:

from consensusai.agents.streaming import StreamingAgent

agent = StreamingAgent(api_keys)

async for chunk in agent.stream_responses("Explain quantum computing"):
    if chunk["type"] == "delta":
        print(f"{chunk['provider']}: {chunk['delta']}")
    elif chunk["type"] == "consensus_update":
        print(f"Consensus: {chunk['consensus_level']}")
    elif chunk["type"] == "complete":
        print(f"{chunk['provider']} complete!")

Debate Mode

Run structured debates between providers:

from consensusai.agents.debate import DebateAgent

agent = DebateAgent(api_keys)

result = await agent.run_debate(
    topic="Should AI development be regulated?",
    rounds=3,
)

print(f"Winner: {result.winner}")
print(f"Final consensus: {result.final_consensus:.2f}")
print(f"Synthesis: {result.final_synthesis}")

Result Filtering & Ranking

Filter and rank responses by custom criteria:

from consensusai.agents.filtering import ResultFilterer, FilterCriteria, RankingCriteria

filterer = ResultFilterer()

# Filter by quality
filtered = filterer.filter_by_quality(
    results,
    min_score=7.0,
    providers={"openai", "claude"}
)

# Rank by accuracy focus
ranked = filterer.rank_by_weights(
    results,
    weights={"accuracy": 0.5, "relevance": 0.3, "creativity": 0.2}
)

# Get best response
best_provider, best_response = filterer.get_best_response(results)

Response Caching

Enable caching to avoid duplicate API calls:

from consensusai.cache import ResponseCache

cache = ResponseCache(ttl=3600)  # 1 hour cache

# Check cache before making API call
key = cache.get_key("Your prompt", model="gpt-4")
if cached := cache.get(key):
    return cached

# Make API call and cache result
result = await agent.get_enhanced_responses(prompt)
cache.set(key, result)

Persistent Storage

Store conversation history with SQLite:

from consensusai.storage import ConversationStore

store = ConversationStore("conversations.db")
store.initialize()

# Save a conversation
store.save_conversation(
    conversation_id="conv_123",
    prompt="What is AI?",
    results=results,
)

# Load conversation
data = store.load_conversation("conv_123")

# Get cost breakdown
total_cost = store.get_total_cost()
total_tokens = store.get_total_tokens()

Embedding-Based Consensus

Use semantic embeddings for more accurate consensus detection:

from consensusai.detectors.embedding import SemanticConsensusDetector

detector = SemanticConsensusDetector(
    embedding_model="text-embedding-3-small",
    cache_embeddings=True,
)

# Use with agent
agent.consensus_detector = detector
results = await agent.get_enhanced_responses(prompt)

Project Structure

consensusai/
├── src/
│   └── consensusai/
│       ├── __init__.py         # Package exports
│       ├── cli.py              # Command-line interface
│       ├── agents/             # Agent orchestration
│       │   ├── __init__.py     # Base multi-provider agent
│       │   ├── batch.py        # Batch/list processing
│       │   ├── streaming.py    # Streaming support
│       │   ├── debate.py       # Debate mode
│       │   └── filtering.py    # Result filtering & ranking
│       ├── cache/              # Response caching
│       │   └── __init__.py
│       ├── clients/            # LLM provider clients
│       ├── detectors/          # Consensus detection
│       │   └── embedding.py    # Embedding-based consensus
│       ├── models/             # Pydantic data models
│       ├── scorers/            # Quality scoring
│       ├── storage/            # Persistent storage (SQLite)
│       │   └── __init__.py
│       └── utils/              # Logging, rate limiting, config
│           └── rate_limit.py   # Rate limiting & cost tracking
├── examples/                   # Usage examples
│   ├── basic_usage.py
│   ├── multi_turn.py
│   ├── multi_turn_conversation.py
│   └── batch_processing.py
├── tests/                      # Unit tests
├── pyproject.toml             # Project configuration
└── README.md                  # This file

API Reference

Models

LLMProvider

Enum of supported LLM providers:

  • LLMProvider.OPENAI
  • LLMProvider.DEEPSEEK
  • LLMProvider.GEMINI
  • LLMProvider.CLAUDE

QualityMetrics

class QualityMetrics(BaseModel):
    relevance: float      # 0-10, relevance to query
    coherence: float      # 0-10, logical coherence
    completeness: float   # 0-10, thoroughness
    accuracy: float       # 0-10, factual accuracy
    creativity: float     # 0-10, creative insight

StructuredResponse

class StructuredResponse(BaseModel):
    answer: str
    key_points: list[str]
    confidence: float     # 0-1
    quality: QualityMetrics
    assumptions: list[str]
    limitations: list[str]

ConsensusAnalysis

class ConsensusAnalysis(BaseModel):
    consensus_level: float           # 0-1
    agreement_points: list[str]
    disagreement_points: list[str]
    most_confident_provider: str
    average_confidence: float
    recommendations: list[str]

Classes

EnhancedMultiProviderAgent

Main orchestration class.

def __init__(self, api_keys: Dict[LLMProvider, str]) -> None:
    """Initialize with API keys for each provider."""

async def get_enhanced_responses(
    self,
    prompt: str,
    response_model: type = StructuredResponse,
    use_structured: bool = True,
    temperature: float = 0.7,
) -> Dict[str, Any]:
    """Query all providers and return enhanced results."""

async def run_multi_turn_conversation(
    self,
    initial_prompt: str,
    follow_up_prompts: list[str],
    response_model: type = StructuredResponse,
) -> Dict[str, Any]:
    """Run multi-turn conversation with context."""

def generate_report(self) -> Dict[str, Any]:
    """Generate comprehensive report of all conversations."""

Development

Running Tests

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run with coverage
pytest --cov=consensusai

Code Quality

# Format code
black src/ tests/

# Lint code
ruff check src/ tests/

# Type check
mypy src/

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Roadmap

Completed Features ✅

  • Batch processing for list-based workflows
  • Embedding-based semantic similarity for consensus detection
  • Response caching with configurable TTL (in-memory and file-based)
  • Rate limiting and cost tracking per provider
  • Streaming responses for real-time consensus
  • Persistent storage (SQLite) for conversation history
  • Debate mode with argument tracking and synthesis
  • Result filtering and custom ranking strategies

Planned Features 📋

  • Web interface for visualization and analysis
  • PostgreSQL support for persistent storage
  • Fact-checking integration for accuracy scoring
  • Advanced prompt templates and chaining
  • Multi-modal support (images, audio, video)
  • Provider failover and automatic retry
  • A/B testing framework for prompts
  • Export results to various formats (PDF, CSV, Excel)

Acknowledgments

Built with:

About

No description or website provided.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages