In [None]:
!pip install spacy

In [1]:
import os
from concurrent.futures import ThreadPoolExecutor
from transformers import pipeline
import sqlite3
import spacy
from PyPDF2 import PdfReader
from dotenv import load_dotenv

# Load environment variables (e.g., API keys from a .env file)
load_dotenv()
my_api_key = os.getenv("API_KEY")

# Import Groq API Client
from groq import Groq


# NLP-Based Query Parser #
class QueryParser:
    """
    Parses user queries using SpaCy to extract actionable tasks based on linguistic dependencies.
    """

    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")  # Load SpaCy's language model

    def parse(self, query):
        """
        Parses the input query and extracts tasks to analyze based on subject and object dependencies.
        :param query: String containing the user's query.
        :return: List of tasks derived from the query.
        """
        doc = self.nlp(query)
        tasks = [f"Analyze {token.text.lower()}" for token in doc if token.dep_ in ("nsubj", "dobj")]
        return tasks


# Extended Data Retrieval #
class SQLDataSource:
    """
    Handles retrieving data from an SQLite database.
    """

    def __init__(self, db_path, query):
        self.db_path = db_path
        self.query = query

    def fetch_data(self):
        """
        Fetches data by executing the provided SQL query on the database.
        :return: Dictionary containing SQL data.
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(self.query)
        data = cursor.fetchall()
        conn.close()
        return {"sql_data": data}


class DocumentParser:
    """
    Handles extracting text data from PDF documents.
    """

    def __init__(self, file_path):
        self.file_path = file_path

    def fetch_data(self):
        """
        Extracts text from the specified file (only supports PDFs).
        :return: Dictionary containing document text.
        """
        if self.file_path.endswith(".pdf"):
            text = self._parse_pdf()
        else:
            raise ValueError("Unsupported file format.")
        return {"document_text": text}

    def _parse_pdf(self):
        """
        Parses the content of a PDF file.
        :return: Extracted text from the PDF.
        """
        reader = PdfReader(self.file_path)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
        return text


class APIDataSource:
    """
    Mock API data retrieval for competitor and market trends data.
    """

    def __init__(self, api_url):
        self.api_url = api_url

    def fetch_data(self):
        """
        Mock method to simulate API responses for different URLs.
        :return: Dictionary containing the simulated API response.
        """
        if "competitors" in self.api_url:
            return {"competitors": "Competitors in Region A offer durable products at competitive prices."}
        elif "market-trends" in self.api_url:
            return {"market_trends": "Growing demand for sustainable and durable products in Region A."}
        else:
            raise ValueError("Invalid API URL")


# Parallel Execution #
class ParallelReasoningPipeline:
    """
    Manages the execution of multiple reasoning tasks in parallel.
    """

    def __init__(self):
        self.steps = []

    def add_step(self, description, data_source, processor):
        """
        Adds a reasoning step to the pipeline.
        :param description: Description of the task.
        :param data_source: Data source object for fetching input data.
        :param processor: Processor object for analyzing the data.
        """
        self.steps.append({"description": description, "data_source": data_source, "processor": processor})

    def run(self):
        """
        Executes all steps in the pipeline in parallel.
        :return: List of results from all executed steps.
        """
        results = []

        def process_step(step):
            print(f"\n{step['description']}")
            data = step["data_source"].fetch_data()
            insights = step["processor"].process(data)
            print(f"Insights: {insights}")
            return insights

        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(process_step, step) for step in self.steps]
            for future in futures:
                results.append(future.result())

        return results


# Enhanced Reasoning Logic #
class SalesProcessor:
    """
    Processes sales data retrieved from the database.
    """

    def process(self, data):
        return f"Sales data: {data['sql_data']}"


class FeedbackProcessor:
    """
    Processes customer feedback data retrieved from the document parser.
    """

    def process(self, data):
        return f"Customer feedback: {data['document_text'][:200]}..."  # Truncate for readability


class CompetitorProcessor:
    """
    Processes competitor insights data retrieved from the API.
    """

    def process(self, data):
        return f"Competitor insights: {data['competitors']}"


class MarketTrendsProcessor:
    """
    Processes market trend insights data retrieved from the API.
    """

    def process(self, data):
        return f"Market trends: {data['market_trends']}"


class SummarizationProcessor:
    """
    Summarizes insights using the Groq API.
    """

    def __init__(self, groq_client):
        self.groq_client = groq_client

    def process(self, insights):
        """
        Summarizes the combined insights using the Groq API.
        :param insights: List of insights to summarize.
        :return: Summarized text.
        """
        combined_text = " ".join(insights)

        # Use Groq API to summarize
        response = self.groq_client.chat.completions.create(
            model="llama3-70b-8192",  # Replace with a valid model name
            messages=[
                {"role": "system", "content": "Summarize the following insights"},
                {"role": "user", "content": combined_text},
            ],
        )

        # Extract summary from response
        if hasattr(response, "choices"):
            summary = response.choices[0].message.content
        else:
            summary = response["choices"][0]["message"]["content"]

        return summary


# Example Implementation #
def main():
    """
    Entry point for the program. Orchestrates query processing, reasoning steps, and summarization.
    """
    print(" Enhanced Multi-Hop Reasoning Agent \n")

    # Initialize Groq Client
    groq_client = Groq(api_key=my_api_key)  # Ensure API key is set in the environment

    # Parse the query
    query = "What are the key factors driving the decline in sales for Product X in the last quarter?"
    query_parser = QueryParser()
    tasks = query_parser.parse(query)
    print(f"Parsed Tasks: {tasks}")

    # Initialize the pipeline
    pipeline = ParallelReasoningPipeline()

    # Add reasoning steps
    pipeline.add_step(
        "Analyze sales data",
        SQLDataSource("sales.db", "SELECT * FROM sales WHERE product='Product X'"),
        SalesProcessor(),
    )
    pipeline.add_step(
        "Analyze customer feedback",
        DocumentParser("feedback.pdf"),
        FeedbackProcessor(),
    )
    pipeline.add_step(
        "Analyze competitor data",
        APIDataSource("mock://competitors"),
        CompetitorProcessor(),
    )
    pipeline.add_step(
        "Analyze market trends",
        APIDataSource("mock://market-trends"),
        MarketTrendsProcessor(),
    )

    # Run the pipeline
    insights = pipeline.run()

    # Summarize results
    summarizer = SummarizationProcessor(groq_client)
    final_summary = summarizer.process(insights)

    print("\nFinal Summary")
    print(final_summary)


# Run the Program #
if __name__ == "__main__":
    main()

  from .autonotebook import tqdm as notebook_tqdm



 Enhanced Multi-Hop Reasoning Agent 

Parsed Tasks: ['Analyze factors', 'Analyze decline']

Analyze sales data
Insights: Sales data: [(1, 'Product X', 'Region A', 100), (2, 'Product X', 'Region B', 200)]

Analyze customer feedback

Analyze competitor data
Insights: Competitor insights: Competitors in Region A offer durable products at competitive prices.

Analyze market trends
Insights: Market trends: Growing demand for sustainable and durable products in Region A.
Insights: Customer feedback: Customer Feedback
Region A: Customers reported durability issues.
Region B: Customers are satisfied with the product....

Final Summary
Here are the summarized insights:

**Product Performance:**

* Product X sells 100 units in Region A and 200 units in Region B.

**Customer Feedback:**

* Region A: Customers unhappy with product durability.
* Region B: Customers satisfied with the product.

**Competitor Insights:**

* Competitors in Region A offer durable products at competitive prices.

**Mark

In [1]:
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import sqlite3
from PyPDF2 import PdfReader
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel

# Load environment variables
load_dotenv()
my_api_key = os.getenv("API_KEY")

# Import Groq API Client
from groq import Groq

# Initialize console for Rich output
console = Console()

class QueryParser:
    """Parses user queries using LLM to extract actionable tasks"""
    
    def __init__(self, groq_client):
        self.groq_client = groq_client

    def parse(self, query):
        """Extracts key analysis targets from user query"""
        prompt = f"""Analyze the following user query and identify the key factors or entities that need to be analyzed. 
        Return your response as a comma-separated list of terms, each term being a noun or noun phrase. 
        Do not include any additional text.

        Query: {query}
        Response:"""
        
        response = self.groq_client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[
                {"role": "system", "content": "You are a helpful assistant that extracts key analysis targets from user queries."},
                {"role": "user", "content": prompt}
            ],
            temperature=0,
        )
        
        response_text = response.choices[0].message.content
        tasks = [f"Analyze {term.strip()}" for term in response_text.split(",") if term.strip()]
        return tasks


# Data Sources #
class SQLDataSource:
    """Handles retrieving data from an SQLite database."""
    def __init__(self, db_path, query):
        self.db_path = db_path
        self.query = query

    def fetch_data(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(self.query)
        data = cursor.fetchall()
        conn.close()
        return {"sql_data": data}


class DocumentParser:
    """Handles extracting text data from PDF documents."""
    def __init__(self, file_path):
        self.file_path = file_path

    def fetch_data(self):
        if self.file_path.endswith(".pdf"):
            text = self._parse_pdf()
        else:
            raise ValueError("Unsupported file format.")
        return {"document_text": text}

    def _parse_pdf(self):
        reader = PdfReader(self.file_path)
        text = "".join([page.extract_text() for page in reader.pages])
        return text


class APIDataSource:
    """Handles API data retrieval for competitor and market data."""
    def __init__(self, api_url):
        self.api_url = api_url

    def fetch_data(self):
        if "competitors" in self.api_url:
            return {"competitors": "Competitors in Region A offer durable products at competitive prices."}
        elif "market-trends" in self.api_url:
            return {"market_trends": "Growing demand for sustainable and durable products in Region A."}
        else:
            raise ValueError("Invalid API URL")


# Parallel Processing #
class ParallelReasoningPipeline:
    """Manages parallel execution of reasoning tasks with dynamic step numbering."""
    def __init__(self):
        self.steps = []

    def add_step(self, description, data_source, processor, data_type, color):
        """Adds an analysis step to the pipeline without predefined numbering."""
        self.steps.append({
            "description": description,
            "data_source": data_source,
            "processor": processor,
            "data_type": data_type,
            "color": color
        })

    def run(self):
        """Executes all pipeline steps in parallel and numbers them dynamically."""
        results = []
        step_number = 1  # Start numbering dynamically

        with ThreadPoolExecutor() as executor:
            future_to_step = {executor.submit(self.process_step, step): step for step in self.steps}

            for future in as_completed(future_to_step):
                step = future_to_step[future]
                insights = future.result()
                
                # Dynamically number the step
                step_content = (
                    f"[bold]Data Source:[/bold] {step['data_type']}\n"
                    f"[bold]Insights:[/bold] {insights}"
                )
                console.print(Panel(step_content, title=f"[bold]Step {step_number}: {step['description']}[/bold]", border_style=step["color"]))
                
                results.append(insights)
                step_number += 1  # Increment step number for next completed step

        return results

    def process_step(self, step):
        """Fetches data, processes it, and returns insights."""
        data = step["data_source"].fetch_data()
        return step["processor"].process(data)


# Data Processors #
class SalesProcessor:
    """Processes sales data using LLM for real insights."""
    def __init__(self, groq_client):
        self.groq_client = groq_client

    def process(self, data):
        if not data["sql_data"]:
            return "No sales data available."

        # Format data for LLM analysis
        sales_records = "\n".join([str(record) for record in data["sql_data"]])

        # Prompt LLM to analyze sales trends
        response = self.groq_client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[
                {"role": "system", "content": "You are an expert business analyst."},
                {"role": "user", "content": f"Analyze the following sales data and provide insights:\n\n{sales_records}"}
            ],
            temperature=0,
        )
        return response.choices[0].message.content


class FeedbackProcessor:
    """Processes customer feedback using LLM for real insights."""
    def __init__(self, groq_client):
        self.groq_client = groq_client

    def process(self, data):
        feedback_text = data["document_text"]

        if not feedback_text.strip():
            return "No feedback data available."

        # Prompt LLM to analyze customer sentiment
        response = self.groq_client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[
                {"role": "system", "content": "You are an expert in customer sentiment analysis."},
                {"role": "user", "content": f"Analyze the following customer feedback and provide insights:\n\n{feedback_text}"}
            ],
            temperature=0,
        )
        return response.choices[0].message.content


class CompetitorProcessor:
    """Processes competitor data using LLM for real insights."""
    def __init__(self, groq_client):
        self.groq_client = groq_client

    def process(self, data):
        competitor_info = data.get("competitors", "")

        if not competitor_info.strip():
            return "No competitor data available."

        # Prompt LLM to analyze competitor strategy
        response = self.groq_client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[
                {"role": "system", "content": "You are an expert in competitor analysis."},
                {"role": "user", "content": f"Analyze the following competitor data and provide insights:\n\n{competitor_info}"}
            ],
            temperature=0,
        )
        return response.choices[0].message.content


class MarketTrendsProcessor:
    """Processes market trends using LLM for real insights."""
    def __init__(self, groq_client):
        self.groq_client = groq_client

    def process(self, data):
        market_trends_info = data.get("market_trends", "")

        if not market_trends_info.strip():
            return "No market trend data available."

        # Prompt LLM to analyze market trends
        response = self.groq_client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[
                {"role": "system", "content": "You are an expert market analyst."},
                {"role": "user", "content": f"Analyze the following market trend data and provide insights:\n\n{market_trends_info}"}
            ],
            temperature=0,
        )
        return response.choices[0].message.content


class SummarizationProcessor:
    """Summarizes insights using LLM."""
    def __init__(self, groq_client):
        self.groq_client = groq_client

    def process(self, insights):
        """Generates final summary using LLM."""
        combined_text = " ".join(insights)
        
        response = self.groq_client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[
                {"role": "system", "content": "Create a concise, actionable summary of the following business insights."},
                {"role": "user", "content": combined_text}
            ]
        )

        return response.choices[0].message.content


def main():
    console.print("[bold cyan]\nEnhanced Multi-Hop Reasoning Agent[/bold cyan]\n")

    # Initialize Groq Client
    groq_client = Groq(api_key=my_api_key)

    # Parse the query using LLM
    query = "What are the key factors driving the decline in sales for Product X in the last quarter?"
    query_parser = QueryParser(groq_client)
    tasks = query_parser.parse(query)
    console.print(Panel(f"Parsed Tasks: {tasks}", title="Query Parsing", border_style="cyan"))

    # Initialize the pipeline
    pipeline = ParallelReasoningPipeline()

    # ✅ Remove hardcoded step numbers & let dynamic numbering handle it
    pipeline.add_step(
        "Retrieve and analyze sales data for Product X",
        SQLDataSource("sales.db", "SELECT * FROM sales WHERE product='Product X'"),
        SalesProcessor(groq_client),
        "SQL Database", "blue"
    )
    pipeline.add_step(
        "Retrieve and analyze customer feedback for Product X",
        DocumentParser("feedback.pdf"),
        FeedbackProcessor(groq_client),
        "Document", "green"
    )
    pipeline.add_step(
        "Retrieve and analyze competitor data",
        APIDataSource("mock://competitors"),
        CompetitorProcessor(groq_client),
        "API", "yellow"
    )
    pipeline.add_step(
        "Retrieve and analyze market trends",
        APIDataSource("mock://market-trends"),
        MarketTrendsProcessor(groq_client),
        "API", "red"
    )

    # Run the pipeline
    insights = pipeline.run()

    # Summarize results using LLM
    summarizer = SummarizationProcessor(groq_client)
    final_summary = summarizer.process(insights)

    console.print(Panel(final_summary, title="[bold]Final Summary[/bold]", border_style="magenta"))


if __name__ == "__main__":
    main()

