In [None]:
! pip install langchain langchain_aws langgraph python-dotenv datetime pydantic botocore boto3

In [None]:
import os
import uuid
from dotenv import load_dotenv
from datetime import datetime
import boto3
from botocore.config import Config
import urllib3

from langchain_aws import ChatBedrock
from langchain_core.messages import AIMessage, ToolMessage
from langgraph.prebuilt import create_react_agent

from tools import MeetingTool, TodoTool
from memory_handler import ConversationMemory


def get_llm():

    # Disable SSL warnings 
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Initialize AWS Bedrock client
    bedrock_client = boto3.client(
        service_name='bedrock-runtime',
        region_name='eu-west-1',
        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
        verify=False,  # Disable SSL verification
        config=Config(
            proxies={'https': None}
        )
    )

    # Define the LLM model for the agents
    model = ChatBedrock(
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        client=bedrock_client,
        model_kwargs={
            "temperature": 0,
            "max_tokens": 2000,
        }
    )

    return model

In [None]:
import os
import json
from typing import Dict, Any, Optional, Callable
from langgraph.checkpoint.memory import MemorySaver

class ConversationMemory(MemorySaver):
    """Enhanced memory handler for storing and retrieving categorized data."""

    def __init__(self, file_path: str = "data/conversation_memory.json"):
        # Ensure data directory exists
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        self.file_path = file_path
        self._load_memory()    

    def _load_memory(self):
        if os.path.exists(self.file_path):
            with open(self.file_path, 'r') as f:
                self.memory = json.load(f)
        else:
            self.memory = {"messages": [], "entries": []}  
            self.save(self.memory)

    def save(self, state: Dict[str, Any]):
        """Save the current state to memory"""
        self.memory = state
        with open(self.file_path, 'w') as f:
            json.dump(state, f, indent=2)

    def load(self) -> Dict[str, Any]:
        """Load the memory state"""
        return self.memory

    def append_memory(self, entry: Dict):
        """Add a new categorized entry to memory."""
        if "entries" not in self.memory:
            self.memory["entries"] = []
        self.memory["entries"].append(entry)
        self.save(self.memory)

    def get_memory(self, entry_type: str, filters: Optional[Dict] = None) -> list:
        """Retrieve entries of a specific type with optional filters."""
        results = [entry for entry in self.memory.get("entries", []) if entry.get("type") == entry_type]
        if filters:
            for key, value in filters.items():
                results = [entry for entry in results if entry["data"].get(key) == value]

        print("Getting current conversation:")
        print(results)
        print("\n")
        return results

    def delete_memory(self, entry_type: str, condition: Callable[[Dict], bool]):
        """Delete entries of a specific type based on a condition."""
        self.memory["entries"] = [
            entry for entry in self.memory.get("entries", [])
            if entry.get("type") != entry_type or not condition(entry)
        ]
        self.save(self.memory)

    def reset_messages(self):
        """Reset the conversation messages."""
        self.memory["messages"] = []
        self.save(self.memory)


In [None]:
from langchain.tools import BaseTool
from typing import Literal, Optional
from pydantic import BaseModel, Field

# Input schema for documentation-related actions
class TodoInput(BaseModel):
    action: Literal['add', 'list', 'delete'] = Field(description="Action to perform: 'add', 'list', or 'delete'")
    task: Optional[str] = Field(None, description="Task description")
    priority: Optional[str] = Field(None, description="Priority level: high, medium, or low")
    due_date: Optional[str] = Field(None, description="Due date in YYYY-MM-DD format")
    status: Optional[str] = Field(None, description="Status of the task")

class TodoTool(BaseTool):
    def __init__(self, memory_handler: ConversationMemory):
        super().__init__(
            name="todo_tool", 
            description="manage task and todos like add, list, complete and delete. Use this tool to add a new task, list all tasks that are in, complete a task, or delete a task.",
            args_schema=TodoInput
        )
        self._memory_handler = memory_handler

    # Core function for processing actions based on agent input
    def _run(self, action: str, task: Optional[str] = None, 
             priority: Optional[str] = None, due_date: Optional[str] = None, status: Optional[str] = None) -> str:
        print(f"\n[DEBUG] TodoTool received: action={action}, task={task}, priority={priority}, due_date={due_date}, status={status}")

        # Perform the specified action
        if action == "add":
            if not task:
                return "Error: Task description is required."

            todo = {"task": task, "priority": priority, "due_date": due_date, "status": status if status else "pending"}
            self._memory_handler.append_memory({"type": "todo", "data": todo})
            return f"Added task: '{task}'" + (f" with priority {priority}" if priority else "") + \
                (f" and due date {due_date}" if due_date else "")

        elif action == "list":
            todos = self._memory_handler.get_memory("todo")
            if not todos:
                return "No tasks found."

            return "Tasks:\n" + "\n".join(
                [f"- {t['data']['task']} (Priority: {t['data'].get('priority', 'N/A')}, Due: {t['data'].get('due_date', 'N/A')}, Status: {t['data'].get('status', 'unknown')})"
                for t in todos]
            )

        elif action == "delete":
            if not task:
                return "Error: Task description is required for deletion."

            self._memory_handler.delete_memory("todo", lambda t: t["data"]["task"] == task)
            return f"Deleted task: '{task}'"

        return "Invalid action. Please use 'add', 'list', 'complete', or 'delete' for task/todo."


In [None]:
from langchain.tools import BaseTool
from typing import Literal, Optional, List
from pydantic import BaseModel, Field

# Input schema for meeting-related actions
class MeetingInput(BaseModel):
    action: Literal['schedule', 'list', 'cancel'] = Field(description="Action to perform: 'schedule', 'list', or 'cancel'")
    date: str = Field(description="Meeting date in YYYY-MM-DD format")
    time: Optional[str] = Field(None, description="Meeting time in HH:MM format")
    title: Optional[str] = Field(None, description="Meeting title")
    participants: Optional[List[str]] = Field(None, description="List of participants")


# Tool for managing meetings, scheduling, listing, and canceling
class MeetingTool(BaseTool):
    # Define the tool's name, description, and input schema

    def __init__(self, memory_handler: ConversationMemory):
        super().__init__(
            name="meeting_tool", 
            description="Manages meeting, scheduling, listing and canceling meetings. Use it to schedule a new meeting, listing all meetings, or canceling one.",
            args_schema=MeetingInput
        )
        self._memory_handler = memory_handler

    # Core function for processing actions based on agent input
    def _run(
            self, 
            action: str, 
            date: str, 
            time: Optional[str] = None, 
            title: Optional[str] = None, 
            participants: Optional[List[str]] = None
        ) -> str:
        
        print(f"[DEBUG] Calling MeetingTool: action={action}, date={date}, time={time}, title={title}, participants={participants}")

        # Perform the specified action

        # Schedule a meeting
        if action == "schedule":
            if not all([time, title, participants]):
                return "Error: Missing required fields for scheduling a meeting. Please provide time, title, and participants."
            
            meeting = {
                "date": date, 
                "time": time, 
                "title": title, 
                "participants": participants
            }
            self._memory_handler.append_memory({"type": "meeting", "data": meeting})
            return f"Scheduled meeting '{title}' on {date} at {time} with {', '.join(participants)}"

        # List meetings for a specific date
        elif action == "list":
            filters = {}
            if date:
                filters["date"] = date

            meetings = self._memory_handler.get_memory("meeting", filters=filters)

            if not meetings:
                return f"No meetings found."
            
            meeting_infos = []
            for m in meetings:
                meeting_title = m["data"]["title"]
                meeting_time = m["data"]["time"]
                meeting_participants = m["data"]["participants"]

                meeting_info = f"- {meeting_title} at {meeting_time} with {', '.join(meeting_participants)}"
                meeting_infos.append(meeting_info)

            meetings = "Found the following meetings:\n" + "\n".join(meeting_infos)
            print(f"[DEBUG] {meetings}")
            return meetings

        elif action == "cancel":
            if not title:
                return "Error: Title is required for cancellation."

            self._memory_handler.delete_memory(
                "meeting", lambda m: m["data"]["date"] == date and m["data"]["title"] == title
            )
            return f"Cancelled meeting '{title}' scheduled for {date}"

        return "Invalid action. Please use 'schedule', 'list', or 'cancel'."


In [None]:

# Function to create the assistant agent and defined tools
def create_assistants():
    """Initialize the agents and the supervisor"""
    

    model = get_llm()

    # Get the current date
    current_date = datetime.now().strftime("%B %d, %Y")

    print("[DEBUG] Agent initializing...")

    # Create memory saver
    memory = ConversationMemory()

    # Define the assistant prompt
    agent_prompt = f"""
        Sei un assistente gentile e simpatico creato per aiutare l'utente nella gestione di riunioni e attività.
        La data di oggi è {current_date}

        Segui queste linee guida:
        - Rispondi in modo naturale ed evita di ripetere i dettagli delle attività o delle riunioni se non richiesto
        - Converti tutte le date nel formato YYYY-MM-DD usando {current_date} come riferimento
        - Utilizza sempre la cronologia della conversazione per mantenere il contesto della conversazione precedente
        - Usa gli strumenti forniti per gestire efficacemente attività e riunioni
        - Rispondi in modo amichevole e professionale, interagendo con l'utente come un assistente disponibile, cercando di comprendere le necessità, le intenzioni e le preferenze dell'utente
        - Se non sei sicuro della richiesta dell'utente, chiedi chiarimenti o fornisci una risposta generale
        - Rispondi in maniera simpatica, in modo da mettere l'utente a proprio agio

        Per le riunioni:
        - Puoi programmare, elencare e cancellare riunioni
        - Estrai titolo, partecipanti e orario dal contesto
        - Se vengono menzionati più partecipanti, combinali insieme

        Per le attività:
        - Puoi aggiungere, elencare, completare ed eliminare attività
        - Gestisci la priorità delle attività e le date di scadenza
        - Aggiorna lo stato delle attività dal contesto e notifica l'utente

        Per richieste composte (più azioni in una singola richiesta):
        - Elabora ogni azione una sola volta
        - Evita di ripetere azioni o creare duplicati

        Non inventare mai informazioni o fare supposizioni sulle intenzioni dell'utente. Chiedi sempre chiarimenti se necessario.

        Per esempio, se l'utente dice "Programma una riunione", dovresti chiedere la data, l'ora, il titolo e i partecipanti e non presumere mai nessuno di questi dettagli.

        Se l'utente chiede qualcosa come "Cosa c'è nel mio calendario?", dovresti elencare tutte le riunioni programmate per il giorno e anche tutte le attività in sospeso.
    """

    # Create the assistant agent
    agent = create_react_agent(
        model=model, # Add the LLM model
        tools=[MeetingTool(memory), TodoTool(memory)], # Add the defined tools
        state_modifier= agent_prompt, # Add the assistant prompt
    )


    return agent, memory


In [None]:
# Print welcome message and instructions for user
print("\nWelcome!")

# Create the assistant agent and memory saver
agent, memory_saver = create_assistants()

# Main conversation loop
while True:
    try:
        # Load existing conversation state
        state = memory_saver.load()

        # Get user input
        user_input = input("\nUser: ")
        
        # Check if user wants to quit
        if user_input.lower() == 'quit':
            print("\nGoodbye!")
            memory_saver.reset_messages()
            break

        # Update state with new message
        messages = state.get("messages", [])
        messages.append({"role": "user", "content": user_input})
        
        # Prepare input for agent
        inputs = {
            "messages": messages
        }

        ai_response = "" 
        
        # Invoke the assistant agent
        for event in agent.stream(inputs, stream_mode="values"):
            
            message = event["messages"][-1]
            
            if isinstance(message, AIMessage):
                message.pretty_print()
                ai_response += message.content + "\n"

            elif isinstance(message, ToolMessage):
                message.pretty_print()



        # Update state with AI response
        messages.append({
            "role": "assistant",
            "content": ai_response
        })

        # Save updated state
        state["messages"] = messages
        memory_saver.save(state)

    except KeyboardInterrupt:
        print("\nGoodbye!")
        memory_saver.reset_messages()
        break

    except Exception as e:
        print(f"Error occurred: {e}")