In [None]:
!pip install langgraph
!pip install pydantic
!pip install openai
!pip install langchain
!pip install langchain_openai

In [None]:
import asyncio
import os
from typing import Dict, List, Optional, Any, TypedDict, Annotated
from datetime import date
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
from langgraph.graph import StateGraph, END
import json
from langchain_openai import ChatOpenAI

os.environ["OPENROUTER_API_KEY"] = "your-openrouterkey"
os.environ["OPENROUTER_BASE_URL"] = "https://openrouter.ai/api/v1"

llm = ChatOpenAI(
    model="openai/gpt-4.1-mini",
    openai_api_base=os.getenv("OPENROUTER_BASE_URL"),
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    temperature=0.3
)

TODAY_DATE = date.today().strftime("%Y-%m-%d")

class ResearchTask(BaseModel):
    task_name: str = Field(description="SPECIFIC name for this research task")
    objective: str = Field(description="Clear statement focused on a SPECIFIC aspect of the research")
    search_terms: List[str] = Field(description="5 HIGHLY SPECIFIC search terms")
    output_format: str = Field(description="Specify the required format: table, bullet points, etc.")
    required_data_points: List[str] = Field(description="List 5-7 SPECIFIC data points to collect")
    template: str = Field(description="Provide a detailed template with placeholders")
    date_range: Optional[str] = Field(default=None, description="Optional date range for the research")

class ResearchTaskSet(BaseModel):
    purpose: str = Field(description="Clear, concise statement focused EXACTLY on the user's research request")
    tasks: List[ResearchTask] = Field(description="List of specific research tasks")

class TopicAnalysis(BaseModel):
    research_type: str = Field(description="Classification of research type: company_analysis, market_research, investment_opportunities, general")
    industry: str = Field(default="", description="Industry or sector of focus")
    primary_focus: str = Field(description="Primary focus of the research")
    estimated_entity_count: int = Field(default=5, description="Estimated number of entities to research")
    recommended_task_count: int = Field(default=5, description="Recommended number of research tasks (3-5)")
    time_frame: str = Field(default="current", description="Time frame relevance")
    template_keys: List[str] = Field(description="Most relevant template keys from available templates")

class SearchResult(BaseModel):
    query: str
    result: str
    sources: List[str] = []

class ResearchState(TypedDict):
    user_query: str
    topic_analysis: Optional[Dict[str, Any]]
    research_tasks: List[Dict[str, Any]]
    search_results: List[Dict[str, Any]]
    final_article: str
    status: str

TEMPLATES = {
    "investment_thesis": """
    COMPANY INVESTMENT PROFILE - Data Points to Collect:

    Financial Metrics:
    - Current market cap, enterprise value, share price
    - Revenue (TTM), revenue growth (1Y, 3Y CAGR)
    - Gross margin, EBITDA margin, net margin percentages
    - Free cash flow generation and cash conversion cycle
    - Debt-to-equity ratio, net cash position

    Valuation Multiples:
    - P/E ratio, EV/Revenue, EV/EBITDA (current and forward)
    - PEG ratio and price-to-book value
    - Dividend yield and payout ratio

    Business Fundamentals:
    - Primary revenue streams and business model
    - Market share in core segments
    - Customer concentration (top 10 customers as % of revenue)
    - Geographic revenue breakdown
    - Recurring revenue percentage

    Recent Developments:
    - Latest quarterly earnings results and guidance
    - Recent analyst upgrades/downgrades and price targets
    - Management changes and strategic initiatives
    - Upcoming catalysts (product launches, regulatory decisions)
    """,

    "market_intelligence": """
    SECTOR & MARKET ANALYSIS - Data Points to Collect:

    Market Size & Growth:
    - Total addressable market (TAM) size and growth rate
    - Market segments and fastest-growing subsectors
    - Geographic market distribution and emerging regions

    Industry Performance:
    - Sector stock performance (YTD, 1Y, 3Y)
    - Average sector valuation multiples vs historical ranges
    - Top 5 companies by market cap and their market shares
    - Recent IPOs and their post-listing performance

    Capital Flows:
    - Total VC/PE investment volume in sector (YTD vs prior year)
    - Number and value of M&A transactions (last 12 months)
    - Average deal multiples for recent transactions
    - Major fundraising rounds and valuations

    Regulatory & Trends:
    - Recent regulatory changes or proposed legislation
    - Government spending or policy support for sector
    - Key technology trends disrupting the industry
    - ESG considerations and climate-related impacts
    """,

    "deal_sourcing": """
    PRIVATE MARKET OPPORTUNITIES - Data Points to Collect:

    Target Company Profile:
    - Annual revenue, revenue growth rate (3Y CAGR)
    - EBITDA margin and cash flow generation
    - Employee count and geographic footprint
    - Founded date and current ownership structure
    - Key products/services and customer base

    Investment Metrics:
    - Current valuation or last funding round details
    - Comparable public company trading multiples
    - Recent private market transaction multiples in sector
    - Average deal multiples for recent transactions
    - Management ownership percentage and rollover intentions

    Growth Drivers:
    - Addressable market size and company's current penetration
    - Product pipeline and R&D spending
    - Geographic expansion opportunities
    - Potential add-on acquisition targets

    Competitive Landscape:
    - Main competitors and their recent valuations/exits
    - Market share rankings and differentiation factors
    - Recent competitor fundraising or strategic moves
    - Barriers to entry and switching costs
    """
}

PLANNER_SYSTEM_PROMPT = """
You are a research planning expert responsible for creating HIGHLY SPECIFIC and IMPACTFUL research tasks.

CRITICAL INSTRUCTIONS FOR CREATING IMPACTFUL RESEARCH TASKS:
1. BE EXTREMELY LITERAL about what the user has requested - focus precisely on their stated needs
2. CREATE SPECIFIC tasks that directly address the primary focus
3. BREAK DOWN broad requests into concrete, actionable research objectives
4. TARGET specific sub-categories rather than creating general overview tasks
5. USE PRECISE search terms that will find specific information, not general trends
6. REQUIRE CONCRETE data points that deliver actionable insights
7. ENSURE all tasks align with the time frame
8. VERY IMPORTANT: You don't give advice, you just look for information

BEST PRACTISES:
- To make research specific and useful, use sub-industries instead of main industry
- Use 3-5 subindustries per task
- Always add numbers in the search queries.
    e.g: WRONG: Recent startups news in industry
         CORRECT: 20 startups that 'some action' in 'sub industry'
- Try understanding a hidden intention from the user query

"""

WRITER_SYSTEM_PROMPT = """
You are an expert investment research writer. Create comprehensive, professional research articles.

GUIDELINES:
- Write in a professional, analytical tone
- Include executive summary, detailed analysis, and conclusions
- Cite sources and provide evidence for claims
- Structure content logically with clear sections
- Focus on investment implications and actionable insights
- Length: 1500-2500 words

Today's date: {today_date}
"""

In [None]:
class PlannerAgent:
    def __init__(self):
        self.templates = TEMPLATES
        self.today_date = TODAY_DATE
        self.llm = llm

    async def analyze_topic(self, topic: str) -> TopicAnalysis:
        """Analyze user topic to extract research parameters"""
        analysis_prompt = f"""
        Analyze this investment research request: "{topic}"

        Classify the research type as one of:
        - company_analysis: specific company research
        - market_research: sector/market analysis
        - investment_opportunities: finding investments
        - general: other research

        Extract:
        - Industry/sector
        - Primary focus
        - Recommended number of research tasks 7
        - Time frame relevance
        - Most relevant template keys from: {list(self.templates.keys())}

        Return the analysis in JSON format.
        """

        try:
            structured_llm = self.llm.with_structured_output(
                TopicAnalysis,
                method="function_calling"
            )
            analysis_data = await structured_llm.ainvoke(analysis_prompt)
            return analysis_data

        except Exception as e:
            print(f"Warning: Topic analysis failed with structured output: {e}. Using fallback.")
            return TopicAnalysis(
                research_type="general",
                industry="Investment",
                primary_focus=topic,
                recommended_task_count=3,
                template_keys=["market_intelligence"]
            )

    async def create_research_tasks(self, topic: str, analysis: TopicAnalysis) -> ResearchTaskSet:
        """Generate specific research tasks based on analysis"""
        template_examples = "\n".join([
            f"Template {key}:\n{content}"
            for key in analysis.template_keys
            for content in [self.templates.get(key, "")]
            if content
        ])

        planning_prompt = f"""
        Create {analysis.recommended_task_count} research tasks for: "{topic}"

        Research Focus: {analysis.primary_focus}
        Industry: {analysis.industry}
        Time Frame: {analysis.time_frame}

        Template Examples:
        {template_examples}

        Each task should:
        - Have a specific, focused objective
        - Include 3-5 relevant search terms
        - Specify output format
        - List required data points
        - Use appropriate template.

        Return the tasks in JSON format.
        """

        system_prompt = PLANNER_SYSTEM_PROMPT.format(
            today_date=self.today_date,
            templates=list(self.templates.keys())
        )

        try:
            structured_llm = self.llm.with_structured_output(
                ResearchTaskSet,
                method="function_calling"
            )
            tasks_data = await structured_llm.ainvoke(f"{system_prompt}\n\n{planning_prompt}")
            return tasks_data
        except Exception as e:
            print(f"Error generating research tasks with structured output: {e}")
            return ResearchTaskSet(
                purpose=f"Research tasks for {topic}",
                tasks=[
                    ResearchTask(
                        task_name="General Research",
                        objective=f"Research information about {topic}",
                        search_terms=[topic, "investment", "analysis"],
                        output_format="structured report",
                        required_data_points=["key metrics", "market data", "recent developments"],
                        template="basic_research"
                    )
                ]
            )

class SearchAgent:
    def __init__(self):
        self.client = AsyncOpenAI(
            base_url=os.getenv("OPENROUTER_BASE_URL"),
            api_key=os.getenv("OPENROUTER_API_KEY"),
        )

    async def search_single_query(self, query: str) -> SearchResult:
        """Execute single search query with web access"""
        try:
            response = await self.client.chat.completions.create(
                model="x-ai/grok-beta:online",
                messages=[
                    {"role": "system", "content": f"You are a research assistant with web access. Today is {TODAY_DATE}. Provide comprehensive, current information with sources."},
                    {"role": "user", "content": query}
                ],
                extra_body={
                    "search_parameters": {
                        "mode": "on",
                        "temperature": 0.2,
                        "return_citations": True,
                        "plugins": [{"id": "web"}]
                    }
                }
            )

            result_content = response.choices[0].message.content

            sources = []
            lines = result_content.split('\n')
            for line in lines:
                if 'http' in line.lower() or 'source:' in line.lower():
                    sources.append(line.strip())

            return SearchResult(
                query=query,
                result=result_content,
                sources=sources[:5]
            )

        except Exception as e:
            return SearchResult(
                query=query,
                result=f"Search failed: {str(e)}",
                sources=[]
            )

    async def execute_research_tasks(self, tasks: List[ResearchTask]) -> List[SearchResult]:
        """Execute all research tasks concurrently"""
        search_queries = []

        for task in tasks:
            query = f"""
            Research Topic: {task.task_name}
            Objective: {task.objective}
            Focus Areas: {', '.join(task.required_data_points)}
            Search Terms: {', '.join(task.search_terms)}
            Date Range: {task.date_range or 'Recent'}

            Please provide detailed information covering all the required data points.
            """
            search_queries.append(query)

        search_tasks = [self.search_single_query(query) for query in search_queries]
        results = await asyncio.gather(*search_tasks)

        return results

class WriterAgent:
    def __init__(self):
        self.client = AsyncOpenAI(
            base_url=os.getenv("OPENROUTER_BASE_URL"),
            api_key=os.getenv("OPENROUTER_API_KEY"),
        )

    async def generate_article(self, user_query: str, search_results: List[SearchResult]) -> str:
        """Generate comprehensive investment research article"""

        research_content = ""
        for i, result in enumerate(search_results, 1):
            research_content += f"\n\n--- Research Finding {i} ---\n"
            research_content += f"Query: {result.query}\n"
            research_content += f"Results: {result.result}\n"
            if result.sources:
                research_content += f"Sources: {'; '.join(result.sources)}\n"

        writing_prompt = f"""
        Write a comprehensive investment research article answering this query: "{user_query}"

        Research Data:
        {research_content}

        Article Requirements:
        1. Executive Summary (2-3 paragraphs)
        2. Market Analysis & Context
        3. Key Findings & Insights
        4. Investment Implications
        5. Risk Assessment
        6. Conclusions & Recommendations
        7. Sources & References

        Style: Professional, analytical, fact-based
        Length: 1500-2500 words
        Focus: Investment decision-making insights
        """

        system_prompt = WRITER_SYSTEM_PROMPT.format(today_date=TODAY_DATE)

        response = await self.client.chat.completions.create(
            model="openai/gpt-4.1-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": writing_prompt}
            ],
            temperature=0.6,
            max_tokens=4000
        )

        return response.choices[0].message.content


In [None]:

planner = PlannerAgent()
searcher = SearchAgent()
writer = WriterAgent()

async def planner_node(state: ResearchState) -> ResearchState:
    """Planner node: Analyze topic and create research tasks"""
    print(f"🔍 Planning research for: {state['user_query']}")

    try:
        analysis = await planner.analyze_topic(state["user_query"])

        task_set = await planner.create_research_tasks(state["user_query"], analysis)

        state["topic_analysis"] = analysis.model_dump() if analysis else None
        state["research_tasks"] = [task.model_dump() for task in task_set.tasks] if task_set and task_set.tasks else []
        state["status"] = "planned"

        print(f"✅ Created {len(state['research_tasks'])} research tasks")

    except Exception as e:
        print(f"❌ Planning failed: {str(e)}")
        state["topic_analysis"] = {"error": str(e)}
        state["research_tasks"] = []
        state["status"] = "planning_failed"

    return state

async def searcher_node(state: ResearchState) -> ResearchState:
    """Searcher node: Execute research tasks and gather information"""
    print(f"🔎 Executing {len(state['research_tasks'])} research tasks...")

    try:
        research_tasks_models = [ResearchTask(**task_dict) for task_dict in state["research_tasks"]]

        search_results_models = await searcher.execute_research_tasks(research_tasks_models)

        state["search_results"] = [result_model.model_dump() for result_model in search_results_models]
        state["status"] = "researched"

        print(f"✅ Completed research with {len(state['search_results'])} results")

    except Exception as e:
        print(f"❌ Search failed: {str(e)}")
        state["search_results"] = [{"error": str(e), "query": "failed", "result": "", "sources": []}]
        state["status"] = "search_failed"

    return state

async def writer_node(state: ResearchState) -> ResearchState:
    """Writer node: Generate final research article"""
    print("📝 Generating comprehensive research article...")

    try:
        search_results_models = [SearchResult(**result_dict) for result_dict in state["search_results"]
                               if "error" not in result_dict]

        if search_results_models:
            article = await writer.generate_article(
                state["user_query"],
                search_results_models
            )
        else:
            article = f"# Research Report: {state['user_query']}\n\nResearch could not be completed due to technical issues. Please try again."

        state["final_article"] = article
        state["status"] = "completed"

        print("✅ Article generation completed")

    except Exception as e:
        print(f"❌ Writing failed: {str(e)}")
        state["final_article"] = f"Article generation failed: {str(e)}"
        state["status"] = "writing_failed"

    return state

workflow = StateGraph(ResearchState)
workflow.add_node("planner", planner_node)
workflow.add_node("searcher", searcher_node)
workflow.add_node("writer", writer_node)
workflow.add_edge("planner", "searcher")
workflow.add_edge("searcher", "writer")
workflow.add_edge("writer", END)
workflow.set_entry_point("planner")
app = workflow.compile()

In [None]:
async def research_investment_topic(query: str) -> Dict[str, Any]:
    """
    Main function to execute the complete investment research workflow
    Args:
        query: Investment research question/topic
    Returns:
        Complete research results including final article
    """
    print(f"🚀 Starting investment research: {query}")
    print("=" * 60)

    initial_state = ResearchState(
        user_query=query,
        topic_analysis=None,
        research_tasks=[],
        search_results=[],
        final_article="",
        status="initialized"
    )

    try:
        final_state = await app.ainvoke(initial_state)

        print("=" * 60)
        print("🎉 Research completed successfully!")

        return {
            "query": query,
            "status": final_state["status"],
            "tasks_created": len(final_state.get("research_tasks", [])),
            "research_results": len(final_state.get("search_results", [])),
            "article": final_state.get("final_article", ""),
            "full_state": final_state
        }

    except Exception as e:
        print(f"❌ Research failed: {str(e)}")
        return {
            "query": query,
            "status": "failed",
            "error": str(e),
            "article": ""
        }




def display_research_summary(result: Dict[str, Any]):
    """Display a formatted summary of research results"""
    print("📊 INVESTMENT RESEARCH SUMMARY")
    print("=" * 50)
    print(f"Query: {result['query']}")
    print(f"Status: {result['status']}")

    if result['status'] == 'completed':
        print(f"Research Tasks: {result.get('tasks_created', 'N/A')}")
        print(f"Search Results: {result.get('research_results', 'N/A')}")
        print(f"Article Length: {len(result.get('article', ''))} characters")
        print("\n" + "=" * 50)
        print("📄 FINAL ARTICLE")
        print("=" * 50)
        print(result.get('article', ''))
    else:
        print(f"Error: {result.get('error', 'Unknown error')}")

def save_article_to_file(result: Dict[str, Any], filename: Optional[str] = None):
    """Save the research article to a markdown file"""
    if result['status'] != 'completed':
        print("❌ Cannot save: Research not completed successfully")
        return

    if not filename:
        safe_query = "".join(c for c in result['query'] if c.isalnum() or c in (' ', '-', '_')).rstrip()
        filename = f"research_{safe_query.replace(' ', '_')[:50]}.md"

    try:
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(f"# Investment Research Report\n\n")
            f.write(f"**Query:** {result['query']}\n\n")
            f.write(f"**Generated:** {TODAY_DATE}\n\n")
            f.write(f"**Research Tasks:** {result.get('tasks_created', 'N/A')}\n\n")
            f.write("---\n\n")
            f.write(result.get('article', ''))

        print(f"✅ Article saved to: {filename}")

    except Exception as e:
        print(f"❌ Failed to save article: {str(e)}")


async def main():
    """Main execution function"""
    result = await research_investment_topic("Find future unicorns in fintech")
    display_research_summary(result)
    # save_article_to_file(result)
    return result

result = await main()