In [1]:
import pyaudio
import wave
import whisper
import sys

In [2]:
import os
from typing import List

from langchain_google_genai import GoogleGenerativeAI, ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory, ChatMessageHistory
from langchain.prompts import PromptTemplate
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.callbacks.manager import CallbackManager
from langchain.document_loaders.csv_loader import CSVLoader
from langchain.docstore.document import Document
from langchain_chroma import Chroma

from dotenv import load_dotenv

In [3]:
os.environ["GOOGLE_API_KEY"] = "AIzaSyDq8c66o0deacxO67PMouAiZlBFOT_Ao4c"

In [4]:
# from dotenv import load_dotenv
# load_dotenv()  # Charge la clé depuis un fichier .env

In [5]:
def chunk_embedder(model: str = "models/embedding-001", temperature: float = 0.5) -> GoogleGenerativeAIEmbeddings:
    """Initialise le modèle d'embeddings Google."""
    return GoogleGenerativeAIEmbeddings(model=model, temperature=temperature)


In [6]:
def load_llm(temperature: float = 0) -> ChatGoogleGenerativeAI:
    """Charge le modèle de langage ChatGoogleGenerativeAI."""
    callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])
    return ChatGoogleGenerativeAI(model="gemini-pro", temperature=temperature)


In [7]:
import shutil
import os

def clear_persist_directory(persist_directory: str):
    """
    Supprime tout le contenu du répertoire de persistance pour ChromaDB.
    """
    try:
        if os.path.exists(persist_directory):
            shutil.rmtree(persist_directory)
            print(f"Le répertoire '{persist_directory}' a été supprimé avec succès.")
        else:
            print(f"Le répertoire '{persist_directory}' est déjà vide ou n'existe pas.")
    except Exception as e:
        print(f"Erreur lors de la suppression du répertoire '{persist_directory}' : {e}")


In [8]:
def init_chroma_vectorstore(embeddings: GoogleGenerativeAIEmbeddings,
                            csv_path: str,
                            persist_directory: str,
                            reset_db: bool = False) -> Chroma:
    """
    Initialise ou charge un vectorstore ChromaDB, avec option de réinitialisation.
    """
    try:
        # Réinitialiser la base si demandé
        if reset_db:
            clear_persist_directory(persist_directory)

        # S'assurer que le répertoire de persistance existe
        os.makedirs(persist_directory, exist_ok=True)

        # Charger les documents
        print(f"Chargement du fichier CSV : {csv_path}")
        loader = CSVLoader(csv_path, encoding="utf-8")
        content = loader.load()

        # Diviser les documents
        print("Division des documents...")
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=20)
        docs = text_splitter.split_documents(documents=content)

        # Vérifier si une base de données existe déjà
        chroma_db_path = os.path.join(persist_directory, "chroma.sqlite3")
        if os.path.exists(chroma_db_path):
            print("Chargement de la base existante...")
            vector_db = Chroma(
                persist_directory=persist_directory,
                embedding_function=embeddings
            )
        else:
            print("Création d'une nouvelle base vectorielle...")
            vector_db = Chroma.from_documents(
                documents=docs,
                embedding=embeddings,
                persist_directory=persist_directory
            )

        print("Base vectorielle initialisée avec succès.")
        return vector_db

    except Exception as e:
        print(f"Erreur lors de l'initialisation de la base vectorielle : {e}")
        import traceback
        traceback.print_exc()
        raise


In [9]:
def init_chat() -> ConversationalRetrievalChain:
    """
    Initialise une chaîne de conversation avec ChromaDB comme backend.
    """
    try:
        print("Chargement du modèle LLM et des embeddings...")
        llm = load_llm()
        embedding = chunk_embedder()

        # Chemins pour les données
        csv_path = "/home/leonel/Assistant-AI/Service_Client_Cotonou.csv"
        vector_db_path = "/home/leonel/Assistant-AI/vector_db"

        if not os.path.exists(csv_path):
            raise FileNotFoundError(f"Le fichier CSV '{csv_path}' est introuvable.")

        # Initialisation de la base vectorielle
        print("Initialisation de la base vectorielle...")
        vectorstore = init_chroma_vectorstore(
            embeddings=embedding,
            csv_path=csv_path,
            persist_directory=vector_db_path,
            reset_db=True  # Modifiez selon vos besoins
        )

        # Template de prompt
        template = """
        Tu es un assistant de service client intelligent.

        Historique de la conversation (si pertinent) :
        {chat_history}

        Contexte pertinent des documents (si disponible) :
        {context}

        Question actuelle : {question}

        Réponse :
        """
        prompt = PromptTemplate(template=template, input_variables=["context", "question", "chat_history"])

        # Mémoire de conversation
        chat_memory = ConversationBufferWindowMemory(
            memory_key="chat_history",
            chat_memory=ChatMessageHistory(),
            output_key="answer",
            k=5,
            return_messages=True
        )

        # Chaîne de conversation
        retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
        conversation = ConversationalRetrievalChain.from_llm(
            llm=llm,
            retriever=retriever,
            memory=chat_memory,
            combine_docs_chain_kwargs={
                "prompt": prompt,
                "document_variable_name": "context",
            },
            return_source_documents=True,
            output_key="answer"
        )

        print("Chaîne de conversation initialisée avec succès.")
        return conversation

    except Exception as e:
        print(f"Erreur lors de l'initialisation de la chaîne de conversation : {e}")
        import traceback
        traceback.print_exc()
        raise


In [10]:
def Talker(query: str, conversation: ConversationalRetrievalChain) -> str:
    """
    Gère une interaction utilisateur avec le modèle, intégrant la gestion de l'historique
    pour les questions ambiguës et le traitement des questions multiples.
    """
    try:
        # Priorité : Vérification des questions liées à l'historique
        if "que t'ai" in query.lower() or "dernière question" in query.lower():
            print("Priorité donnée à l'historique pour cette question.")
            messages = conversation.memory.chat_memory.messages
            if len(messages) >= 2:
                # Retourner le dernier message utilisateur avant la question actuelle
                previous_query = messages[-2].content
                return f"Votre dernière question était : '{previous_query}'"
            else:
                return "Il n'y a pas encore assez d'historique pour répondre à cette question."

        # Gestion des questions multiples dans une seule requête
        questions = [q.strip() for q in query.split(" et ")]
        if len(questions) > 1:
            print("Détection de plusieurs questions dans une seule requête. Traitement individuel.")
            responses = []
            for q in questions:
                # Traitement de chaque sous-question indépendamment
                output = conversation({"question": q})
                response = output.get('answer')
                responses.append(f"Pour '{q}': {response}")

                # Afficher les documents récupérés pour chaque question (pour diagnostic)
                """
                source_documents = output.get('source_documents', [])
                print("\nDocuments récupérés pour le contexte (pour la sous-question) :")
                for i, doc in enumerate(source_documents):
                    print(f"Document {i+1} :\n{doc.page_content}")
                """

            # Combiner les réponses pour chaque sous-question
            return "\n".join(responses)

        # Si une seule question, traitement standard
        output = conversation({"question": query})
        response = output.get('answer')

        # Afficher les documents récupérés pour diagnostic
        """
        source_documents = output.get('source_documents', [])
        print("\nDocuments récupérés pour le contexte :")
        for i, doc in enumerate(source_documents):
            print(f"Document {i+1} :\n{doc.page_content}")
        """

        # Afficher l'historique actuel pour diagnostic
        print("\nHistorique actuel :", conversation.memory.chat_memory.messages)

        return response

    except Exception as e:
        print(f"Erreur lors du traitement de la requête : {e}")
        import traceback
        traceback.print_exc()
        return "Désolé, une erreur s'est produite lors du traitement de votre demande."


In [11]:
def upload_audio_file():
    """
    Importer un fichier audio depuis l'ordinateur de l'utilisateur
    en utilisant une boîte de dialogue de sélection de fichier
    """
    import tkinter as tk
    from tkinter import filedialog
    
    # Créer une fenêtre racine Tkinter (elle sera cachée)
    root = tk.Tk()
    root.withdraw()  # Cache la fenêtre principale
    
    print("Sélectionnez votre fichier audio (.wav ou .mp3)")
    
    # Ouvrir la boîte de dialogue de sélection de fichier
    filetypes = [
        ('Fichiers audio', '*.wav;*.mp3'),
        ('Fichiers WAV', '*.wav'),
        ('Fichiers MP3', '*.mp3'),
        ('Tous les fichiers', '*.*')
    ]
    
    audio_file = filedialog.askopenfilename(
        title="Sélectionnez un fichier audio",
        filetypes=filetypes
    )
    
    if not audio_file:
        print("Aucun fichier sélectionné.")
        return None
    
    print(f"Fichier sélectionné : {audio_file}")
    return audio_file

In [12]:
def speech_to_text(audio_file: str) -> str:
    """
    Convertit un fichier audio en texte en utilisant le modèle Whisper.
    """
    try:
        import whisper
        model = whisper.load_model("base")

        print("Transcription en cours...")
        result = model.transcribe(audio_file)

        return result["text"]

    except Exception as e:
        print(f"Erreur lors de la transcription : {e}")
        return ""

In [None]:
def play_text_to_speech_colab(text, language='fr', slow=False):
    """
    Convertit du texte en audio et le lit dans Colab.
    """
    try:
        from gtts import gTTS
        from IPython.display import Audio
        import os

        # Convertir le texte en audio
        tts = gTTS(text=text, lang=language, slow=slow)
        temp_audio_file = "/home/leonel/Assistant-AI/temp_audio.mp3"
        tts.save(temp_audio_file)

        # Lecture de l'audio
        display(Audio(temp_audio_file, autoplay=True))

        # Nettoyage
        os.remove(temp_audio_file)
    except Exception as e:
        print(f"Erreur lors de la lecture audio : {e}")


In [14]:
def main():
    """
    Fonction principale pour tester la conversation.
    """
    try:
        print("Initialisation de la conversation...")
        conversation = init_chat()

        print("\nAssistant prêt. Téléchargez un fichier audio contenant votre question ou tapez 'quit' pour sortir.\n")
        while True:
            print("\nOptions :")
            print("1. Télécharger un fichier audio pour poser une question.")
            print("2. Quitter.")
            choix = input("Votre choix : ").strip()

            if choix == '2' or choix.lower() == 'quit':
                print("Au revoir !")
                break

            elif choix == '1':
                # Étape 1 : Télécharger un fichier audio
                audio_filename = upload_audio_file()
                if not audio_filename:
                    print("Aucun fichier n'a été téléchargé. Réessayez.")
                    continue

                # Étape 2 : Transcrire l'audio en texte
                query = speech_to_text(audio_filename)
                if not query.strip():
                    print("Impossible de transcrire l'audio. Réessayez.")
                    continue

                print(f"Question transcrite : {query}")

                # Étape 3 : Obtenir la réponse
                response = Talker(query, conversation)
                print("Réponse :", response)

                # Étape 4 : Lecture de la réponse en audio
                play_text_to_speech_colab(response, language='fr')

            else:
                print("Choix invalide. Veuillez sélectionner une option valide.")

    except Exception as e:
        print(f"Erreur dans la fonction principale : {e}")


In [15]:
if __name__ == "__main__":
    main()


Initialisation de la conversation...
Chargement du modèle LLM et des embeddings...
Initialisation de la base vectorielle...
Le répertoire '/home/leonel/Assistant-AI/vector_db' est déjà vide ou n'existe pas.
Chargement du fichier CSV : /home/leonel/Assistant-AI/Service_Client_Cotonou.csv
Division des documents...
Création d'une nouvelle base vectorielle...


  chat_memory = ConversationBufferWindowMemory(


Base vectorielle initialisée avec succès.
Chaîne de conversation initialisée avec succès.

Assistant prêt. Téléchargez un fichier audio contenant votre question ou tapez 'quit' pour sortir.


Options :
1. Télécharger un fichier audio pour poser une question.
2. Quitter.
Sélectionnez votre fichier audio (.wav ou .mp3)
Fichier sélectionné : /home/leonel/Assistant-AI/ContacterServiceClient .mp3


100%|███████████████████████████████████████| 139M/139M [24:39<00:00, 98.2kiB/s]
  checkpoint = torch.load(fp, map_location=device)


Transcription en cours...




Question transcrite :  Comment puis-je contacter le service client?


  output = conversation({"question": query})



Historique actuel : [HumanMessage(content=' Comment puis-je contacter le service client?', additional_kwargs={}, response_metadata={}), AIMessage(content='Vous pouvez contacter le service client par téléphone ou par e-mail.', additional_kwargs={}, response_metadata={})]
Réponse : Vous pouvez contacter le service client par téléphone ou par e-mail.
Erreur lors de la lecture audio : [Errno 2] No such file or directory: '/content/temp_audio.mp3'

Options :
1. Télécharger un fichier audio pour poser une question.
2. Quitter.
Au revoir !


**gTTS (Google Text-to-Speech)** repose sur des modèles de **deep learning** pour générer la parole à partir du texte. Plus précisément, il utilise des **réseaux neuronaux profonds** pour effectuer la synthèse vocale. Ces modèles sont entraînés pour produire une voix naturelle et fluide.

### Voici comment cela fonctionne dans les grandes lignes :

1. **Prétraitement du texte** :
   - Le texte que vous fournissez est d'abord analysé et transformé en une forme qui peut être interprétée par les réseaux neuronaux. Cela inclut des étapes comme la normalisation du texte (conversion en minuscules, gestion des ponctuations, etc.) et parfois l'analyse phonétique.

2. **Modèle phonétique et prosodique** :
   - Le modèle de deep learning va transformer le texte en **phonèmes**, qui sont les unités sonores fondamentales du langage (par exemple, "cat" se décompose en /k/, /æ/, /t/).
   - Le modèle apprend également la **prosodie**, c'est-à-dire l'intonation, le rythme et la cadence du discours, ce qui est essentiel pour rendre la voix naturelle et agréable.

3. **Synthèse vocale avec un modèle génératif** :
   - gTTS utilise un réseau neuronal pour générer l'audio à partir des phonèmes et de la prosodie.
   - Cette étape repose sur des architectures de modèles génératifs comme les **WaveNet** ou d'autres types de réseaux de neurones récurrents, qui sont capables de synthétiser des sons réalistes à partir des phonèmes fournis.
     - **WaveNet**, par exemple, est un modèle développé par DeepMind, une filiale de Google, qui génère des formes d'onde audio très réalistes en prédisant chaque échantillon sonore à partir des précédents.
     - D'autres modèles modernes de synthèse vocale, comme ceux basés sur des **transformers**, peuvent aussi être utilisés pour améliorer la qualité de la voix générée.

4. **Génération du fichier audio** :
   - Une fois le texte transformé en audio, celui-ci est ensuite sauvegardé dans un fichier audio, généralement au format **MP3**.
   - Le fichier audio est renvoyé à l'utilisateur via l'API de gTTS, que l'on peut télécharger et lire.

### Conclusion :
gTTS utilise des **modèles de deep learning**, en particulier des réseaux neuronaux profonds et des modèles génératifs, pour produire une parole de qualité à partir du texte. Ces modèles sont entraînés sur de grandes quantités de données vocales pour apprendre à reproduire des voix humaines naturelles, avec la prosodie et les nuances appropriées.