In [1]:
from dotenv import load_dotenv
from openai import AsyncOpenAI
from agents import OpenAIChatCompletionsModel
from typing import Dict
import os
from pydantic import BaseModel
import mimetypes
from pathlib import Path




In [2]:
load_dotenv(override=True)

True

In [3]:
openai_api_key = os.getenv('OPENAI_API_KEY')
groq_api_key = os.getenv('GROQ_API_KEY')

In [4]:
GROQ_BASE_URL = "https://api.groq.com/openai/v1"
groq_client = AsyncOpenAI(base_url=GROQ_BASE_URL, api_key=groq_api_key)
llama = OpenAIChatCompletionsModel(model="llama-3.3-70b-versatile", openai_client=groq_client)

In [5]:
from agents import Agent, function_tool, Runner
from pathlib import Path
import os
import json
import shutil
from datetime import datetime
import pytz

STORAGE_ROOT = Path(os.getenv("STORAGE_ROOT", "../../storage")).resolve()
METADATA_FILE = STORAGE_ROOT / os.getenv("METADATA_FILE", "file_metadata.json")

STORAGE_ROOT.mkdir(parents=True, exist_ok=True)
if not METADATA_FILE.exists():
    with open(METADATA_FILE, "w") as f:
        json.dump([], f)


def load_metadata():
    if METADATA_FILE.exists():
        with open(METADATA_FILE, "r") as f:
            try:
                data = json.load(f)
                return data if isinstance(data, list) else []
            except json.JSONDecodeError:
                return []
    return []


def save_metadata(data):
    with open(METADATA_FILE, "w") as f:
        json.dump(data, f, indent=4)


def get_current_time():
    tz = pytz.timezone("Asia/Kolkata")
    return datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")


def update_last_accessed(files):
    """Update last accessed time for the given metadata entries."""
    metadata = load_metadata()
    updated = False
    for entry in metadata:
        for f in files:
            if entry["path"] == f["path"]:
                entry["last_accessed"] = get_current_time()
                updated = True
    if updated:
        save_metadata(metadata)


# -----------------------------
# Tools (Wrapped for the Agent)
# -----------------------------

@function_tool
def update_file_tool(
    file_name: str,
    new_category: str = None,
    new_tag: str = None,
):
    """
    Updates file metadata and optionally moves file between categories.

    Arguments:
    - file_name: existing stored file name
    - new_category: if provided, moves file to this category folder
    - new_tag: if provided, updates the file's tag

    Notes:
    - If only new_tag is given → updates tag only.
    - If only new_category is given → moves file but keeps tag.
    - If both given → moves file and updates tag.
    """

    metadata = load_metadata()
    entry = next((m for m in metadata if m["name"] == file_name), None)

    if not entry:
        return {"error": f"File '{file_name}' not found in metadata."}

    src = Path(entry["path"])
    old_category_dir = src.parent  

    if new_category:
        new_category_dir = STORAGE_ROOT / new_category
        new_category_dir.mkdir(parents=True, exist_ok=True)

        dst = new_category_dir / src.name
        if dst.exists():
            return {"error": f"File already exists in destination: {dst}"}

        shutil.move(str(src), dst)
        entry["path"] = str(dst)
        entry["category"] = new_category

        try:
            if old_category_dir.exists() and not any(old_category_dir.iterdir()):
                old_category_dir.rmdir()
        except Exception as e:
            print(f"Warning: Could not delete empty folder {old_category_dir}: {e}")

    # --- Handle tag update ---
    if new_tag is not None:
        entry["tag"] = new_tag


    entry["last_accessed"] = get_current_time()


    save_metadata(metadata)

    return {
        "success": True,
        "file": entry["name"],
        "new_category": entry["category"],
        "new_tag": entry["tag"],
        "path": entry["path"]
    }


@function_tool
def add_file_tool(
    file_path: str,
    category: str,
    tag: str = None,
    name: str = None,
    overwrite: bool = False,
    description: str = None
):
    """
    Uploads a file into STORAGE_ROOT/category/tag/ (if tag provided)
    and records metadata.

    Arguments:
    - file_path: local path of the uploaded file
    - category: folder name inside STORAGE_ROOT
    - tag: optional label -> will also create subfolder inside category
    - name: optional new name for the file
    - overwrite: whether to overwrite if a file with same name exists
    - description: short summary of the file contents
    """
    src = Path(file_path).resolve()
    if not src.exists():
        return {"error": f"File not found: {src}"}

    # If tag provided, put inside category/tag
    if tag:
        target_dir = STORAGE_ROOT / category / tag
    else:
        target_dir = STORAGE_ROOT / category

    target_dir.mkdir(parents=True, exist_ok=True)

    if name:
        name = Path(name).stem
        final_name = f"{name}{src.suffix}"
    else:
        final_name = src.name

    dst = target_dir / final_name

    if dst.exists() and not overwrite:
        base, ext = dst.stem, dst.suffix
        i = 1
        while True:
            new_name = f"{base}_{i}{ext}"
            new_dst = target_dir / new_name
            if not new_dst.exists():
                dst = new_dst
                final_name = new_name
                break
            i += 1

    shutil.copy2(src, dst)

    metadata = load_metadata()
    entry = {
        "name": final_name,
        "path": str(dst),
        "category": category,
        "tag": tag,
        "description": description if description else "No description provided",
        "last_accessed": get_current_time(),
    }
    metadata.append(entry)
    save_metadata(metadata)

    return {
        "success": True,
        "stored_file": final_name,
        "category": category,
        "tag": tag,
        "description": entry["description"]
    }




@function_tool
def get_files_tool(name: str = None, category: str = None, tag: str = None):
    """
    Fetches a file asked by the user and sends it back to him
    
    """
    metadata = load_metadata()
    results = []
    for entry in metadata:
        if name and entry["name"] != name:
            continue
        if category and entry["category"] != category:
            continue
        if tag and entry["tag"] != tag:
            continue
        results.append(entry)

    if results:
        update_last_accessed(results)

    if not results:
        return {"files": []}

    return {
    "files": [
        {
            "name": f["name"],
            "category": f["category"],
            "tag": f.get("tag"),
            "path": f["path"]   
        }
        for f in results
    ]
}


@function_tool
def delete_files_tool(name: str = None, category: str = None, tag: str = None):
    """
    Deletes files and updates metadata based on filters:
    - category only
    - tag only
    - name only
    - name + category + tag
    - tag + category

    If a folder becomes empty after deletion, it is also removed.
    """
    metadata = load_metadata()
    updated_metadata = []
    deleted = []

    for entry in metadata:
        match = True
        if name and entry["name"] != name:
            match = False
        if category and entry["category"] != category:
            match = False
        if tag and entry["tag"] != tag:
            match = False

        if match:
            file_path = Path(entry["path"])
            if file_path.exists():
                try:
                    file_path.unlink()
                except Exception as e:
                    return {"error": f"Could not delete {file_path}: {e}"}
            deleted.append(entry)
        else:
            updated_metadata.append(entry)

    save_metadata(updated_metadata)

    affected_folders = set()
    for d in deleted:
        folder = Path(d["path"]).parent
        affected_folders.add(folder)

    for folder in affected_folders:
        if folder.exists() and not any(folder.iterdir()):  
            try:
                folder.rmdir()
            except Exception:
                pass  

    return {
        "deleted": [d["name"] for d in deleted],
        "remaining_files": [f["name"] for f in updated_metadata]
    }

@function_tool
def read_contents(file_path: str, max_pages: int = 3, max_chars: int = 1000):
    """
    Reads the contents of a given file and returns a text preview.

    Arguments:
    - file_path: absolute path to the file
    - max_pages: number of pages/slides/images to read (default: 3)
    - max_chars: limit the number of characters returned (default: 1000)
    """

    path = Path(file_path)
    if not path.exists():
        return {"error": f"File not found: {file_path}"}

    mime_type, _ = mimetypes.guess_type(path)
    text_content = ""

    try:
        # --- PDF ---
        if path.suffix.lower() == ".pdf":
            reader = PdfReader(str(path))
            for i, page in enumerate(reader.pages[:max_pages]):
                text_content += f"\n--- Page {i+1} ---\n"
                text_content += page.extract_text() or ""

        # --- PowerPoint ---
        elif path.suffix.lower() in [".pptx", ".ppt"]:
            prs = Presentation(str(path))
            for i, slide in enumerate(prs.slides[:max_pages]):
                text_content += f"\n--- Slide {i+1} ---\n"
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        text_content += shape.text + "\n"

        # --- Text/JSON/CSV/Markdown ---
        elif path.suffix.lower() in [".txt", ".json", ".md", ".csv"]:
            text_content = path.read_text(errors="ignore")[:max_chars]

        # --- Images (OCR) ---
        elif path.suffix.lower() in [".jpg", ".jpeg", ".png"]:
            image = Image.open(path)
            text_content = pytesseract.image_to_string(image)

        else:
            return {
                "info": "Unsupported file type. Returning metadata only.",
                "file_name": path.name,
                "size_bytes": path.stat().st_size,
                "path": str(path)
            }

        # Truncate by chars for safety
        if len(text_content) > max_chars:
            text_content = text_content[:max_chars] + "... [truncated]"

        return {
            "file_name": path.name,
            "path": str(path),
            "content_preview": text_content.strip() if text_content else "[No readable content]"
        }

    except Exception as e:
        return {"error": f"Could not read file: {e}"}

# -----------------------------
# The Agent
# -----------------------------

file_agent = Agent(
    name="FileManagerAgent",
    instructions=(
        "You are a helpful file manager agent. "
        "You strictly follow the documented tools provided. "
        "Do not invent features outside these tools. "
        "Always use metadata fields (name, category, tag, last_accessed, path,description) exactly as defined. "
        "Ground your reasoning in the function docstrings only."
    ),
    tools=[add_file_tool, get_files_tool, delete_files_tool,update_file_tool],
    model="gpt-4o-mini"
)



In [None]:
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import JSONResponse, FileResponse, HTMLResponse
from pathlib import Path
import shutil
import asyncio
import json
import nest_asyncio
import uvicorn
from agents.items import ToolCallOutputItem
from mimetypes import guess_type

nest_asyncio.apply()
app = FastAPI(title="File Manager Agent")

# -----------------------------
# STORAGE SETUP
# -----------------------------
STORAGE_ROOT = Path("../../storage").resolve()
STORAGE_ROOT.mkdir(parents=True, exist_ok=True)
UPLOADS_DIR = STORAGE_ROOT / "uploads"
UPLOADS_DIR.mkdir(exist_ok=True)

METADATA_FILE = STORAGE_ROOT / "file_metadata.json"
if not METADATA_FILE.exists():
    METADATA_FILE.write_text("[]")  # initialize empty metadata

# -----------------------------
# Helper functions
# -----------------------------
def load_metadata():
    try:
        with open(METADATA_FILE, "r") as f:
            data = json.load(f)
            return data if isinstance(data, list) else []
    except json.JSONDecodeError:
        return []

def get_files_tool(name: str = None, category: str = None, tag: str = None):
    """
    Fetches a file asked by the user and sends it back to him
    """
    metadata = load_metadata()
    results = []
    for entry in metadata:
        if name and entry["name"] != name:
            continue
        if category and entry["category"] != category:
            continue
        if tag and entry["tag"] != tag:
            continue
        results.append(entry)

    if not results:
        return {"files": []}

    return {
        "files": [
            {
                "name": f["name"],
                "category": f["category"],
                "tag": f.get("tag"),
                "path": f["path"]
            }
            for f in results
        ]
    }

# -----------------------------
# Agent runner (async-safe)
# -----------------------------
def run_agent_sync(user_input: str):
    """Run the agent in a sync context safely"""
    from agents import Runner

    try:
        loop = asyncio.get_running_loop()
        result = loop.run_until_complete(Runner.run(file_agent, user_input))
    except RuntimeError:
        result = asyncio.run(Runner.run(file_agent, user_input))

    files_to_show = []
    for item in result.new_items:
        if isinstance(item, ToolCallOutputItem) and isinstance(item.output, dict):
            if "files" in item.output:
                files_to_show.extend([Path(f["path"]) for f in item.output["files"] if "path" in f])

    return result.final_output, files_to_show

# -----------------------------
# Chat endpoint
# -----------------------------
@app.post("/chat")
async def chat_endpoint(
    message: str = Form(...),
    uploaded_files: list[UploadFile] = File(None)
):
    file_paths = []

    # Save uploaded files
    if uploaded_files:
        for uploaded_file in uploaded_files:
            file_path = UPLOADS_DIR / Path(uploaded_file.filename).name
            with open(file_path, "wb") as f:
                shutil.copyfileobj(uploaded_file.file, f)
            file_paths.append(file_path)

    # Load metadata and include in prompt
    metadata = load_metadata()
    metadata_json = json.dumps(metadata, indent=2)

    user_input = (
        f"Here is the current file metadata (name, category, tag, description, path):\n"
        f"{metadata_json}\n\n"
        f"User message: {message}"
    )
    if file_paths:
        user_input += "\nFiles uploaded:\n" + "\n".join(str(p) for p in file_paths)

    # Run agent
    reply, files_to_show = run_agent_sync(user_input)

    # Prepare clickable URLs
    files_info = []
    for f in files_to_show:
        meta = next((m for m in metadata if Path(m["path"]) == f), None)
        if meta:
            category = meta.get("category")
            tag = meta.get("tag")
            files_info.append({
                "name": meta["name"],
                "category": category,
                "tag": tag,
                "url": f"/download/{meta['name']}?category={category}&tag={tag}"
            })

    return JSONResponse({
        "chat_reply": reply,
        "files": files_info
    })

# -----------------------------
# Download endpoint
# -----------------------------
@app.get("/download/{file_name}")
async def download_file(file_name: str, category: str = None, tag: str = None):
    files = get_files_tool(name=file_name, category=category, tag=tag).get("files", [])
    if not files:
        return JSONResponse({"error": "File not found"}, status_code=404)
    
    file_path = Path(files[0]["path"])
    if not file_path.exists():
        return JSONResponse({"error": "File missing on disk"}, status_code=404)

    # Guess the correct MIME type
    mime_type, _ = guess_type(file_path)
    if not mime_type:
        mime_type = "application/octet-stream"

    # Inline = try to render in browser instead of download
    return FileResponse(
        path=file_path,
        media_type=mime_type,
        headers={"Content-Disposition": f"inline; filename={file_path.name}"}
    )

# -----------------------------
# Front-end
# -----------------------------
@app.get("/", response_class=HTMLResponse)
async def home():
    return """
    <!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>File Manager Agent</title>
<style>
  body { font-family: Arial, sans-serif; padding: 20px; }
  .chat { margin-bottom: 10px; }
  .file-links { margin-top: 10px; }
  .uploaded-list { margin-top: 10px; color: green; }
</style>
</head>
<body>

<h2>🤖 File Manager Agent</h2>

<form id="chatForm">
  <div class="chat">
    <label>Message:</label><br>
    <input type="text" id="message" size="50" required>
  </div>
  <div class="chat">
    <label>Upload Files (optional):</label><br>
    <input type="file" id="file" multiple>
  </div>
  <button type="submit">Send</button>
</form>

<h3>Uploaded Files:</h3>
<div id="uploadedFiles" class="uploaded-list"></div>

<h3>Reply:</h3>
<div id="reply"></div>

<h3>Files:</h3>
<div id="files" class="file-links"></div>

<script>
const form = document.getElementById('chatForm');
const fileInput = document.getElementById('file');
const uploadedFilesDiv = document.getElementById('uploadedFiles');

fileInput.addEventListener('change', () => {
  uploadedFilesDiv.innerHTML = '';
  if (fileInput.files.length > 0) {
    uploadedFilesDiv.innerHTML = `<strong>${fileInput.files.length} file(s) selected:</strong><br>`;
    for (let i = 0; i < fileInput.files.length; i++) {
      uploadedFilesDiv.innerHTML += `• ${fileInput.files[i].name}<br>`;
    }
  } else {
    uploadedFilesDiv.innerHTML = 'No files selected';
  }
});

form.addEventListener('submit', async (e) => {
  e.preventDefault();

  const formData = new FormData();
  formData.append('message', document.getElementById('message').value);

  if (fileInput.files.length > 0) {
    for (let i = 0; i < fileInput.files.length; i++) {
      formData.append('uploaded_files', fileInput.files[i]);
    }
  }

  const response = await fetch('/chat', {
    method: 'POST',
    body: formData
  });

  const data = await response.json();
  document.getElementById('reply').innerText = data.chat_reply;

  const filesDiv = document.getElementById('files');
  filesDiv.innerHTML = '';
  if (data.files && data.files.length > 0) {
    data.files.forEach(f => {
      const link = document.createElement('a');
      link.href = f.url;
      link.innerText = f.name;
      link.target = '_blank';
      filesDiv.appendChild(link);
      filesDiv.appendChild(document.createElement('br'));
    });
  }
});
</script>

</body>
</html>

    """

# -----------------------------
# Run server
# -----------------------------
uvicorn.run(app, host="0.0.0.0", port=8000)


INFO:     Started server process [3012]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
