In [None]:
import sys
import os
import datetime
import re
import time
import numpy as np
import datetime
from typing import List, Dict, Any, Tuple
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain, SequentialChain, SimpleSequentialChain
from langchain_openai import OpenAI
from langchain.agents import AgentType, initialize_agent, Tool
from langchain.agents import AgentExecutor
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
from langchain.vectorstores import FAISS
from dateutil import parser
from dateutil.relativedelta import relativedelta
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.tools import DuckDuckGoSearchRun


model = ChatGoogleGenerativeAI(
    google_api_key="AIzaSyBRNeqFxx67kRd0L5ZQXS8ZXaP3gENeMGQ",
    model="gemini-2.0-flash",
    temperature=0.0,
)

embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    model_kwargs={'device': 'cpu'}
)

class RAGDocStore:
    def __init__(self, embeddings):
        self.embeddings = embeddings
        self.ticket_store = None
        self.calendar_store = None
        self.all_tickets = []
        self.all_events = []
        
    def add_ticket(self, content):
        self.all_tickets.append(content)
        documents = [Document(page_content=content, metadata={"type": "ticket"})]
        
        if self.ticket_store is None:
            self.ticket_store = FAISS.from_documents(documents, self.embeddings)
        else:
            self.ticket_store.add_documents(documents)
    
    def add_event(self, content):
        self.all_events.append(content)
        documents = [Document(page_content=content, metadata={"type": "event"})]
        
        if self.calendar_store is None:
            self.calendar_store = FAISS.from_documents(documents, self.embeddings)
        else:
            self.calendar_store.add_documents(documents)
    
    def search_tickets(self, query, k=3):
        if not self.ticket_store or not self.all_tickets:
            return []
            
        results = self.ticket_store.similarity_search(query, k=k)
        return [doc.page_content for doc in results]
    
    def search_events(self, query, k=3):
        if not self.calendar_store or not self.all_events:
            return []
            
        results = self.calendar_store.similarity_search(query, k=k)
        return [doc.page_content for doc in results]
        
    def search_all(self, query, k=3):
        ticket_results = self.search_tickets(query, k=k)
        event_results = self.search_events(query, k=k)
        return ticket_results + event_results

    def update_ticket(self, old_ticket, new_ticket):
        try:
            if old_ticket in self.all_tickets:
                index = self.all_tickets.index(old_ticket)
                self.all_tickets[index] = new_ticket
                
                if self.ticket_store:
                    documents = [Document(page_content=ticket, metadata={"type": "ticket"}) 
                                for ticket in self.all_tickets]
                    self.ticket_store = FAISS.from_documents(documents, self.embeddings)
                
                return True
            return False
        except Exception as e:
            print(f"Error updating ticket: {e}")
            return False

def setup_docstore():
    if not os.path.exists("data"):
        os.makedirs("data")
    
    docstore = RAGDocStore(embeddings)
    
    for filename in ["tickets.txt", "calendar.txt"]:
        file_path = os.path.join("data", filename)
        if not os.path.exists(file_path):
            with open(file_path, "w") as f:
                pass
    
    try:
        with open("data/tickets.txt", "r") as f:
            for line in f:
                line = line.strip()
                if line:
                    docstore.add_ticket(line)
    except Exception as e:
        print(f"Error loading tickets: {e}")
    
    try:
        with open("data/calendar.txt", "r") as f:
            for line in f:
                line = line.strip()
                if line:
                    docstore.add_event(line)
    except Exception as e:
        print(f"Error loading calendar: {e}")

    return docstore

docstore = setup_docstore()

class TicketTool:
    def __init__(self, docstore):
        self.docstore = docstore
        self.next_ticket_id = self._get_next_ticket_id()

    def _get_next_ticket_id(self):
        try:
            with open("data/tickets.txt", "r") as f:
                tickets = f.read().splitlines()
            
            max_id = 1000
            
            for ticket in tickets:
                match = re.search(r"Ticket #(\d+):", ticket)
                if match:
                    ticket_id = int(match.group(1))
                    max_id = max(max_id, ticket_id)
            
            return max_id + 1
            
        except FileNotFoundError:
            return 1001
        except Exception as e:
            print(f"Error reading tickets file: {e}")
            return 1001
    
    def create_ticket(self, issue_description):
        issue_description = issue_description.strip()
        if len(issue_description) > 100:
            issue_description = issue_description[:100] + "..."
        
        ticket_id = self.next_ticket_id
        self.next_ticket_id += 1
        status = "New"
        created_date = datetime.datetime.now().strftime("%Y-%m-%d")
        ticket = f"Ticket #{ticket_id}: {issue_description} - Status: {status} - Created: {created_date}"
        
        with open("data/tickets.txt", "a") as f:
            f.write("\n" + ticket)
        
        self.docstore.add_ticket(ticket)
        
        return f"Created ticket #{ticket_id}. Status: {status}"

    def update_ticket_status(self, ticket_info):
        try:
            parts = ticket_info.strip().split(' ', 1)
            
            if len(parts) != 2:
                return "Please provide both ticket ID and new status (e.g., '1001 Resolved')"
                
            ticket_id = parts[0].strip()
            ticket_id = ticket_id.replace("'", "").replace('"', '').replace('#', '').strip()
                
            if not ticket_id.isdigit():
                return f"Invalid ticket ID: {ticket_id}. Please provide a numeric ticket ID."
                
            new_status = parts[1].strip()
            
            ticket_query = f"Ticket #{ticket_id}"
            matching_tickets = self.docstore.search_tickets(ticket_query, k=5)
            
            target_ticket = None
            for ticket in matching_tickets:
                if f"Ticket #{ticket_id}:" in ticket:
                    target_ticket = ticket
                    break
            
            if not target_ticket:
                return f"Ticket #{ticket_id} not found."
            
            current_status_match = re.search(r"Status: ([^-]+)", target_ticket)
            if not current_status_match:
                return f"Could not parse status in ticket: {target_ticket}"
                
            current_status = current_status_match.group(1).strip()
            updated_ticket = target_ticket.replace(f"Status: {current_status}", f"Status: {new_status}")
            
            update_successful = self.docstore.update_ticket(target_ticket, updated_ticket)
            
            if not update_successful:
                return f"Failed to update ticket in document store."
                
            with open("data/tickets.txt", "r") as f:
                tickets = f.read().splitlines()
                
            for i, ticket in enumerate(tickets):
                if f"Ticket #{ticket_id}:" in ticket:
                    tickets[i] = updated_ticket
                    break
                    
            with open("data/tickets.txt", "w") as f:
                f.write("\n".join(tickets))
                
            return f"Updated ticket #{ticket_id} status from '{current_status}' to '{new_status}'."
            
        except Exception as e:
            return f"Error updating ticket status: {str(e)}"
    
    def check_ticket_status(self, ticket_id_or_query):
        results = self.docstore.search_tickets(ticket_id_or_query, k=3)
        
        if not results:
            return "No matching tickets found."
        
        return "Found the following relevant tickets:\n" + "\n".join(results)

class CalendarTool:
    def __init__(self, docstore):
        self.docstore = docstore
    
    def schedule_event(self, event_details):
        event_details = event_details.strip()
    
        if event_details.startswith("Event:") and " - Date:" in event_details and " - Time:" in event_details and " - Location:" in event_details:
            event = event_details
            
            title_match = re.search(r'Event:\s*([^-]+)', event)
            date_match = re.search(r'Date:\s*([^-]+)', event)
            time_match = re.search(r'Time:\s*([^-]+)', event)
            location_match = re.search(r'Location:\s*(.+)$', event)
            
            title = title_match.group(1).strip() if title_match else "Event"
            date_str = date_match.group(1).strip() if date_match else "Unknown Date"
            time_str = time_match.group(1).strip() if time_match else "Unknown Time"
            location = location_match.group(1).strip() if location_match else "Unknown Location"
        else:
            event = event_details
            title = "Event"
            date_str = datetime.datetime.now().strftime("%Y-%m-%d")
            time_str = "12:00"
            location = "See details"
        
        with open("data/calendar.txt", "a") as f:
            f.write("\n" + event)
        
        self.docstore.add_event(event)
        
        return f"Scheduled: {title} for {date_str} at {time_str} in {location}"
    
    def check_calendar(self, query):
        results = self.docstore.search_events(query, k=3)
        
        if not results:
            return "No matching events found."
        
        return "Found the following relevant events:\n" + "\n".join(results)

class ContextTool:
    def __init__(self, docstore):
        self.docstore = docstore
    
    def get_context(self, query):
        results = self.docstore.search_all(query, k=5)
        
        if not results:
            return "No relevant information found."
        
        return "Here's relevant information that might help:\n" + "\n".join(results)

class DateTool:    
    def get_current_date(self, _=""):
        return datetime.datetime.now().strftime("%Y-%m-%d")
        
date_tool = DateTool()
ticket_tool = TicketTool(docstore)
calendar_tool = CalendarTool(docstore)
context_tool = ContextTool(docstore)

tools = [
    Tool(
        name="CheckCurrentDate",
        func=date_tool.get_current_date,
        description="Checks today's date. Returns current date in YYYY-MM-DD format. Input can be empty."
    ),
    Tool(
        name="CreateTicket",
        func=ticket_tool.create_ticket,
        description="Create a new support ticket. Input should be a description of the issue."
    ),
    Tool(
        name="UpdateTicketStatus",
        func=ticket_tool.update_ticket_status,
        description="Update the status of an existing ticket. Input should be ticket ID followed by new status (e.g., '1001 Resolved'). Possible values of new_status are: Resolved, New, In Progress, Pending, Investigating."
    ),
    Tool(
        name="CheckTicketStatus",
        func=ticket_tool.check_ticket_status,
        description="Check the status of existing tickets. Input can be a ticket number or description."
    ),
    Tool(
        name="ScheduleEvent",
        func=calendar_tool.schedule_event,
        description="Schedule a new event. Output exactly should be of format -> `Event: 'event name' - Date: 'the date the event is scheduled for in yyyy-mm-dd format strictly' - Time: 'the time the event is scheduled for, hh-mm format strictly' - Location: 'the location the event is scheduled for'`. Example: 'Event: Swimming - Date: 2025-04-24 - Time: 03:00 - Location: Swimming pool'"
    ),
    Tool(
        name="CheckCalendar",
        func=calendar_tool.check_calendar,
        description="Check for scheduled events. Input should be a date or description."
    ),
    Tool(
        name="GetContext",
        func=context_tool.get_context,
        description="Get relevant context from knowledge base about a topic or query."
    )
]

def classify_intent(query):
    classification_prompt = PromptTemplate(
        input_variables=["query"],
        template="""Classify this service desk query into ONE category:
        'ticket_creation', 'ticket_status', 'ticket_status_update', 'schedule_event', 'check_calendar', 'check_current_date' or 'general_inquiry'.
        
        Query: {query}
        
        Category:"""
    )
    
    classification_chain = LLMChain(llm=model, prompt=classification_prompt)
    result = classification_chain.run(query)
    return result.strip().lower()

def create_agent():
    return initialize_agent(
        tools,
        model,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        verbose=True,
        handle_parsing_errors=True,
        max_iterations=5,
        agent_kwargs={
            "prefix": """You are a helpful service desk assistant that can help with tickets and scheduling events. 
            You have access to several tools to help you with your tasks.
            When using tools, use their exact names without parentheses or additional characters.
            For example, use "CheckCurrentDate" instead of "CheckCurrentDate()".
            
            IMPORTANT: For scheduling events, always use the CheckCurrentDate tool first to get today's date,
            then calculate the next day's date manually before using the ScheduleEvent tool.
            
            When scheduling events, make sure to format the event details correctly:
            Event: [name] - Date: [YYYY-MM-DD] - Time: [HH:MM] - Location: [location]
            """
        }
    )

def generate_rag_response(query):
    context = context_tool.get_context(query)
    
    rag_prompt = PromptTemplate(
        input_variables=["query", "context"],
        template="""You are a helpful service desk assistant. Use the following context to help answer the query.
        
        Context: {context}
        
        Query: {query}
        
        Answer:"""
    )
    
    rag_chain = LLMChain(llm=model, prompt=rag_prompt)
    return rag_chain.run(query=query, context=context)

def get_response(query, agent):
    try:
        intent = classify_intent(query)
        print(f"Classified intent: {intent}")
        
        if intent == 'ticket_creation':
            augmented_query = f"I need to create a ticket for the following issue: {query}"
        elif intent == 'ticket_status':
            augmented_query = f"I need to check the status of the following ticket: {query}"
        elif intent == 'ticket_status_update':
            augmented_query = f"I need to update the status of the following ticket: {query}"
        elif intent == 'schedule_event':
            augmented_query = f"I need to schedule an event: {query}"
        elif intent == 'check_calendar':
            augmented_query = f"I need to check my calendar for: {query}"
        elif intent == 'check_current_date':
            augmented_query = "What is today's date?"
        else:
            augmented_query = query
            
        try:
            result = agent.run(augmented_query)
            return result
        except Exception as agent_error:
            print(f"Agent error: {agent_error}")
            return generate_rag_response(query)
        
    except Exception as e:
        print(f"Error: {e}")
        return "I'm sorry, I couldn't process your request. Could you please try again with more details?"

def parse_input(user_input):
    if user_input.lower() in ['quit', 'exit', 'q']:
        return None
        
    return user_input

def run_llm():
    print("Service Desk Smart Assistant")
    print("----------------------------")
    agent = create_agent()
    
    while True:
        try:
            user_input = input("> ")
            query = parse_input(user_input)
            
            if query is None:
                print("Thank you for using the Service Desk Assistant. Goodbye!")
                sys.exit()
            
            if not query.strip():
                print("Please enter a request or type 'quit' to exit.")
                continue
                
            print("Processing your request...")
            response = get_response(query, agent)
            print(f"\n{response}\n")
        except KeyboardInterrupt:
            print("\nThank you for using the Service Desk Assistant. Goodbye!")
            sys.exit()
        except Exception as e:
            print(f"\nAn error occurred: {e}\nPlease try again.\n")

if __name__ == "__main__":
    run_llm()