In [None]:
import os
import json
import argparse
import yfinance as yf
import mlflow
from dotenv import load_dotenv

from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.output_parsers import PydanticOutputParser
from langchain_google_vertexai import ChatVertexAI
from langchain_community.retrievers import ExaSearchRetriever

In [None]:
from pydantic import BaseModel, Field 
from typing import List, Optional

class SentimentProfile(BaseModel):
    """Structured sentiment profile for a company based on recent news."""
    company_name: str = Field(description="The name of the company being analyzed.")
    stock_code: str = Field(description="The stock market ticker symbol for the company.")
    news_summary: str = Field(description="A brief, one-paragraph summary of the key news points.")
    sentiment: str = Field(description="Overall market sentiment. Must be one of: 'Positive', 'Negative', 'Neutral'.")
    people_names: list[str] = Field(description="List of key people mentioned in the news (e.g., CEOs, executives).")
    places_names: list[str] = Field(description="List of key geographical locations or places mentioned.")
    other_companies_referred: list[str] = Field(description="List of other companies mentioned in the news.")
    related_industries: list[str] = Field(description="List of industries related to the news content.")
    market_implications: str = Field(description="A brief analysis of the potential market implications of the news.")
    confidence_score: float = Field(description="A confidence score (0.0 to 1.0) in the sentiment analysis.")


In [None]:
# Component A: Get Stock Code
def get_stock_code(company_name: str) -> str:
    """Fetches the stock ticker for a given company name using yfinance."""
    with mlflow.start_span(name="Stock Code Extraction") as span:
        try:
            span.set_inputs({"company_name": company_name})
            ticker = yf.Ticker(company_name).ticker
            if not ticker or ticker == "-":
                raise ValueError("Invalid or not found.")
            span.set_outputs({"stock_code": ticker})
            print(f"✅ Found stock code for {company_name}: {ticker}")
            return ticker
        except Exception as e:
            print(f"❌ Could not find stock code for {company_name}: {e}")
            span.set_outputs({"error": str(e)})
            return "N/A"

In [None]:
# Component B: Fetch Company News
def get_company_news(search_query: dict) -> str:
    """Fetches recent news articles for a company using the Exa search tool."""
    company_name = search_query.get("company")
    stock_code = search_query.get("stock_code")
    
    with mlflow.start_span(name="News Fetching") as span:
        span.set_inputs({"company_name": company_name, "stock_code": stock_code})
        print(f"🔎 Fetching news for {company_name} ({stock_code})...")
        
        # Using ExaSearchRetriever from LangChain
        retriever = ExaSearchRetriever(k=5, highlights=True)
        query = f"Recent financial news, market performance, and analysis for {company_name} ({stock_code})"
        
        try:
            docs = retriever.invoke(query)
            # Format the retrieved documents into a single context string
            context = "\n\n---\n\n".join(
                [f"Article: {doc.page_content}" for doc in docs]
            )
            span.set_outputs({"news_context_length": len(context), "num_articles": len(docs)})
            print(f"✅ Fetched {len(docs)} news articles.")
            return context
        except Exception as e:
            print(f"❌ Error fetching news: {e}")
            span.set_outputs({"error": str(e)})
            return "Could not fetch news."

In [None]:
# --- 3. Construct the LangChain Chain using LCEL ---

def build_sentiment_analyzer_chain():
    """Builds and returns the full LangChain Expression Language (LCEL) chain."""
    
    # Initialize the LLM (Gemini gemini-2.0-flash-001 via Vertex AI)
    llm = ChatVertexAI(
        model_name="gemini-2.0-flash-001",
        temperature=0.2,
        convert_system_message_to_human=True # Ensures system messages are treated as user prompts
    )
    
    # Initialize the Pydantic Output Parser
    parser = PydanticOutputParser(pydantic_object=SentimentProfile)

    # Define the final prompt template for sentiment analysis
    prompt_template = ChatPromptTemplate.from_template(
        """
        As a senior financial analyst, your task is to analyze the provided news context about a company and generate a structured sentiment profile in JSON format.
        
        **Company Name:** {company}
        **Stock Code:** {stock_code}

        **Recent News Context:**
        ---
        {news_context}
        ---
        
        Based *only* on the information in the news context provided, perform the following actions:
        1.  Determine the overall market sentiment (Positive, Negative, or Neutral).
        2.  Summarize the key news points concisely.
        3.  Extract all named entities: people, places, and other companies.
        4.  Identify related industries.
        5.  Analyze potential market implications.
        6.  Provide a confidence score for your sentiment analysis.
        
        {format_instructions}
        """
    )

    # Use RunnableLambda to wrap our Python functions for use in the chain
    get_stock_code_runnable = RunnableLambda(lambda x: get_stock_code(x['company']))
    fetch_news_runnable = RunnableLambda(get_company_news)

    # Assemble the chain using LCEL
    # This architecture allows data to flow and be enriched at each step.
    chain = (
        RunnablePassthrough.assign(
            stock_code=get_stock_code_runnable
        )
        .assign(
            news_context=fetch_news_runnable
        )
        .assign(
            analysis_result=(
                lambda x: {"format_instructions": parser.get_format_instructions()} |
                prompt_template |
                llm |
                parser
            )
        )
    )
    
    return chain

In [None]:
# --- 4. Main Execution Block ---

def main(company_name: str):
    """Main function to run the sentiment analysis pipeline."""
    
    # Start an MLflow run to log metrics, parameters, and artifacts
    with mlflow.start_run() as run:
        run_id = run.info.run_id
        print(f"🚀 Starting MLflow Run ID: {run_id}")
        mlflow.log_param("company_name", company_name)

        # Build the chain
        analyzer_chain = build_sentiment_analyzer_chain()

        # Invoke the chain with the company name
        print(f"\n🧠 Starting sentiment analysis for '{company_name}'...")
        input_data = {"company": company_name}
        
        with mlflow.start_span(name="Full Sentiment Analysis Pipeline") as span:
            span.set_inputs(input_data)
            result = analyzer_chain.invoke(input_data)
            span.set_outputs({"final_result": result})

        # Extract the final parsed object
        sentiment_profile = result.get("analysis_result")
        
        if sentiment_profile:
            print("\n--- ✅ Analysis Complete ---")
            # Convert Pydantic model to a dictionary for logging and printing
            output_dict = sentiment_profile.dict()
            print(json.dumps(output_dict, indent=2))

            # Log the final JSON output as an artifact in MLflow
            output_path = f"sentiment_profile_{company_name.replace(' ', '_')}.json"
            with open(output_path, "w") as f:
                json.dump(output_dict, f, indent=2)
            mlflow.log_artifact(output_path)
            print(f"\n📄 Saved and logged output to '{output_path}'")
        else:
            print("\n--- ❌ Analysis Failed ---")
            mlflow.log_param("status", "Failed")
    
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Real-Time Market Sentiment Analyzer")
    parser.add_argument(
        "-c", "--company", 
        type=str, 
        required=True, 
        help="The name of the company to analyze (e.g., 'Google', 'Microsoft')."
    )
    args = parser.parse_args()
    main(args.company)