# Call center agent - based on LangMem memory extractor

Call center agent supports voice interactions using:
- [OpenAI's Whisper](https://platform.openai.com/docs/guides/speech-to-text) for speech-to-text
- [Kokoro](https://huggingface.co/spaces/hexgrad/Kokoro-TTS) for text-to-speech

LangMem memory manager is used to collect and track information given by the user during the call

![call_center_agent_flow.png](call_center_agent_flow.png)

### Install dependencies

Ensure you have `ffmpeg` installed. 

On linux, you can install it with: \
sudo apt install ffmpeg

In [None]:
#%pip install -U numpy==2.1.0 scipy openai kokoro langchain-core langgraph langchain-openai langmem openai-whisper pydantic email-validator sounddevice

### Global Stuff

#### Imports

In [None]:
from typing import TypedDict, Literal, Optional, List
import io
import threading
import numpy as np
from scipy.io.wavfile import write
from IPython.display import Image, display, Audio
from openai import OpenAI
from kokoro import KPipeline
from pydantic import BaseModel, Field, EmailStr
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import merge_message_runs, HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, MessagesState, END, START
from langchain_openai import ChatOpenAI
from langmem import create_memory_manager
from langmem.knowledge.extraction import MemoryManager
from email.message import EmailMessage
import time
import whisper
import sounddevice as sd
#import smtplib

### Set environment variables

* Set your `OPENAI_API_KEY`

In [None]:
import os, getpass

def _set_env(var: str):
    # Check if the variable is set in the OS environment
    env_value = os.environ.get(var)
    if not env_value:
        # If not set, prompt the user for input
        env_value = getpass.getpass(f"{var}: ")
    
    # Set the environment variable for the current process
    os.environ[var] = env_value

_set_env("OPENAI_API_KEY")

#### Models initialization

In [None]:
# Initialize chat model
chat_model = ChatOpenAI(model="gpt-4o", temperature=0)

# Initialize kokoro TTS pipeline
kpipeline = KPipeline(lang_code='a')

# Initialize whisper STT model
whisper_model = whisper.load_model("medium.en") 

#### Chatbot instructions and messages

In [None]:
# Filling and tracking the user request form 
FORM_FILL_SYSTEM_MESSAGE = """You are a helpful call center agent. 

Your task is to fill a user request form by asking the user to provide necessary information.

Here is the current user request form (may be empty if no information has been provided yet):
<user_request_form>
{request_form}
</user_request_form>

Ask for one form detail at each iteration.

If the request form was completed then reply "Got it!".
"""

# Checking if form completed
FORM_COMPLETION_STATUS_SYSTEM_MESSAGE = """You are a helpful call center agent. 

Your task is to decide if a user request form is completed.

Here is the current user request form (may be empty if no information has been provided yet):
<user_request_form>
{request_form}
</user_request_form>
"""

# Checking if form is valid
FORM_FIX_SYSTEM_MESSAGE = """You are a helpful call center agent. 

Your task is to detect in the following user request form in which field there might be a problem:

Here is the current user request form (may be empty if no information has been provided yet):
<user_request_form>
{request_form}
</user_request_form>
"""

# A message asking the user to approve form details
APPROVE_MESSAGE = """Please reply 'yes' to confirm the details below, or 'no' if you want to make changes:
**********************
{request_form}
**********************
"""

# Welcome message
WELCOME_MESSAGE = "Welcome to automated call center!"

# Respond when user asks to change something in the form
CHANGE_MESSAGE = "What changes would you like to make?"

# Temp file for audio recording
AUDIO_PATH = './audio.wav'

#### Audio utility functions

In [None]:
class AudioUtils:

    def __init__(self):
        self.calibrate_mic()        

    def calibrate_mic(self):
        mean_seq = []
        with sd.InputStream(samplerate=16000, channels=1, dtype='int16') as stream:
            for _ in range(50):
                audio_chunk, _ = stream.read(1024)  # Read audio data in chunks
                mean_seq.append(np.mean(np.abs(audio_chunk)))

        mean_seq = mean_seq[10:]
        self.silence_mean_threshold = 1.5*np.mean(mean_seq)
                
    def play_audio(self, response: str):
        """Play an audio of the response with Kokoro."""
            
        generator = kpipeline(
            response, voice='af_heart', # <= change voice here
            speed=1, split_pattern=r'\n+'
        )
    
        for i, (gs, ps, audio) in enumerate(generator):            
            display(Audio(data=audio, rate=24000, autoplay=i==0))
            time.sleep(len(audio)/24000)
            break    
    
    def record_audio_until_stop(self):
        """Records audio from the microphone until Enter is pressed, then saves it to a .wav file."""        

        audio_data = []  # List to store audio chunks
        recording = True  # Flag to control recording
        sample_rate = 16000 # (kHz) Adequate for human voice frequency
    
        def record_audio():
            """Continuously records audio until the recording flag is set to False."""
            nonlocal audio_data, recording
            with sd.InputStream(samplerate=sample_rate, channels=1, dtype='int16') as stream:
                print("Recording...")
                while recording:
                    audio_chunk, _ = stream.read(1024)  # Read audio data in chunks
                    audio_data.append(audio_chunk)                                                
    
        def stop_recording():
            """Waits for user input to stop the recording."""
    
            started_talking = False
            
            while True:
                
                silent = True

                if len(audio_data) < 11:
                    continue
                                        
                for last_audio_chunk in audio_data[-10:]:                    
                    if np.mean(np.abs(last_audio_chunk)) > self.silence_mean_threshold:
                        silent = False 
                        started_talking = True
        
                if silent and started_talking:                
                    break
                                
            nonlocal recording
            recording = False
    
        # Start recording in a separate thread
        recording_thread = threading.Thread(target=record_audio)
        recording_thread.start()
    
        # Start a thread to listen for the Enter key
        stop_thread = threading.Thread(target=stop_recording)
        stop_thread.start()
    
        # Wait for both threads to complete
        stop_thread.join()
        recording_thread.join()
    
        # Stack all audio chunks into a single NumPy array and write to file
        audio_data = np.concatenate(audio_data, axis=0)    
        
        # Convert to WAV format in-memory
        audio_bytes = io.BytesIO()
        write(audio_bytes, sample_rate, audio_data)  # Use scipy's write function to save to BytesIO
        audio_bytes.seek(0)  # Go to the start of the BytesIO buffer
        audio_bytes.name = AUDIO_PATH # Set a filename for the in-memory file
        with open(audio_bytes.name, "wb") as f:
            f.write(audio_bytes.getbuffer())
    
        return audio_data

audioUtils = AudioUtils()
print(f'audioUtils.silence_mean_threshold: {audioUtils.silence_mean_threshold}')

def get_user_reply():
    audioUtils.record_audio_until_stop() #input()    
    user_reply = whisper_model.transcribe(AUDIO_PATH)
    print(f"user: {user_reply["text"]}")
    return user_reply["text"]

def respond_to_user(txt):
    print(f'assistant: {txt}')
    audioUtils.play_audio(txt)

### Graph definition

![call_center_agent_graph.png](call_center_agent_graph.png)

#### Structures definitions

In [None]:
#Form template
class RequestForm(BaseModel): 
    """User request form template"""
    name: Optional[str] = Field(description="The user's name", default=None)
    email: Optional[EmailStr] = Field(description="The user's email address", default=None)
    request: Optional[str] = Field(description="The user's request", default=None)    

#LangGraph graph state
class RequestState(MessagesState): 
    """Graph state containing chat messages (inherited from MessagesState) and user request form managment variables"""
    request_manager: MemoryManager = Field(description="Request form details extractor", default=None)
    request_form: RequestForm = Field(description="Request form details", default=None)

#Chatbot structured output for form completion status response
class RequestFormStatus(TypedDict): 
    """Decide if the user request form was completed"""
    status: Literal['completed', 'not completed']

#### Node definitions

In [None]:
#start node - welcome message and graph state initialization
def welcome_user(state: RequestState):
    """Play a welcome message to the user, initialize user request memory manager and form template."""
    
    respond_to_user(WELCOME_MESSAGE)

    request_manager = create_memory_manager(
                        chat_model,
                        schemas=[RequestForm],                    
                        enable_inserts=False,  # Profiles update in-place 
                      )

    request_form = RequestForm()

    return {"messages": [AIMessage(content=WELCOME_MESSAGE)], "request_manager": request_manager, "request_form": request_form}


#form update node (loop untill form completion)
def update_form(state: RequestState):
    """Reflect on chat history and update the request form with new details"""

    new_messages = []
    user_messages = [message for message in state["messages"] if type(message) == HumanMessage]    
    
    #get user reply for previous assistant message
    if len(state["messages"]) > 1:
        user_reply = get_user_reply()
        user_messages.append(HumanMessage(user_reply))
        new_messages.append(HumanMessage(user_reply))
    
    #update user request form   
    request_form = state["request_form"] 
    if (len(user_messages) > 0): #update request form with new user provided data        
        request_form = state["request_manager"].invoke({"messages": user_messages})[0].content
        print(f'request_form:\n{request_form}')
        
    #ask user for more details if necessary
    system_msg = FORM_FILL_SYSTEM_MESSAGE.format(request_form=request_form)        
    response = chat_model.invoke([SystemMessage(content=system_msg)] + state["messages"] + new_messages)
    respond_to_user(response.content)

    new_messages.append(response)

    return {"messages": new_messages, "request_form": request_form}


#fix form details node (loop untill form completion)
def change_form(state: RequestState):
    """Reflect on chat history and update the request form with new details"""

    respond_to_user(CHANGE_MESSAGE)

    user_reply = get_user_reply()
    
    user_messages = [message for message in state["messages"] if type(message) == HumanMessage]
    user_messages.append(HumanMessage(user_reply))
    
    #update user request form   
    request_form = state["request_form"] 
    if (len(user_messages) > 0): #update request form with new user provided data        
        request_form = state["request_manager"].invoke({"messages": user_messages})[0].content                
        print(f'request_form:\n{request_form}')

    return {"messages": user_reply, "request_form": request_form}


def fix_form(state: RequestState):
    """Reflect on chat history and fix the request form with correct details"""

    request_form = state["request_form"] 
    
    #ask user to fix form details if necessary
    system_msg = FORM_FIX_SYSTEM_MESSAGE.format(request_form=request_form)        
    response = chat_model.invoke([SystemMessage(content=system_msg)] + state["messages"])
    new_messages = [response]
    
    respond_to_user(response.content)    
    
    #update user request form   
    user_messages = [message for message in state["messages"] if type(message) == HumanMessage]
    user_reply = get_user_reply()
    user_messages.append(HumanMessage(user_reply))    
    
    request_form = state["request_manager"].invoke({"messages": user_messages})[0].content
    print(f'request_form:\n{request_form}')

    new_messages.append(HumanMessage(user_reply))
    
    return {"messages": new_messages, "request_form": request_form}
    

#form completion check node
def check_form_completion(state: RequestState):
    """Decide whether to route the user to confirm form details or to continue filling the form"""

    request_form = state["request_form"]

    if request_form.name is None or request_form.email is None or request_form.request is None:
        return "update_form"
    else:
        #use chat model to decide if form was completed or further corrections are needed
        system_msg = FORM_COMPLETION_STATUS_SYSTEM_MESSAGE.format(request_form=request_form)            
        response = chat_model.with_structured_output(RequestFormStatus).invoke([SystemMessage(content=system_msg)] + state["messages"])

        if response['status'] != 'completed':
            print(f"check_form_completion: chatbot thinks form is missing details: response['status']: {response['status']}\n{request_form}")
            return "fix_form"
        
        return "get_user_confirmation"


#ask user to approve form details
def get_user_confirmation(state: RequestState):
    """Get request details confirmation from user"""

    request_form = state["request_form"]    
    
    response = APPROVE_MESSAGE.format(request_form=request_form)
    
    respond_to_user(response)
    
    user_reply = get_user_reply()

    return {"messages": [response, HumanMessage(user_reply)]}


#Check if user approved form details and route accordingly 
def check_form_confirmation(state: RequestState):
    """Decide whether to open user request or continue filling the form"""

    user_reply = state["messages"][-1:][0].content.lower()

    print(f'user_reply: {user_reply}')

    if 'yes' in user_reply: #user approved form details by replying 'yes'
        return "open_user_request"
    elif 'no' in user_reply:        
        return "change_form"
    else:
        return "get_user_confirmation"


#Continue with openning the user request and sending a notification
def open_user_request(state: RequestState):

    request_form = state["request_form"]

    emsg = EmailMessage()
    emsg.set_content(request_form.request)
    emsg['Subject'] = f'Call Center - request by {request_form.name} is being processed'
    emsg['From'] = 'localhost' #your SMTP server address
    emsg['To'] = request_form.email
    
    # Send the message via our own SMTP server.
    print(f'\n***********\nNotification Email:\n{emsg}***********') #s = smtplib.SMTP('localhost'); s.send_message(msg); s.quit()

    respond_to_user(f'A request was opened for you. A notification was sent to your email. Have a good day!')    
    

# Create the graph + all nodes
builder = StateGraph(RequestState)

builder.add_node("welcome_user", welcome_user)
builder.add_node("update_form", update_form)
builder.add_node("check_form_completion", check_form_completion)
builder.add_node("get_user_confirmation", get_user_confirmation)
builder.add_node("check_form_confirmation", check_form_confirmation)
builder.add_node("change_form", change_form)
builder.add_node("fix_form", fix_form)
builder.add_node("open_user_request", open_user_request)

builder.add_edge(START, "welcome_user")
builder.add_edge("welcome_user", "update_form")
builder.add_conditional_edges("update_form", check_form_completion)
builder.add_conditional_edges("change_form", check_form_completion)
builder.add_conditional_edges("fix_form", check_form_completion)
builder.add_conditional_edges("get_user_confirmation", check_form_confirmation)
builder.add_edge("open_user_request", END)

# We compile the graph with the checkpointer and store
call_center_graph = builder.compile()

In [None]:
result = call_center_graph.invoke({"messages": []})