# AI Agent with Knowledge Base

### Installing required libraries and importing them

In [None]:
#Colab-specific pip installs
!pip install groq
!pip install tavily-python
!pip install google.colab

Collecting groq
  Downloading groq-0.25.0-py3-none-any.whl.metadata (15 kB)
Downloading groq-0.25.0-py3-none-any.whl (129 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/129.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m129.4/129.4 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: groq
Successfully installed groq-0.25.0
Collecting tavily-python
  Downloading tavily_python-0.7.3-py3-none-any.whl.metadata (7.0 kB)
Downloading tavily_python-0.7.3-py3-none-any.whl (15 kB)
Installing collected packages: tavily-python
Successfully installed tavily-python-0.7.3
Collecting jedi>=0.16 (from ipython==7.34.0->google.colab)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected pack

In [None]:
# Standard library imports
import datetime  # for handling dates and times
import requests  # for HTTP requests to external APIs
from zoneinfo import ZoneInfo  # for timezone-aware datetime operations
from abc import ABC, abstractmethod  # for defining abstract base classes
import json  # for JSON serialization/deserialization
import ast  # for safe literal evaluation of Python expressions
import operator  # for mapping numeric operations to functions

from groq import Groq # Groq LLM client
from google.colab import userdata

# Retrieve API key for Groq from Colab userdata storage
groq_api_key = userdata.get("GROQ_API_KEY")

### Defining the tools

In [None]:
# Abstract class to define any tool
class Tool(ABC):
    @abstractmethod
    def name(self) -> str:
      # Return the unique name of the tool
        pass

    @abstractmethod
    def description(self) -> str:
      # Return a brief description of the tool's functionality
        pass

    @abstractmethod
    def use(self, **kwargs):
      # Execute the tool with given keyword arguments and return the result
        pass

# Tool gives the current time of a given timezone
class TimeTool(Tool):
    def name(self):
        return "Time Tool"

    def description(self):
        return ("Provides the current time for a given city's timezone "
                "(e.g., Asia/Kolkata, America/New_York). If no timezone is provided, returns local time.")

    def use(self, **kwargs):
        fmt = "%Y-%m-%d %H:%M:%S %Z%z"
        current_time = datetime.datetime.now()
        timezone = kwargs.get("timezone", "")
        if timezone:
            try:
                current_time = current_time.astimezone(ZoneInfo(timezone))
            except Exception as e:
                return f"Invalid timezone provided: {timezone}. Error: {str(e)}"
        return f"The current time is {current_time.strftime(fmt)}."


# Tool: Fetches weather information using OpenWeatherMap API
class WeatherTool(Tool):
    def name(self):
        return "Weather Tool"

    def description(self):
        return "Provides weather information for a given location."

    def use(self, **kwargs):
      # Extract and validate location parameter
        location = kwargs.get("location", "")
        if not location:
            return "Error: Please provide a location."

        # Retrieve OpenWeather API key
        weather_api_key = userdata.get("OPEN_WEATHER_KEY")
        # Make HTTP request to OpenWeatherMap
        try:
            url = f"http://api.openweathermap.org/data/2.5/weather?q={location}&appid={weather_api_key}&units=metric"
            response = requests.get(url)
            # Parse weather data
            data = response.json()
            if data.get("cod") == 200:
                temp = data["main"]["temp"]
                description = data["weather"][0]["description"]
                return f"The weather in {location} is currently {description} with a temperature of {temp}°C."
            else:
                return f"Sorry, I couldn't find weather information for {location}."
        except:
            # Handle missing or malformed data
            return f"Sorry, I couldn't find weather information for {location}."

# Tool: Performs basic arithmetic and comparison operations
class CalculatorTool(Tool):
    def name(self):
        return "Calculator Tool"

    def description(self):
        # List supported operations for users
        return ("Performs numeric operations given a JSON string with keys 'num1', 'num2', and 'operation'. "
                "Supported operations: add, subtract, multiply, divide, floor_divide, modulus, power, "
                "lt, le, eq, ne, ge, gt.")

    def use(self, **kwargs):
        # Read JSON input specifying numbers and operation
        input_str = kwargs.get("input", "")
        if not input_str:
            return "Error: No input provided. Please provide a JSON string with 'num1', 'num2', and 'operation'."
        try:
            # Safely parse JSON string
            input_str_clean = input_str.replace("'", "\"").strip().strip("\"")
            input_dict = json.loads(input_str_clean)
            num1 = input_dict['num1']
            num2 = input_dict['num2']
            operation = input_dict['operation']
        except json.JSONDecodeError as e:
            return f"Invalid JSON format: {str(e)}"
        except KeyError as e:
            return f"Missing required field: {e}"
        except Exception as e:
            return f"Input error: {str(e)}"

        # Supported numeric operations mapping
        operations = {
            'add': operator.add,
            'subtract': operator.sub,
            'multiply': operator.mul,
            'divide': operator.truediv,
            'floor_divide': operator.floordiv,
            'modulus': operator.mod,
            'power': operator.pow,
            'lt': operator.lt,
            'le': operator.le,
            'eq': operator.eq,
            'ne': operator.ne,
            'ge': operator.ge,
            'gt': operator.gt
        }

        if operation not in operations:
            return f"Unsupported operation. Valid operations: {', '.join(operations.keys())}"

        try:
            # Perform the selected operation
            result = operations[operation](num1, num2)
            return f"The result of {operation} on {num1} and {num2} is {result}."
        except Exception as e:
             # Handle potential math errors (e.g., division by zero)
            return f"Calculation error: {str(e)}"

# Tool helps to solve transistor problems
class TransistorTool(Tool):
    def name(self):
        return "Transistor Tool"

    def description(self):
        return (
            "Solves basic transistor problems. Accepts input with base current (Ib) and current gain (beta). "
            "Ib can be in mA or A. Calculates collector current (Ic) using Ic = beta * Ib."
        )

    def use(self, **kwargs):
        try:
            # Parse base current and beta from kwargs
            ib_raw = kwargs.get("Ib", None)
            beta = kwargs.get("beta", None)

            if ib_raw is None or beta is None:
                return "Error: Please provide both 'Ib' (base current) and 'beta' (current gain)."

            # Convert Ib to amperes if user specified mA
            if isinstance(ib_raw, str):
                if "mA" in ib_raw:
                    ib = float(ib_raw.replace("mA", "").strip()) / 1000  # convert to A
                elif "A" in ib_raw:
                    ib = float(ib_raw.replace("A", "").strip())
                else:
                    ib = float(ib_raw)
            else:
                ib = float(ib_raw)

            beta = float(beta)
            ic = beta * ib  # Ic in A
            ic_mA = ic * 1000  # convert to mA for output

            return (
                f"For Ib = {ib*1000:.3f} mA and β = {beta:.2f}, the collector current Ic = {ic_mA:.3f} mA "
                f"(or {ic:.6f} A)."
            )
        except Exception as e:
            return f"Error in transistor calculation: {str(e)}"

# Tool that searches the internet for a given input query
# Tool: Performs web searches using Tavily API
class WebSearchTool(Tool):
    def name(self):
        return "Web search tool"

    def description(self):
        return ("Searches the web for results for a given query. Uses the Tavily API. "
                "Perform a web search using the Tavily API based on the input query. "
                "Parameters: query (str): The search query. Example: 'latest news on AI'. "
                "Returns: str: A concise answer to the query or an error message.")

    def use(self, *args, **kwargs):
        from tavily import TavilyClient  # Import Tavily client
        # Extract query parameter or positional argument
        query = kwargs.get("query", "") or (args[0] if args else "")
        tavily_api_key = userdata.get("TAVILY_API_KEY")
        if not tavily_api_key:
            return "Error: Tavily API key is missing. Please set it using userdata.set()."

        try:
            # Initialize Tavily client with API key from environment
            client = TavilyClient(api_key=tavily_api_key)

            # Perform the web search
            response = client.search(query=query, include_answer=True)

            # Extract and return results
            if not response.get('answer'):
                return f"No results found for the query: {query}."

            return f"Search results for '{query}':\n{response['answer']}"

        except Exception as e:
            return f"Search failed: {str(e)}"

### Defining the Knowledge Base

In [None]:
# Knowledge base for various types of tasks
kb = {
    "math": {
        "general_strategy": "Break the problem into smaller parts and use basic arithmetic rules.",
        "alternative_methods": ["Use number lines", "Decompose numbers into smaller parts"]
    },
    "cricket_captains": {
        "source": "https://www.espncricinfo.com/",
        "strategy": "Look for structured sports databases instead of random searches."
    },
    "transistors": {
        "general_strategy": "Identify the transistor type (NPN or PNP), use the standard formulas like Ic = β * Ib, and apply Kirchhoff's laws where needed.",
        "source": "https://www.electronics-tutorials.ws/transistor/tran_1.html"
    }
}

# Defining methods to access the knowledge base
class Knowledge:
    def __init__(self):
        # Load initial KB data defined elsewhere in this script
        self.data = kb

    def get_strategy(self, topic):
        # Retrieve general strategy for a topic
        return self.data.get(topic, {}).get("general_strategy", "No strategy found.")

    def get_source(self, topic):
        # Retrieve source/reference for a topic
        return self.data.get(topic, {}).get("source", "No source found.")


### Defining the Agent

In [None]:
# Main agent class that orchestrates tools and LLM planning
class Agent:
    def __init__(self):
        # Initialize available tools, memory, and KB
        self.tools = []
        self.memory = []  # Conversation memory buffer
        self.max_memory = 10  # Max messages to retain
        self.knowledge = Knowledge()
        self.client = Groq(api_key=groq_api_key) # Groq LLM client

    def add_tool(self, tool: Tool):
        # Register a new tool for agent use
        self.tools.append(tool)

     # Parses JSON strings returned by the LLM and dispatches to the correct tool
    def json_parser(self, input_string):
        try:
            python_dict = ast.literal_eval(input_string)
            json_string = json.dumps(python_dict)
            json_dict = json.loads(json_string)
            if isinstance(json_dict, dict):
                return json_dict
            else:
                raise ValueError("Parsed object is not a dictionary")
        except (ValueError, SyntaxError, json.JSONDecodeError) as e:
            raise ValueError(f"Invalid JSON response: {e}")

    def build_functions_spec(self):
        functions = []
        for tool in self.tools:
            func_spec = {
                "name": tool.name(),
                "description": tool.description(),
                "parameters": {
                    "type": "object",
                    "properties": {},
                    "required": []
                }
            }
            if isinstance(tool, TimeTool):
                func_spec["parameters"]["properties"]["timezone"] = {
                    "type": "string",
                    "description": "Timezone in IANA format (e.g., 'Asia/Kolkata'). Provide empty string for local time."
                }
                func_spec["parameters"]["required"] = ["timezone"]
            elif isinstance(tool, WeatherTool):
                func_spec["parameters"]["properties"]["location"] = {
                    "type": "string",
                    "description": "Location for which to fetch weather information."
                }
                func_spec["parameters"]["required"] = ["location"]
            elif isinstance(tool, CalculatorTool):
                func_spec["parameters"]["properties"]["input"] = {
                    "type": "string",
                    "description": ("A JSON string with keys 'num1', 'num2', and 'operation'. "
                                    "Example: '{\"num1\": 4, \"num2\": 6, \"operation\": \"add\"}'")
                }
                func_spec["parameters"]["required"] = ["input"]
            elif isinstance(tool, WebSearchTool):
                func_spec["parameters"]["properties"]["query"] = {
                    "type": "string",
                    "description": "The search query string (e.g., 'latest AI news')."
                }
                func_spec["parameters"]["required"] = ["query"]
            elif isinstance(tool, TransistorTool):
                func_spec["parameters"]["properties"]["Ib"] = {
                    "type": "number",
                    "description": "Base current (Ib) in milliamperes (mA)."
                }
                func_spec["parameters"]["properties"]["beta"] = {
                    "type": "number",
                    "description": "Current gain (β) of the transistor."
                }
                func_spec["parameters"]["required"] = ["Ib", "beta"]

            functions.append(func_spec)
        return functions

    # Send queries and recieve answer from llm
    def query_llm(self, prompt):
        model_to_use = "llama-3.3-70b-versatile"  # Change as needed
        # ... build messages with memory and KB strategies ...
        # Send to Groq client
        functions = self.build_functions_spec()
        completion = self.client.chat.completions.create(
            model=model_to_use,
            messages=[{"role": "user", "content": prompt}],
            functions=functions,
            function_call="auto"
        )
        message = completion.choices[0].message

        # Check if the LLM decided to call a function using attributes
        if hasattr(message, "function_call") and message.function_call:
            func_call = message.function_call
            function_name = func_call.name
            try:
                arguments = json.loads(func_call.arguments)
            except Exception:
                arguments = {}
            return {"action": function_name, "args": arguments}
        else:
            try:
                response_dict = self.json_parser(message.content)
                return response_dict
            except Exception as e:
                return {"action": "respond_to_user", "args": message.content}

    # Top-level user input handler
    # ... update memory, call LLM, parse function call, delegate to tools ...
    def process_input(self, user_input):
        self.memory.append(f"User: {user_input}")
        self.memory = self.memory[-self.max_memory:]
        context = "\n".join(self.memory)
        tool_descriptions = "\n".join([f"- {tool.name()}: {tool.description()}" for tool in self.tools])
        kb_str = json.dumps(self.knowledge.data)
        response_format = {"action": "", "args": ""}

        prompt = f"""Context:
            {context}

            Available tools:
            {tool_descriptions}

            Knowledge base:
            {kb_str}

            Based on the user's input and context, decide if you should use a tool or respond directly.
            If you choose to use a tool, output a JSON object with keys "action" (the tool name) and "args" (the arguments for the tool).
            If you decide to respond directly to the user, set "action" to "respond_to_user" and put your answer in "args".
            Response Format:
            {json.dumps(response_format)}
        """
        response_dict = self.query_llm(prompt)
        self.memory.append(f"Agent: {response_dict}")

        if response_dict["action"].lower() == "respond_to_user":
            return response_dict["args"]
        else:
            for tool in self.tools:
                if tool.name().lower() == response_dict["action"].lower():
                    if isinstance(response_dict["args"], str):
                      return tool.use(query=response_dict["args"])
                    else:
                      return tool.use(**response_dict["args"])

            return f"Error: No matching tool found for action '{response_dict['action']}'."

    # Control starts here
    def run(self):
         # Entry point for interactive loop
        print("LLM Agent: Hello! How can I assist you today?")
        user_input = input("You: ")
        while True:
            if user_input.lower() in ["exit", "bye", "close"]:
                print("See you later!")
                break
            response = self.process_input(user_input)
            print("Response from Agent:", response)
            user_input = input("You: ")


### Running the Agent

In [None]:
# Main function to bootstrap the agent and register tools
def main():
    agent = Agent()
    # Add available tools
    agent.add_tool(TimeTool())
    agent.add_tool(WeatherTool())
    agent.add_tool(CalculatorTool())
    agent.add_tool(TransistorTool())
    agent.run()

if __name__ == "__main__":
    main()

LLM Agent: Hello! How can I assist you today?
You: If the collector current changes from 2 mA to 3mA in a transistor when collector-emitter voltage is increased from 2V to 10V, what is the output resistance ?
Response from Agent: The result of divide on 8 and 0.001 is 8000.0.
You: exit
See you later!
