<a href="https://colab.research.google.com/github/quarup/afterlife-ai/blob/main/afterlife_ai.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# end-to-end

In [None]:
#@title install packages
!pip install openai-whisper gradio transformers accelerate bitsandbytes TTS

In [None]:
!apt-get update && apt-get install -y espeak-ng

In [None]:
#@title imports
import whisper
import gradio as gr
import torch
import time
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import os
import numpy as np
from TTS.api import TTS
import soundfile as sf
from google.colab import drive
import datetime

In [None]:
#@title set up directories

# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Mount Google Drive
drive.mount('/content/drive')

# Create directories on Google Drive
drive_base_dir = "/content/drive/MyDrive/AI_Chat_App"
voice_samples_dir = os.path.join(drive_base_dir, "voice_samples")
responses_dir = os.path.join(drive_base_dir, "ai_responses")
conversations_dir = os.path.join(drive_base_dir, "conversations")

for directory in [drive_base_dir, voice_samples_dir, responses_dir, conversations_dir]:
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Created directory: {directory}")

print(f"Using Google Drive directories at: {drive_base_dir}")

In [None]:
#@title load models
# Load Whisper model
print("Loading Whisper model...")
whisper_model = whisper.load_model("base")  # You can use "tiny", "base", "small", "medium", or "large"
print("Whisper model loaded!")

# Load DeepSeek model with quantization to reduce memory usage
print("Loading DeepSeek model...")
model_name = "deepseek-ai/deepseek-llm-7b-chat"

# Quantization configuration for memory efficiency
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
)

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=quantization_config,
    device_map="auto",
    torch_dtype=torch.float16,
)
print("DeepSeek model loaded!")


In [None]:
#@title TTS models

# Load language-specific high-quality voice cloning models
print("Loading language-specific voice cloning models...")

# Dictionary to hold our language-specific models
tts_models = {}

# Try to load models with better error handling
try:
    # First try a reliable English model
    tts_models["en"] = TTS("tts_models/en/ljspeech/tacotron2-DDC")
    print("Loaded English TTS model")
except Exception as e:
    print(f"Error loading English model: {str(e)}")
    # We'll use the fallback model for English

# try:
#     # For Portuguese
#     tts_models["pt"] = TTS("tts_models/pt/cv/vits")
#     print("Loaded Portuguese TTS model")
# except Exception as e:
#     print(f"Error loading Portuguese model: {str(e)}")
#     # We'll use the fallback model for Portuguese

# try:
#     # For German
#     tts_models["de"] = TTS("tts_models/de/thorsten/tacotron2-DDC")
#     print("Loaded German TTS model")
# except Exception as e:
#     print(f"Error loading German model: {str(e)}")
#     # We'll use the fallback model for German

# Load language-specific high-quality voice cloning models
print("Loading English-focused voice cloning model...")

try:
    # YourTTS is better at maintaining English speech patterns during cloning
    english_clone_model = TTS("tts_models/multilingual/multi-dataset/your_tts")
    print("Loaded YourTTS model for English voice cloning")
except Exception as e:
    print(f"Error loading YourTTS model: {str(e)}")
    print("Will fall back to XTTS-v2 for English voice cloning")
    english_clone_model = None

# Load the XTTS-v2 model which we know works for voice cloning
print("Loading XTTS-v2 model for voice cloning...")
fallback_model = TTS("tts_models/multilingual/multi-dataset/xtts_v2")
print("XTTS-v2 model loaded for voice cloning!")

# Get available models to ensure we're using valid ones
print("\nListing available TTS models for reference:")
print(TTS().list_models())

In [None]:
#@title run chat
# Variables to store user voice sample
user_voice_sample = os.path.join(voice_samples_dir, "default_voice.wav")
current_language = "en"

def transcribe_audio(audio_file):
    """Transcribe audio using Whisper"""
    # Whisper expects the audio file path
    result = whisper_model.transcribe(audio_file)
    return result["text"]

def text_to_speech(text, voice_sample=None):
    """
    Enhanced TTS function that uses:
    1. Language-specific model for English when no voice cloning is needed
    2. YourTTS for English voice cloning (better at staying in English)
    3. fallback_model for Portuguese and German (all cases)
    4. fallback_model as final fallback for all cases if other options fail
    """
    global current_language

    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_path = os.path.join(responses_dir, f"response_{current_language}_{timestamp}.wav")

    # Preprocess text for the specific language
    processed_text = preprocess_text_for_language(text, current_language)

    try:
        # CASE 1: English without voice cloning - use dedicated English model if available
        if current_language == "en" and voice_sample is None and "en" in tts_models:
            tts_models["en"].tts_to_file(
                text=processed_text,
                file_path=output_path
            )
            print(f"Used dedicated English model without voice cloning")
            return output_path

        # CASE 2: English WITH voice cloning - use YourTTS if available (better English cloning)
        if current_language == "en" and voice_sample is not None and english_clone_model is not None:
            english_clone_model.tts_to_file(
                text=processed_text,
                file_path=output_path,
                speaker_wav=voice_sample,
                language=current_language
            )
            print(f"Used YourTTS for English voice cloning")
            return output_path

        # CASE 3: All other cases - use fallback_model
        # This includes:
        # - Portuguese and German (with or without voice cloning)
        # - English with voice cloning if YourTTS failed or isn't available
        # - English if dedicated model isn't available

        # With voice cloning
        if voice_sample is not None:
            fallback_model.tts_to_file(
                text=processed_text,
                file_path=output_path,
                speaker_wav=voice_sample,
                language=current_language
            )
            print(f"Used fallback_model with voice cloning for {current_language}")
        # Without voice cloning
        else:
            fallback_model.tts_to_file(
                text=processed_text,
                file_path=output_path,
                language=current_language
            )
            print(f"Used fallback_model without voice cloning for {current_language}")

        return output_path

    except Exception as e:
        print(f"Error in TTS processing: {str(e)}")

        # Last resort fallback - use fallback_model with minimal options
        try:
            fallback_model.tts_to_file(
                text=processed_text,
                file_path=output_path,
                language=current_language
            )
            print(f"Used emergency fallback for {current_language}")
            return output_path
        except Exception as e2:
            print(f"All TTS methods failed: {str(e2)}")
            return None

def preprocess_text_for_language(text, language_code):
    """Apply language-specific preprocessing to improve TTS quality"""
    import re

    # Common preprocessing for all languages
    text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)

    # Replace problematic characters
    replacements = {
        "…": "...",
        "–": "-",
        "—": "-",
        """: '"',
        """: '"',
        "'": "'",
        "'": "'",
    }

    for original, replacement in replacements.items():
        text = text.replace(original, replacement)

    # Language-specific preprocessing
    if language_code == "en":
        # For English models (SpeechT5/Tortoise/VITS)
        # These models handle English text well, but we'll clean up non-English characters
        text = re.sub(r'[^\x00-\x7F]+', '', text)

    elif language_code == "pt":
        # For Portuguese, preserve Portuguese-specific characters
        # The PT-specific models need these characters for proper pronunciation
        pass  # No extra processing needed for Portuguese

    elif language_code == "de":
        # For German, preserve German-specific characters
        pass  # No extra processing needed for German

    # Normalize whitespace
    text = ' '.join(text.split())

    return text

def get_friend_name(language_code):
    """Returns the friend's name based on the language"""
    if language_code == "pt":
        return "Quarup"
    else:  # en or de
        return "Q"

def generate_response(user_input, history):
    """
    Generate response using DeepSeek model, ensuring it responds in the same language as the user
    """
    global current_language

    # Determine which language to use for the response
    response_language = current_language

    # Get the appropriate friend name for this language
    friend_name = get_friend_name(response_language)

    # Add a system instruction to encourage warmer responses in the appropriate language
    # and include the appropriate name
    if response_language == "en":
        system_instruction = f"You are Assistant {friend_name}, having a friendly, warm conversation in English. Respond in English using conversational language, express empathy, and occasionally use friendly phrases or light humor when appropriate."
    elif response_language == "pt":
        system_instruction = f"Você é o Assistente {friend_name}, tendo uma conversa amigável e calorosa em português. Responda em português usando linguagem conversacional, expresse empatia e ocasionalmente use frases amigáveis ou humor leve quando apropriado."
    elif response_language == "de":
        system_instruction = f"Sie sind Assistent {friend_name}, führen ein freundliches, herzliches Gespräch auf Deutsch. Antworten Sie auf Deutsch in einer Unterhaltungssprache, zeigen Sie Empathie und verwenden Sie gelegentlich freundliche Ausdrücke oder leichten Humor, wenn es angebracht ist."
    else:
        # Default to English
        system_instruction = f"You are Assistant {friend_name}, having a friendly, warm conversation in English. Respond in English using conversational language, express empathy, and occasionally use friendly phrases or light humor when appropriate."

    # Format the conversation history with appropriate name
    conversation = format_chat_history(history, response_language)

    # Add the current user input with the system instruction and friendly framing
    prompt = f"{system_instruction}\n\n{conversation}User: {user_input}\nAssistant {friend_name}:"

    # Generate response
    inputs = tokenizer(prompt, return_tensors="pt").to(device)

    # Set stop sequences - update to include assistant's name
    stop_sequences = ["User:", "\nUser", f"Assistant {friend_name}:"]
    stop_token_ids = [tokenizer.encode(seq, add_special_tokens=False)[0] for seq in stop_sequences]

    with torch.no_grad():
        outputs = model.generate(
            inputs.input_ids,
            max_new_tokens=512,
            temperature=0.75,
            top_p=0.92,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
            eos_token_id=stop_token_ids
        )

    # Decode the response and clean it up
    response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True).strip()

    # Additional cleaning to ensure we don't have any remaining stop sequences
    for stop_seq in stop_sequences:
        if stop_seq in response:
            response = response.split(stop_seq)[0].strip()

    # Verify response language matches expected language
    check_response_language = detect_language_with_deepseek(response)
    if check_response_language != response_language:
        print(f"Warning: Response language ({check_response_language}) doesn't match expected language ({response_language})")

    return response

def format_chat_history(messages, language=None):
    """Format the chat history for DeepSeek model input with the appropriate friend name"""
    global current_language

    # If language is not provided, use the current language
    if language is None:
        language = current_language

    # Get the appropriate friend name
    friend_name = get_friend_name(language)

    formatted_prompt = ""
    for user_msg, friend_msg in messages:
        formatted_prompt += f"User: {user_msg}\nAssistant {friend_name}: {friend_msg}\n\n"
    return formatted_prompt

def save_voice_sample(audio_file):
    """Save the user's voice sample to Google Drive"""
    global user_voice_sample

    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    sample_filename = f"voice_sample_{timestamp}.wav"
    drive_path = os.path.join(voice_samples_dir, sample_filename)

    # Copy the file to Google Drive
    import shutil
    shutil.copy(audio_file, drive_path)

    user_voice_sample = drive_path

    print(f"Saved voice sample to: {drive_path}")
    return drive_path

def set_voice_sample(audio_file):
    """Save the user's voice sample for cloning"""
    if not audio_file:
        return "Please record or upload a voice sample first."

    drive_path = save_voice_sample(audio_file)
    return f"Voice sample saved to Google Drive: {drive_path}"

def detect_language_with_deepseek(text):
    """
    Use DeepSeek model to detect language in the user's message
    """
    if not text.strip():
        return "en"  # Default to English for empty text

    # Create a simple prompt asking DeepSeek to identify the language
    prompt = f"""Please identify which language this text is written in. Only respond with the language code:
- "en" for English
- "pt" for Portuguese
- "de" for German

Text: "{text}"

Language code:"""

    # Generate response
    inputs = tokenizer(prompt, return_tensors="pt").to(device)

    with torch.no_grad():
        outputs = model.generate(
            inputs.input_ids,
            max_new_tokens=5,  # We only need a short response
            temperature=0.1,   # Low temperature for deterministic output
            top_p=0.95,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )

    # Decode and clean the response
    response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True).strip().lower()

    # Extract just the language code
    if "en" in response:
        return "en"
    elif "pt" in response:
        return "pt"
    elif "de" in response:
        return "de"
    else:
        print(f"Unexpected language detection response: {response}")
        return "en"  # Default to English if we can't parse the response

def enhanced_chat_with_text(text, history):
    """Process text input, detect language, and generate response with TTS"""
    global user_voice_sample, current_language

    if not text.strip():
        return history, history, "Please enter a message.", None

    # Detect language of input text using DeepSeek
    detected_language = detect_language_with_deepseek(text)

    # Always update the language - we trust DeepSeek's detection
    previous_language = current_language
    current_language = detected_language

    language_info = ""
    if previous_language != current_language:
        language_info = f"Language changed from {current_language_name(previous_language)} to {current_language_name(current_language)}. "
    print(f"DeepSeek detected language: {current_language}")

    # Get response from DeepSeek
    llm_start_time = time.time()
    response = generate_response(text, history)
    llm_time = time.time() - llm_start_time

    # Convert response to speech using language-specific model
    speech_file = None
    tts_info = f"LLM: {llm_time:.2f}s | Using language: {current_language}"

    if user_voice_sample is not None:
        tts_start_time = time.time()
        speech_file = text_to_speech(response, user_voice_sample)
        tts_time = time.time() - tts_start_time
        tts_info += f" | TTS: {tts_time:.2f}s"
    else:
        tts_info += " | No voice sample set for TTS"

    # Update history
    history.append((text, response))

    return history, history, tts_info, speech_file

def enhanced_chat_with_voice(audio_file, history):
    """Process voice input, transcribe, detect language, and generate response with TTS"""
    global user_voice_sample, current_language

    if not audio_file:
        return history, history, "No audio detected. Please record your message again.", None

    # Process audio to text
    start_time = time.time()
    transcription = transcribe_audio(audio_file)
    transcription_time = time.time() - start_time

    # Detect language from transcription using DeepSeek
    previous_language = current_language
    detected_language = detect_language_with_deepseek(transcription)
    current_language = detected_language

    language_info = ""
    if previous_language != current_language:
        language_info = f"Language changed from {current_language_name(previous_language)} to {current_language_name(current_language)}. "
    print(f"DeepSeek detected language: {current_language}")

    # If this is the first voice message, use it as voice sample if none exists
    if user_voice_sample is None:
        drive_path = save_voice_sample(audio_file)
        voice_sample_message = f"Voice sample automatically set from your first message and saved to: {drive_path}"
    else:
        voice_sample_message = ""

    # Get response from DeepSeek
    llm_start_time = time.time()
    response = generate_response(transcription, history)
    llm_time = time.time() - llm_start_time

    # Convert response to speech using language-specific model
    tts_start_time = time.time()
    speech_file = text_to_speech(response, user_voice_sample)
    tts_time = time.time() - tts_start_time

    # Update history
    history.append((transcription, response))

    status = f"Transcription: {transcription_time:.2f}s | LLM: {llm_time:.2f}s | TTS: {tts_time:.2f}s | Language: {current_language}"
    if voice_sample_message:
        status += f"\n{voice_sample_message}"

    return history, history, status, speech_file

def set_language(lang_code):
    """Manually set the language"""
    global current_language

    if lang_code in ["en", "pt", "de"]:
        current_language = lang_code
        return f"Language manually set to: {current_language_name(lang_code)} ({lang_code})"
    else:
        return f"Invalid language code: {lang_code}. Supported codes: en, pt, de"

def current_language_name(lang_code):
    """Get the full name of a language from its code"""
    names = {"en": "English", "pt": "Portuguese", "de": "German"}
    return names.get(lang_code, "Unknown")

def save_conversation(history):
    """Save the current conversation to a text file on Google Drive"""
    global current_language

    if not history:
        return "No conversation to save."

    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"conversation_{timestamp}.txt"
    filepath = os.path.join(conversations_dir, filename)

    # Get the friend name based on the current language
    friend_name = get_friend_name(current_language)

    with open(filepath, "w", encoding="utf-8") as f:
        f.write(f"Conversation with Assistant {friend_name} ({current_language_name(current_language)})\n\n")
        for i, (user_msg, friend_msg) in enumerate(history):
            f.write(f"Turn {i+1}:\n")
            f.write(f"User: {user_msg}\n\n")
            f.write(f"Assistant {friend_name}: {friend_msg}\n\n")
            f.write("-" * 50 + "\n\n")

    return f"Conversation with Assistant {friend_name} saved to Google Drive: {filepath}"

# Create Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# Multilingual Voice Chat with Language-Specific Models")

    with gr.Row():
        with gr.Column(scale=3):
            # Display chat history
            chatbot = gr.Chatbot(label="Chat History")

            # Audio output for TTS responses
            audio_output = gr.Audio(label="AI Voice Response", autoplay=True)

            # Status message and current language
            status_msg = gr.Markdown("")
            current_lang_indicator = gr.Markdown(f"**Current Language**: English (en) | **Assistant**: {get_friend_name('en')}")

            # Text input option
            with gr.Row():
                text_input = gr.Textbox(placeholder="Type your message here...", label="Text Input")
                text_submit = gr.Button("Send Text")

            # Audio input option
            with gr.Row():
                audio_input = gr.Audio(
                    label="Voice Input",
                    type="filepath",
                    sources=["microphone", "upload"]
                )
                audio_submit = gr.Button("Send Voice")

        with gr.Column(scale=1):
            gr.Markdown("## Voice Cloning Settings")
            voice_sample_input = gr.Audio(
                label="Record Voice Sample for Cloning",
                type="filepath",
                sources=["microphone", "upload"]
            )
            voice_sample_button = gr.Button("Set Voice Sample")
            voice_sample_status = gr.Markdown("")

            gr.Markdown("## Language Settings")
            with gr.Row():
                en_button = gr.Button("English")
                pt_button = gr.Button("Portuguese")
                de_button = gr.Button("German")
            language_override_status = gr.Markdown("")

            gr.Markdown("## Save Conversation")
            save_conversation_button = gr.Button("Save Conversation to Drive")
            save_status = gr.Markdown("")

            gr.Markdown("## Storage Information")
            gr.Markdown(f"Voice samples: {voice_samples_dir}")
            gr.Markdown(f"AI responses: {responses_dir}")
            gr.Markdown(f"Conversations: {conversations_dir}")

    # Store chat history state
    state = gr.State([])

    # Set up event handlers for language buttons
    def language_button_handler(lang_code):
        global current_language
        current_language = lang_code
        friend_name = get_friend_name(lang_code)
        return f"Language manually set to: {current_language_name(lang_code)} ({lang_code}) | Assistant: {friend_name}"

    en_button.click(
        lambda: language_button_handler("en"),
        inputs=[],
        outputs=[language_override_status]
    )

    pt_button.click(
        lambda: language_button_handler("pt"),
        inputs=[],
        outputs=[language_override_status]
    )

    de_button.click(
        lambda: language_button_handler("de"),
        inputs=[],
        outputs=[language_override_status]
    )

    # Wrap the chat functions to update the language display
    def wrapped_chat_with_text(text, history):
        history, state, status, speech_file = enhanced_chat_with_text(text, history)
        text_input.value = ""  # Clear the text input field
        friend_name = get_friend_name(current_language)
        return history, state, status, speech_file, f"**Current Language**: {current_language_name(current_language)} ({current_language}) | **Assistant**: {friend_name}"

    def wrapped_chat_with_voice(audio_file, history):
        history, state, status, speech_file = enhanced_chat_with_voice(audio_file, history)
        friend_name = get_friend_name(current_language)
        return history, state, status, speech_file, f"**Current Language**: {current_language_name(current_language)} ({current_language}) | **Assistant**: {friend_name}"

    # Set up event handlers for chat
    text_submit.click(
        wrapped_chat_with_text,
        inputs=[text_input, state],
        outputs=[chatbot, state, status_msg, audio_output, current_lang_indicator]
    )

    text_input.submit(
        wrapped_chat_with_text,
        inputs=[text_input, state],
        outputs=[chatbot, state, status_msg, audio_output, current_lang_indicator]
    )

    audio_submit.click(
        wrapped_chat_with_voice,
        inputs=[audio_input, state],
        outputs=[chatbot, state, status_msg, audio_output, current_lang_indicator]
    )

    # Other event handlers
    voice_sample_button.click(
        set_voice_sample,
        inputs=[voice_sample_input],
        outputs=[voice_sample_status]
    )

    save_conversation_button.click(
        save_conversation,
        inputs=[state],
        outputs=[save_status]
    )

# Launch the app
demo.launch(debug=True, share=True)