# Week 2 Exercise - with Booking, Translation and Speech-To-Text

In [0]:
# Note: The speech-to-text functionality requires FFmpeg to be installed. Go to FFmpeg website and downoad the corresponding OS installer.
# !pip install openai-whisper sounddevice scipy numpy

In [0]:
# imports

import os
import json
from dotenv import load_dotenv
from openai import OpenAI
import gradio as gr
from anthropic import Anthropic
import numpy as np
import sounddevice as sd
import scipy.io.wavfile as wav
import tempfile
import whisper

In [0]:
# Initialization
load_dotenv(override=True)
openai_api_key = os.getenv('OPENAI_API_KEY')
anthropic_api_key = os.getenv('ANTHROPIC_API_KEY')
# Initialize clients
MODEL = "gpt-4o-mini"
STT_DURATION = 3
openai = OpenAI()
anthropic = Anthropic(api_key=anthropic_api_key)

In [0]:
system_message = "You are a helpful assistant for an Airline called FlightAI. "
system_message += "Give short, courteous answers, no more than 1 sentence. "
system_message += "Always be accurate. If you don't know the answer, say so."

In [0]:
# get ticket price function

ticket_prices = {"london": "$799", "paris": "$899", "tokyo": "$1400", "berlin": "$499", "rome": "$699", "bucharest": "$949", "moscow": "$1199"}

def get_ticket_price(destination_city):
    print(f"Tool get_ticket_price called for {destination_city}")
    city = destination_city.lower()
    return ticket_prices.get(city, "Unknown")

# create booking function
import random

def create_booking(destination_city):
    # Generate a random 6-digit number
    digits = ''.join([str(random.randint(0, 9)) for _ in range(6)])  
    booking_number = f"AI{digits}"
    
    # Print the booking confirmation message
    print(f"Booking {booking_number} created for the flight to {destination_city}")
    
    return booking_number

In [0]:
# price function structure:

price_function = {
    "name": "get_ticket_price",
    "description": "Get the price of a return ticket to the destination city. Call this whenever you need to know the ticket price, for example when a customer asks 'How much is a ticket to this city'",
    "parameters": {
        "type": "object",
        "properties": {
            "destination_city": {
                "type": "string",
                "description": "The city that the customer wants to travel to",
            },
        },
        "required": ["destination_city"],
        "additionalProperties": False
    }
}

# booking function structure:
booking_function = {
    "name": "make_booking",
    "description": "Make a flight booking for the customer. Call this whenever a customer wants to book a flight to a destination.",
    "parameters": {
        "type": "object",
        "properties": {
            "destination_city": {
                "type": "string",
                "description": "The city that the customer wants to travel to",
            },
        },
        "required": ["destination_city"],
        "additionalProperties": False
    }
}

In [0]:
# List of tools:

tools = [
    {"type": "function", "function": price_function},
    {"type": "function", "function": booking_function}
]

In [0]:
# Function handle_tool_call:

def handle_tool_call(message):
    tool_call = message.tool_calls[0]
    function_name = tool_call.function.name
    arguments = json.loads(tool_call.function.arguments)
    
    if function_name == "get_ticket_price":
        city = arguments.get('destination_city')
        price = get_ticket_price(city)
        response = {
            "role": "tool",
            "content": json.dumps({"destination_city": city,"price": price}),
            "tool_call_id": tool_call.id
        }
        return response, city
    elif function_name == "make_booking":
        city = arguments.get('destination_city')
        booking_number = create_booking(city)
        response = {
            "role": "tool",
            "content": json.dumps({"destination_city": city, "booking_number": booking_number}),
            "tool_call_id": tool_call.id
        }
        return response, city

In [0]:
# Image generation

import base64
from io import BytesIO
from PIL import Image

def artist(city, testing_mode=False):
    if testing_mode:
        print(f"Image generation skipped for {city} - in testing mode")
        return None
    
    image_response = openai.images.generate(
            model="dall-e-3",
            prompt=f"An image representing a vacation in {city}, showing tourist spots and everything unique about {city}, in a realistic style",
            size="1024x1024",
            n=1,
            response_format="b64_json",
        )
    image_base64 = image_response.data[0].b64_json
    image_data = base64.b64decode(image_base64)
    return Image.open(BytesIO(image_data))

In [0]:
# Text to speech 

import base64
from io import BytesIO
from PIL import Image
from IPython.display import Audio, display

def talker(message, testing_mode=False):
    """Generate speech from text and return the path to the audio file for Gradio to play"""
    if testing_mode:
        print(f"Text-to-speech skipped - in testing mode")
        return None
    
    try:
        response = openai.audio.speech.create(
            model="tts-1",
            voice="onyx",
            input=message)

        # Save to a unique filename based on timestamp to avoid caching issues
        import time
        timestamp = int(time.time())
        output_filename = f"output_audio_{timestamp}.mp3"
        
        with open(output_filename, "wb") as f:
            f.write(response.content)
        
        print(f"Audio saved to {output_filename}")
        return output_filename
    except Exception as e:
        print(f"Error generating speech: {e}")
        return None

In [0]:
# Speech to text function

def recorder_and_transcriber(duration=STT_DURATION, samplerate=16000, testing_mode=False):
    """Record audio for the specified duration and transcribe it using Whisper"""
    if testing_mode:
        print("Speech-to-text skipped - in testing mode")
        return "This is a test speech input"
    
    print(f"Recording for {duration} seconds...")
    
    # Record audio using sounddevice
    recording = sd.rec(int(duration * samplerate), samplerate=samplerate, channels=1, dtype='float32')
    sd.wait()  # Wait until recording is finished
    
    # Save the recording to a temporary WAV file
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio:
        temp_filename = temp_audio.name
        wav.write(temp_filename, samplerate, recording)
    
    # Load Whisper model and transcribe
    model = whisper.load_model("base")  # You can use "tiny", "base", "small", "medium", or "large"
    result = model.transcribe(temp_filename)
    
    # Clean up the temporary file
    import os
    os.unlink(temp_filename)
    
    return result["text"].strip()

In [0]:
import os
import glob

def cleanup_audio_files():
    """Delete all MP3 files in the current directory that match our output pattern"""
    
    # Get all mp3 files that match our naming pattern
    mp3_files = glob.glob("output_audio_*.mp3")
    
    # Delete each file
    count = 0
    for file in mp3_files:
        try:
            os.remove(file)
            count += 1
        except Exception as e:
            print(f"Error deleting {file}: {e}")
    
    print(f"Cleaned up {count} audio files")
    return None

In [0]:
# Translation function

def translate_text(text, target_language):
    if not text or not target_language:
        return ""
        
    # Map the language dropdown values to language names for Claude
    language_map = {
        "French": "French",
        "Spanish": "Spanish",
        "German": "German",
        "Italian": "Italian",
        "Russian": "Russian",
        "Romanian": "Romanian"
    }
    
    full_language_name = language_map.get(target_language, "French")
    
    try:
        response = anthropic.messages.create(
            model="claude-3-haiku-20240307",
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": f"Translate the following text to {full_language_name}. Provide only the translation, no explanations: \n\n{text}"
                }
            ]
        )
        return response.content[0].text
    except Exception as e:
        print(f"Translation error: {e}")
        return f"[Translation failed: {str(e)}]"

In [0]:
def chat(history, image, testing_mode=False):
    messages = [{"role": "system", "content": system_message}] + history
    response = openai.chat.completions.create(model=MODEL, messages=messages, tools=tools)    
    
    if response.choices[0].finish_reason=="tool_calls":
        message = response.choices[0].message
        response, city = handle_tool_call(message)
        messages.append(message)
        messages.append(response)
        
        # Only generate image if not in testing mode
        if not testing_mode and image is None:
            image = artist(city, testing_mode)
            
        response = openai.chat.completions.create(model=MODEL, messages=messages)
        
    reply = response.choices[0].message.content
    history += [{"role":"assistant", "content":reply}]    

    # Return the reply directly - we'll handle TTS separately
    return history, image, reply

In [0]:
# Function to translate conversation history
def translate_history(history, target_language):
    if not history or not target_language:
        return []
    
    translated_history = []
    
    for msg in history:
        role = msg["role"]
        content = msg["content"]
        
        translated_content = translate_text(content, target_language)
        translated_history.append({"role": role, "content": translated_content})
    
    return translated_history

In [0]:
# Update the Gradio interface to handle audio output properly
def update_gradio_interface():
    with gr.Blocks() as ui:
        # Store chat history and audio output in state
        state = gr.State([])
        audio_state = gr.State(None)
        
        with gr.Row():
            testing_checkbox = gr.Checkbox(label="Testing", info="Turn off multimedia features when checked", value=False)
        
        with gr.Row():
            with gr.Column(scale=2):
                # Main panel with original chat and image
                with gr.Row():
                    with gr.Column(scale=1):
                        with gr.Row():
                            chatbot = gr.Chatbot(height=300, type="messages")
                        with gr.Row():
                            language_dropdown = gr.Dropdown(
                                choices=["French", "Spanish", "German", "Italian", "Russian", "Romanian"],
                                value="French",
                                label="Translation to"
                            )
                        with gr.Row():
                            translation_output = gr.Chatbot(height=200, type="messages", label="Translated chat")
                    with gr.Column(scale=1):
                        with gr.Row():
                            image_output = gr.Image(height=620)
                        with gr.Row():
                            audio_output = gr.Audio(label="Assistant's Voice", visible=False, autoplay=True, type="filepath")
                                        
        with gr.Row():
            entry = gr.Textbox(label="Chat with our AI Assistant:")
                    
        with gr.Row():
            with gr.Column(scale=1):
                with gr.Row():
                    md = gr.Markdown()
            with gr.Column(scale=1):
                speak_button = gr.Button(value="🎤 Speak Command", variant="primary")
            with gr.Column(scale=1):
                with gr.Row():
                    md = gr.Markdown()
            with gr.Column(scale=1):            
                with gr.Row():
                    clear = gr.Button(value="Clear", variant="secondary")
            with gr.Column(scale=1):
                with gr.Row():
                    md = gr.Markdown()

        # Function to handle speech input
        def do_speech_input(testing_mode):
            # Record and transcribe speech
            speech_text = recorder_and_transcriber(duration=STT_DURATION, testing_mode=testing_mode)
            return speech_text
            
        # Function to handle user input
        def do_entry(message, history, testing_mode):
            history += [{"role":"user", "content":message}]
            return "", history
        
        # Function to handle translation updates
        def do_translation(history, language):
            translated = translate_history(history, language)
            return translated
        
        # Function to handle text-to-speech
        def do_tts(reply, testing_mode):
            if not reply or testing_mode:
                return None
            
            audio_file = talker(reply, testing_mode)
            return audio_file
        
        # Handle user message submission
        entry.submit(do_entry, inputs=[entry, chatbot, testing_checkbox], outputs=[entry, chatbot]).then(
            chat, inputs=[chatbot, image_output, testing_checkbox], outputs=[chatbot, image_output, audio_state]
        ).then(
            do_tts, inputs=[audio_state, testing_checkbox], outputs=[audio_output]
        ).then(
            do_translation, inputs=[chatbot, language_dropdown], outputs=[translation_output]
        )
        
        # Add speech button handling
        speak_button.click(
            do_speech_input, 
            inputs=[testing_checkbox], 
            outputs=[entry]
        ).then(
            do_entry, 
            inputs=[entry, chatbot, testing_checkbox], 
            outputs=[entry, chatbot]
        ).then(
            chat, 
            inputs=[chatbot, image_output, testing_checkbox], 
            outputs=[chatbot, image_output, audio_state]
        ).then(
            do_tts, inputs=[audio_state, testing_checkbox], outputs=[audio_output]
        ).then(
            do_translation, 
            inputs=[chatbot, language_dropdown], 
            outputs=[translation_output]
        )
        
        # Update translation when language is changed
        language_dropdown.change(do_translation, inputs=[chatbot, language_dropdown], outputs=[translation_output])
        
        # Handle clear button
        def clear_all():
            # Clean up audio files
            cleanup_audio_files()
            # Return None for all outputs to clear the UI
            return None, None, None, None
        
        clear.click(clear_all, inputs=None, outputs=[chatbot, translation_output, image_output, audio_output], queue=False)

    return ui

# Replace the original ui code with this:
ui = update_gradio_interface()
ui.launch(inbrowser=True)