<a href="https://colab.research.google.com/github/pastrop/kaggle/blob/master/ThinkingAgent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
!pip install anthropic

In [2]:
import pandas as pd
import json
#from anthropic import Anthropic
import anthropic
from typing import Dict, List, Any, Optional

In [3]:
from google.colab import userdata
#api_key_openAI = userdata.get('OpenAI')
api_key_anthropic = userdata.get('Antropic')
#api_key_gemini = userdata.get('google')

In [None]:
csv_file = 'frog_ferry_meta.csv'

In [None]:
#Dataset to be used:
df = pd.read_csv(csv_file)
#Text cleanup
def text_input(file = 'Mejuri_texts.csv'):
  df = pd.read_csv(file)
  df_clean = df[df['Text'].apply(lambda x: isinstance(x, str))]
  texts = [item.replace("\t", " ") for item in df_clean['Text']]

  return texts

In [None]:
#Getting a corpus of texts
texts_cleaned = text_input(csv_file)
corpus = ' '.join(texts_cleaned)
test1 = ' '.join(corpus.split()[:20000])

In [None]:
#transform the dataframe into the list of dicts
df_clean = df[df['Text'].apply(lambda x: isinstance(x, str))]
records = df_clean.to_dict(orient='records')

In [None]:
records[1]

{'Text': 'We have GREATLY appreciated the addition of speed bumps and cross walks in our area by Roosevelt. More traffic calming features and accessible curbs would always be appreciated. A swing set at George park or some other fun addition to the play area there (basketball court, garden, or fenced area for off leash dogs) would be amazing! The FROG FERRY would be SO GREAT for our community, having the option to take a ferry downtown would be so fun for tourists and a great way for locals to spend the day and obviously commuters would benefit so much. I think the addition of a ferry would be ICONIC.',
 'connection': 4,
 'recommend': 5,
 'satisfaction': 6,
 'verified': 'Yes'}

# Thinking Agent

In [4]:
def get_context(self, corpus: str = None, question: str = None) -> str:
    """
    Your actual get_context tool implementation.
    This is the function Claude can call.
    """

    if corpus == None or question == None:
        return "No context available - empty corpus or question provided."

In [30]:
class ThinkingModule:
    """Custom module that leverages Claude's capabilities for reflective thinking."""

    def __init__(self, api_key: Optional[str] = None):
        """Initialize the thinking module with the Anthropic API client."""
        self.api_key = api_key #or os.environ.get("ANTHROPIC_API_KEY")
        self.client = anthropic.Anthropic(api_key=self.api_key)
        self.model = "claude-3-7-sonnet-20250219"  # Using Claude 3.7 Sonnet

    def analyze(self, task: str, context: str, reflection_depth: int = 1) -> Dict[str, Any]:
        """
        Perform reflective thinking using Claude.

        Args:
            task: The specific thinking task to perform
            context: Relevant context for the thinking task
            reflection_depth: How many levels of reflection to perform (1-3)

        Returns:
            Dict containing the analysis results
        """
        #print(f'task as defined in the analyze function: {task}')
        #print(f'context as defined in the analyze function: {context}')
        # Build the prompt for Claude
        prompt = f"""<thinking>
Task: {task}

Context:
{context}

Please think through this step-by-step with {reflection_depth} level(s) of reflection.
You have an access to a specialized tool called get_context.
The get_context tool allows you to get a summary a corpus of text to find relevant information based on a question. Use this tool when:
- The user asks a question that might be answered by searching through available text
- You need to find specific information within a large body of text
- The user's query would benefit from contextual information retrieval
Provide your analysis in the VALID JSON FORMAT including the following fields:
- reasoning: Your step-by-step reasoning process
- conclusion: A concise summary of your conclusion
- confidence: A number from 0-1 indicating your confidence
- use_metadata: Set this field to 'True' if metadata analysis is required and 'False' otherwise
- tools: List tools to use
</thinking>"""

        # Define the tool
        tools = [
            {
                "name": "get_context",
                "description": "Search through a corpus of text to find relevant context based on a question",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "corpus": {
                            "type": "string",
                            "description": "The text corpus to search through"
                        },
                        "question": {
                            "type": "string",
                            "description": "The question or topic to find relevant context for"
                        }
                    },
                    "required": ["corpus", "question"]
                }
            },
            {
                "name": "filter",
                "description": "Filter the dataset by given criteria",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "data_set": {
                            "type": "pandas dataframe",
                            "description": "The dataframe to search through inluding text column and metadata columns"
                        },
                        "filter_criteria": {
                            "type": "string",
                            "description": "The cirteria to filter by"
                        }
                    },
                    "required": ["data_set", "filter_criteria"]
                }
            },
            {
                "name": "key_words",
                "description": "Get keywords from the text",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "text_corpus": {
                            "type": "string",
                            "description": "The text corpus to extract keywards from"
                        }
                    },
                    "required": ["text_corpus"]
                }
            }
        ]

        # Call Claude API
        response = self.client.messages.create(
            model=self.model,
            max_tokens=2000,
            temperature=0.2,  # Low temperature for more deterministic thinking
            system="You are an expert analytical assistant. When asked to think about a problem, you break it down methodically and provide clear, structured analysis. Your output should always be valid JSON",
            tools=tools,
            messages=[
                {"role": "user", "content": prompt}
            ]
        )
        print(f'ANALYZE FUNCTION-RESPONSE: {response}')

        # Extract and parse JSON response

        # Find JSON in the response content
        content = response.content[0].text

        print(f"@@@@@@@@@@@@@@@@Claude's response - analyze function: {content}")

        print(f"@@@@@@response.stop_reason - analyze function: {response.stop_reason}")
        tool_calls = [block for block in response.content if block.type == "tool_use"]
        print(f"$$$$$$$$$$$$$tool_calls - analyze function: { tool_calls}")

        return content




In [32]:
#alternative thinking module
class ThinkingModuleAlt:
    """Custom module that leverages Claude's capabilities for reflective thinking."""

    def __init__(self, api_key: Optional[str] = None):
        """Initialize the thinking module with the Anthropic API client."""
        self.api_key = api_key #or os.environ.get("ANTHROPIC_API_KEY")
        self.client = anthropic.Anthropic(api_key=self.api_key)
        self.model = "claude-3-7-sonnet-20250219"  # Using Claude 3.7 Sonnet

    def analyze(self, task: str, context: str, reflection_depth: int = 1) -> Dict[str, Any]:
        """
        Perform reflective thinking using Claude.

        Args:
            task: The specific thinking task to perform
            context: Relevant context for the thinking task
            reflection_depth: How many levels of reflection to perform (1-3)

        Returns:
            Dict containing the analysis results
        """
        #print(f'task as defined in the analyze function: {task}')
        #print(f'context as defined in the analyze function: {context}')
        # Build the prompt for Claude
        prompt = f"""<thinking>
Task: {task}

Context:
{context}

Please think through this step-by-step with {reflection_depth} level(s) of reflection.
Provide your analysis in the VALID JSON FORMAT including the following fields:
- reasoning: Your step-by-step reasoning process
- conclusion: A concise summary of your conclusion
- confidence: A number from 0-1 indicating your confidence
- use_metadata: Set this field to 'True' if metadata analysis is required and 'False' otherwise
- metadata_tool: Suggest a list of additional tools that can be used for metadata analysis if the use_ metadata field is set to 'True'
</thinking>"""


        # Call Claude API
        response = self.client.messages.create(
            model=self.model,
            max_tokens=2000,
            temperature=0.2,  # Low temperature for more deterministic thinking
            system="You are an expert analytical assistant. When asked to think about a problem, you break it down methodically and provide clear, structured analysis. Your output should always be valid JSON",
            messages=[
                {"role": "user", "content": prompt}
            ]
        )
        print(f'ANALYZE FUNCTION-RESPONSE: {response}')

        # Extract and parse JSON response

        # Find JSON in the response content
        content = response.content[0].text

        print(f"@@@@@@@@@@@@@@@@Claude's response - analyze function: {content}")

        print(f"@@@@@@response.stop_reason - analyze function: {response.stop_reason}")
        tool_calls = [block for block in response.content if block.type == "tool_use"]
        print(f"$$$$$$$$$$$$$tool_calls - analyze function: { tool_calls}")

        return content

In [None]:
            # Extract JSON part (assuming it's properly formatted)
            json_str = content
            if "```json" in content:
                json_str = content.split("```json")[1].split("```")[0].strip()
            elif "```" in content:
                json_str = content.split("```")[1].split("```")[0].strip()

In [None]:
'''
class MetadataAnalysisTool:
    """Stub for the metadata analysis tool."""

    def __init__(self):
        # Initialization for metadata tool would go here
        pass

    def analyze(self, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Analyze reviews based on metadata parameters.

        Args:
            params: Parameters for filtering and analyzing metadata

        Returns:
            Filtered list of reviews
        """
        # This is just a stub implementation
        print(f"Metadata tool called with parameters: {params}")
        # In a real implementation, this would filter the actual reviews
        return [{"id": 1, "text": "Example filtered review", "rating": 5}]
'''


In [33]:
class ReviewAnalysisAgent:
    """Agent that processes customer queries about review data."""

    def __init__(self, review_corpus: List[Dict], api_key: Optional[str] = None):
        """
        Initialize the review analysis agent.

        Args:
            review_corpus: Collection of customer reviews with metadata
            api_key: Anthropic API key (optional if set in environment variables)
        """
        self.review_corpus = review_corpus
        #self.metadata_tool = MetadataAnalysisTool()
        self.thinking_module = ThinkingModule(api_key=api_key)
        self.thinking_module_alt = ThinkingModuleAlt(api_key=api_key)

    def process_query(self, query: str) -> str:
        """
        Process a customer query and return a response.

        Args:
            query: Natural language query about the review data

        Returns:
            Response to the query based on review analysis
        """
        # Step 1: Understand the query through the thinking module
        print(f'#####################step 1: Understand the query through the thinking module')

        #change the thinking module type here
        query_analysis = self.thinking_module_alt.analyze(
            task="Analyze the user query to extract: (1) primary information need, "
                 "(2)type of analysis requested, "
                 "(3) whether numerical/metadata analysis is likely needed",
            context=f"User query: {query}",
            reflection_depth=2
        )
        return query_analysis

'''
        # Step 2: Decide whether to use metadata tool
        print(f'#####################step 2: Decide whether to use metadata tool')


        tool_decision = self.thinking_module.analyze(
            task="Determine if metadata analysis is required or beneficial for this query",
            context=f"Query analysis: {query_analysis}\n"
                   f"Available tools: text corpus analysis, metadata analysis tool for numerical data",
            reflection_depth=2
        )

        print(f'+++++++++++++++++++++++++++++tool_decision call results: {tool_decision}')

        # Step 3: Execute appropriate analysis
        print(f'#####################calling thinking module form the process_query step 3')

        #next line has a bug, needs work
        #tools_decision_keys_to_extract = ['additional_fields', 'use_metadata_tool', 'd']
        #use_metadata = tool_decision.get("additional_fields", {}).get("use_metadata_tool", False)
        additional_fields = tool_decision.get("additional_fields", {})
        use_metadata = tool_decision.get("use_metadata", False)
        print(f'use_metadata call results: {use_metadata}')
        print(f'additional_fields call results: {additional_fields}')
        #######################################################################################
        print('Execution terminated')
        raise RuntimeError("Stopping execution intentionally.")
        #######################################################################################
        if use_metadata:
            # Define parameters for metadata tool
            metadata_params = self.thinking_module.analyze(
                task="Determine optimal parameters for metadata tool based on the query",
                context=f"Query analysis: {query_analysis}\n"
                       f"Metadata tool capabilities: filter by ratings, aggregate statistics, etc.",
                reflection_depth=1
            )

            # Use metadata tool to get filtered set of reviews
            tool_params = metadata_params.get("additional_fields", {}).get("tool_parameters", {})
            print(f'@@@@@@@@@@@@@@@@@@@@@@@@@@@@tool_params call results: {tool_params}')
            #test: making sure that the metada tool is properly activated
            #test block ends
            filtered_reviews = self.metadata_tool.analyze(tool_params)
            text_analysis = self._analyze_text_corpus(filtered_reviews, query_analysis)
        else:
            # Just analyze the full text corpus
            text_analysis = self._analyze_text_corpus(self.review_corpus, query_analysis)

        # Step 4: Generate final response

        print(f'#####################calling thinking module form the process_query step 4')

        response = self.thinking_module.analyze(
            task="Synthesize findings into a comprehensive response to the user query",
            context=f"Query: {query}\n"
                   f"Analysis results: {text_analysis}\n"
                   f"Was metadata used: {'Yes' if use_metadata else 'No'}\n"
                   f"Thinking process: {query_analysis.get('reasoning', '')}\n"
                   f"Tool decision reasoning: {tool_decision.get('reasoning', '')}",
            reflection_depth=2
        )

        # Return the final response text
        return response.get("conclusion", "I couldn't generate a proper response.")

    def _analyze_text_corpus(self, reviews: List[Dict], query_analysis: Dict) -> Dict[str, Any]:
        """
        Analyze the text content of reviews.

        Args:
            reviews: List of review objects to analyze
            query_analysis: Analysis of the user query to guide text analysis

        Returns:
            Results of the text analysis
        """
        # In a real implementation, this would use NLP techniques
        # appropriate for the query type (sentiment analysis, topic modeling, etc.)

        # Stub implementation
        review_texts = [review.get("Text", "") for review in reviews]

        # Use the thinking module to analyze the reviews based on the query

        print(f'!!!!!!!!!!!!!!!!!!!!!!!!!!calling thinking module from inside the analyze_text_corpus')

        analysis_result = self.thinking_module.analyze(
            task="Analyze review texts to answer the user query",
            context=f"Query analysis: {query_analysis}\n"
                   f"Reviews to analyze: {review_texts} (showing first 5 only)",
            reflection_depth=2
        )

        return analysis_result

'''

'\n        # Step 2: Decide whether to use metadata tool\n        print(f\'#####################step 2: Decide whether to use metadata tool\')\n\n\n        tool_decision = self.thinking_module.analyze(\n            task="Determine if metadata analysis is required or beneficial for this query",\n            context=f"Query analysis: {query_analysis}\n"\n                   f"Available tools: text corpus analysis, metadata analysis tool for numerical data",\n            reflection_depth=2\n        )\n\n        print(f\'+++++++++++++++++++++++++++++tool_decision call results: {tool_decision}\')\n\n        # Step 3: Execute appropriate analysis\n        print(f\'#####################calling thinking module form the process_query step 3\')\n\n        #next line has a bug, needs work\n        #tools_decision_keys_to_extract = [\'additional_fields\', \'use_metadata_tool\', \'d\']\n        #use_metadata = tool_decision.get("additional_fields", {}).get("use_metadata_tool", False)\n        additi

queries (St.John):
Where are pedestrian safety improvements needed?
What can police be doing to make the neighborhood safer?
What can city council prioritize to help St Johns?
What new businesses are needed in St Johns and where?
What issues would Frog Ferry solve?

In [None]:
len(records_stjohn[1000:5400])

4400

In [34]:
# Example usage

# Set your API key
api_key = api_key_anthropic
records = {}
# Sample review corpus (in a real scenario, this would be much larger)
sample_reviews = [
    {"id": 1, "text": "Love this product! Battery life is amazing.", "rating": 5, "verified": True},
    {"id": 2, "text": "Decent product but overpriced for what you get.", "rating": 3, "verified": True},
    {"id": 3, "text": "Terrible quality, broke after one week.", "rating": 1, "verified": True},
    # In reality, you'd have thousands more reviews here
]

# Initialize the agent

agent = ReviewAnalysisAgent(review_corpus=records, api_key=api_key)

# Example queries
queries = [
    "What do customers think about the battery life? assume you have a dataset of the customer reviews about a produc"
    #"Are verified purchasers happier with the product than non-verified ones?",
    #"What are the most common complaints in 1-star reviews?"
]

query_battery = '''
What do customers think about the battery life? assume you have a dataset of the customer reviews about a product
that includes a battery, reviews discuss the product and may include information and opinions about the battery.
every review comes with the metadata including review rating, overall sentiment, geographic region the review comes from
'''

query_stjohn = '''
What issues would 'Frog Ferry' ferry service solve? What customers are saying about the service? Pls only consider 'verified' reviews with 'satisfaction' rating above 5"
'''
response = agent.process_query(query_stjohn)
print(f"#####Response: {response}")

'''
queries_stjohn = [
    #"Where are pedestrian safety improvements needed?",
    #"What can police be doing to make the neighborhood safer?",
    #"What can city council prioritize to help St Johns?",
    #"What new businesses are needed in St Johns and where?",
    #"What issues would Frog Ferry solve?",
    "What issues would 'Frog Ferry' ferry service solve? pls only consider 'verified' reviews with 'satisfaction' rating above 5 "
]

# Process each query
for query in queries_stjohn:
    print(f"\nQuery: {query}")
    response = agent.process_query(query)
    print(f"Response: {response}")
'''

#####################step 1: Understand the query through the thinking module
ANALYZE FUNCTION-RESPONSE: Message(id='msg_01UppMsbWRC5M41yujHmcQHc', content=[TextBlock(citations=None, text='```json\n{\n  "reasoning": "Let me analyze this query step by step:\\n\\n1. The user is asking about \'Frog Ferry\' ferry service, specifically:\\n   - What issues the service would solve\\n   - What customers are saying about the service\\n\\n2. There are specific filtering requirements:\\n   - Only consider \'verified\' reviews\\n   - Only include reviews with \'satisfaction\' rating above 5\\n\\n3. This suggests the user wants:\\n   - Information about the purpose/benefits of the Frog Ferry service\\n   - A filtered analysis of customer reviews based on specific metadata attributes\\n\\n4. The query explicitly mentions filtering by:\\n   - A verification status (likely a boolean or categorical field)\\n   - A numerical satisfaction rating (with a threshold of >5)\\n\\n5. To properly answer this qu

'\nqueries_stjohn = [\n    #"Where are pedestrian safety improvements needed?",\n    #"What can police be doing to make the neighborhood safer?",\n    #"What can city council prioritize to help St Johns?",\n    #"What new businesses are needed in St Johns and where?",\n    #"What issues would Frog Ferry solve?",\n    "What issues would \'Frog Ferry\' ferry service solve? pls only consider \'verified\' reviews with \'satisfaction\' rating above 5 "\n]\n\n# Process each query\nfor query in queries_stjohn:\n    print(f"\nQuery: {query}")\n    response = agent.process_query(query)\n    print(f"Response: {response}")\n'

# **Future features**

In [None]:
#including csv data into the prompt for the thinking module
filtered_data_json = [
    row for row in csv_data
    if row['verified'] == 'Yes' and int(row['satisfaction']) > 5
]

thinking_prompt = f"""<thinking>
Task: {task}

Context: {context}

Data:
{json.dumps(filtered_data_json, indent=2)}

Please think through this step-by-step with {reflection_depth} level(s) of reflection.
[rest of your prompt...]
</thinking>"""

In [None]:
# Persistent API calls using tenacity for retries
!pip install tenacity

import json
import anthropic
from typing import Dict, List, Any, Optional
from tenacity import retry, stop_after_attempt, wait_exponential

class ThinkingModule:
    """Custom module that leverages Claude's capabilities for reflective thinking."""

    def __init__(self, api_key: Optional[str] = None):
        """Initialize the thinking module with the Anthropic API client."""
        self.api_key = api_key  # or os.environ.get("ANTHROPIC_API_KEY")
        self.client = anthropic.Anthropic(api_key=self.api_key)
        self.model = "claude-3-7-sonnet-20250219"  # Using Claude 3.7 Sonnet

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def analyze(self, task: str, context: str, reflection_depth: int = 1) -> Dict[str, Any]:
        """
        Perform reflective thinking using Claude with retries for OverloadedError.

        Args:
            task: The specific thinking task to perform
            context: Relevant context for the thinking task
            reflection_depth: How many levels of reflection to perform (1-3)

        Returns:
            Dict containing the analysis results
        """
        # Build the prompt for Claude
        prompt = f"""<thinking>
Task: {task}

Context:
{context}

Please think through this step-by-step with {reflection_depth} level(s) of reflection.
Provide your analysis in JSON format with these fields:
- reasoning: Your step-by-step reasoning process
- conclusion: A concise summary of your conclusion
- confidence: A number from 0-1 indicating your confidence
- additional_fields: Any task-specific outputs needed
</thinking>"""

        # Call Claude API
        response = self.client.messages.create(
            model=self.model,
            max_tokens=2000,
            temperature=0.2,  # Low temperature for more deterministic thinking
            system="You are an expert analytical assistant. When asked to think about a problem, you break it down methodically and provide clear, structured analysis. Your output should always be valid JSON when requested.",
            messages=[
                {"role": "user", "content": prompt}
            ]
        )

        # Extract and parse JSON response
        try:
            # Find JSON in the response content
            content = response.content[0].text

            print(f"Claude's response - analyze function: {content}")

            # Extract JSON part (assuming it's properly formatted)
            json_str = content
            if "