#### Install Required Packages

In [None]:
# Install all required packages
!pip install openai python-dotenv flask nest_asyncio requests beautifulsoup4 duckduckgo-search psutil pillow numpy pydantic typing-extensions flask-cors

Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting duckduckgo-search
  Downloading duckduckgo_search-8.1.1-py3-none-any.whl.metadata (16 kB)
Collecting flask-cors
  Downloading flask_cors-6.0.1-py3-none-any.whl.metadata (5.3 kB)
Collecting primp>=0.15.0 (from duckduckgo-search)
  Downloading primp-0.15.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Downloading duckduckgo_search-8.1.1-py3-none-any.whl (18 kB)
Downloading flask_cors-6.0.1-py3-none-any.whl (13 kB)
Downloading primp-0.15.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m25.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: python-dotenv, primp, duckduckgo-search, flask-cors
Successfully installed duckduckgo-search-8.1.1 flask-cors-6.0.1 primp-0.15.0 python-dotenv-1.1.1


#### Importing all Libraries

In [None]:
# Import required libraries
import os
import sys
import json
import asyncio
import logging
import time
import subprocess
import tempfile
import shutil
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
from abc import ABC, abstractmethod
from functools import wraps
import threading
import io
import contextlib

# Install nest_asyncio to handle async in Colab
import nest_asyncio
nest_asyncio.apply()

print("✅ All dependencies installed successfully!")

✅ All dependencies installed successfully!


#### Config and Setting

In [None]:
import getpass
from dotenv import load_dotenv

# Secure API key input
if 'OPENAI_API_KEY' not in os.environ:
    api_key = getpass.getpass("🔑 Enter your OpenAI API Key: ")
    os.environ["OPENAI_API_KEY"] = api_key

class Settings:
    """Configuration settings for Colab environment"""

    # API Configuration
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
    OPENAI_MODEL = "gpt-3.5-turbo"  # Using 3.5-turbo for better rate limits in Colab

    # Application Configuration
    DEBUG = True
    LOG_LEVEL = "INFO"
    HOST = "0.0.0.0"  # Colab compatibility
    PORT = 5000

    # Tool Configuration
    ENABLE_FILE_SYSTEM_TOOL = True
    ENABLE_WEB_SEARCH_TOOL = True
    ENABLE_SCRIPT_EXECUTOR = True
    ENABLE_COLAB_TOOLS = True  # Special Colab-specific tools

    # Security Settings
    MAX_REQUESTS_PER_MINUTE = 30  # Lower for Colab
    MAX_FILE_SIZE = 5 * 1024 * 1024  # 5MB for Colab

    ALLOWED_FILE_EXTENSIONS = {
        '.txt', '.json', '.csv', '.py', '.md', '.yaml', '.yml',
        '.xml', '.html', '.css', '.js', '.log', '.ipynb'
    }

    # Colab-specific directories
    BASE_DIR = "/content"
    TEMP_DIR = "/content/temp_files"
    DATA_DIR = "/content/data"
    LOGS_DIR = "/content/logs"

    # Script execution settings
    SCRIPT_TIMEOUT = 30

    @classmethod
    def setup_directories(cls):
        """Create required directories"""
        directories = [cls.TEMP_DIR, cls.DATA_DIR, cls.LOGS_DIR]
        for directory in directories:
            os.makedirs(directory, exist_ok=True)
        print(f"✅ Created directories: {directories}")

    @classmethod
    def mount_drive(cls):
        """Mount Google Drive for persistent storage"""
        try:
            from google.colab import drive
            drive.mount('/content/drive', force_remount=True)
            cls.DRIVE_DIR = "/content/drive/My Drive/ChatbotData"
            os.makedirs(cls.DRIVE_DIR, exist_ok=True)
            print("✅ Google Drive mounted successfully!")
            return True
        except Exception as e:
            print(f"⚠️ Could not mount Google Drive: {e}")
            return False

# Initialize settings
settings = Settings()
settings.setup_directories()

# Try to mount Google Drive for persistence
drive_mounted = settings.mount_drive()

print("⚙️ Configuration completed!")


🔑 Enter your OpenAI API Key: ··········
✅ Created directories: ['/content/temp_files', '/content/data', '/content/logs']
Mounted at /content/drive
✅ Google Drive mounted successfully!
⚙️ Configuration completed!


#### Utility and Function Helper

In [None]:
# Custom Exceptions
class ChatbotError(Exception):
    """Base exception for chatbot errors"""
    pass

class ToolError(ChatbotError):
    """Base exception for tool-related errors"""
    pass

class FileSystemError(ToolError):
    """File system operation errors"""
    pass

class WebSearchError(ToolError):
    """Web search errors"""
    pass

class ScriptExecutionError(ToolError):
    """Script execution errors"""
    pass

# Helper Functions
def ensure_directory_exists(directory_path: str) -> None:
    """Ensure directory exists"""
    os.makedirs(directory_path, exist_ok=True)

def is_safe_path(base_path: str, user_path: str) -> bool:
    """Check if path is safe"""
    try:
        base_path = os.path.abspath(base_path)
        user_path = os.path.abspath(os.path.join(base_path, user_path))
        return user_path.startswith(base_path)
    except Exception:
        return False

def sanitize_filename(filename: str) -> str:
    """Sanitize filename"""
    dangerous_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
    sanitized = filename

    for char in dangerous_chars:
        sanitized = sanitized.replace(char, '_')

    sanitized = sanitized.strip(' .')

    if not sanitized:
        sanitized = f"file_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

    return sanitized

def format_file_size(size_bytes: int) -> str:
    """Format file size in human readable format"""
    if size_bytes == 0:
        return "0 B"

    size_names = ["B", "KB", "MB", "GB", "TB"]
    import math
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return f"{s} {size_names[i]}"

class ResponseFormatter:
    """Format responses consistently"""

    @staticmethod
    def success(message: str, data: Optional[Any] = None) -> Dict:
        response = {
            'status': 'success',
            'message': message,
            'timestamp': datetime.now().isoformat()
        }
        if data is not None:
            response['data'] = data
        return response

    @staticmethod
    def error(message: str, error_code: Optional[str] = None) -> Dict:
        response = {
            'status': 'error',
            'message': message,
            'timestamp': datetime.now().isoformat()
        }
        if error_code:
            response['error_code'] = error_code
        return response

print("🛠️ Utility functions loaded!")

🛠️ Utility functions loaded!


#### Base Tool Framework

In [None]:
class BaseTool(ABC):
    """Abstract base class for all tools"""

    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
        self.is_enabled = True
        self.usage_count = 0

        print(f"🔧 Initialized tool: {name}")

    @abstractmethod
    async def execute(self, **kwargs) -> Dict[str, Any]:
        """Execute the tool with given parameters"""
        pass

    @abstractmethod
    def get_parameters(self) -> Dict[str, Any]:
        """Get OpenAI function calling parameters schema"""
        pass

    def to_openai_tool(self) -> Dict[str, Any]:
        """Convert to OpenAI tool format"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.get_parameters()
            }
        }

    async def safe_execute(self, **kwargs) -> Dict[str, Any]:
        """Safely execute tool with error handling"""
        if not self.is_enabled:
            return ResponseFormatter.error(f"Tool '{self.name}' is disabled")

        try:
            self.usage_count += 1
            print(f"⚡ Executing tool '{self.name}' (usage #{self.usage_count})")

            result = await self.execute(**kwargs)
            return result

        except Exception as e:
            print(f"❌ Tool '{self.name}' execution failed: {e}")
            return ResponseFormatter.error(f"Tool execution failed: {str(e)}")

print("🏗️ Base tool framework ready!")

🏗️ Base tool framework ready!


#### File System Tool Implementation

In [None]:
# Cell 5: File System Tool (Colab Optimized)

class FileSystemTool(BaseTool):
    """File system operations tool optimized for Colab"""

    def __init__(self):
        super().__init__(
            name="file_system",
            description="Manage files and directories in Colab environment with Google Drive support"
        )

    def get_parameters(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "description": "File system action",
                    "enum": [
                        "create_file", "read_file", "write_file", "delete_file",
                        "list_directory", "create_directory", "copy_file", "move_file",
                        "get_file_info", "search_files", "save_to_drive", "load_from_drive"
                    ]
                },
                "path": {"type": "string", "description": "File or directory path"},
                "content": {"type": "string", "description": "File content"},
                "destination": {"type": "string", "description": "Destination path"},
                "pattern": {"type": "string", "description": "Search pattern"},
                "drive_path": {"type": "string", "description": "Google Drive path"}
            },
            "required": ["action"]
        }

    async def execute(self, **kwargs) -> Dict[str, Any]:
        action = kwargs.get('action')

        try:
            if action == "create_file":
                return await self._create_file(kwargs.get('path', ''), kwargs.get('content', ''))
            elif action == "read_file":
                return await self._read_file(kwargs.get('path', ''))
            elif action == "write_file":
                return await self._write_file(kwargs.get('path', ''), kwargs.get('content', ''))
            elif action == "delete_file":
                return await self._delete_file(kwargs.get('path', ''))
            elif action == "list_directory":
                return await self._list_directory(kwargs.get('path', '.'))
            elif action == "create_directory":
                return await self._create_directory(kwargs.get('path', ''))
            elif action == "copy_file":
                return await self._copy_file(kwargs.get('path', ''), kwargs.get('destination', ''))
            elif action == "move_file":
                return await self._move_file(kwargs.get('path', ''), kwargs.get('destination', ''))
            elif action == "get_file_info":
                return await self._get_file_info(kwargs.get('path', ''))
            elif action == "search_files":
                return await self._search_files(kwargs.get('path', '.'), kwargs.get('pattern', '*'))
            elif action == "save_to_drive":
                return await self._save_to_drive(kwargs.get('path', ''), kwargs.get('drive_path', ''))
            elif action == "load_from_drive":
                return await self._load_from_drive(kwargs.get('drive_path', ''), kwargs.get('path', ''))
            else:
                return ResponseFormatter.error(f"Unknown action: {action}")

        except Exception as e:
            raise FileSystemError(f"File system operation failed: {e}")

    async def _create_file(self, path: str, content: str = '') -> Dict[str, Any]:
        """Create a new file"""
        try:
            if not path.startswith('/content/'):
                path = os.path.join('/content', path)

            path = sanitize_filename(os.path.basename(path))
            full_path = os.path.join('/content', path)

            os.makedirs(os.path.dirname(full_path), exist_ok=True)

            with open(full_path, 'w', encoding='utf-8') as f:
                f.write(content)

            file_size = os.path.getsize(full_path)

            return ResponseFormatter.success(
                f"File created: {path}",
                {
                    "path": full_path,
                    "size": format_file_size(file_size),
                    "content_length": len(content)
                }
            )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to create file: {e}")

    async def _read_file(self, path: str) -> Dict[str, Any]:
        """Read file contents"""
        try:
            if not path.startswith('/content/'):
                path = os.path.join('/content', path)

            if not os.path.exists(path):
                return ResponseFormatter.error(f"File not found: {path}")

            file_size = os.path.getsize(path)
            if file_size > settings.MAX_FILE_SIZE:
                return ResponseFormatter.error(f"File too large: {format_file_size(file_size)}")

            with open(path, 'r', encoding='utf-8') as f:
                content = f.read()

            return ResponseFormatter.success(
                f"File read: {os.path.basename(path)}",
                {
                    "path": path,
                    "content": content,
                    "size": format_file_size(file_size),
                    "lines": len(content.splitlines())
                }
            )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to read file: {e}")

    async def _write_file(self, path: str, content: str) -> Dict[str, Any]:
        """Write content to file"""
        try:
            if not path.startswith('/content/'):
                path = os.path.join('/content', path)

            if len(content.encode('utf-8')) > settings.MAX_FILE_SIZE:
                return ResponseFormatter.error("Content too large")

            os.makedirs(os.path.dirname(path), exist_ok=True)

            with open(path, 'w', encoding='utf-8') as f:
                f.write(content)

            file_size = os.path.getsize(path)

            return ResponseFormatter.success(
                f"File written: {os.path.basename(path)}",
                {
                    "path": path,
                    "size": format_file_size(file_size),
                    "content_length": len(content)
                }
            )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to write file: {e}")

    async def _list_directory(self, path: str) -> Dict[str, Any]:
        """List directory contents"""
        try:
            if not path.startswith('/content/'):
                path = os.path.join('/content', path)

            if not os.path.exists(path):
                return ResponseFormatter.error(f"Directory not found: {path}")

            items = []
            total_size = 0

            for item in sorted(os.listdir(path)):
                item_path = os.path.join(path, item)

                try:
                    stat = os.stat(item_path)
                    is_dir = os.path.isdir(item_path)
                    size = 0 if is_dir else stat.st_size
                    total_size += size

                    items.append({
                        "name": item,
                        "type": "directory" if is_dir else "file",
                        "size": format_file_size(size),
                        "modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
                    })
                except (OSError, IOError):
                    continue

            return ResponseFormatter.success(
                f"Directory listing for: {path}",
                {
                    "path": path,
                    "items": items,
                    "total_items": len(items),
                    "total_size": format_file_size(total_size)
                }
            )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to list directory: {e}")

    async def _save_to_drive(self, local_path: str, drive_path: str) -> Dict[str, Any]:
        """Save file to Google Drive"""
        if not drive_mounted:
            return ResponseFormatter.error("Google Drive not mounted")

        try:
            if not local_path.startswith('/content/'):
                local_path = os.path.join('/content', local_path)

            drive_full_path = os.path.join(settings.DRIVE_DIR, drive_path)
            os.makedirs(os.path.dirname(drive_full_path), exist_ok=True)

            shutil.copy2(local_path, drive_full_path)

            return ResponseFormatter.success(
                f"File saved to Drive: {drive_path}",
                {"local_path": local_path, "drive_path": drive_full_path}
            )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to save to Drive: {e}")

    async def _load_from_drive(self, drive_path: str, local_path: str) -> Dict[str, Any]:
        """Load file from Google Drive"""
        if not drive_mounted:
            return ResponseFormatter.error("Google Drive not mounted")

        try:
            drive_full_path = os.path.join(settings.DRIVE_DIR, drive_path)

            if not local_path.startswith('/content/'):
                local_path = os.path.join('/content', local_path)

            os.makedirs(os.path.dirname(local_path), exist_ok=True)
            shutil.copy2(drive_full_path, local_path)

            return ResponseFormatter.success(
                f"File loaded from Drive: {drive_path}",
                {"drive_path": drive_full_path, "local_path": local_path}
            )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to load from Drive: {e}")

print("📁 File System Tool ready!")

📁 File System Tool ready!


#### Web Search Tool Implementation

In [None]:
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS

class WebSearchTool(BaseTool):
    """Web search tool using DuckDuckGo"""

    def __init__(self):
        super().__init__(
            name="web_search",
            description="Search the web and fetch content from URLs using DuckDuckGo"
        )

    def get_parameters(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "description": "Search action",
                    "enum": ["search", "fetch_url", "search_news"]
                },
                "query": {"type": "string", "description": "Search query"},
                "url": {"type": "string", "description": "URL to fetch"},
                "max_results": {"type": "integer", "description": "Maximum results (1-10)"}
            },
            "required": ["action"]
        }

    async def execute(self, **kwargs) -> Dict[str, Any]:
        action = kwargs.get('action')

        try:
            if action == "search":
                return await self._search_web(
                    kwargs.get('query', ''),
                    kwargs.get('max_results', 5)
                )
            elif action == "fetch_url":
                return await self._fetch_url(kwargs.get('url', ''))
            elif action == "search_news":
                return await self._search_news(
                    kwargs.get('query', ''),
                    kwargs.get('max_results', 3)
                )
            else:
                return ResponseFormatter.error(f"Unknown action: {action}")

        except Exception as e:
            raise WebSearchError(f"Web search failed: {e}")

    async def _search_web(self, query: str, max_results: int) -> Dict[str, Any]:
        """Search the web using DuckDuckGo"""
        try:
            if not query:
                return ResponseFormatter.error("Search query is required")

            results = []

            with DDGS() as ddgs:
                search_results = ddgs.text(query, max_results=max_results)

                for result in search_results:
                    results.append({
                        "title": result.get('title', ''),
                        "url": result.get('href', ''),
                        "snippet": result.get('body', '')[:200] + "..." if len(result.get('body', '')) > 200 else result.get('body', ''),
                        "published": result.get('published', '')
                    })

            return ResponseFormatter.success(
                f"Found {len(results)} search results for '{query}'",
                {"query": query, "results": results}
            )

        except Exception as e:
            return ResponseFormatter.error(f"Web search failed: {str(e)}")

    async def _fetch_url(self, url: str) -> Dict[str, Any]:
        """Fetch content from a URL"""
        try:
            if not url or not url.startswith(('http://', 'https://')):
                return ResponseFormatter.error("Invalid URL provided")

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }

            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()

            content_type = response.headers.get('content-type', '').lower()

            if 'html' in content_type:
                soup = BeautifulSoup(response.content, 'html.parser')

                # Remove script and style elements
                for script in soup(["script", "style", "nav", "footer"]):
                    script.decompose()

                title = soup.find('title')
                title_text = title.get_text().strip() if title else "No title"

                # Extract main content
                body = soup.find('body')
                if body:
                    text = body.get_text()
                    lines = (line.strip() for line in text.splitlines())
                    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
                    clean_text = ' '.join(chunk for chunk in chunks if chunk)
                else:
                    clean_text = soup.get_text()

                # Limit content length
                if len(clean_text) > 2000:
                    clean_text = clean_text[:2000] + "..."

                return ResponseFormatter.success(
                    f"Content fetched from {url}",
                    {
                        "url": url,
                        "title": title_text,
                        "content": clean_text,
                        "content_type": content_type,
                        "word_count": len(clean_text.split())
                    }
                )

            elif 'json' in content_type:
                return ResponseFormatter.success(
                    f"JSON content from {url}",
                    {"url": url, "content": response.json(), "content_type": content_type}
                )

            else:
                content = response.text[:2000]
                return ResponseFormatter.success(
                    f"Content from {url}",
                    {"url": url, "content": content, "content_type": content_type}
                )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to fetch URL: {str(e)}")

    async def _search_news(self, query: str, max_results: int) -> Dict[str, Any]:
        """Search for news using DuckDuckGo"""
        try:
            results = []

            with DDGS() as ddgs:
                news_results = ddgs.news(query, max_results=max_results)

                for result in news_results:
                    results.append({
                        "title": result.get('title', ''),
                        "url": result.get('url', ''),
                        "snippet": result.get('body', ''),
                        "published": result.get('date', ''),
                        "source": result.get('source', '')
                    })

            return ResponseFormatter.success(
                f"Found {len(results)} news results for '{query}'",
                {"query": query, "results": results, "type": "news"}
            )

        except Exception as e:
            return ResponseFormatter.error(f"News search failed: {str(e)}")

print("🌐 Web Search Tool ready!")

🌐 Web Search Tool ready!


#### Script Executor Tool Implementation

In [None]:
class ScriptExecutor(BaseTool):
    """Safe script execution tool for Colab"""

    def __init__(self):
        super().__init__(
            name="script_executor",
            description="Execute Python code and safe system commands in Colab environment"
        )

    def get_parameters(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "description": "Execution action",
                    "enum": ["run_python", "run_shell", "install_package", "get_system_info"]
                },
                "code": {"type": "string", "description": "Python code to execute"},
                "command": {"type": "string", "description": "Shell command to execute"},
                "package": {"type": "string", "description": "Package name to install"},
                "timeout": {"type": "integer", "description": "Execution timeout"}
            },
            "required": ["action"]
        }

    async def execute(self, **kwargs) -> Dict[str, Any]:
        action = kwargs.get('action')

        try:
            if action == "run_python":
                return await self._run_python(
                    kwargs.get('code', ''),
                    kwargs.get('timeout', settings.SCRIPT_TIMEOUT)
                )
            elif action == "run_shell":
                return await self._run_shell(
                    kwargs.get('command', ''),
                    kwargs.get('timeout', settings.SCRIPT_TIMEOUT)
                )
            elif action == "install_package":
                return await self._install_package(kwargs.get('package', ''))
            elif action == "get_system_info":
                return await self._get_system_info()
            else:
                return ResponseFormatter.error(f"Unknown action: {action}")

        except Exception as e:
            raise ScriptExecutionError(f"Script execution failed: {e}")

    async def _run_python(self, code: str, timeout: int) -> Dict[str, Any]:
        """Execute Python code safely"""
        try:
            if not code.strip():
                return ResponseFormatter.error("No Python code provided")

            # Security check - block dangerous imports and operations
            dangerous_patterns = [
                'import subprocess', 'import os', '__import__',
                'exec(', 'eval(', 'compile(', 'open(',
                'file(', 'input(', 'raw_input('
            ]

            code_lower = code.lower()
            for pattern in dangerous_patterns:
                if pattern in code_lower:
                    return ResponseFormatter.error(f"Security restriction: '{pattern}' not allowed")

            # Capture output
            old_stdout = sys.stdout
            old_stderr = sys.stderr
            stdout_capture = io.StringIO()
            stderr_capture = io.StringIO()

            try:
                sys.stdout = stdout_capture
                sys.stderr = stderr_capture

                # Create a restricted execution environment
                restricted_globals = {
                    '__builtins__': {
                        'print': print,
                        'len': len,
                        'range': range,
                        'str': str,
                        'int': int,
                        'float': float,
                        'list': list,
                        'dict': dict,
                        'tuple': tuple,
                        'set': set,
                        'abs': abs,
                        'min': min,
                        'max': max,
                        'sum': sum,
                        'sorted': sorted,
                        'reversed': reversed,
                        'enumerate': enumerate,
                        'zip': zip,
                        'map': map,
                        'filter': filter,
                    },
                    'math': __import__('math'),
                    'json': __import__('json'),
                    'datetime': __import__('datetime'),
                    'random': __import__('random'),
                }

                exec(code, restricted_globals, {})

                stdout_output = stdout_capture.getvalue()
                stderr_output = stderr_capture.getvalue()

                return ResponseFormatter.success(
                    "Python code executed successfully",
                    {
                        "stdout": stdout_output if stdout_output else "No output",
                        "stderr": stderr_output if stderr_output else None,
                        "code": code[:200] + "..." if len(code) > 200 else code
                    }
                )

            finally:
                sys.stdout = old_stdout
                sys.stderr = old_stderr

        except Exception as e:
            return ResponseFormatter.error(f"Python execution error: {str(e)}")

    async def _run_shell(self, command: str, timeout: int) -> Dict[str, Any]:
        """Execute safe shell commands"""
        try:
            if not command.strip():
                return ResponseFormatter.error("No command provided")

            # Whitelist of safe commands for Colab
            safe_commands = [
                'ls', 'pwd', 'whoami', 'date', 'df', 'free', 'ps',
                'cat', 'head', 'tail', 'wc', 'grep', 'find',
                'pip list', 'pip show', 'python --version',
                'nvidia-smi', 'lscpu', 'lsb_release'
            ]

            command_start = command.split()[0] if command.split() else ""

            # Check if command is in whitelist or is a pip command
            is_safe = any(command.startswith(safe_cmd) for safe_cmd in safe_commands)
            is_safe = is_safe or command.startswith('pip ') or command.startswith('!pip ')

            if not is_safe:
                return ResponseFormatter.error(f"Command '{command_start}' not allowed for security reasons")

            # Execute command
            result = subprocess.run(
                command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=timeout
            )

            return ResponseFormatter.success(
                "Command executed successfully",
                {
                    "command": command,
                    "stdout": result.stdout,
                    "stderr": result.stderr if result.stderr else None,
                    "return_code": result.returncode
                }
            )

        except subprocess.TimeoutExpired:
            return ResponseFormatter.error(f"Command timed out after {timeout} seconds")
        except Exception as e:
            return ResponseFormatter.error(f"Command execution failed: {str(e)}")

    async def _install_package(self, package: str) -> Dict[str, Any]:
        """Install Python package using pip"""
        try:
            if not package:
                return ResponseFormatter.error("Package name required")

            # Security check - only allow alphanumeric, hyphens, underscores, and dots
            import re
            if not re.match(r'^[a-zA-Z0-9\-_.]+$', package):
                return ResponseFormatter.error("Invalid package name")

            command = f"pip install {package}"
            result = subprocess.run(
                command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=120  # Longer timeout for installations
            )

            if result.returncode == 0:
                return ResponseFormatter.success(
                    f"Package '{package}' installed successfully",
                    {
                        "package": package,
                        "output": result.stdout,
                        "command": command
                    }
                )
            else:
                return ResponseFormatter.error(
                    f"Package installation failed: {result.stderr}"
                )

        except Exception as e:
            return ResponseFormatter.error(f"Installation failed: {str(e)}")

    async def _get_system_info(self) -> Dict[str, Any]:
        """Get Colab system information"""
        try:
            import platform
            import psutil

            # Get system information
            info = {
                "platform": platform.platform(),
                "python_version": platform.python_version(),
                "cpu_count": psutil.cpu_count(),
                "memory_total": format_file_size(psutil.virtual_memory().total),
                "memory_available": format_file_size(psutil.virtual_memory().available),
                "disk_usage": {
                    "total": format_file_size(psutil.disk_usage('/').total),
                    "used": format_file_size(psutil.disk_usage('/').used),
                    "free": format_file_size(psutil.disk_usage('/').free)
                },
                "environment": "Google Colab"
            }

            # Try to get GPU information
            try:
                gpu_result = subprocess.run(
                    ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader"],
                    capture_output=True,
                    text=True,
                    timeout=10
                )
                if gpu_result.returncode == 0:
                    info["gpu"] = gpu_result.stdout.strip()
                else:
                    info["gpu"] = "No GPU detected"
            except:
                info["gpu"] = "GPU information unavailable"

            return ResponseFormatter.success(
                "System information retrieved",
                info
            )

        except Exception as e:
            return ResponseFormatter.error(f"Failed to get system info: {str(e)}")

print("⚡ Script Executor Tool ready!")

⚡ Script Executor Tool ready!


#### Colab-Specific Tool

In [None]:
class ColabTool(BaseTool):
    """Special tools for Google Colab environment"""

    def __init__(self):
        super().__init__(
            name="colab_tools",
            description="Google Colab specific operations: file upload/download, form widgets, etc."
        )

    def get_parameters(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "description": "Colab action",
                    "enum": [
                        "upload_files", "download_file", "display_image",
                        "create_form", "show_progress", "clear_output"
                    ]
                },
                "filepath": {"type": "string", "description": "File path"},
                "title": {"type": "string", "description": "Form title"},
                "fields": {"type": "array", "description": "Form fields"},
                "message": {"type": "string", "description": "Progress message"}
            },
            "required": ["action"]
        }

    async def execute(self, **kwargs) -> Dict[str, Any]:
        action = kwargs.get('action')

        try:
            if action == "upload_files":
                return await self._upload_files()
            elif action == "download_file":
                return await self._download_file(kwargs.get('filepath', ''))
            elif action == "display_image":
                return await self._display_image(kwargs.get('filepath', ''))
            elif action == "create_form":
                return await self._create_form(
                    kwargs.get('title', 'Form'),
                    kwargs.get('fields', [])
                )
            elif action == "clear_output":
                return await self._clear_output()
            else:
                return ResponseFormatter.error(f"Unknown Colab action: {action}")

        except Exception as e:
            return ResponseFormatter.error(f"Colab operation failed: {str(e)}")

    async def _upload_files(self) -> Dict[str, Any]:
        """Upload files to Colab"""
        try:
            from google.colab import files
            uploaded = files.upload()

            file_info = []
            for filename, data in uploaded.items():
                file_size = len(data)
                file_info.append({
                    "filename": filename,
                    "size": format_file_size(file_size),
                    "path": f"/content/{filename}"
                })

            return ResponseFormatter.success(
                f"Uploaded {len(uploaded)} file(s)",
                {"files": file_info}
            )

        except Exception as e:
            return ResponseFormatter.error(f"Upload failed: {str(e)}")

    async def _download_file(self, filepath: str) -> Dict[str, Any]:
        """Download file from Colab"""
        try:
            if not filepath:
                return ResponseFormatter.error("File path required")

            if not os.path.exists(filepath):
                return ResponseFormatter.error(f"File not found: {filepath}")

            from google.colab import files
            files.download(filepath)

            return ResponseFormatter.success(
                f"File download initiated: {os.path.basename(filepath)}",
                {"filepath": filepath}
            )

        except Exception as e:
            return ResponseFormatter.error(f"Download failed: {str(e)}")

    async def _display_image(self, filepath: str) -> Dict[str, Any]:
        """Display image in Colab"""
        try:
            if not filepath:
                return ResponseFormatter.error("Image path required")

            if not os.path.exists(filepath):
                return ResponseFormatter.error(f"Image not found: {filepath}")

            from IPython.display import Image, display
            display(Image(filepath))

            return ResponseFormatter.success(
                f"Image displayed: {os.path.basename(filepath)}",
                {"filepath": filepath}
            )

        except Exception as e:
            return ResponseFormatter.error(f"Image display failed: {str(e)}")

    async def _clear_output(self) -> Dict[str, Any]:
        """Clear Colab cell output"""
        try:
            from IPython.display import clear_output
            clear_output(wait=True)

            return ResponseFormatter.success("Output cleared", {})

        except Exception as e:
            return ResponseFormatter.error(f"Clear output failed: {str(e)}")

print("📱 Colab Tools ready!")

📱 Colab Tools ready!


#### Agent Implementation

In [None]:
# AI Agent with OpenAI Integration
import openai

class ChatbotAgent:
    """Main chatbot agent with tool integration"""

    def __init__(self):
        self.name = "Colab AI Assistant"
        self.client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)

        # Initialize tools
        self.tools = []

        if settings.ENABLE_FILE_SYSTEM_TOOL:
            self.tools.append(FileSystemTool())

        if settings.ENABLE_WEB_SEARCH_TOOL:
            self.tools.append(WebSearchTool())

        if settings.ENABLE_SCRIPT_EXECUTOR:
            self.tools.append(ScriptExecutor())

        if settings.ENABLE_COLAB_TOOLS:
            self.tools.append(ColabTool())

        # System instructions
        self.instructions = f"""
You are an intelligent AI assistant running in Google Colab with access to powerful tools.

🔧 AVAILABLE CAPABILITIES:
• File Management: Create, read, write, delete files in Colab and Google Drive
• Web Search: Search the internet and fetch content from URLs
• Code Execution: Run Python scripts safely with restricted imports
• System Commands: Execute safe shell commands
• Package Management: Install Python packages with pip
• Colab Integration: Upload/download files, display images, create forms

📝 COLAB ENVIRONMENT NOTES:
• Files are temporary unless saved to Google Drive
• You have access to GPU resources if enabled
• Python packages can be installed dynamically
• System has Ubuntu Linux environment

🛡️ SAFETY GUIDELINES:
• File operations are restricted to /content/ directory
• Only safe shell commands are allowed
• Python execution has security restrictions
• Always explain actions before performing them
• Ask for confirmation for potentially destructive operations

🎯 INTERACTION STYLE:
• Be helpful and explain what you're doing
• Provide clear feedback on operation results
• Suggest alternatives if something isn't possible
• Use tools effectively to accomplish user tasks

Current tools available: {', '.join(tool.name for tool in self.tools)}
"""

        # Conversation history
        self.conversation_history = []

        print(f"🤖 {self.name} initialized with {len(self.tools)} tools")

    async def process_message(self, message: str) -> Dict[str, Any]:
        """Process user message and return response"""
        try:
            print(f"💭 Processing: {message[:50]}...")

            # Prepare messages
            messages = [
                {"role": "system", "content": self.instructions}
            ]

            # Add conversation history (last 10 messages)
            messages.extend(self.conversation_history[-10:])

            # Add current message
            messages.append({"role": "user", "content": message})

            # Get OpenAI response
            response = self.client.chat.completions.create(
                model=settings.OPENAI_MODEL,
                messages=messages,
                tools=[tool.to_openai_tool() for tool in self.tools],
                tool_choice="auto",
                temperature=0.7,
                max_tokens=2000
            )

            message_obj = response.choices[0].message

            # Handle tool calls
            if message_obj.tool_calls:
                return await self._handle_tool_calls(message_obj, messages)
            else:
                # Regular response
                content = message_obj.content
                self._add_to_history("user", message)
                self._add_to_history("assistant", content)

                return ResponseFormatter.success(content)

        except Exception as e:
            print(f"❌ Error processing message: {e}")
            return ResponseFormatter.error(f"I encountered an error: {str(e)}")

    async def _handle_tool_calls(self, message_obj, messages) -> Dict[str, Any]:
        """Handle OpenAI tool calls"""
        try:
            tool_results = []

            # Execute each tool call
            for tool_call in message_obj.tool_calls:
                function_name = tool_call.function.name
                function_args = json.loads(tool_call.function.arguments)

                print(f"🔧 Executing tool: {function_name}")

                # Find and execute tool
                tool = next((t for t in self.tools if t.name == function_name), None)
                if tool:
                    result = await tool.safe_execute(**function_args)
                    tool_results.append(result)

                    # Add tool result to messages
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": json.dumps(result)
                    })
                else:
                    error_result = ResponseFormatter.error(f"Tool '{function_name}' not found")
                    tool_results.append(error_result)

                    messages.append({
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": json.dumps(error_result)
                    })

            # Get final response from OpenAI
            final_response = self.client.chat.completions.create(
                model=settings.OPENAI_MODEL,
                messages=messages,
                temperature=0.7,
                max_tokens=2000
            )

            final_content = final_response.choices[0].message.content

            # Add to conversation history
            original_message = next(msg["content"] for msg in messages if msg["role"] == "user")
            self._add_to_history("user", original_message)
            self._add_to_history("assistant", final_content)

            return ResponseFormatter.success(
                final_content,
                {"tool_results": tool_results}
            )

        except Exception as e:
            print(f"❌ Error in tool execution: {e}")
            return ResponseFormatter.error(f"Tool execution failed: {str(e)}")

    def _add_to_history(self, role: str, content: str):
        """Add to conversation history"""
        self.conversation_history.append({
            "role": role,
            "content": content
        })

        # Keep history manageable
        if len(self.conversation_history) > 20:
            self.conversation_history = self.conversation_history[-20:]

    def clear_history(self):
        """Clear conversation history"""
        self.conversation_history.clear()
        print("🧹 Conversation history cleared")

    def get_status(self) -> Dict[str, Any]:
        """Get agent status"""
        return {
            "name": self.name,
            "tools_count": len(self.tools),
            "tools": [
                {
                    "name": tool.name,
                    "description": tool.description,
                    "usage_count": tool.usage_count
                }
                for tool in self.tools
            ],
            "conversation_length": len(self.conversation_history)
        }

# Initialize the agent
agent = ChatbotAgent()
print("✅ Agent ready for interaction!")

🔧 Initialized tool: file_system
🔧 Initialized tool: web_search
🔧 Initialized tool: script_executor
🔧 Initialized tool: colab_tools
🤖 Colab AI Assistant initialized with 4 tools
✅ Agent ready for interaction!


#### Interactive Chat Interface

In [None]:
# Interactive Chat Interface

import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import asyncio

class ColabChatInterface:
    """Interactive chat interface for Colab"""

    def __init__(self, agent):
        self.agent = agent
        self.chat_history = []

        # Create widgets
        self.chat_output = widgets.Output(layout={'height': '400px', 'overflow': 'auto'})
        self.message_input = widgets.Textarea(
            placeholder='Type your message here...',
            layout={'width': '70%', 'height': '60px'}
        )
        self.send_button = widgets.Button(
            description='Send',
            button_style='primary',
            layout={'width': '15%', 'height': '60px'}
        )
        self.clear_button = widgets.Button(
            description='Clear',
            button_style='warning',
            layout={'width': '15%', 'height': '60px'}
        )

        # Bind events
        self.send_button.on_click(self._on_send_click)
        self.clear_button.on_click(self._on_clear_click)
        self.message_input.observe(self._on_key_press, names='value')

        # Create layout
        input_box = widgets.HBox([
            self.message_input,
            self.send_button,
            self.clear_button
        ])

        self.interface = widgets.VBox([
            widgets.HTML('<h3>🤖 Colab AI Assistant Chat</h3>'),
            self.chat_output,
            input_box
        ])

        # Add welcome message
        self._add_message("Assistant", "👋 Hello! I'm your Colab AI Assistant. I can help you with:\n\n• File management and Google Drive operations\n• Web search and content fetching\n• Python code execution\n• Package installation\n• System information\n• Colab-specific operations\n\nWhat would you like me to help you with?", "assistant")

    def display(self):
        """Display the chat interface"""
        display(self.interface)

    def _on_key_press(self, change):
        """Handle Enter key press"""
        # Note: This is a simplified version - full Ctrl+Enter detection would need more complex handling
        pass

    def _on_send_click(self, button):
        """Handle send button click"""
        message = self.message_input.value.strip()
        if message:
            self.message_input.value = ""
            asyncio.create_task(self._process_message(message))

    def _on_clear_click(self, button):
        """Handle clear button click"""
        self.agent.clear_history()
        self.chat_history = []
        with self.chat_output:
            clear_output()
        self._add_message("System", "🧹 Chat history cleared!", "system")

    async def _process_message(self, message):
        """Process user message"""
        # Add user message
        self._add_message("You", message, "user")

        # Show processing indicator
        self._add_message("Assistant", "🤔 Thinking...", "processing")

        try:
            # Process with agent
            response = await self.agent.process_message(message)

            # Remove processing indicator
            self._remove_last_message()

            # Add response
            if response['status'] == 'success':
                response_text = response['message']

                # Add tool results if available
                if 'data' in response and 'tool_results' in response['data']:
                    tool_results = response['data']['tool_results']
                    success_count = sum(1 for result in tool_results if result['status'] == 'success')
                    if len(tool_results) > 0:
                        response_text += f"\n\n📊 Executed {success_count}/{len(tool_results)} tools successfully"

                self._add_message("Assistant", response_text, "assistant")
            else:
                self._add_message("Assistant", f"❌ Error: {response['message']}", "error")

        except Exception as e:
            self._remove_last_message()
            self._add_message("Assistant", f"❌ Unexpected error: {str(e)}", "error")

    def _add_message(self, sender, content, msg_type):
        """Add message to chat"""
        timestamp = datetime.now().strftime("%H:%M:%S")

        # Style based on message type
        if msg_type == "user":
            style = "background: #e3f2fd; border-left: 4px solid #2196f3; margin: 5px 0; padding: 10px;"
            icon = "👤"
        elif msg_type == "assistant":
            style = "background: #f3e5f5; border-left: 4px solid #9c27b0; margin: 5px 0; padding: 10px;"
            icon = "🤖"
        elif msg_type == "error":
            style = "background: #ffebee; border-left: 4px solid #f44336; margin: 5px 0; padding: 10px;"
            icon = "❌"
        elif msg_type == "system":
            style = "background: #e8f5e8; border-left: 4px solid #4caf50; margin: 5px 0; padding: 10px;"
            icon = "💬"
        else:  # processing
            style = "background: #fff3e0; border-left: 4px solid #ff9800; margin: 5px 0; padding: 10px;"
            icon = "⏳"

        # Format content for HTML display
        content_html = content.replace('\n', '<br>')

        message_html = f"""
        <div style="{style}">
            <strong>{icon} {sender}</strong> <small style="color: gray;">{timestamp}</small><br>
            <div style="margin-top: 5px;">{content_html}</div>
        </div>
        """

        with self.chat_output:
            display(HTML(message_html))

        # Scroll to bottom
        self.chat_output.layout.height = f"{min(600, len(self.chat_history) * 80 + 100)}px"

        self.chat_history.append({
            'sender': sender,
            'content': content,
            'type': msg_type,
            'timestamp': timestamp
        })

    def _remove_last_message(self):
        """Remove the last message (used to remove processing indicator)"""
        if self.chat_history:
            self.chat_history.pop()

        # Redraw all messages except the last one
        with self.chat_output:
            clear_output()
            for msg in self.chat_history:
                self._redraw_message(msg)

    def _redraw_message(self, msg):
        """Redraw a single message"""
        #

#### Chatting with the AI Assistant

In [None]:
# Simple REPL to chat with your Colab AI Assistant
import asyncio

async def chat_loop():
    print("🤖 Colab AI Assistant is ready to chat!")
    print("📝 Type your message and press Enter. Type 'exit' or 'quit' to end.\n")
    while True:
        user_input = input("👤 You: ").strip()
        if not user_input:
            continue
        if user_input.lower() in ("exit", "quit", "bye"):
            print("👋 Goodbye!")
            break

        # Show a thinking indicator
        print("🤔 Thinking...\n")
        # Process message with the agent
        response = await agent.process_message(user_input)

        # Display the assistant’s reply
        if response["status"] == "success":
            print(f"🤖 Assistant: {response['message']}\n")
        else:
            print(f"❌ Error: {response['message']}\n")

# Run the chat loop
await chat_loop()

🤖 Colab AI Assistant is ready to chat!
📝 Type your message and press Enter. Type 'exit' or 'quit' to end.

🤔 Thinking...

💭 Processing: go to the pictures folder and create a subfolder n...
❌ Error processing message: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}}
❌ Error: I encountered an error: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}}

🤔 Thinking...

💭 Processing: go to the pictures folder and create a subfolder n...
❌ Error processing message: Error code: 429