In [None]:
!pip install transformers torch arxiv sentence-transformers

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import arxiv
import re
from typing import Dict, List, Optional
from enum import Enum
import warnings
warnings.filterwarnings('ignore')

In [None]:
class AgentType(Enum):
    ANALYSIS = "analysis_agent"
    SEARCH = "search_agent"
    GENERATION = "generation_agent"
    INTERACTIVE = "interactive_agent"

In [None]:
class HandoffDecision:
    """Structure"""
    def __init__(self, target_agent: AgentType, reason: str, data: Dict):
        self.target_agent = target_agent
        self.reason = reason
        self.data = data

In [None]:
class BaseAgent:
    """A base class with a handover tool"""

    def __init__(self, agent_type: AgentType):
        self.agent_type = agent_type
        self.handoff_registry = {}

    def register_handoff(self, agent_type: AgentType, agent_instance):
        """Registers another agent for handoff"""
        self.handoff_registry[agent_type] = agent_instance
        print(f"\n {self.agent_type.value} → Registered handoff to {agent_type.value}")

    def handoff_to(self, target: AgentType, data: Dict, reason: str = "") -> Dict:
        """Handover tool - transfers work to another agent"""
        print(f"\nHandoff: {self.agent_type.value} → {target.value}")
        print(f"Reason: {reason}")

        # checking if the agent is registered(error handling)
        if target not in self.handoff_registry:
            print(f"Error: Agent {target.value} isn't registered")
            return {
                "status": "error",
                "message": f"Agent {target.value} isn't registered",
                "agent": self.agent_type.value
            }

        try:
            target_agent = self.handoff_registry[target]
            return target_agent.receive_handoff(self.agent_type, data)
        except Exception as e:
            print(f"Error during handoff: {str(e)}")
            return {
                "status": "error",
                "message": f"Error during handoff: {str(e)}",
                "agent": self.agent_type.value
            }

    def receive_handoff(self, from_agent: AgentType, data: Dict) -> Dict:
        """Receiving a handover from another agent"""
        print(f"\n {self.agent_type.value} receives work from {from_agent.value}")
        return {"status": "received", "data": data}

    def decide_next_agent(self, current_data: Dict) -> Optional[HandoffDecision]:
        """Decides whether to handover and to whom"""
        return None

In [None]:
#Agent 1 - input analysis
class RequestAnalysisAgent(BaseAgent):
    def __init__(self):
        super().__init__(AgentType.ANALYSIS)
        print("Loading analysis agent")

        try:
            model_name = "Qwen/Qwen2.5-0.5B-Instruct"
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto"
            )
            print("Model loaded")
        except Exception as e:
            self.model = None
            print(f"Model isn't available: {str(e)}")
            print("Using a rule-based approach")

    def analyze_and_handoff(self, user_input: str) -> Dict:
        """Main method - analyzing and automatically switching to search"""
        print(f"\n")
        print(f"Analysis agent is processing the request")

        #input validation(error handling)
        if not user_input or not user_input.strip():
            print("Error: The entry is empty")
            return {
                "status": "error",
                "message": "Entry cannot be empty",
                "agent": self.agent_type.value
            }

        try:
            analysis_result = self._analyze(user_input)


            if analysis_result.get("status") == "error":
                return analysis_result

            next_step = self.decide_next_agent(analysis_result)

            if next_step:
                return self.handoff_to(
                    next_step.target_agent,
                    next_step.data,
                    next_step.reason
                )

            return analysis_result

        except Exception as e:
            print(f"Unexpected error during analysis: {str(e)}")
            return {
                "status": "error",
                "message": "Request parsing error",
                "agent": self.agent_type.value
            }

    def decide_next_agent(self, current_data: Dict) -> Optional[HandoffDecision]:
        """Dynamic decision - where next?"""
        if current_data.get("status") == "success" and current_data.get("keywords"):
            return HandoffDecision(
                target_agent=AgentType.SEARCH,
                reason="Keywords found, literature search required",
                data=current_data
            )
        return None

    def _analyze(self, user_input: str) -> Dict:
        """Internal method for analysis"""

        user_input_clean = user_input.strip()

        if len(user_input_clean) < 2:
            return {
                "status": "error",
                "message": "Query too short. Please provide at least 2 characters.",
                "agent": self.agent_type.value
            }

        words = re.findall(r'\w+', user_input_clean.lower())

        keywords = [w for w in words if len(w) >= 2][:7]

        if not keywords:
            return {
                "status": "error",
                "message": "No valid keywords found. Please provide more specific search terms.",
                "agent": self.agent_type.value
            }

        return {
            "main_topic": user_input_clean[:80] if len(user_input_clean) > 80 else user_input_clean,
            "keywords": keywords,
            "status": "success",
            "agent": self.agent_type.value
        }

In [None]:
#Agent 2 - search literature
class LiteratureSearchAgent(BaseAgent):
    def __init__(self):
        super().__init__(AgentType.SEARCH)
        print("Search agent ready")

    def receive_handoff(self, from_agent: AgentType, data: Dict) -> Dict:
        """Recieves the job and performs the search automatically"""
        print(f"Search agent recieved work from {from_agent.value}")

        try:
            keywords = data.get("keywords", [])

            # no keywords(error handling)
            if not keywords:
                print("No keywords, trying with main_topic")
                main_topic = data.get("main_topic", "")
                if main_topic:
                    keywords = [main_topic]
                else:
                    print("No keywords and no main_topic")
                    #interaction with user
                    return self.handoff_to(
                        AgentType.INTERACTIVE,
                        {**data, "error": "no_keywords"},
                        "No keywords, I'm looking for user help"
                    )

            #searching
            search_result = self._search_arxiv(keywords)

            #search error
            if "error" in search_result:
                error_type = search_result.get("error")
                print(f"Search error: {error_type}")

                #timeout error
                if error_type == "timeout" and len(keywords) > 1:
                    print("I'm trying with fewer keywords...")
                    retry_result = self._search_arxiv(keywords[:2], max_results=3)
                    if retry_result.get("papers_count", 0) > 0:
                        search_result = retry_result
                        print("Retry succeded!")

                #network error
                elif error_type == "network":
                    print("Network error, continuing without results")
                    search_result = {"papers": [], "papers_count": 0}

            #no results
            papers_count = search_result.get("papers_count", 0)
            if papers_count == 0:
                print("No results available")

                #broader search
                if len(keywords) > 2:
                    print("Trying to expand the search...")
                    broader_search = self._search_arxiv(keywords[:1], max_results=10)
                    if broader_search.get("papers_count", 0) > 0:
                        search_result = broader_search
                        print("Broader search succeded")
                    else:

                        return self.handoff_to(
                            AgentType.INTERACTIVE,
                            {**data, "search_results": search_result, "no_results": True},
                            "No results, looking for alternative terms"
                        )

            # combining results
            combined_data = {**data, "search_results": search_result}

            #hadnoff decision
            next_step = self.decide_next_agent(combined_data)

            if next_step:
                return self.handoff_to(
                    next_step.target_agent,
                    next_step.data,
                    next_step.reason
                )

            return combined_data

        except Exception as e:
            print(f"Critical error in the search agent: {str(e)}")
            #fallback
            return self.handoff_to(
                AgentType.GENERATION,
                {**data, "search_results": {"papers": [], "papers_count": 0}},
                "Critical error, continuing with no results"
            )

    def decide_next_agent(self, current_data: Dict) -> Optional[HandoffDecision]:
        """Deciding what to do next after the search"""
        papers_count = current_data.get("search_results", {}).get("papers_count", 0)

        return HandoffDecision(
            target_agent=AgentType.GENERATION,
            reason=f"\n {papers_count} papers found, transfering to writing",
            data=current_data
        )

    def _search_arxiv(self, keywords: List[str], max_results: int = 5) -> Dict:
        """Internal search"""


        if not keywords:
            return {"papers": [], "papers_count": 0, "error": "no_keywords"}


        if not isinstance(keywords, list):
            print(f"Keywords isn't a list, converting: {type(keywords)}")
            if isinstance(keywords, str):
                keywords = [keywords]
            else:
                return {"papers": [], "papers_count": 0, "error": "invalid_type"}

        try:
            query = " ".join(keywords[:3])
            print(f"Searching arXiv: {query}")

            search = arxiv.Search(
                query=query,
                max_results=max_results,
                sort_by=arxiv.SortCriterion.Relevance
            )

            papers = []
            timeout_counter = 0

            for result in search.results():
                timeout_counter += 1
                if timeout_counter > max_results + 5:
                    print("Search timeout")
                    return {"papers": papers, "papers_count": len(papers), "error": "timeout"}

                papers.append({
                    "title": result.title,
                    "authors": [a.name for a in result.authors][:3],
                    "year": result.published.year,
                    "summary": result.summary[:400],
                    "url": result.entry_id
                })

            print(f"\n {len(papers)} papers found")
            return {"papers": papers, "papers_count": len(papers)}

        except TimeoutError:
            print("arXiv API timeout")
            return {"papers": [], "papers_count": 0, "error": "timeout"}

        except ConnectionError:
            print("Network error")
            return {"papers": [], "papers_count": 0, "error": "network"}

        except Exception as e:
            print(f"Search error: {str(e)}")
            return {"papers": [], "papers_count": 0, "error": "unexpected"}

In [None]:
#Agent 3 - text generation
class TextGenerationAgent(BaseAgent):
    def __init__(self):
        super().__init__(AgentType.GENERATION)
        print("Loading Text Generation Agent")

        try:
            model_name = "Qwen/Qwen2.5-0.5B-Instruct"
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto"
            )
            print("Generation agent loaded successfully")
        except Exception as e:
            self.model = None
            print(f"Generation agent failed to load: {e}")

    def receive_handoff(self, from_agent: AgentType, data: Dict) -> Dict:
        """Receives search results and synthesizes them into a report."""
        print(f"Writing agent is synthesizing results from {from_agent.value}...")

        try:
            if "main_topic" not in data:
                return {"status": "error", "message": "Missing main_topic"}


            generated = self._generate_text(data)

            final_data = {**data, "generated_text": generated}
            return self.handoff_to(
                self.decide_next_agent(final_data).target_agent,
                final_data,
                "Synthesis complete."
            )

        except Exception as e:
            print(f"Generating error: {str(e)}")
            return {"status": "error", "message": str(e)}

    def _generate_text(self, data: Dict) -> str:
        """Constructs a prompt with research data and calls the LLM."""
        topic = data.get("main_topic", "Research")
        papers = data.get("search_results", {}).get("papers", [])

        #prepare the context from search results
        context_block = ""
        for i, p in enumerate(papers, 1):
            context_block += f"Paper {i}: {p['title']} ({p['year']})\nSummary: {p['summary']}\n\n"

        prompt = f"""
        You are an expert scientific writer. Write a professional literature review about: {topic}.

        I will provide you with summaries of relevant research papers.
        Use them to write:
        1. An Introduction.
        2. A Synthesized Literature Review (grouping ideas, not just listing papers).
        3. A Conclusion on current trends.

        RESEARCH DATA:
        {context_block if context_block else "No specific papers found. Provide a general overview."}

        FORMATTING:
        - Use Markdown headers (##).
        - Use bold text for key concepts.
        - Cite papers as [Paper 1], [Paper 2], etc.
        """

        if self.model:
            return self._call_llm(prompt)
        return "Model not available. Manual fallback: " + context_block

    def _call_llm(self, prompt: str) -> str:
        messages = [
            {"role": "system", "content": "You are a professional academic writer."},
            {"role": "user", "content": prompt}
        ]
        text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)

        generated_ids = self.model.generate(
            **model_inputs,
            max_new_tokens=1024,
            do_sample=True,
            temperature=0.7
        )

        generated_ids = [
            output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
        ]
        return self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

    def decide_next_agent(self, current_data: Dict) -> HandoffDecision:
        return HandoffDecision(
            target_agent=AgentType.INTERACTIVE,
            reason="Report generated.",
            data=current_data
        )

In [None]:
#Agent 4 - interactive
class InteractiveAgent(BaseAgent):
    def __init__(self):
        super().__init__(AgentType.INTERACTIVE)
        print("Loading Interactive Agent")

        try:
            model_name = "Qwen/Qwen2.5-0.5B-Instruct"
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto"
            )
            print("Interactive agent loaded successfully")
        except Exception as e:
            self.model = None
            print(f"Interactive agent failed to load: {e}")

    def receive_handoff(self, from_agent: AgentType, data: Dict) -> Dict:
        """Receives the final result and presents it to the user."""
        print(f"\n[System]: Interaction phase started.")

        if "error" in data or "no_results" in data:
            self._handle_error_interaction(data)

        if "generated_text" in data:
            print("\n" + "="*50)
            print("GENERATED RESEARCH SUMMARY")
            print("="*50)
            print(data["generated_text"])
            print("="*50 + "\n")

        return {
            **data,
            "status": "awaiting_interaction",
            "message": "I have analyzed the research. You can ask me questions about it or ask for more details."
        }

    def handle_question(self, question: str, current_data: Dict) -> Dict:
        """Uses LLM to answer questions or decide if a new search is needed."""
        print(f"\nAgent is thinking...")

        context = current_data.get("generated_text", "No context available.")

        prompt = f"""
        You are a research assistant. Based on the context below, answer the user's question.

        CONTEXT:
        {context[:1500]}

        USER QUESTION:
        {question}

        INSTRUCTION:
        1. If the user wants 'more' information, 'new' papers, or topics NOT in the context, start your response with the word 'SEARCH_REQUIRED'.
        2. Otherwise, answer the question naturally based on the context.
        """

        if self.model:
            response_text = self._call_llm(prompt)
        else:
            response_text = "Model not loaded. I cannot process this request."

        if "SEARCH_REQUIRED" in response_text:
            print("LLM decided a new search is necessary.")
            clean_response = response_text.replace("SEARCH_REQUIRED", "").strip()
            print(f"Reason: {clean_response}")

            new_keywords = self._extract_keywords(question)

            return self.handoff_to(
                AgentType.SEARCH,
                {**current_data, "keywords": new_keywords},
                f"User requested more info: {question}"
            )

        print(f"\n[Agent]: {response_text}")
        return {"message": response_text, "status": "clarification"}

    def _call_llm(self, prompt: str) -> str:
        """Helper to generate text from the LLM."""
        messages = [
            {"role": "system", "content": "You are a helpful research assistant."},
            {"role": "user", "content": prompt}
        ]
        text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)

        generated_ids = self.model.generate(
            **model_inputs,
            max_new_tokens=256,
            do_sample=True,
            temperature=0.7
        )

        generated_ids = [
            output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
        ]

        return self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

    def _handle_error_interaction(self, data: Dict):
        """Displays formatted error messages."""
        if "no_results" in data:
            print("\n[!] Warning: No specific papers were found for those keywords.")
        elif data.get("error") == "no_keywords":
            print("\n[!] Error: I couldn't identify specific topics to search for.")

    def _extract_keywords(self, text: str) -> List[str]:
        """Simple keyword extraction for the search handoff."""
        words = re.findall(r'\w+', text.lower())

        stop_words = {'the', 'about', 'more', 'find', 'search', 'research', 'show', 'tell'}
        return [w for w in words if len(w) > 3 and w not in stop_words][:5]

In [None]:
class ResearchOrchestrator:
    """An orchestrator that connects agents"""

    def __init__(self):
        print("Literature review system")
        print("\n")


        #agent creation
        self.analysis_agent = RequestAnalysisAgent()
        self.search_agent = LiteratureSearchAgent()
        self.generation_agent = TextGenerationAgent()
        self.interactive_agent = InteractiveAgent()

        #handoff network
        self._setup_handoffs()

        print("\nThe system is ready\n")

    def _setup_handoffs(self):
        """Handover network configuration"""
        self.analysis_agent.register_handoff(AgentType.SEARCH, self.search_agent)
        self.search_agent.register_handoff(AgentType.GENERATION, self.generation_agent)
        self.search_agent.register_handoff(AgentType.INTERACTIVE, self.interactive_agent)
        self.generation_agent.register_handoff(AgentType.INTERACTIVE, self.interactive_agent)
        self.interactive_agent.register_handoff(AgentType.SEARCH, self.search_agent)

    def process_request(self, user_input: str) -> Dict:
        """Starts a chain of handovers"""
        print("\n")
        print(f"New request: {user_input}")
        print("\n")

        try:
            result = self.analysis_agent.analyze_and_handoff(user_input)

            #check results
            if result.get("status") == "error":
                print(f" The system returned an error.: {result.get('message')}")

            return result

        except Exception as e:
            print(f" Critical error: {str(e)}")
            return {
                "status": "error",
                "message": "Critical system error"
            }

    def ask_followup(self, question: str, previous_result: Dict) -> Dict:
        """Asks a follow-up question."""
        try:
            return self.interactive_agent.handle_question(question, previous_result)
        except Exception as e:
            print(f" Error while processing a question: {str(e)}")
            return {
                "status": "error",
                "message": "Error while processing a question"
            }

In [None]:
try:
    system = ResearchOrchestrator()
except Exception as e:
    print(f"Critical error: The system could not be initialized: {str(e)}")
    exit(1)

print("\n")
print("Interactive mode:")
print("\n")
print("\nHandovers will be shown in real time.")
print("For exit: 'exit'\n")

current_result = None

while True:
    try:
        user_input = input("\nYour input: ").strip()

        if user_input.lower() in ['exit', 'quit']:
            print("\nSystem finished")
            break

        if not user_input:
            continue

        if current_result and any(kw in user_input.lower() for kw in ['expand', ' more']):
            current_result = system.ask_followup(user_input, current_result)
        else:
            current_result = system.process_request(user_input)

        print("\nHandover completed")

    except KeyboardInterrupt:
        print("\n\nInterrupted by user")
        break
    except Exception as e:
        print(f"\n Error: {str(e)}")
        print("Try again.")

print("\nSystem is closing.")