<a href="https://colab.research.google.com/github/shivansh193/LLMProjs/blob/main/chainedThought.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

gemini_long_context_path = kagglehub.competition_download('gemini-long-context')

print('Data source import complete.')


# chainedThought
---
Our project  introduces a sophisticated approach to complex query analysis by combining distributed multi-agent processing with an advanced chain of thought methodology. Our project represents a significant evolution in artificial intelligence query processing, leveraging both parallel agent computation and iterative refinement mechanisms.

At its core, the system employs a hierarchical structure where a primary Large Language Model (Gemini) serves as both coordinator and initial processor. When a query is received, instead of generating a direct, potentially unsatisfactory response, the system initiates two parallel processes: domain decomposition and chain of thought analysis.

The domain decomposition process dynamically instantiates specialized sub-agents, each focused on specific aspects of the query such as economic, geopolitical, or social dimensions. These agents operate concurrently, providing domain-specific insights that contribute to the initial response. Simultaneously, the chain of thought mechanism begins its iterative refinement cycle.

This refinement process is sophisticated and methodical. The initial response undergoes a systematic criticism phase, where potential weaknesses, biases, or gaps in reasoning are identified. These criticisms then trigger a defense mechanism, where the system generates counterarguments and improvements. This dialectical process is governed by a fitness function that evaluates the quality, comprehensiveness, and logical coherence of each iteration.

The system's true innovation lies in its synthesis phase. The Final Compilation module integrates the parallel insights from specialized agents with the refined outcomes of the chain of thought process. This integration ensures that the final response not only benefits from multiple domain expertise but also withstands rigorous logical scrutiny.

The result is a comprehensive analysis system that combines the breadth of multi-agent processing with the depth of iterative reasoning. This dual approach ensures that complex queries receive responses that are both extensively researched across domains and thoroughly refined through logical examination.

Through this architecture, our project addresses the limitations of both simple query-response systems and single-agent processors, providing a framework that can handle complex, nuanced questions with the depth and breadth they require. The system's adaptability and scalability make it particularly suitable for applications ranging from academic research to strategic business analysis.

*Note- This solution truly takes advantage of Gemini's Long Context Window, as this would not have worked nearly as efficiently on a RAG alternative or other methods.*

---

![logic diagram](https://i.ibb.co/J37ZF7c/image.png)

A Logic Diagram of the functioning of our solution.

## Setting up all imports for our submission

Here, we set up all our important **imports** and **configure logging functions**, using a certain format helping us with debugging

In [None]:
import asyncio
import json
import time
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from itertools import cycle
from typing import List, Dict, Optional, Tuple

import google.generativeai as genai

from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
key1 = user_secrets.get_secret("key1")
key2 = user_secrets.get_secret("key2")
key3 = user_secrets.get_secret("key3")


# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

## Setting up classes to be used across the project.

Here we set up major classes to be used across the code.
The first class APIKeyManager helps us with sharding of keys, to reduce the amount of time and credits being spent for one single domain/iterations.
Next we use AIModel abstract class and its abstract methods to generate and identify domains

Next GeminiAPI is another class which sends in the initial prompt, manages the temperature of the response, generates the response, parses the Domains, and the questions array from the initial response.

Then we analyse the domain through the domain analysis class, which has all the necessary values we need for our analysis.

In [None]:
@dataclass
class APIKeyManager:
    """Manages rotation of API keys to prevent rate limiting."""
    api_keys: List[str]
    requests_per_key: int = 6

    def __post_init__(self):
        if not self.api_keys:
            raise ValueError("At least one API key must be provided")
        self.current_request_count = 0
        self.key_cycle = cycle(self.api_keys)
        self.current_key = next(self.key_cycle)
        self._lock = asyncio.Lock()  # Add lock for thread safety

    def get_current_key(self) -> str:
        return self.current_key

    async def next_key(self) -> str:
        """Rotate to next API key if request limit reached."""
        async with self._lock:  # Ensure thread-safe key rotation
            self.current_request_count += 1

            if self.current_request_count >= self.requests_per_key:
                self.current_key = next(self.key_cycle)
                self.current_request_count = 0
                logger.info(f"Switching to new API key: {self.current_key[:10]}...")

            return self.current_key

class AIModel(ABC):
    """Abstract base class for AI model implementations."""

    @abstractmethod
    async def generate_response(self, prompt: str, temperature: float = 0.3) -> str:
        pass

    @abstractmethod
    async def identify_domains(self, question: str) -> Dict[str, List[str]]:
        pass

class GeminiAPI(AIModel):

    def __init__(self, api_keys: List[str]):
        self.key_manager = APIKeyManager(api_keys)

    async def generate_response(self, prompt: str, temperature: float = 0.3) -> str:
        api_key = await self.key_manager.next_key()  # Make this async

        try:
            genai.configure(api_key=api_key)
            model = genai.GenerativeModel('gemini-1.5-flash')
            start = time.time()
            response = model.generate_content(prompt, generation_config={"temperature": temperature})
            end = time.time()
            logger.info(f"Generated response in {end - start:.2f} seconds")
            return response.text
        except Exception as e:
            logger.error(f"Error generating response with key {api_key[:5]}...: {e}")
            raise

    async def identify_domains(self, question: str) -> Dict[str, List[str]]:
        """Identify relevant domains and generate specific questions for analysis."""
        prompt = f"""
        Analyze this question and identify the relevant domains that would affect the answer:
        "{question}"

        Provide your response in this EXACT JSON format, with NO additional text:
        {{
            "domains": ["domain1", "domain2"],
            "questions": ["specific question 1", "specific question 2"]
        }}

        Requirements:
        1. Include 3-5 relevant domains
        2. Each domain must be a single word
        3. Each question must specifically relate to its domain
        4. Questions should be analytical and specific
        """

        try:
            response = await self.generate_response(prompt, temperature=0.3)

            # Clean the response - remove any non-JSON content
            response = response.strip()
            if not response.startswith('{'):
                response = response[response.find('{'):]
            if not response.endswith('}'):
                response = response[:response.rfind('}')+1]

            parsed = json.loads(response)

            # Validate the response structure
            if not isinstance(parsed, dict) or 'domains' not in parsed or 'questions' not in parsed:
                raise ValueError("Invalid response structure")
            if len(parsed['domains']) != len(parsed['questions']):
                raise ValueError("Mismatch between domains and questions count")

            return parsed
        except (json.JSONDecodeError, ValueError) as e:
            logger.error(f"Error parsing domains: {e}, Response: {response}")
            raise  # Re-raise to handle in the coordinator
@dataclass
class DomainAnalysis:
    """Stores analysis results for a specific domain."""
    domain: str
    question: str
    current_response: Optional[str] = None
    confidence_score: float = 0.0
    iteration_count: int = 0

## ![](http://)Setting up prompts and fitness function
Next we set up our prompts and fitness functions.
Domain Agent class defines the domain it is running for, the model, the max iterations, and the target confidence before the model has to stop, whichever happens first.

Then we calculate the temperature of the response through a basic function

Then, we analyse our iterations by arguing with itself. Through extensive defense and rebuttals like conversation enabling and making the best use of Long context.

we get initial response we highlight weaknesses, oversights, etc. and use that criticism to improve the response. And using the following improve response functions to further improve upon these responses.

In [None]:
class DomainAgent:
    def __init__(self, domain: str, question: str, ai_model: AIModel):
        self.domain = domain
        self.question = question
        self.ai_model = ai_model
        self.analysis = DomainAnalysis(domain=domain, question=question)
        self.MAX_ITERATIONS = 8
        self.TARGET_CONFIDENCE = 80
        self._iteration_lock = asyncio.Lock()

    def calculate_temperature(self, iteration: int) -> float:
        """Calculate temperature based on iteration number."""
        base_temp = 0.3
        temp_range = 0.7
        temp_step = temp_range / (self.MAX_ITERATIONS - 1)
        return min(1.0, base_temp + (temp_step * iteration))

    async def analyze_iteration(self, iteration: int) -> None:
        """Execute a single iteration of analysis."""
        if iteration >= self.MAX_ITERATIONS or self.analysis.confidence_score >= self.TARGET_CONFIDENCE:
            return

        async with self._iteration_lock:  # Ensure sequential iterations
            current_temp = self.calculate_temperature(iteration)
            logger.info(f"Domain: {self.domain} - Iteration {iteration} - Temperature: {current_temp:.2f}")

            try:
                # For first iteration, get initial response
                if iteration == 0:
                    self.analysis.current_response = await self._get_initial_response(current_temp)
                    self.analysis.iteration_count = 1
                    logger.info(f"Initial response for {self.domain}: {self.analysis.current_response[:100]}...")
                    return

                # For subsequent iterations
                if self.analysis.current_response:
                    criticisms = await self._get_criticisms(current_temp)
                    defense = await self._defend_against_criticisms(criticisms, current_temp)
                    new_response = await self._improve_response(defense, current_temp)
                    confidence = await self._calculate_confidence(new_response, criticisms, defense)

                    self.analysis.current_response = new_response
                    self.analysis.confidence_score = confidence
                    self.analysis.iteration_count = iteration + 1

                    logger.info(f"Domain: {self.domain} - Iteration {iteration} - "
                              f"Confidence: {self.analysis.confidence_score}%")

            except Exception as e:
                logger.error(f"Error in analysis iteration {iteration} for domain {self.domain}: {e}")
                raise

    async def _get_initial_response(self, temperature: float) -> str:
        prompt = f"""Analyze the following question from the perspective of {self.domain}:
        Question: {self.question}
        Provide a detailed and CONCISE analysis and prediction."""
        return await self.ai_model.generate_response(prompt, temperature)

    async def _get_criticisms(self, temperature: float) -> List[str]:
        prompt = f"""Review the CORE ARGUMENTS of this analysis critically:
        Analysis: {self.analysis.current_response}
        List the main weaknesses, oversights, or alternative viewpoints.  Be concise and target only the most important flaws.
        Make sure to keep criticisms in 100-300 characters max."""
        criticism_response = await self.ai_model.generate_response(prompt, temperature)
        return self._parse_criticisms(criticism_response)

    def _parse_criticisms(self, criticism_response: str) -> List[str]:
        criticisms = [c.strip() for c in criticism_response.split('\n') if c.strip()]
        return criticisms

    async def _defend_against_criticisms(self, criticisms: List[str], temperature: float) -> Dict[str, str]:
        defenses = {}
        for criticism in criticisms:
            prompt = f"""Is this a valid criticism of the CORE ARGUMENT? Reply with "YES" or "NO", followed by a concise justification.
            Analysis: {self.analysis.current_response}
            Criticism: {criticism}
            Make sure to keep justifications in 100-300 characters max."""
            defense = await self.ai_model.generate_response(prompt, temperature)
            defenses[criticism] = defense
        return defenses

    async def _improve_response(self, defenses: Dict[str, str], temperature: float) -> str:
        valid_criticisms = [crit for crit, defense in defenses.items() if defense.lower().startswith("yes")]

        if not valid_criticisms:
            return self.analysis.current_response

        try:
            prompt = f"""Improve the CORE ARGUMENTS of this analysis based on the VALID criticisms and your rebuttals:
            Current analysis: {self.analysis.current_response}
            Valid criticisms and Rebuttals: { {crit: defenses[crit] for crit in valid_criticisms} }
            Provide only core changes to the initial analysis that addresses these criticisms concisely."""

            improved_response = await self.ai_model.generate_response(prompt, temperature)
            return improved_response
        except Exception as e:
            logger.error(f"Error in _improve_response: {e}")
            return self.analysis.current_response

    async def _calculate_confidence(self, response: str, criticisms: List[str], defenses: Dict[str, str]) -> float:
        prompt = f"""Given the original criticisms and the rebuttals, what is the confidence level (0-100) of the improved analysis?
        Improved Analysis: {response}
        Criticisms and Rebuttals: {defenses}
        Consider:
        1. How well the CORE ARGUMENTS address the valid criticisms.
        2. The overall strength and conciseness of the analysis.
        Return only a number."""

        try:
            confidence_str = await self.ai_model.generate_response(prompt)
            return float(confidence_str.strip())
        except (ValueError, TypeError):
            logger.warning("Could not parse confidence score, defaulting to 0.0")
            return 0.0

## [](http://)Compiling final response
Then we come down to compiling the final response, which is done by the class multiagent coordinator, it compiles all these responses by the main agent, after considering all the arguments and returns the response after deep analysis of all the required arguments.

In [None]:
class MultiAgentCoordinator:
    def __init__(self, ai_model: AIModel):
        self.ai_model = ai_model

    async def analyze_question(self, question: str) -> Tuple[Dict[str, DomainAnalysis], str]:
        """Analyze a question using multiple domain agents and synthesize results."""
        try:
            # Identify domains and questions
            domain_info = await self.ai_model.identify_domains(question)

            if not domain_info or 'domains' not in domain_info:
                raise ValueError("Failed to identify domains properly")

            domains = domain_info['domains']
            questions = domain_info['questions']

            logger.info(f"Identified domains: {domains}")

            # Create agents for each domain
            agents = {
                domain: DomainAgent(domain, question, self.ai_model)
                for domain, question in zip(domains, questions)
            }

            # Track results and active agents
            results: Dict[str, DomainAnalysis] = {}
            active_agents = agents.copy()

            # Run iterations across all domains concurrently
            for iteration in range(max(agent.MAX_ITERATIONS for agent in agents.values())):
                logger.info(f"Starting iteration {iteration} across all domains")

                # Run one iteration for all active agents concurrently
                await asyncio.gather(
                    *(agent.analyze_iteration(iteration) for agent in active_agents.values())
                )

                # Check for completed domains
                completed_domains = set()
                for domain, agent in active_agents.items():
                    if (agent.analysis.confidence_score >= agent.TARGET_CONFIDENCE or
                        iteration >= agent.MAX_ITERATIONS - 1):
                        results[domain] = agent.analysis
                        completed_domains.add(domain)

                # Remove completed domains
                for domain in completed_domains:
                    active_agents.pop(domain, None)

                # If all domains complete, break
                if not active_agents:
                    break

            # Add any remaining active agents to results
            for domain, agent in active_agents.items():
                results[domain] = agent.analysis

            if not results:
                raise ValueError("No valid analyses were produced")

            # Synthesize final response
            final_response = await self._synthesize_final_response(results)

            return results, final_response

        except Exception as e:
            logger.error(f"Error in question analysis: {e}")
            raise

    async def _synthesize_final_response(self, domain_analyses: Dict[str, DomainAnalysis]) -> str:
        """Synthesize domain-specific analyses into a final response."""
        analyses_text = "\n\n".join([
            f"Domain: {domain}\n"
            f"Confidence: {analysis.confidence_score}%\n"
            f"Analysis: {analysis.current_response}"
            for domain, analysis in domain_analyses.items()
        ])

        prompt = f"""Synthesize these domain-specific analyses into a comprehensive final response:
        {analyses_text}

        Provide a concise, well-reasoned final prediction that integrates all domain perspectives.
        Include:
        1. Key agreements across domains
        2. Critical interactions between factors
        3. Overall confidence assessment
        4. Major uncertainties

        Keep the response focused and actionable."""
        #         5. Important values and figures with interpretations.
        # 6. Sources, if credible.

        return await self.ai_model.generate_response(prompt)
async def analyze_question(api_keys: List[str], question: str) -> Tuple[Dict[str, DomainAnalysis], str]:
    """Main entry point for question analysis."""
    ai_model = GeminiAPI(api_keys)
    coordinator = MultiAgentCoordinator(ai_model)
    return await coordinator.analyze_question(question)

## An example cell using our code
Here we try our code out with an example question, "What are the predicted prices of coffee in 2027?". The output gives us analyses spanning four different domains along with a comprehensive well rounded final analysis.

In [None]:
# Example usage
if __name__ == "__main__":
    # Example with multiple API keys
    api_keys=[key1,key2,key3]  # Replace with actual API keys

    question = "What are the predicted prices of coffee in 2027?"

    async def main():
        try:
            analyses, final_response = await analyze_question(api_keys, question)

            # Print results
            for domain, analysis in analyses.items():
                print(f"\n{'-'*50}")
                print(f"Domain: {domain}")
                print(f"Confidence: {analysis.confidence_score}%")
                print(f"Analysis: {analysis.current_response}")

            print(f"\n{'-'*50}\nFinal Analysis:")
            print(final_response)

        except Exception as e:
            logger.error(f"Analysis failed: {e}")
            raise

    await main()