In [None]:
from google.genai import types
from google.adk.agents.llm_agent import Agent
from google.adk.models.google_llm import Gemini
from google.adk.runners import InMemoryRunner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search, AgentTool, ToolContext
from google.adk.code_executors import BuiltInCodeExecutor

print("âœ… ADK components imported successfully.")

import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
from datetime import datetime, timedelta
from typing import Optional

cred = credentials.Certificate("lanaagent-firebase-adminsdk-fbsvc-3ed45f99b9.json")
firebase_admin.initialize_app(cred)

db=firestore.client()

from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv("GOOGLE_API_KEY")

In [18]:
#helpers and tools

retry_config = types.HttpRetryOptions(
    attempts=3,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)

def log_persistence(log_entry_data: dict, is_authenticated: bool) -> bool:
    """USAGE: Call only when the user explicitly requests persisting a log entry.
    Do NOT call during casual conversation.

    Saves a structured log entry to the Google Cloud Persistence Store (Firestore).
    Includes a Contact Gate check and an auto-generated timestamp.

    It also controls the sharing of contact information based on authentication status.

    Args:
        log_entry_data (dict): The log entry dictionary with this structure:
            {
                "timestamp": "ISO 8601",
                "category": "Structural/Time, Persona, Avoidance, or Project",
                "key": "The specific field name (e.g., 'Preferred Communication Tone')",
                "value": "The current data value",
                "source": "e.g., 'User Input', 'Bujo OCR', 'Agent Check'"
            }
        is_authenticated (bool): Boolean flag indicating if the user is logged into 
                          their Google Account (True) or anonymous (False).

    Returns:
        bool: True if the log was successfully written; False otherwise.
    """

    
    # 1. Structural Check: The Contact Gate (for ethical protection)
    if log_entry_data.get('key') == 'contact_info' and not is_authenticated:
        print("ACCESS DENIED: Cannot share contact info for anonymous user.")
        # We do not log the entry to prevent accidental data leakage
        return False
    
    try:
        # 2. Add Required Metadata (Timestamp)
        # Create a new dictionary to ensure we always log a timestamp
        data_to_log = log_entry_data.copy()
        data_to_log['timestamp'] = datetime.utcnow()
        
        # 3. Cloud Write: Writing the data to the 'logs' collection in Firestore
        db.collection('logs').add(data_to_log)
        
        print(f"SUCCESS: Logged data to Firestore: {log_entry_data.get('key')}")
        return True
        
    except Exception as e:
        print(f"FAILURE: Could not write to Firestore. Error: {e}")
        return False

In [19]:
#tools part 2
from datetime import datetime, timedelta
from typing import Optional
def query_persistence(task_id: str) -> dict:
    """USAGE: Only call this tool when explicitly asked to inspect persistence for a task.
    Do NOT call during casual conversation.

    Queries the Persistence Store (Firestore) for a specific task's history 
    and checks if the Contradiction Flag should be raised.
    
    Args:
        task_id: The unique identifier (Task Key) of the task to search for.

    Returns:
        A dictionary containing the check result and failure count.
    """
    
    logs_ref = db.collection('logs').where('key', '==', task_id)
    
    flag_raised = True if logs_ref is None else False
    
    # Simulate a successful query that found 4 failures in a row
    consecutive_failure_count = 4 

    # THE CONTRADICTION FLAG LOGIC 
    flag_raised = consecutive_failure_count >= 3

    return {
        "task_id": task_id,
        "consecutive_failures": consecutive_failure_count,
        "flag_raised": flag_raised
    }

def _parse_date_token(date_str: str) -> datetime:
    """
    Parse a date string that may be either an ISO date or a relative token.

    Supported formats:
    - ISO date: 'YYYY-MM-DD' (e.g., '2025-11-24')
    - Relative tokens: 'NOW', 'TODAY', 'NOW-7DAYS', 'NOW-30DAYS' (case-insensitive)

    Returns a timezone-naive UTC datetime at the start of the day.
    Raises ValueError for unsupported formats.
    """
    if not isinstance(date_str, str):
        raise ValueError("date_str must be a string")

    token = date_str.strip().upper()
    if token in ("NOW", "TODAY"):
        return datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)

    # Handle patterns like NOW-7DAYS or TODAY-1DAYS
    if (token.startswith("NOW-") or token.startswith("TODAY-")) and "DAYS" in token:
        try:
            # last part expected to be like '7DAYS' -> extract leading integer
            n_part = token.split("-")[-1]
            n = int(''.join(ch for ch in n_part if ch.isdigit()))
            return (datetime.utcnow() - timedelta(days=n)).replace(hour=0, minute=0, second=0, microsecond=0)
        except Exception as e:
            raise ValueError(f"Could not parse relative date token: {date_str}") from e

    # Fallback: try ISO format
    try:
        return datetime.strptime(token, "%Y-%m-%d")
    except Exception as e:
        raise ValueError(f"Unsupported date format: {date_str}. Use 'YYYY-MM-DD' or 'NOW-<NDAYS>'") from e


def make_a_report(start_date_str: str, end_date_str: str) -> dict:
    """USAGE: Only call this tool when the user explicitly requests a time-bound report
    from persistence (e.g., "summarize logs from 2025-11-01 to 2025-11-08").
    Do NOT call during casual conversation.

    Queries the Persistence Store (Firestore) for a date span and returns matching logs.

    Accepted date formats:
    - ISO date: 'YYYY-MM-DD' (e.g., '2025-11-01')
    - Relative tokens: 'NOW', 'TODAY', 'NOW-7DAYS' (case-insensitive)

    Args:
        start_date_str: Start date string or relative token.
        end_date_str: End date string or relative token.

    Returns:
        dict: { 'logs': [ ... ] }
    """
    # Parse inputs (accept tokens like NOW-7DAYS)
    start_date = _parse_date_token(start_date_str)
    end_date = _parse_date_token(end_date_str)

    # Ensure end_date includes the full day
    end_date = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)

    # Execute the query and stream the data
    logs_stream = db.collection('logs').where('timestamp', '>', start_date).where('timestamp', '<', end_date).stream()

    # Return the data as a list of dictionaries
    logs_data = [log.to_dict() for log in logs_stream]

    return {
        "logs": logs_data
    }

def write_project(project_data: dict) -> dict:
        """USAGE: Call only when the user explicitly requests creating or updating a project.
        Do NOT call for regular chat or unrelated questions.

        Write or update a project document to the `projects` collection in Firestore.

        Behavior:
        - If `project_data` contains a `doc_id` key, the function updates that document (merge=True).
        - Otherwise it creates a new document (auto-generated ID).

        Expected project_data keys (based on SQL schema):
            - project_name (required), project_type (required), description, deadline (YYYY-MM-DD or datetime),
            - status, priority, estimated_hours, actual_hours, category_id, completed_date, notes

        Returns dict: { 'success': bool, 'doc_id': str (if created/updated), 'data': dict, 'error': str (if any) }
        """
        try:
            if not isinstance(project_data, dict):
                raise ValueError("project_data must be a dict")

            # Required fields
            if not project_data.get("project_name") or not project_data.get("project_type"):
                raise ValueError("project_data must include 'project_name' and 'project_type'")

            data = project_data.copy()

            # Normalize date fields if provided as strings
            for date_field in ("deadline", "created_date", "completed_date"):
                if date_field in data and isinstance(data[date_field], str):
                    try:
                        data[date_field] = datetime.strptime(data[date_field], "%Y-%m-%d")
                    except Exception:
                        # leave as-is if parse fails; Firestore can accept string but we try to be helpful
                        pass

            # Defaults
            data.setdefault("status", "active")
            data.setdefault("actual_hours", 0)
            data.setdefault("created_date", datetime.utcnow())

            # If doc_id provided, update existing doc
            doc_id = data.pop("doc_id", None)
            if doc_id:
                doc_ref = db.collection("projects").document(str(doc_id))
                doc_ref.set(data, merge=True)
                return {"success": True, "doc_id": doc_id, "data": data}
            else:
                doc_ref = db.collection("projects").add(data)[1]
                # Firestore Python client returns (write_result) on add; to get id we need to use the returned DocumentReference
                # However `add` returns (doc_ref, write_result) depending on version; handle both
                try:
                    new_doc_id = doc_ref.id
                except Exception:
                    # fallback: if first element was the doc_ref
                    new_doc_id = None
                return {"success": True, "doc_id": new_doc_id, "data": data}

        except Exception as e:
            return {"success": False, "error": str(e)}


def read_projects(filter_by: Optional[dict] = None, limit: int = 100) -> dict:
    """USAGE: Call only when the user explicitly requests a project list or lookup.
    Do NOT call during casual chat.

    Read projects from the `projects` collection.

    Args:
        filter_by (dict): optional mapping of field -> value to filter with equality.
            e.g. { 'status': 'active', 'project_type': 'tech' }
        limit (int): maximum number of documents to return.

    Returns:
        dict: { 'success': bool, 'projects': [dict,...], 'error': str (if any) }
    """
    try:
        coll = db.collection("projects")
        query = coll
        if filter_by:
            if not isinstance(filter_by, dict):
                raise ValueError("filter_by must be a dict mapping field->value")
            for k, v in filter_by.items():
                query = query.where(k, "==", v)

        # Apply a limit to avoid runaway reads
        docs = query.limit(limit).stream()
        projects = []
        for d in docs:
            doc = d.to_dict()
            doc["_doc_id"] = d.id
            projects.append(doc)

        return {"success": True, "projects": projects}
    except Exception as e:
        return {"success": False, "error": str(e)}


def write_category(category_data: dict) -> dict:
        """USAGE: Call only when the user explicitly requests creating or updating a category.
        Do NOT call during casual chat.

        Write or update a category document to the `categories` collection in Firestore.

        Behavior:
        - If `category_data` contains a `doc_id` key, the function updates that document (merge=True).
        - Otherwise it creates a new document (auto-generated ID).

        Expected category_data keys (based on SQL schema):
            - category_name (required), description, color_code, parent_category_id, is_active

        Returns dict: { 'success': bool, 'doc_id': str (if created/updated), 'data': dict, 'error': str (if any) }
        """
        try:
            if not isinstance(category_data, dict):
                raise ValueError("category_data must be a dict")

            if not category_data.get("category_name"):
                raise ValueError("category_data must include 'category_name'")

            data = category_data.copy()

            # Normalize boolean
            if "is_active" in data:
                try:
                    data["is_active"] = bool(int(data["is_active"])) if isinstance(data["is_active"], (str, int)) else bool(data["is_active"])
                except Exception:
                    data["is_active"] = True
            else:
                data.setdefault("is_active", True)

            # If doc_id provided, update existing doc
            doc_id = data.pop("doc_id", None)
            if doc_id:
                doc_ref = db.collection("categories").document(str(doc_id))
                doc_ref.set(data, merge=True)
                return {"success": True, "doc_id": doc_id, "data": data}
            else:
                add_result = db.collection("categories").add(data)
                new_doc_id = None
                try:
                    new_doc_ref = add_result[0]
                    new_doc_id = new_doc_ref.id
                except Exception:
                    try:
                        new_doc_id = add_result.id
                    except Exception:
                        new_doc_id = None
                return {"success": True, "doc_id": new_doc_id, "data": data}

        except Exception as e:
            return {"success": False, "error": str(e)}


def read_categories(filter_by: Optional[dict] = None, limit: int = 100) -> dict:
    """USAGE: Call only when the user explicitly requests category lookups.
    Do NOT call during casual chat.

    Read categories from the `categories` collection.

    Args:
        filter_by (dict): optional mapping of field -> value to filter with equality.
            e.g. { 'is_active': True }
        limit (int): maximum number of documents to return.

    Returns:
        dict: { 'success': bool, 'categories': [dict,...], 'error': str (if any) }
    """
    try:
        coll = db.collection("categories")
        query = coll
        if filter_by:
            if not isinstance(filter_by, dict):
                raise ValueError("filter_by must be a dict mapping field->value")
            for k, v in filter_by.items():
                query = query.where(k, "==", v)

        docs = query.limit(limit).stream()
        categories = []
        for d in docs:
            doc = d.to_dict()
            doc["_doc_id"] = d.id
            categories.append(doc)

        return {"success": True, "categories": categories}
    except Exception as e:
        return {"success": False, "error": str(e)}


def write_task(task_data: dict) -> dict:
        """USAGE: Call only when the user explicitly requests creating or updating a task/milestone.
        Do NOT call during casual chat.

        Write or update a task (milestone) document to the `project_tasks` collection in Firestore.

        Behavior:
        - If `task_data` contains a `doc_id` key, the function updates that document (merge=True).
        - Otherwise it creates a new document (auto-generated ID).

        Expected task_data keys (based on SQL schema):
            - project_id (required), milestone_name (required), description, due_date (YYYY-MM-DD or datetime),
            - status, completed_date

        Returns dict: { 'success': bool, 'doc_id': str (if created/updated), 'data': dict, 'error': str (if any) }
        """
        try:
            if not isinstance(task_data, dict):
                raise ValueError("task_data must be a dict")

            # Required fields
            if not task_data.get("project_id") or not task_data.get("milestone_name"):
                raise ValueError("task_data must include 'project_id' and 'milestone_name'")

            data = task_data.copy()

            # Normalize date fields if provided as strings
            for date_field in ("due_date", "completed_date"):
                if date_field in data and isinstance(data[date_field], str):
                    try:
                        data[date_field] = datetime.strptime(data[date_field], "%Y-%m-%d")
                    except Exception:
                        # leave as-is if parse fails; Firestore can accept string but we try to be helpful
                        pass

            # Defaults
            data.setdefault("status", "pending")

            # If doc_id provided, update existing doc
            doc_id = data.pop("doc_id", None)
            if doc_id:
                doc_ref = db.collection("project_tasks").document(str(doc_id))
                doc_ref.set(data, merge=True)
                return {"success": True, "doc_id": doc_id, "data": data}
            else:
                # Add new document
                add_result = db.collection("project_tasks").add(data)
                # add_result may be (doc_ref, write_result) or doc_ref depending on client
                new_doc_id = None
                try:
                    # If add returns (doc_ref, write_result)
                    new_doc_ref = add_result[0]
                    new_doc_id = new_doc_ref.id
                except Exception:
                    try:
                        # If add returned a DocumentReference directly
                        new_doc_id = add_result.id
                    except Exception:
                        new_doc_id = None

                return {"success": True, "doc_id": new_doc_id, "data": data}

        except Exception as e:
            return {"success": False, "error": str(e)}


def read_tasks(filter_by: Optional[dict] = None, limit: int = 100) -> dict:
    """USAGE: Call only when the user explicitly requests tasks or milestone lookups.
    Do NOT call during casual chat.

    Read tasks (milestones) from the `project_tasks` collection.

    Args:
        filter_by (dict): optional mapping of field -> value to filter with equality.
            e.g. { 'project_id': 123, 'status': 'pending' }
        limit (int): maximum number of documents to return.

    Returns:
        dict: { 'success': bool, 'tasks': [dict,...], 'error': str (if any) }
    """
    try:
        coll = db.collection("project_tasks")
        query = coll
        if filter_by:
            if not isinstance(filter_by, dict):
                raise ValueError("filter_by must be a dict mapping field->value")
            for k, v in filter_by.items():
                query = query.where(k, "==", v)

        docs = query.limit(limit).stream()
        tasks = []
        for d in docs:
            doc = d.to_dict()
            doc["_doc_id"] = d.id
            tasks.append(doc)

        return {"success": True, "tasks": tasks}
    except Exception as e:
        return {"success": False, "error": str(e)}

In [None]:
# Define the log_analyst_agent (keeps read/report tools isolated from root_agent)
log_analyst_agent = Agent(
    name="log_analyst_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""You are a productivity log analyst and objective structural evaluator.
    Your sole purpose is to analyze the Persistence Store data using your tools and provide objective assessments.
    If asked about commitment status, run the 'query_persistence' tool to check the Contradiction Flag.
    If asked for a summary (daily, weekly, monthly), run the 'make_a_report' tool and provide structural insights based on logged avoidance patterns and corrective actions.
    Your output must be non-judgmental and focused on systemic improvement.
    """,
    tools=[query_persistence, make_a_report],
)
print("âœ… log_analyst_agent created with custom function tools")
print("ðŸ”§ Available tools:", [t.__name__ if hasattr(t, '__name__') else str(t) for t in log_analyst_agent.tools])

In [None]:
root_agent = Agent(
    name='root_agent',
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    description='Lanas personal assistant',
    instruction="""You are a productivity log analyst and objective structural evaluator. 
    Your sole purpose is to analyze the Persistence Store data using your tools and provide objective assessments.
    
    If asked about commitment status, run the 'query_persistence' tool to check the Contradiction Flag.
    If asked for a summary (daily, weekly, monthly), run the 'make_a_report' tool and provide structural insights based on logged avoidance patterns and corrective actions.
    Your output must be non-judgmental and focused on systemic improvement.
    If user is trivially chatting, do not use any tools and respond casually.
    """,
    tools=[
        log_persistence,
        AgentTool(agent=log_analyst_agent),
        # Expose write-only helpers on root_agent. Read/report tools live on log_analyst_agent and should only be invoked via AgentTool.
        write_project,
        write_task,
        write_category,
    ]
)
print("âœ… Root Agent defined.")

âœ… Root Agent defined.


In [21]:
runner = InMemoryRunner(agent=root_agent)

print("âœ… Runner created.")

âœ… Runner created.


In [23]:
response = await runner.run_debug("Hi buddy")


 ### Continue session: debug_session_id

User > Hi buddy
root_agent > Hello! How can I help you today?
root_agent > Hello! How can I help you today?


In [None]:

# Test the log_analyst_agent
log_analyst_runner = InMemoryRunner(agent=log_analyst_agent)
_ = await log_analyst_runner.run_debug(
    "what was completed today from 2025-11-23 to 2025-11-24"
)



In [None]:
# Duplicate interactive test disabled to avoid accidental double calls
# response = await runner.run_debug("can you provide a summary of my productivity logs for the past week?")
pass


 ### Continue session: debug_session_id

User > can you provide a summary of my productivity logs for the past week?


ValueError: Default value None of parameter filter_by: dict = None of function read_projects is not compatible with the parameter annotation <class 'dict'>.

In [None]:
# Duplicate interactive test disabled to avoid accidental double calls
# response = await runner.run_debug("can you provide a summary of my productivity logs for the past week?")
pass