<a href="https://colab.research.google.com/github/sharook-khan-pathan/MultiModelDataProcessingSystem/blob/main/MultiModelDataProcessingSystem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Step 1 – Sentence Transformers
!pip install sentence-transformers

#Step 2 – ChromaDB
!pip install chromadb

#Step 3 – Gemini API
!pip install google-generativeai

#Step 4 – PDF support
!pip install PyPDF2

#Step 5 – DOCX support
!pip install python-docx

#Step 6 – PPTX support
!pip install python-pptx

#Step 7 – YouTube downloader
!pip install pytube

#Step 8 – Speech recognition
!pip install SpeechRecognition

#Step 9 – Video/audio processing
!pip install moviepy

#Step 10 – Image processing
!pip install Pillow

#Step 11 – OCR (text from images)
!pip install pytesseract

!apt-get install -y ffmpeg

In [None]:
!pip install -U pytube
!pip install -q yt-dlp pydub

In [None]:
# Step 1: Imports
# ======================================================
import os, tempfile
from google.colab import files
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import PyPDF2, docx
from pptx import Presentation
import pytesseract
from PIL import Image
import speech_recognition as sr
from pydub import AudioSegment
import yt_dlp
import google.generativeai as genai



In [None]:
#Step 2: Configure Gemini API
# ======================================================
import os

api_key = os.environ.get("GENAI_API_KEY")
if not api_key:
    raise RuntimeError("Set GENAI_API_KEY environment variable before running this notebook.")
genai.configure(api_key=api_key)

gemini_model = genai.GenerativeModel("models/gemini-2.5-flash")

In [None]:
# Step 3: Initialize local embedding model
# ======================================================
# This model converts text into vector embeddings for ChromaDB memory
embedder = SentenceTransformer('all-MiniLM-L6-v2')


In [None]:
# Step 4: Initialize ChromaDB collection
# ======================================================
# ChromaDB stores embeddings + documents (memory system)
client = chromadb.Client(Settings(persist_directory="./vector_store"))
collection_name = "gemini_memory"

try:
    client.delete_collection(collection_name)
    print("✅ Deleted previous memory collection.")
except:
    print("⚠️ No previous memory collection found to delete.")

collection = client.get_or_create_collection(collection_name)


✅ Deleted previous memory collection.


In [9]:
def chunk_text(text, chunk_size=500):
    """Splits text into smaller chunks"""
    words = text.split()
    return [" ".join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)]

def extract_text_from_file(filename):
    text = ""
    ext = filename.split('.')[-1].lower()

    try:
        if ext in ["txt", "md"]:
            with open(filename, "r", encoding="utf-8") as f:
                text = f.read()

        elif ext == "pdf":
            reader = PyPDF2.PdfReader(filename)
            for page in reader.pages:
                text += page.extract_text() or ""

        elif ext == "docx":
            doc = docx.Document(filename)
            for para in doc.paragraphs:
                text += para.text + "\n"

        elif ext == "pptx":
            prs = Presentation(filename)
            for slide in prs.slides:
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        text += shape.text + "\n"

        elif ext in ["png", "jpg", "jpeg"]:
            image = Image.open(filename)
            text = pytesseract.image_to_string(image)

        elif ext in ["mp3", "wav", "mp4"]:
            audio_file = filename
            if ext == "mp4":
                from moviepy.editor import VideoFileClip
                clip = VideoFileClip(filename)
                temp_wav = "temp_audio.wav"
                clip.audio.write_audiofile(temp_wav, codec='pcm_s16le')
                clip.close()
                audio_file = temp_wav
            elif ext == "mp3":
                temp_wav = "temp_audio.wav"
                AudioSegment.from_file(filename).export(temp_wav, format="wav")
                audio_file = temp_wav

            r = sr.Recognizer()
            try:
                with sr.AudioFile(audio_file) as source:
                    audio = r.record(source)
                    text = r.recognize_google(audio)
            except Exception as e:
                text = ""
            if os.path.exists(audio_file):
                os.remove(audio_file)
    except Exception as e:
        print(f"⚠️ Error extracting {filename}: {e}")

    return text

def extract_text_from_youtube(url):
    try:
        url = url.split("?")[0]  # clean URL
        print(f"🎥 Downloading audio from: {url}")
        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".%(ext)s").name

        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': temp_file,
            'quiet': True,
            'no_warnings': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
            downloaded_path = ydl.prepare_filename(info)

        wav_path = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name
        audio = AudioSegment.from_file(downloaded_path)
        os.remove(downloaded_path)

        r = sr.Recognizer()
        text = ""
        chunk_length_ms = 60000  # 1 minute chunks
        for i in range(0, len(audio), chunk_length_ms):
            chunk = audio[i:i + chunk_length_ms]
            chunk_path = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name
            chunk.export(chunk_path, format="wav")
            with sr.AudioFile(chunk_path) as source:
                audio_chunk = r.record(source)
            try:
                text += r.recognize_google(audio_chunk) + " "
            except:
                pass
            os.remove(chunk_path)

        if text.strip() == "":
            print("⚠️ Transcription failed or empty.")
        else:
            print("✅ YouTube audio processed successfully!")
        return text
    except Exception as e:
        print(f"❌ Error processing YouTube video: {e}")
        return ""


def store_text_in_chroma(text, prefix="file"):
    if not text.strip():
        return 0
    chunks = chunk_text(text)
    for i, chunk in enumerate(chunks):
        embedding = embedder.encode(chunk).tolist()
        collection.add(
            ids=[f"{prefix}_chunk_{i+1}"],
            embeddings=[embedding],
            documents=[chunk]
        )
    return len(chunks)

In [None]:
# Step 7: Store uploaded files automatically into ChromaDB
# ======================================================

# 1️⃣ Upload files via Colab
uploaded = files.upload()

for filename in uploaded.keys():
    text = extract_text_from_file(filename)
    n_chunks = store_text_in_chroma(text, prefix=filename)
    if n_chunks > 0:
        print(f"✅ Stored {n_chunks} chunks from {filename}")
    else:
        print(f"⚠️ No valid text found in {filename}")


In [None]:
youtube_url = input("🎥 Enter YouTube URL (or press Enter to skip): ").strip()
if youtube_url:
    yt_text = extract_text_from_youtube(youtube_url)
    n_chunks = store_text_in_chroma(yt_text, prefix="youtube")
    if n_chunks > 0:
        print(f"✅ Stored {n_chunks} chunks from YouTube video")
    else:
        print("⚠️ No valid text from YouTube video")





In [None]:
# Step 8: Interactive query function
# ======================================================
def query_memory_local(question: str):
    query_embedding = embedder.encode(question).tolist()
    results = collection.query(query_embeddings=[query_embedding], n_results=3)

    if not results["documents"] or all(not d.strip() for d in results["documents"][0]):
        print("⚠️ No relevant information found in memory.")
        return

    context = " ".join([d for d in results["documents"][0] if d.strip()])
    prompt = f"""
Use the following stored information to answer the question accurately.

Context:
{context}

Question:
{question}
"""
    response = gemini_model.generate_content(prompt)
    print("🤖 Answer:", response.text)


In [None]:
# Step 9: Interactive query loop
# ======================================================
while True:
    question = input("\nEnter your question (or type 'exit' to quit): ")
    if question.lower() == "exit":
        print("👋 Exiting AI memory system.")
        break
    query_memory_local(question)