In [16]:
from dotenv import load_dotenv
load_dotenv("./.env", override=True)

True

In [17]:
from pydantic_settings import BaseSettings

class LLMConfig(BaseSettings):
    hf_token: str
    model_name: str = "microsoft/DialoGPT-medium"
    provider: str = "huggingface_hub"
    temperature: float = 0.1

class EmbeddingConfig(BaseSettings):
    embedding_model_name: str
    gemini_api_key: str

In [18]:
from langchain_huggingface.chat_models import ChatHuggingFace
from langchain_google_genai.embeddings import GoogleGenerativeAIEmbeddings
from langchain_huggingface import HuggingFaceEndpoint
from langchain_chroma.vectorstores import Chroma

In [19]:
class Embeddings(GoogleGenerativeAIEmbeddings):
    def __init__(self, config: EmbeddingConfig):
        super().__init__(
            google_api_key=config.gemini_api_key,
            model=config.embedding_model_name
        )

class LLM(ChatHuggingFace):
    def __init__(self, config: LLMConfig):
        endpoint = HuggingFaceEndpoint(
            repo_id=config.model_name,
            huggingfacehub_api_token=config.hf_token,
            temperature=config.temperature,
            provider=config.provider
        )
        super().__init__(llm=endpoint)

class VectorStore(Chroma):
    def __init__(self, embedding: Embeddings, persist_directory: str, **kwargs):
        super().__init__(embedding_function=embedding, persist_directory=persist_directory, **kwargs)

"""
STATE DEFINITION
"""

In [20]:
from pydantic import BaseModel, Field
from typing import List, Any, Dict, Optional

class MarketResearchAgentState(BaseModel):
    """State Object that flow through the entire chain"""

    # Input
    file_path: Optional[str] = Field(..., description="Path of the file to be processed")

    # OCR / File Reading
    min_text_length: Optional[int] = Field(None, description="Minimum text length to consider a page valid")

    # Processing
    raw_text: Optional[str] = Field(None, description="Raw text extracted from the file")
    cleaned_text: Optional[str] = Field(None, description="Cleaned text after preprocessing")
    chunks: Optional[List[str]] = Field(None, description="List of text chunks for processing")
    embeddings: Optional[List[List[float]]] = Field(None, description="Embeddings for each text chunk")

    # retieval
    retriever: Optional[Any] = Field(None, description="Retriever object for vector store")

    # Retrieval & Analysis
    query: Optional[str] = Field(None, description="User query for market research")
    query_results: Optional[str] = Field(None, description="Results retrieved from vector store")
    competitors: Optional[List[str]] = Field(None, description="List of identified competitors")
    profiles: Optional[List[Dict[str, Any]]] = Field(None, description="Detailed profiles of competitors")
    scores: Optional[Dict[str, float]] = Field(None, description="Scores for each competitor based on analysis")

    # Output
    executive_summary: Optional[str] = Field(None, description="Executive summary of the market research")

    # Error Handling
    error: Optional[str] = Field(None, description="Error message if any step fails")


"""INGESTION NODES"""

In [21]:
import pymupdf
import pytesseract
from pdf2image import convert_from_path
import re

def ingest_file(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Ingest the file and update the state with raw text.
    """
    try:
        # Open the PDF document
        doc = pymupdf.open(state.file_path)
        text = ""
        for page in doc:
            text += page.get_text()
        doc.close()
        
        # If text extraction failed or text is too short, use OCR
        if not text or (state.min_text_length and len(text) < state.min_text_length):
            images = convert_from_path(state.file_path)
            text = ""
            for image in images:
                text += pytesseract.image_to_string(image)
                
    except Exception as e:
        # Fallback to OCR if PDF reading fails
        images = convert_from_path(state.file_path)
        text = ""
        for image in images:
            text += pytesseract.image_to_string(image)

    state.raw_text = text
    return state

def clean_text(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Clean the raw text and update the state with cleaned text.
    """
    text = state.raw_text
    text = re.sub(r'\s+', ' ', text)  # Remove extra whitespace
    text = re.sub(r'http\S+', '', text)  # Remove URLs
    state.cleaned_text = text.strip()
    return state

"""Chunk & Embedding"""

In [22]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
def chunk_text(state: MarketResearchAgentState, chunk_size=1000, chunk_overlap=200) -> MarketResearchAgentState:
    """
    Chunk the cleaned text and update the state with text chunks.
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""]
    )
    state.chunks = text_splitter.split_text(state.cleaned_text)
    return state

def store_embeddings(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Generate embeddings for the text chunks and store them in a vector store.
    """
    embedding_config = EmbeddingConfig()
    embeddings = Embeddings(embedding_config)
    vector_store = VectorStore(embedding=embeddings, persist_directory="./vector_store")
    vector_store.add_texts(state.chunks)
    state.retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 20})
    return state

"""LLM Analysis Node"""

In [23]:
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate
from langchain_core.retrievers import BaseRetriever

In [24]:
def retrieve_query_results(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Retrieve relevant documents from the vector store based on the user query.
    """
    if state.retriever is None:
        raise ValueError("Retriever is not initialized. Please run the embedding storage step first.")
    
    docs = state.retriever.invoke(state.query)
    state.query_results = "\n".join([doc.page_content for doc in docs])
    return state

def competitor_detection(query_results: str) -> List[str]:
    """
    Detect competitors from the query results and update the state.
    """
    llm_config = LLMConfig()
    llm = LLM(llm_config)

    system_prompt = SystemMessagePromptTemplate.from_template(
        "You are an expert market research analyst. Your task is to identify competitors from the provided information."
    )

    human_prompt = HumanMessagePromptTemplate.from_template(
        """Based on the following information, identify and list the competitors:\n\n{query_results}\n\nList the competitors in a comma-separated format.
        Output format:
        ["Competitor1", "Competitor2", ...]
        {error}
        """
    )

    prompt = ChatPromptTemplate.from_messages([system_prompt, human_prompt])

    LCEL = (prompt | llm)

    error = ""
    max_retries = 3
    retries = 0

    while True:
        try:
            response = LCEL.invoke({"query_results": query_results, "error": error})
            competitors = eval(response.content)
            if isinstance(competitors, list):
                return competitors
            else:
                raise ValueError("Response is not a list")
        except Exception as e:
            error = str(e)
            retries += 1
            if retries >= max_retries:
                raise e
            continue

def competitor_detection_node(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Detect competitors from the query results and update the state.
    """
    state.competitors = competitor_detection(state.query_results)
    return state

"""Testing Compititor Detection"""

In [25]:
query_results = """Competitor A is a leading provider of AI solutions, offering a range of products from chatbots to data analytics. Competitor B specializes in cloud computing services, providing scalable infrastructure for businesses of all sizes. Competitor C focuses on cybersecurity, delivering advanced threat detection and prevention systems. Competitor D is known for its innovative software development tools that enhance productivity and collaboration among teams."""

competitor = competitor_detection(query_results)

print("Competitor Detection Test")
print(competitor)

Competitor Detection Test
['Competitor A', 'Competitor B', 'Competitor C', 'Competitor D']


In [36]:
def competitor_profile_extraction(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Extract detailed profiles for each competitor and update the state.
    """
    llm_config = LLMConfig()
    llm = LLM(llm_config)

    system_prompt = SystemMessagePromptTemplate.from_template(
        "You are an expert market research analyst. Your task is to create detailed profiles for each competitor based on the provided information."
    )

    human_prompt = HumanMessagePromptTemplate.from_template(
        """Based on the following information, create a detailed profile for each competitor:\n\n{query_results}\n\nCompetitors: {competitors}\n\nFor each competitor, provide a profile including their strengths, weaknesses, market position, and any other relevant details.
        remember it's not json
        {error}
        Output format:
        [
            {{
                "name": "Competitor1",
                "profile": "Detailed profile of Competitor1"
            }},
            {{
                "name": "Competitor2",
                "profile": "Detailed profile of Competitor2"
            }},
            ...
        ]
        """
    )

    prompt = ChatPromptTemplate.from_messages([system_prompt, human_prompt])

    LCEL = (prompt | llm)

    error = ""
    max_retries = 3
    retries = 0

    while True:
        try:
            response = LCEL.invoke({"query_results": state.query_results, "competitors": state.competitors, "error": error})
            print(response.content)
            profiles = eval(response.content)
            if isinstance(profiles, list):
                state.profiles = profiles
                return state
            else:
                raise ValueError("Response is not a list")
        except Exception as e:
            error = str(e)
            retries += 1
            if retries >= max_retries:
                raise e
            continue

def score_competitors(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Score each competitor based on their profiles and update the state.
    """
    llm_config = LLMConfig()
    llm = LLM(llm_config)

    system_prompt = SystemMessagePromptTemplate.from_template(
        "You are an expert market research analyst. Your task is to score each competitor based on their profiles."
    )

    human_prompt = HumanMessagePromptTemplate.from_template(
        """Based on the following competitor profiles, assign a score to each competitor on a scale of 1 to 10, where 10 indicates the strongest market position:\n\n{profiles}\n\nProvide the scores in a dictionary format where keys are competitor names and values are their respective scores.
        remember it's not json
        Output format:
        {{
            "Competitor1": score1,
            "Competitor2": score2,
            ...
        }}
        {error}
        """
    )

    prompt = ChatPromptTemplate.from_messages([system_prompt, human_prompt])

    LCEL = (prompt | llm)

    error = ""
    max_retries = 3
    retries = 0

    while True:
        try:
            response = LCEL.invoke({"profiles": state.profiles, "error": error})
            scores = eval(response.content)
            if isinstance(scores, dict):
                state.scores = scores
                return state
            else:
                raise ValueError("Response is not a dictionary")
        except Exception as e:
            error = str(e)
            retries += 1
            if retries >= max_retries:
                raise e
            continue

def generate_executive_summary(state: MarketResearchAgentState) -> MarketResearchAgentState:
    """
    Generate an executive summary based on the competitor profiles and scores, and update the state.
    """
    llm_config = LLMConfig()
    llm = LLM(llm_config)

    system_prompt = SystemMessagePromptTemplate.from_template(
        "You are an expert market research analyst. Your task is to generate an executive summary based on the competitor profiles and scores."
    )

    human_prompt = HumanMessagePromptTemplate.from_template(
        """Based on the following competitor profiles and their scores, generate a concise executive summary highlighting key insights and recommendations:\n\nProfiles: {profiles}\n\nScores: {scores}\n\nThe summary should be clear, informative, and actionable.
        {error}
        """
    )

    prompt = ChatPromptTemplate.from_messages([system_prompt, human_prompt])

    LCEL = (prompt | llm)

    error = ""
    max_retries = 3
    retries = 0

    while True:
        try:
            response = LCEL.invoke({"profiles": state.profiles, "scores": state.scores, "error": error})
            state.executive_summary = response.content
            return state
        except Exception as e:
            error = str(e)
            retries += 1
            if retries >= max_retries:
                raise e
            continue

In [37]:
from langgraph.graph import StateGraph, START, END

In [38]:
def build_market_research_agent_graph() -> StateGraph:
    graph = StateGraph(MarketResearchAgentState)

    graph.add_node("IngestFile", action=ingest_file)
    graph.add_node("CleanText", action=clean_text)
    graph.add_node("ChunkText", action=chunk_text)
    graph.add_node("StoreEmbeddings", action=store_embeddings)
    graph.add_node("RetrieveQueryResults", action=retrieve_query_results)
    graph.add_node("CompetitorDetection", action=competitor_detection_node)
    graph.add_node("CompetitorProfileExtraction", action=competitor_profile_extraction)
    graph.add_node("ScoreCompetitors", action=score_competitors)
    graph.add_node("GenerateExecutiveSummary", action=generate_executive_summary)

    graph.add_edge(START, "IngestFile")
    graph.add_edge("IngestFile", "CleanText")
    graph.add_edge("CleanText", "ChunkText")
    graph.add_edge("ChunkText", "StoreEmbeddings")
    graph.add_edge("StoreEmbeddings", "RetrieveQueryResults")
    graph.add_edge("RetrieveQueryResults", "CompetitorDetection")
    graph.add_edge("CompetitorDetection", "CompetitorProfileExtraction")
    graph.add_edge("CompetitorProfileExtraction", "ScoreCompetitors")
    graph.add_edge("ScoreCompetitors", "GenerateExecutiveSummary")
    graph.add_edge("GenerateExecutiveSummary", END)

    return graph.compile()

In [41]:
def main():
    
    initial_state = MarketResearchAgentState(
        file_path="/home/selva/Documents/langchain-projects/Market_Research_Agent/industry_report_detailed.pdf",
        min_text_length=1000,
        query="Identify key competitors in the EV market"
    )
    graph = build_market_research_agent_graph().invoke(initial_state)
    print("Executive Summary:")
    print(graph['executive_summary'])

In [42]:
main()

[
    {
        "name": "CATL",
        "profile": "CATL (Contemporary Amperex Technology Co. Limited) is the global market leader in EV batteries with a dominant 35% market share. Headquartered in China, the company benefits from strong manufacturing scale and cost advantages. Its key strengths include being the world's largest battery supplier, established production presence in Europe, and extensive R&D capabilities. However, CATL faces weaknesses such as geopolitical risks due to its China-centric operations and potential supply chain vulnerabilities. The company maintains its market position through technological innovation and strategic partnerships with major automakers worldwide."
    },
    {
        "name": "LG Energy Solution",
        "profile": "LG Energy Solution holds the second-largest market position with 20% global market share. The South Korean company specializes in high-nickel battery chemistries, offering superior energy density for premium electric vehicles. Its 