In [1]:
import os
import time
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader
from PyPDF2 import PdfReader
from langchain_groq import ChatGroq
from langchain.chains import RetrievalQA
from faster_whisper import WhisperModel
from gtts import gTTS
from groq import Groq
from playsound import playsound 
import pyglet
import io

class RAGVoiceBot:
    def __init__(self, vector_db_path, knowldge_path, groq_token_path, whisper_size='base', model_name='llama-3.1-70b-versatile'):

        self.load_groq_token(groq_token_path)
        self.grok_client = Groq()
        
        self.embedding_model = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
        self.whisper_model = self.initialize_STT_model(whisper_size)
        self.llm = self.initialize_llm(model_name)
        self.qa_pipeline = self.initialize_qa_pipeline(vector_db_path, knowldge_path, k=3)

    def load_groq_token(self, groq_token_path):
        with open(groq_token_path, 'r') as f:
            os.environ["GROQ_API_KEY"] = f.readline().strip()

    def initialize_STT_model(self, whisper_size):
        num_cores = os.cpu_count() // 2
        return WhisperModel(
            whisper_size,
            device="cpu",
            # compute_type="int8",
            cpu_threads=num_cores
        )
        

    def initialize_llm(self, model_name):
        return ChatGroq(
            model=model_name
        )

    def initialize_qa_pipeline(self, vector_db_path, knowldge_path, k):
        if os.path.exists(vector_db_path):
            print('Loading exisiting vector database')
            vec_db = FAISS.load_local(vector_db_path, self.embedding_model, allow_dangerous_deserialization=True)
        else:
            print('Creating new vector database')
            vec_db = self.create_vector_database_texts(knowldge_path)
            vec_db.save_local(vector_db_path)

        # The RAG pipeline
        return RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type='stuff',
            retriever=vec_db.as_retriever(search_kwargs={'k': 5}),
            # return_source_documents=True
        )

    def create_vector_database_docs_pdfs(self, knowldge_path):
        loader = PyPDFLoader(knowldge_path)
        documents = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
        docs = text_splitter.split_documents(documents)
        return FAISS.from_documents(docs, self.embedding_model)
    
    def create_vector_database_texts(self, knowldge_path):
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
        all_chunks = []
        
        for filename in os.listdir(knowldge_path):
            if filename.endswith('.txt'):
                file_path = os.path.join(knowldge_path, filename)
                with open(file_path, 'r', encoding='utf-8') as file:
                    text_content = file.read()
                    
                    chunks = text_splitter.split_text(text_content)
                    all_chunks.extend(chunks)
                    
        return FAISS.from_texts(all_chunks, self.embedding_model)
    
    def speach_to_text(self, wav_audio):
        # print('we now opened the speech_to_text method .........')
        # # segments, _ = self.whisper_model.transcribe(audio_path, language='ar')
        # # return ''.join(segment.text for segment in segments)
        # # with open(audio_path, "rb") as file:
        # # audio_file 
        # audio_file = ("audio.wav", wav_audio)
        # transcription = self.grok_client.audio.transcriptions.create(
        #     file=audio_file, 
        #     model="whisper-large-v3-turbo",
        #     prompt="Specify context or spelling",
        #     response_format="json",
        #     # language='ar',
        #     temperature=0.0,
        # )

        with open(filename, "rb") as file:
            # Create a transcription of the audio file
            transcription = self.grok_client.audio.transcriptions.create(
              file=(filename, file.read()), # Required audio file
              model="whisper-large-v3-turbo", # Required model to use for transcription
              prompt="Specify context or spelling",  # Optional
              response_format="json",  # Optional
              # language="ar",  # Optional
              temperature=0.0  # Optional
            )
            # Print th
        
        return transcription.text

    def generate_response(self, prompt):
        response_data = self.qa_pipeline.invoke(prompt)
        print('Final prompt to LLM after RAG:\n')
        print(response_data['query'])
        return response_data['result']

    def text_to_speech(self, text, output_path=r'D:\GitHub projects\Mic_Server_Test\Backend\output_voices\speech.mp3'):
        start = time.time()
        tts = gTTS(text, lang='ar', slow=False)
        
        tts.save(output_path)
        print(f'Text to sound time: {time.time() - start}')

        # output_path = os.path.join(os.getcwd(), output_path)
        
        # music = pyglet.media.load("D:\GitHub projects\Mic_Server_Test\Backend\output_voices\speech.mp3", streaming=False)
        # music.play()
        # os.remove(output_path)
        return tts
                
    def process_audio_file(self, audio_path):
        start = time.time()
        transcription = self.speach_to_text(audio_path)
        print(f'Sound to text time: {time.time() - start}')

        start = time.time()
        
        arabic_instruction = "يرجى تقديم الإجابة باللغة العربية فقط  واجعل الأجابة مختصرة في سطر قدر الامكان الامكان: "
        # arabic_instruction = "response in just 2 lines: "
        prompt = f"{arabic_instruction}\n{transcription}"
        print(prompt)
        response = self.generate_response(prompt)
        print(f'Model response time: {time.time() - start}')
        
        
        tts_output = self.text_to_speech(response)
        return transcription, response, tts_output



if __name__ == '__main__':
    audio_path = r"C:\Users\zmlka\Documents\Sound Recordings\Recording (3).m4a"

    voice_bot = RAGVoiceBot(
        knowldge_path='./knowledge_base',
        groq_token_path='groq_token.txt',
        vector_db_path='vector_db'
    )

    transcription, response = voice_bot.process_audio_file(audio_path)

    print('='*50)

    print("USER:", transcription)
    print("Assistant:", response)

  self.embedding_model = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')


Loading exisiting vector database


NameError: name 'filename' is not defined

In [None]:
import os
from groq import Groq

# Initialize the Groq client
client = Groq(api_key=)

# Specify the path to the audio file
filename = r"C:\Users\zmlka\Documents\Sound Recordings\Recording (3).m4a" # Replace with your audio file!

# Open the audio file
with open(filename, "rb") as file:
    # Create a transcription of the audio file
    transcription = client.audio.transcriptions.create(
      file=(filename, file.read()), # Required audio file
      model="whisper-large-v3-turbo", # Required model to use for transcription
      prompt="Specify context or spelling",  # Optional
      response_format="json",  # Optional
      # language="ar",  # Optional
      temperature=0.0  # Optional
    )
    # Print the transcription text
    print(transcription.text)

 السلام عليكم كيف الحال


In [4]:
import os
import time
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader
from PyPDF2 import PdfReader
from langchain_groq import ChatGroq
from langchain.chains import RetrievalQA
from faster_whisper import WhisperModel
from gtts import gTTS
from groq import Groq
from playsound import playsound 
import pyglet
import io
from elevenlabs import save, play
from elevenlabs.client import ElevenLabs


class RAGVoiceBot:
    def __init__(self, vector_db_path, knowldge_path, groq_token_path, whisper_size='base', model_name='llama-3.1-70b-versatile'):

        self.load_groq_token(groq_token_path)
        self.grok_client = Groq()

        self.elevenlabs_client = ElevenLabs(api_key='sk_4b8af34b2615328298ba8718fc90797eafbcc39d08382917')
        self.embedding_model = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
        self.whisper_model = self.initialize_STT_model(whisper_size)
        self.llm = self.initialize_llm(model_name)
        self.vector_db = self.intialize_vectore_db(vector_db_path, knowldge_path)
        # self.qa_pipeline = self.initialize_qa_pipeline(vector_db_path, knowldge_path, k=3)

    def load_groq_token(self, groq_token_path):
        with open(groq_token_path, 'r') as f:
            os.environ["GROQ_API_KEY"] = f.readline().strip()

    def initialize_STT_model(self, whisper_size):
        num_cores = os.cpu_count() // 2
        return WhisperModel(
            whisper_size,
            device="cpu",
            # compute_type="int8",
            cpu_threads=num_cores
        )
        

    def initialize_llm(self, model_name):
        return ChatGroq(
            model=model_name
        )

    def intialize_vectore_db(self, vector_db_path, knowldge_path):
        if os.path.exists(vector_db_path):
            print('Loading exisiting vector database')
            vec_db = FAISS.load_local(vector_db_path, self.embedding_model, allow_dangerous_deserialization=True)
        else:
            print('Creating new vector database')
            vec_db = self.create_vector_database_texts(knowldge_path)
            vec_db.save_local(vector_db_path)

        return vec_db
    
    def initialize_qa_pipeline(self, vector_db_path, knowldge_path, k):
        if os.path.exists(vector_db_path):
            print('Loading exisiting vector database')
            vec_db = FAISS.load_local(vector_db_path, self.embedding_model, allow_dangerous_deserialization=True)
        else:
            print('Creating new vector database')
            vec_db = self.create_vector_database_texts(knowldge_path)
            vec_db.save_local(vector_db_path)

        # The RAG pipeline
        return RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type='stuff',
            retriever=vec_db.as_retriever(search_kwargs={'k': 5}),
            # return_source_documents=True
        )

    
    def get_relevant_chunks(self, query, k=5):
        docs = self.vector_db.similarity_search(query, k=k)
        return [doc.page_content for doc in docs]

    def construct_prompt(self, chunks, query):
        context = "\n".join([f"- {chunk}" for chunk in chunks])
    
        rag_template = f"""You are given a user query, some textual context and rules, all inside xml tags. You have to answer the query based on the context while respecting the rules.

<context>
 {context}
</context>

<rules>
- If you don't know, just say so.
- If you are not sure, ask for clarification.
- Answer in the same language as the user query.
- If the context appears unreadable or of poor quality, tell the user then answer as best as you can.
- If the answer is not in the context but you think you know the answer, explain that to the user then answer with your own knowledge.
- Answer directly and without using xml tags.
</rules>

<user_query>
{query}
</user_query>
"""


        return rag_template

    def get_model_response(self, full_prompt, model_name='llama-3.1-70b-versatile'):
        response = self.grok_client.chat.completions.create(
            model="llama-3.1-70b-versatile",
            messages=[
                {"role": "system", "content": "You are a professional conference assistant fluent in Arabic and English. Respond concisely and professionally ONLY IN ARABIC"},
                {"role": "user", "content": full_prompt}
            ],
            max_tokens=250,
            temperature=0.4,
        )
    
        return response.choices[0].message.content


    def process_user_message(self, query):
        relevant_chunks = self.get_relevant_chunks(query)
        full_prompt_after_rag = self.construct_prompt(relevant_chunks, query)
        response = self.get_model_response(full_prompt_after_rag, model_name='llama-3.1-70b-versatile')
    
        return response
    
    def create_vector_database_docs_pdfs(self, knowldge_path):
        loader = PyPDFLoader(knowldge_path)
        documents = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
        docs = text_splitter.split_documents(documents)
        return FAISS.from_documents(docs, self.embedding_model)
    
    def create_vector_database_texts(self, knowldge_path):
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
        all_chunks = []
        
        for filename in os.listdir(knowldge_path):
            if filename.endswith('.txt'):
                file_path = os.path.join(knowldge_path, filename)
                with open(file_path, 'r', encoding='utf-8') as file:
                    text_content = file.read()
                    
                    chunks = text_splitter.split_text(text_content)
                    all_chunks.extend(chunks)
                    
        return FAISS.from_texts(all_chunks, self.embedding_model)
    
    def speach_to_text(self, wav_audio):
        # print('we now opened the speech_to_text method .........')
        # # segments, _ = self.whisper_model.transcribe(audio_path, language='ar')
        # # return ''.join(segment.text for segment in segments)
        # # with open(audio_path, "rb") as file:
        # # audio_file 
        # audio_file = ("audio.wav", wav_audio)
        # transcription = self.grok_client.audio.transcriptions.create(
        #     file=audio_file, 
        #     model="whisper-large-v3-turbo",
        #     prompt="Specify context or spelling",
        #     response_format="json",
        #     # language='ar',
        #     temperature=0.0,
        # )

        with open(wav_audio, "rb") as file:
            # Create a transcription of the audio file
            transcription = self.grok_client.audio.transcriptions.create(
              file=(wav_audio, file.read()), # Required audio file
              model="whisper-large-v3", # Required model to use for transcription
              prompt="Specify context or spelling",  # Optional
              response_format="json",  # Optional
              # language="ar",  # Optional
              temperature=0.0  # Optional
            )
            # Print th
        
        return transcription.text.strip()

    def generate_response(self, prompt):
        response_data = self.qa_pipeline.invoke(prompt)
        print('Final prompt to LLM after RAG:\n')
        print(response_data['query'])
        return response_data['result']

    def text_to_speech(self, text, output_path=r'D:\GitHub projects\Mic_Server_Test\Backend\output_voices\speech.mp3'):
        start = time.time()
        tts = gTTS(text, lang='ar', slow=False)
        
        tts.save(output_path)
        print(f'Text to sound time: {time.time() - start}')

        # output_path = os.path.join(os.getcwd(), output_path)
        
        # music = pyglet.media.load("D:\GitHub projects\Mic_Server_Test\Backend\output_voices\speech.mp3", streaming=False)
        # music.play()
        # os.remove(output_path)
        return tts

    def text_to_sound(self, text):
        audio = self.elevenlabs_client.generate(
            text=text,
            voice="IK7YYZcSpmlkjKrQxbSn",
            model="eleven_multilingual_v2"
        )
        
        save(audio, "output.mp3")
        # play(audio)
        
    def process_audio_file(self, audio_path):
        # start = time.time()
        # transcription = self.speach_to_text(audio_path)
        # print(f'Sound to text time: {time.time() - start}')

        # start = time.time()
        
        # arabic_instruction = "يرجى تقديم الإجابة باللغة العربية فقط  واجعل الأجابة مختصرة في سطر قدر الامكان الامكان: "
        # # arabic_instruction = "response in just 2 lines: "
        # prompt = f"{arabic_instruction}\n{transcription}"
        # print(prompt)
        # response = self.generate_response(prompt)
        # print(f'Model response time: {time.time() - start}')
        
        
        # tts_output = self.text_to_speech(response)
        # return transcription, response, tts_output

        start = time.time()
        transcription = self.speach_to_text(audio_path)
        print(f'Sound to text time: {time.time() - start}')
        print(f'Transcritption: {transcription}')
        print('='*50)
        
        start = time.time()
        response = self.process_user_message(transcription)
        
        print(f'Model response: {response}')
        print(f'Model response time: {time.time() - start}')
        print('='*50)
        
        tts_output = self.text_to_sound(response)
        return transcription, response, tts_output

if __name__ == '__main__':
    audio_path = r"C:\Users\zmlka\Documents\Sound Recordings\Recording (3).m4a"

    print('Initializing all things')
    voice_bot = RAGVoiceBot(
        knowldge_path='./knowledge_base',
        groq_token_path='groq_token.txt',
        vector_db_path='vector_db'
    )

    print('='*50)
    transcription, response, tts_output = voice_bot.process_audio_file(audio_path)

    print('='*50)

    # print("USER:", transcription)
    # print("Assistant:", response)

Initializing all things
Loading exisiting vector database
Sound to text time: 1.2540624141693115
Transcritption: السلام عليكم كيف الحال؟
Model response: السلام عليكم ورحمة الله وبركاته، الحمد لله الحال جيد، كيف يمكنني مساعدتك؟
Model response time: 0.617142915725708


In [5]:
audio_path = r"C:\Users\zmlka\Documents\Sound Recordings\Recording (16).m4a"
transcription, response, tts_output = voice_bot.process_audio_file(audio_path)

Sound to text time: 0.6377434730529785
Transcritption: ماذا تعرف عن شركة Future Platform؟
Model response: منصة المستقبل (Future Platform) هي شركة ذكاء اصطناعي تأسست في المملكة العربية السعودية عام 2018 استجابةً للطلب الكبير في البلاد على التكنولوجيا الغامرة الجديدة. نحن متخصصون في حلول الذكاء الاصطناعي.
Model response time: 0.7272927761077881


In [5]:
from elevenlabs import ElevenLabs
import io

# Initialize ElevenLabs client with your API key
elevenlabs_client = ElevenLabs(api_key="sk_4b8af34b2615328298ba8718fc90797eafbcc39d08382917")

# Define the text to generate audio for
text = "Hello, this is a test of ElevenLabs streaming audio."

In [6]:
def test_elevenlabs_streaming(text):
    # Generate audio using ElevenLabs in streaming mode
    audio_stream = elevenlabs_client.generate(
        text=text,
        voice="IK7YYZcSpmlkjKrQxbSn",
        model="eleven_multilingual_v2",
        stream=True
    )

    # Collect chunks into a bytearray
    audio_data = bytearray()
    print("Streaming audio chunks...")
    for chunk in audio_stream:
        print(f"Received chunk of size: {len(chunk)} bytes")
        audio_data.extend(chunk)  # Append each chunk to the bytearray

    # Convert bytearray to bytes
    audio_bytes = bytes(audio_data)
    print(f"Total audio size: {len(audio_bytes)} bytes")

    # Return audio bytes for further processing
    return audio_bytes

# Test streaming
audio_bytes = test_elevenlabs_streaming(text)

Streaming audio chunks...


ApiError: status_code: 401, body: {'detail': {'status': 'quota_exceeded', 'message': 'This request exceeds your  quota of 10000. You have 2 credits remaining, while 52 credits are required for this request.'}}

In [3]:
with open("test_audio.mp3", "wb") as f:
    f.write(audio_bytes)
print("Audio saved to test_audio.mp3")


Audio saved to test_audio.mp3


In [4]:
from IPython.display import Audio

# Play the audio
Audio(audio_bytes, rate=44100)


In [5]:
from transformers import VitsModel, AutoTokenizer
import torch

model = VitsModel.from_pretrained("facebook/mms-tts-ara")
tokenizer = AutoTokenizer.from_pretrained("facebook/mms-tts-ara")

text = " مصر ام الدنيا كيف الحال"
inputs = tokenizer(text, return_tensors="pt")

with torch.no_grad():
    output = model(**inputs).waveform


Some weights of the model checkpoint at facebook/mms-tts-ara were not used when initializing VitsModel: ['flow.flows.0.wavenet.in_layers.0.weight_g', 'flow.flows.0.wavenet.in_layers.0.weight_v', 'flow.flows.0.wavenet.in_layers.1.weight_g', 'flow.flows.0.wavenet.in_layers.1.weight_v', 'flow.flows.0.wavenet.in_layers.2.weight_g', 'flow.flows.0.wavenet.in_layers.2.weight_v', 'flow.flows.0.wavenet.in_layers.3.weight_g', 'flow.flows.0.wavenet.in_layers.3.weight_v', 'flow.flows.0.wavenet.res_skip_layers.0.weight_g', 'flow.flows.0.wavenet.res_skip_layers.0.weight_v', 'flow.flows.0.wavenet.res_skip_layers.1.weight_g', 'flow.flows.0.wavenet.res_skip_layers.1.weight_v', 'flow.flows.0.wavenet.res_skip_layers.2.weight_g', 'flow.flows.0.wavenet.res_skip_layers.2.weight_v', 'flow.flows.0.wavenet.res_skip_layers.3.weight_g', 'flow.flows.0.wavenet.res_skip_layers.3.weight_v', 'flow.flows.1.wavenet.in_layers.0.weight_g', 'flow.flows.1.wavenet.in_layers.0.weight_v', 'flow.flows.1.wavenet.in_layers.1.wei

In [6]:
from IPython.display import Audio

Audio(output, rate=model.config.sampling_rate)


In [11]:
import os
import time
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader
from PyPDF2 import PdfReader
from langchain_groq import ChatGroq
from langchain.chains import RetrievalQA
from faster_whisper import WhisperModel
from gtts import gTTS
from groq import Groq
from playsound import playsound 
import pyglet
import io
from elevenlabs import save, play
from elevenlabs.client import ElevenLabs

In [16]:
class RAGVoiceBot_v2:
    def __init__(self, vector_db_path, knowldge_path, groq_token_path, whisper_size='base', model_name='llama-3.1-70b-versatile'):

        self.load_groq_token(groq_token_path)
        self.grok_client = Groq()

        self.elevenlabs_client = ElevenLabs(api_key='sk_4b8af34b2615328298ba8718fc90797eafbcc39d08382917')
        self.embedding_model = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
        self.whisper_model = self.initialize_STT_model(whisper_size)
        self.llm = self.initialize_llm(model_name)
        self.vector_db = self.intialize_vectore_db(vector_db_path, knowldge_path)
        # self.qa_pipeline = self.initialize_qa_pipeline(vector_db_path, knowldge_path, k=3)

    def load_groq_token(self, groq_token_path):
        with open(groq_token_path, 'r') as f:
            os.environ["GROQ_API_KEY"] = f.readline().strip()

    def initialize_STT_model(self, whisper_size):
        num_cores = os.cpu_count() // 2
        return WhisperModel(
            whisper_size,
            device="cpu",
            # compute_type="int8",
            cpu_threads=num_cores
        )
        

    def initialize_llm(self, model_name):
        return ChatGroq(
            model=model_name
        )

    def intialize_vectore_db(self, vector_db_path, knowldge_path):
        if os.path.exists(vector_db_path):
            print('Loading exisiting vector database')
            vec_db = FAISS.load_local(vector_db_path, self.embedding_model, allow_dangerous_deserialization=True)
        else:
            print('Creating new vector database')
            vec_db = self.create_vector_database_texts(knowldge_path)
            vec_db.save_local(vector_db_path)

        return vec_db
    
    def initialize_qa_pipeline(self, vector_db_path, knowldge_path, k):
        if os.path.exists(vector_db_path):
            print('Loading exisiting vector database')
            vec_db = FAISS.load_local(vector_db_path, self.embedding_model, allow_dangerous_deserialization=True)
        else:
            print('Creating new vector database')
            vec_db = self.create_vector_database_texts(knowldge_path)
            vec_db.save_local(vector_db_path)

        # The RAG pipeline
        return RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type='stuff',
            retriever=vec_db.as_retriever(search_kwargs={'k': k}),
            # return_source_documents=True
        )

    
    def get_relevant_chunks(self, query, k=5):
        docs = self.vector_db.similarity_search(query, k=k)
        return [doc.page_content for doc in docs]

    def construct_prompt(self, chunks, query):
        context = "\n".join([f"- {chunk}" for chunk in chunks])
    
        rag_template = f"""You are given a user query, some textual context and rules, all inside xml tags. You have to answer the query based on the context while respecting the rules.

<context>
 {context}
</context>

<rules>
- If you don't know, just say so.
- If you are not sure, ask for clarification.
- Answer in the same language as the user query.
- If the context appears unreadable or of poor quality, tell the user then answer as best as you can.
- If the answer is not in the context but you think you know the answer, explain that to the user then answer with your own knowledge.
- Answer directly and without using xml tags.
</rules>

<user_query>
{query}
</user_query>
"""


        return rag_template

    def get_model_response(self, full_prompt, model_name='llama-3.1-70b-versatile'):
        response = self.grok_client.chat.completions.create(
            model="llama-3.1-70b-versatile",
            messages=[
                {"role": "system", "content": "You are a professional conference assistant fluent in Arabic and English. Respond concisely and professionally ONLY IN ARABIC"},
                {"role": "user", "content": full_prompt}
            ],
            max_tokens=250,
            temperature=0.4,
        )
    
        return response.choices[0].message.content


    def process_user_message(self, query):
        relevant_chunks = self.get_relevant_chunks(query)
        full_prompt_after_rag = self.construct_prompt(relevant_chunks, query)
        response = self.get_model_response(full_prompt_after_rag, model_name='llama-3.1-70b-versatile')
    
        return response
    
    def create_vector_database_docs_pdfs(self, knowldge_path):
        loader = PyPDFLoader(knowldge_path)
        documents = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
        docs = text_splitter.split_documents(documents)
        return FAISS.from_documents(docs, self.embedding_model)
    
    def create_vector_database_texts(self, knowldge_path):
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
        all_chunks = []
        
        for filename in os.listdir(knowldge_path):
            if filename.endswith('.txt'):
                file_path = os.path.join(knowldge_path, filename)
                with open(file_path, 'r', encoding='utf-8') as file:
                    text_content = file.read()
                    
                    chunks = text_splitter.split_text(text_content)
                    all_chunks.extend(chunks)
                    
        return FAISS.from_texts(all_chunks, self.embedding_model)
    
    def speach_to_text(self, wav_audio):
        print('we now opened the speech_to_text method .........')
        # segments, _ = self.whisper_model.transcribe(audio_path, language='ar')
        # return ''.join(segment.text for segment in segments)
        # with open(audio_path, "rb") as file:
        # audio_file 
        # audio_file = ("audio.wav", wav_audio)
        # transcription = self.grok_client.audio.transcriptions.create(
        #     file=audio_file, 
        #     model="whisper-large-v3-turbo",
        #     prompt="Specify context or spelling",
        #     response_format="json",
        #     # language='ar',
        #     temperature=0.0,
        # )

        with open(wav_audio, "rb") as file:
            # Create a transcription of the audio file
            transcription = self.grok_client.audio.transcriptions.create(
              file=(wav_audio, file.read()), # Required audio file
              model="whisper-large-v3", # Required model to use for transcription
              prompt="Specify context or spelling",  # Optional
              response_format="json",  # Optional
              # language="ar",  # Optional
              temperature=0.0  # Optional
            )
        
        return transcription.text.strip()

    def generate_response(self, prompt):
        response_data = self.qa_pipeline.invoke(prompt)
        print('Final prompt to LLM after RAG:\n')
        print(response_data['query'])
        return response_data['result']

    def text_to_speech(self, text, output_path="output_voices\speech.mp3"):
        print('***********'*100)
        start = time.time()
        tts = gTTS(text, lang='ar', slow=False)
        
        tts.save(output_path)
        print(f'Text to sound time: {time.time() - start}')

        # output_path = os.path.join(os.getcwd(), output_path)
        
        # music = pyglet.media.load("D:\GitHub projects\Mic_Server_Test\Backend\output_voices\speech.mp3", streaming=False)
        # music.play()
        # os.remove(output_path)
        return tts

    def text_to_sound(self, text):
        audio = self.elevenlabs_client.generate(
            text=text,
            voice="IK7YYZcSpmlkjKrQxbSn",
            model="eleven_multilingual_v2",
            stream=True
        )
        
        start = time.time()
        audio_data = bytearray()
        for chunk in audio:
            audio_data.extend(chunk)
        print(f"time of conversion to bytes: {time.time() - start}")

        return bytes(audio_data)
        # save(audio, "output_voices\speech.mp3")
        # return audio
        # play(audio)
        
    def process_audio_file(self, audio_path):
        # start = time.time()
        # transcription = self.speach_to_text(audio_path)
        # print(f'Sound to text time: {time.time() - start}')

        # start = time.time()
        
        # arabic_instruction = "يرجى تقديم الإجابة باللغة العربية فقط  واجعل الأجابة مختصرة في سطر قدر الامكان الامكان: "
        # # arabic_instruction = "response in just 2 lines: "
        # prompt = f"{arabic_instruction}\n{transcription}"
        # print(prompt)
        # response = self.generate_response(prompt)
        # print(f'Model response time: {time.time() - start}')
        
        
        # tts_output = self.text_to_speech(response)
        # return transcription, response, tts_output

        start = time.time()
        transcription = self.speach_to_text(audio_path)
        print(f'Sound to text time: {time.time() - start}')
        print(f'Transcritption: {transcription}')
        print('='*50)
        
        start = time.time()
        response = self.process_user_message(transcription)
        
        print(f'Model response: {response}')
        print(f'Model response time: {time.time() - start}')
        print('='*50)
        
        start = time.time()
        tts_output = self.text_to_sound(response)
        print(f"time of TTS: {time.time() - start}")

        return transcription, response, tts_output

if __name__ == '__main__':
    audio_path = r"C:\Users\zmlka\Documents\Sound Recordings\Recording (3).m4a"

    print('Initializing all things')
    voice_bot = RAGVoiceBot_v2(
        knowldge_path='./knowledge_base',
        groq_token_path='groq_token.txt',
        vector_db_path='vector_db'
    )

    print('='*50)
    transcription, response, tts_output = voice_bot.process_audio_file(audio_path)

    print('='*50)

    # print("USER:", transcription)
    # print("Assistant:", response)

  def text_to_speech(self, text, output_path="output_voices\speech.mp3"):


Initializing all things
Loading exisiting vector database
we now opened the speech_to_text method .........
Sound to text time: 1.3685686588287354
Transcritption: السلام عليكم كيف الحال؟


  attn_output = torch.nn.functional.scaled_dot_product_attention(


Model response: السلام عليكم، الحمد لله الحال جيد، وكيف يمكنني مساعدتك اليوم؟
Model response time: 9.344614505767822


  def text_to_speech(self, text, output_path="output_voices\speech.mp3"):


ApiError: status_code: 401, body: {'detail': {'status': 'quota_exceeded', 'message': 'This request exceeds your  quota of 10000. You have 2 credits remaining, while 61 credits are required for this request.'}}

In [4]:
import os
import time
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader
from PyPDF2 import PdfReader
from langchain_groq import ChatGroq
from langchain.chains import RetrievalQA
from faster_whisper import WhisperModel
from gtts import gTTS
from groq import Groq
from playsound import playsound 
import pyglet
import io
from elevenlabs import save, play
from elevenlabs.client import ElevenLabs
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline



In [5]:
class SpeechToTextProcessor:
    def __init__(self, mode='local', whisper_model_size='large', groq_token_path=None):

        self.mode = mode

        if mode == 'local':
            print('Your sound-to-text (STT) model running locally')
            self.whisper_model = WhisperModel(whisper_model_size, device="cpu", cpu_threads=os.cpu_count() // 2)

        elif mode == 'groq' and groq_token_path:
            print('Your sound-to-text (STT) model running online')

            with open(groq_token_path, 'r') as f:
                os.environ['GROQ_API_KEY'] = f.readline().strip()
                
            self.grok_client = Groq()

        elif mode=='hugging_face':
            device = "cuda:0" if torch.cuda.is_available() else "cpu"
            torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
            
            model_id = "openai/whisper-large-v3-turbo"
            
            model = AutoModelForSpeechSeq2Seq.from_pretrained(
                model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True
            )
            model.to(device)
            
            processor = AutoProcessor.from_pretrained(model_id)
            
            self.pipe = pipeline(
                "automatic-speech-recognition",
                model=model,
                tokenizer=processor.tokenizer,
                feature_extractor=processor.feature_extractor,
                chunk_length_s=30,
                batch_size=16,  # batch size for inference - set based on your device
                torch_dtype=torch_dtype,
                device=device,
            )

        else:
            raise ValueError("Invalid mode for SpeechToTextProcessor. Use 'local' or 'groq'. Or enter the groq_token_path")


    def transcribe(self, audio_data):
        if self.mode == 'local':
            segments, _ = self.whisper_model.transcribe(audio_data, language="ar")
            return ''.join(segment.text for segment in segments)

        elif self.mode == 'groq':

            # if audio_data == bytes
            # transcription = self.client.audio.transcriptions.create(
            #     file=audio_data.getvalue(),
            #     model="whisper-large-v3-turbo",
            #     response_format="json",
            #     temperature=0.0,
            # )
            
            # if audio_data == 'wav'
            with open(audio_data, "rb") as file:
                transcription = self.grok_client.audio.transcriptions.create(
                  file=(audio_data, file.read()),
                  model="whisper-large-v3",
                  prompt="Specify context or spelling",
                  response_format="json", 
                  # language="ar", 
                  temperature=0.0 
                )

            return transcription.text.strip()

        
        elif self.mode == 'hugging_face':
            return self.pipe('./output_voices/speech.mp3')['text'].strip()

In [None]:
stt = SpeechToTextProcessor(mode='groq', groq_token_path='./groq_token.txt')

Your sound-to-text (STT) model running online


In [None]:
stt.transcribe(r"C:\Users\zmlka\Documents\Sound Recordings\Recording (3).m4a")

'السلام عليكم كيف الحال؟'

In [22]:
class TextToSpeechProcessor:
    def __init__(self, mode='gtts', elevenlabs_api_path=None):
        self.mode = mode

        if mode == 'elevenlabs' and elevenlabs_api_path:
            with open(elevenlabs_api_path, 'r') as f:
                eleven_labs_key = f.readline().strip()
                
            self.client = ElevenLabs(api_key=eleven_labs_key)

        elif mode != 'gtts':
            raise ValueError("Invalid mode for TextToSpeechProcessor. Use 'gtts' or 'elevenlabs' or enter the api token of elevenlabs.")

    
    def synthesize(self, text, output_path=None, voice_id='IK7YYZcSpmlkjKrQxbSn'):
        if self.mode == 'gtts':
            print('Your text-to-sound (TTS) model running locally')
            tts = gTTS(text, lang='ar', slow=False)
            if output_path:
                tts.save(output_path)
            return tts

        elif self.mode == 'elevenlabs':
            print('Your text-to-sound (TTS) model running online on ElevenLabs')
            audio = self.client.generate(
                text=text,
                voice=voice_id,
                model="eleven_multilingual_v2",
                # stream=True,
            )
            # audio_data = bytearray()
            # for chunk in audio:
            #     audio_data.extend(chunk)
            # return bytes(audio_data)


            save(audio, "output_voices\speech.mp3")
            # play(audio)
            return audio
            

  save(audio, "output_voices\speech.mp3")


In [23]:
tts = TextToSpeechProcessor('elevenlabs', elevenlabs_api_path='./elevenlabs_token.txt')
audio = tts.synthesize(text='منصة المستقبل', output_path='./hello.mp3')

Your text-to-sound (TTS) model running online on ElevenLabs


Loading exisiting vector database ...


<langchain_community.vectorstores.faiss.FAISS at 0x19bbb499340>

In [63]:
class VectoreDatabaseManager:
    def __init__(self,
                vector_db_path="data/vector_db",
                knowledge_base_path="data/knowledge",
                metadata_path="data/metadata.json",
                embedding_model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
        
        self.vector_db_path = vector_db_path
        self.knowldge_base_path = knowledge_base_path
        self.metadata_path = metadata_path
        self.embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
        
    def _load_metadata(self):
        if os.path.exists(self.metadata_path):
            with open(self.metadata_path, 'r') as f:
                return json.load(f)
        return {}

    def _save_metadata(self, metadata):
        with open(self.metadata_path, 'w') as f:
            json.dump(metadata, f)
    
    def _get_current_metadata(self):
        metadata = {}
        for root, _, files in os.walk(self.knowldge_base_path):
            for filename in files:
                file_path = os.path.join(root, filename)
                last_modified = os.path.getmtime(file_path)
                metadata[file_path] = last_modified
        return metadata
    
    def _create_vector_database_texts(self):
        """Create a new vector database from knowledge files."""
        print('Creating new vector database ...')
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
        all_chunks = []

        for filename in os.listdir(self.knowldge_base_path):
            file_path = os.path.join(self.knowldge_base_path, filename)
            
            if filename.endswith('.txt'):
                with open(file_path, 'r', encoding='utf-8') as file:
                    text_content = file.read()
                    chunks = text_splitter.split_text(text_content)
                    all_chunks.extend(chunks)
                    
            elif filename.endswith('.pdf'):
                loader = PyPDFLoader(file_path)
                documents = loader.load()
                chunks = text_splitter.split_documents(documents)
                all_chunks.extend(chunks)

        return FAISS.from_texts(all_chunks, self.embedding_model)
    
    
    def get_vector_databaase(self):
        old_metadata = self._load_metadata()
        current_metadata = self._get_current_metadata()
        
        if old_metadata != current_metadata: # if there are files changed or added or removed
            vector_db = self._create_vector_database_texts()
            vector_db.save_local(self.vector_db_path)
            self._save_metadata(current_metadata)
        
        elif os.path.exists(self.vector_db_path):
            print('Loading exisiting vector database ...')
            vector_db = FAISS.load_local(self.vector_db_path, self.embedding_model, allow_dangerous_deserialization=True)
        
        else: # if there is no vector database
            vector_db = self._create_vector_database_texts()
            vector_db.save_local(self.vector_db_path)
            self._save_metadata(current_metadata)
            
        return vector_db

In [47]:
model = text_to_text_RAG(vector_db_path='vector_db', knowldge_path='./knowledge_base')

Loading exisiting vector database


In [49]:
model.process_user_message("هل تعرف شركة future plateform, and what about mohamed hassan")

'لا أعرف شركة future platform، ولا أعرف أي معلومات عن شخص يُدعى محمد حسن. لم أجد أي معلومات عن هذين الموضوعين في السياق المُقدم.'

In [87]:
llm = GroqProvider(model_name='llama-3.1-70b-versatile')


In [88]:
llm.get_response('hi')

'مرحبا، كيف يمكنني مساعدتك؟'

In [102]:
class BaseModelProvider:
    def __init__(self, model_name, max_tokens=250, temperature=0.4):
        self.model_name = model_name
        self.max_tokens = max_tokens
        self.temperature = temperature
        
    def get_response(self, prompt):
        raise NotImplementedError("Subclass must implement abstract method")
    

class GroqProvider(BaseModelProvider):
    def __init__(self, model_name=None, max_tokens=None, temperature=None):
        print("Using groq as the model provider ... ")
        from groq import Groq
        super().__init__(model_name, max_tokens, temperature) # inhirit from the parent class
        
        os.environ['GROQ_API_KEY'] = os.getenv('GROQ_API_KEY')
        self.client = Groq()
    

    def get_response(self, prompt):
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=[
                {"role": "system", "content": "You are a professional conference assistant fluent in Arabic and English. Respond concisely and professionally ONLY IN ARABIC"},
                {"role": "user", "content": prompt}
            ],
            max_tokens=self.max_tokens, # comming from the parent class
            temperature=self.temperature, # comming from the parent class
        )
        
        return response.choices[0].message.content
    
    
class HuggingFaceProvider(BaseModelProvider):
    print("Using HuggingFace as the model provider ... ")
    def __init__(self, model_name=None, max_tokens=None, temperature=None):
        super().__init__(model_name, max_tokens, temperature)
        from transformers import pipeline
        
        device = 0 if torch.cuda.is_available() else -1
        self.pipe = pipeline("text2text-generation", model=self.model_name, device=device)
        
        
    def get_response(self, prompt):
        response = self.pipe(prompt, max_length=self.max_tokens, temperature=self.temperature)
        return response
    

class OpenAIProvider(BaseModelProvider):
    print("Using OpenAI as the model provider ... ")
    def __init__(self, model_name=None, max_tokens=None, temperature=None):
        super().__init__(model_name, temperature, max_tokens)
        import openai  # Import only when needed
        openai.api_key = os.getenv('OPENAI_API_KEY')

    def get_response(self, prompt):
        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt,
            max_tokens=self.max_tokens,
            temperature=self.temperature
        )
        return response['choices'][0]['text'].strip()


Using HuggingFace as the model provider ... 
Using OpenAI as the model provider ... 


In [104]:
agent = text_to_text_with_RAG()
agent

Loading exisiting vector database ...
Using groq as the model provider ... 


<__main__.text_to_text_with_RAG at 0x19cd1cee390>

In [105]:
agent.process_user_message("Hello, what do you know about Future Plateform?")

'مرحبا، منصة المستقبل (Future Platform) هي شركة ذكاء اصطناعي تأسست في المملكة العربية السعودية عام 2018، وهي متخصصة في حلول الذكاء الاصطناعي، وتهدف إلى تمكين الشركات والأفراد من خلال تقنيات الذكاء الاصطناعي والتحول الرقمي.'

In [103]:
class text_to_text_with_RAG:
    def __init__(self, 
                vector_db_path='vector_db',
                knowldge_base_path='./knowledge_base',
                metadata_path='metabase.json',
                embedding_model_name='sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2',
                llm_provider='groq',
                model_name='llama-3.3-70b-versatile',
                max_tokens=250,
                temperature=0.4):
        """
        Initialize the RAG application.

        Args:
            vector_db_path (str): Path to the vector database.
            knowledge_path (str): Path to the knowledge files.
            metadata_path (str): Path to the metadata file.
            llm_provider (str): Name of the model provider (e.g., "groq", "huggingface", "openai").
            model_name (str): Name of the model to use.
            embedding_model (str): Name of the embedding model for vector database.
            max_tokens (int): Maximum tokens for the response.
            temperature (float): Sampling temperature for response generation.
        """
        
        # Initialize the vector database manager
        vdb_manager = VectoreDatabaseManager(
            vector_db_path=vector_db_path,
            knowldge_base_path=knowldge_base_path,
            metadata_path=metadata_path,
            embedding_model_name=embedding_model_name)
        
        self.vector_db = vdb_manager.get_vector_databaase()
        
        
        # Initialize the model provider
        if llm_provider == "groq":
            self.model = GroqProvider(model_name, max_tokens, temperature)
        elif llm_provider == "huggingface":
            self.model =  HuggingFaceProvider(model_name, max_tokens, temperature)
        elif llm_provider == "openai":
            self.model = OpenAIProvider(model_name, max_tokens, temperature)
        else:
            raise ValueError(f"Unsupported provider: {llm_provider}")


    def _get_relevant_chunks(self, query, k=5):
        
        docs = self.vector_db.similarity_search(query, k=k)
        return [doc.page_content for doc in docs]

    def _construct_prompt(self, chunks, query):
        context = "\n".join([f"- {chunk}" for chunk in chunks])
    
        rag_template = f"""You are given a user query, some textual context and rules, all inside xml tags. You have to answer the query based on the context while respecting the rules.

        <context>
        {context}
        </context>

        <rules>
        - If you don't know, just say so.
        - If you are not sure, ask for clarification.
        - Answer in the same language as the user query.
        - If the context appears unreadable or of poor quality, tell the user then answer as best as you can.
        - If the answer is not in the context but you think you know the answer, explain that to the user then answer with your own knowledge.
        - Answer directly and without using xml tags.
        </rules>

        <user_query>
        {query}
        </user_query>
        """


        return rag_template.strip()

    def process_user_message(self, query):
        relevant_chunks = self._get_relevant_chunks(query)
        full_prompt_after_rag = self._construct_prompt(relevant_chunks, query)
        response = self.model.get_response(full_prompt_after_rag)
    
        return response