In [1]:
!pip install -r requirements.txt

Collecting faster-whisper (from -r requirements.txt (line 7))
  Downloading faster_whisper-1.2.0-py3-none-any.whl.metadata (16 kB)
Collecting ffmpeg-python (from -r requirements.txt (line 8))
  Downloading ffmpeg_python-0.2.0-py3-none-any.whl.metadata (1.7 kB)
Collecting pysrt (from -r requirements.txt (line 15))
  Downloading pysrt-1.1.2.tar.gz (104 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/104.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.4/104.4 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting webvtt-py (from -r requirements.txt (line 16))
  Downloading webvtt_py-0.5.1-py3-none-any.whl.metadata (3.4 kB)
Collecting langchain-community (from -r requirements.txt (line 20))
  Downloading langchain_community-0.4-py3-none-any.whl.metadata (3.0 kB)
Collecting langchain-experimental (from -r requirements.txt (line 23))
  Downloading

Imports

In [2]:
# ========================================
# IMPORTS
# ========================================
import os
import json
import shutil
import pathlib
import gc
from pathlib import Path
from typing import List, Optional, TypedDict

import numpy as np
import pandas as pd
from tqdm import tqdm

# Whisper and audio processing
from faster_whisper import WhisperModel
import ffmpeg

# Text processing
import nltk
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')
try:
    nltk.data.find('tokenizers/punkt_tab')
except LookupError:
    nltk.download('punkt_tab')
from nltk.tokenize import sent_tokenize

# LangChain imports
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain.schema import Document
from langchain_experimental.text_splitter import SemanticChunker
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain.prompts import PromptTemplate
from langchain.chains.summarize import load_summarize_chain

# LLM
from langchain_groq import ChatGroq
from langchain_openai import ChatOpenAI

# For subtitle parsing
try:
    import pysrt
except ImportError:
    print("pysrt not installed. SRT support will be limited.")
try:
    import webvtt
except ImportError:
    print("webvtt-py not installed. VTT support will be limited.")

# Google Colab userdata (comment out if not using Colab)
try:
    from google.colab import userdata
    IN_COLAB = True
except ImportError:
    IN_COLAB = False
    print("Not running in Colab. Make sure to set environment variables manually.")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


In [3]:
# ========================================
# CONFIGURATION
# ========================================
WHISPER_MODEL = "medium"
VIDEO_EXT = {".mp4", ".mkv", ".mov", ".avi"}
AUDIO_EXT = {".wav", ".mp3", ".m4a", ".flac", ".aac", ".ogg"}
TRANSCRIPT_EXT = {".txt", ".srt", ".vtt", ".json"}
CHUNK_SIZE = 1800
CHUNK_OVERLAP = 200

# Directory setup
BASE = "."
DATA = f"{BASE}/data"
os.makedirs(f"{DATA}/audio", exist_ok=True)
os.makedirs(f"{DATA}/transcripts", exist_ok=True)
os.makedirs(f"{DATA}/chunks", exist_ok=True)


File Processor

In [4]:
# ========================================
# FILE PROCESSOR CLASS
# ========================================
class FileProcessor:
    """Handles file upload, format detection, and routing"""

    def __init__(self):
        self.whisper_model = None

    def initialize_whisper(self):
        """Initialize Whisper model once"""
        if self.whisper_model is None:
            print("Initializing Whisper model...")
            self.whisper_model = WhisperModel(
                WHISPER_MODEL,
                device="cuda",
                compute_type="float16"
            )
            gc.collect()

    @staticmethod
    def get_file_stem(fname):
        stem = pathlib.Path(fname).stem
        # Sanitize the stem for use in collection names
        import re
        sanitized_stem = re.sub(r'[^a-zA-Z0-9._-]', '_', stem)
        sanitized_stem = sanitized_stem.strip('_-.')  # removes leading/trailing invalid chars
        if len(sanitized_stem) < 3:
         sanitized_stem = f"doc_{sanitized_stem}"
        return sanitized_stem


    @staticmethod
    def get_audio_duration(audio_path):
        """Get audio duration in seconds"""
        try:
            probe = ffmpeg.probe(audio_path)
            return float(probe['format']['duration'])
        except ffmpeg.Error as e:
            print(f"Error getting duration: {e}")
            return 0.0

    def extract_audio_from_video(self, video_path, out_audio_path, sr=16000):
        """Convert video to audio"""
        if not os.path.exists(out_audio_path):
            print(f"Extracting audio from video: {video_path}")
            (
                ffmpeg
                .input(video_path)
                .output(out_audio_path, ac=1, ar=sr, format='wav', loglevel="error")
                .overwrite_output()
                .run()
            )
            print(f"✓ Audio extracted: {out_audio_path}")
        return out_audio_path

    def transcribe_audio(self, audio_path, doc_id):
        """Transcribe audio using Whisper"""
        self.initialize_whisper()

        print(f"Transcribing audio: {audio_path}")
        segments, info = self.whisper_model.transcribe(
            audio_path,
            beam_size=5,
            vad_filter=True,
            word_timestamps=False
        )

        rows = []
        for i, seg in enumerate(segments):
            rows.append({
                "doc_id": doc_id,
                "segment_idx": i,
                "start_ts": float(seg.start),
                "end_ts": float(seg.end),
                "text": seg.text.strip()
            })

        out_path = f"{DATA}/transcripts/{doc_id}.parquet"
        pd.DataFrame(rows).to_parquet(out_path, index=False)
        print(f"✓ Transcript saved: {out_path}")
        return out_path

    def process_video(self, video_path, doc_id):
        """Process video: extract audio, transcribe"""
        print(f"\n=== Processing Video ===")
        audio_path = f"{DATA}/audio/{doc_id}.wav"
        self.extract_audio_from_video(video_path, audio_path)

        duration = self.get_audio_duration(audio_path)
        if duration > 7200:  # 2 hours
            print(f"⚠ Warning: Audio is {duration/60:.1f} minutes. This may take a while.")

        return self.transcribe_audio(audio_path, doc_id)

    def process_audio(self, audio_path, doc_id):
        """Process audio file directly"""
        print(f"\n=== Processing Audio ===")
        ext = pathlib.Path(audio_path).suffix.lower()
        dst = f"{DATA}/audio/{doc_id}.wav"

        if ext == ".wav":
            shutil.copy(audio_path, dst)
        else:
            print(f"Converting {ext} to WAV...")
            (
                ffmpeg
                .input(audio_path)
                .output(dst, ac=1, ar=16000, format='wav', loglevel="error")
                .overwrite_output()
                .run()
            )

        print(f"✓ Audio ready: {dst}")

        duration = self.get_audio_duration(dst)
        if duration > 7200:
            print(f"⚠ Warning: Audio is {duration/60:.1f} minutes.")

        return self.transcribe_audio(dst, doc_id)

    def process_parquet_file(self, parquet_path, doc_id):
        """Process existing parquet transcript file"""
        print(f"\n=== Processing Parquet Transcript ===")
        try:
            df = pd.read_parquet(parquet_path)

            if "text" not in df.columns:
                raise ValueError("Parquet file must contain a 'text' column")

            standardized_rows = []
            for idx, row in df.iterrows():
                standardized_row = {
                    "doc_id": row.get("doc_id", row.get("video_id", doc_id)),
                    "segment_idx": row.get("segment_idx", idx),
                    "start_ts": float(row.get("start_ts", 0.0)),
                    "end_ts": float(row.get("end_ts", 0.0)),
                    "text": str(row["text"]).strip()
                }
                standardized_rows.append(standardized_row)

            std_df = pd.DataFrame(standardized_rows)
            out_path = f"{DATA}/transcripts/{doc_id}.parquet"
            std_df.to_parquet(out_path, index=False)

            print(f"✓ Parquet processed: {out_path} ({len(std_df)} segments)")
            return out_path

        except Exception as e:
            raise ValueError(f"Error processing parquet file: {str(e)}")

    def process_transcript_file(self, transcript_path, doc_id):
        """Process existing transcript files (txt, srt, vtt, json)"""
        print(f"\n=== Processing Transcript File ===")
        ext = pathlib.Path(transcript_path).suffix.lower()

        if ext == ".txt":
            with open(transcript_path, 'r', encoding='utf-8', errors='ignore') as f:
                text = f.read().strip()
            rows = [{"segment_idx": 0, "start_ts": 0.0, "end_ts": 0.0, "text": text}]

        elif ext == ".srt":
            try:
                subs = pysrt.open(transcript_path, encoding='utf-8')
                rows = [{
                    "segment_idx": i,
                    "start_ts": s.start.ordinal/1000.0,
                    "end_ts": s.end.ordinal/1000.0,
                    "text": s.text.replace('\n', ' ').strip()
                } for i, s in enumerate(subs)]
            except NameError:
                raise ValueError("pysrt not installed. Cannot process SRT files.")

        elif ext == ".vtt":
            try:
                def hms_to_s(hms):
                    parts = hms.split(':')
                    if len(parts) == 3:
                        h, m, s = parts
                        return int(h)*3600 + int(m)*60 + float(s)
                    return 0.0

                rows = [{
                    "segment_idx": i,
                    "start_ts": hms_to_s(cap.start),
                    "end_ts": hms_to_s(cap.end),
                    "text": cap.text.replace('\n', ' ').strip()
                } for i, cap in enumerate(webvtt.read(transcript_path))]
            except NameError:
                raise ValueError("webvtt-py not installed. Cannot process VTT files.")

        elif ext == ".json":
            with open(transcript_path, 'r', encoding='utf-8') as f:
                obj = json.load(f)
            segs = obj.get("segments", [])
            rows = [{
                "segment_idx": i,
                "start_ts": float(s.get("start", 0.0)),
                "end_ts": float(s.get("end", 0.0)),
                "text": str(s.get("text", "")).strip()
            } for i, s in enumerate(segs)]

        else:
            raise ValueError(f"Unsupported transcript format: {ext}")

        df = pd.DataFrame(rows)
        df.insert(0, "doc_id", doc_id)
        out_path = f"{DATA}/transcripts/{doc_id}.parquet"
        df.to_parquet(out_path, index=False)
        print(f"✓ Transcript saved: {out_path}")
        return out_path

    def process_pdf(self, pdf_path, doc_id):
        """Process PDF file"""
        print(f"\n=== Processing PDF ===")
        loader = PyPDFLoader(pdf_path)
        docs = loader.load()
        text = " ".join([doc.page_content for doc in docs])

        rows = [{"segment_idx": 0, "start_ts": 0.0, "end_ts": 0.0, "text": text}]
        df = pd.DataFrame(rows)
        df.insert(0, "doc_id", doc_id)
        out_path = f"{DATA}/transcripts/{doc_id}.parquet"
        df.to_parquet(out_path, index=False)
        print(f"✓ PDF processed: {out_path}")
        return out_path

    def route_file(self, file_path):
        """Main routing function for any file type"""
        ext = pathlib.Path(file_path).suffix.lower()
        doc_id = self.get_file_stem(file_path)

        print(f"\n{'='*60}")
        print(f"Processing file: {os.path.basename(file_path)}")
        print(f"File type: {ext}")
        print(f"{'='*60}")

        if ext in VIDEO_EXT:
            return self.process_video(file_path, doc_id)
        elif ext in AUDIO_EXT:
            return self.process_audio(file_path, doc_id)
        elif ext == ".pdf":
            return self.process_pdf(file_path, doc_id)
        elif ext == ".parquet":
            return self.process_parquet_file(file_path, doc_id)
        elif ext in TRANSCRIPT_EXT:
            return self.process_transcript_file(file_path, doc_id)
        else:
            raise ValueError(f"Unsupported file format: {ext}")

In [5]:
# ========================================
# TEXT CHUNKER CLASS
# ========================================
class TextChunker:
    """Smart text chunking with sentence awareness and timestamps"""

    @staticmethod
    def normalize_whitespace(text):
        import re
        text = re.sub(r'\s+', ' ', text)
        return text.strip()

    @staticmethod
    def basic_cleanup(text):
        text = text.replace("'", "'").replace(""", "\"").replace(""", "\"")
        return TextChunker.normalize_whitespace(text)

    @staticmethod
    def sentence_time_expand(seg_text, seg_start, seg_end):
        """Split segment into sentences with proportional timestamps"""
        txt = TextChunker.basic_cleanup(seg_text)
        sents = [s for s in sent_tokenize(txt) if s.strip()]

        if not sents:
            return []

        total_chars = sum(len(s) for s in sents)
        if total_chars == 0:
            return []

        dur = max(0.0, float(seg_end) - float(seg_start))
        out = []
        cur = float(seg_start)

        for s in sents:
            frac = len(s) / total_chars
            sdur = frac * dur
            out.append({"text": s, "start_ts": cur, "end_ts": cur + sdur})
            cur += sdur

        if out:
            out[-1]["end_ts"] = float(seg_end)

        return out

    @staticmethod
    def build_sentence_table(transcript_parquet_path):
        """Convert transcript to sentence-level dataframe"""
        df = pd.read_parquet(transcript_parquet_path)
        rows = []

        for _, r in df.iterrows():
            exp = TextChunker.sentence_time_expand(r["text"], r["start_ts"], r["end_ts"])
            rows.extend(exp)

        if not rows:
            rows = [{
                "text": TextChunker.basic_cleanup(" ".join(df["text"].tolist())),
                "start_ts": 0.0,
                "end_ts": 0.0
            }]

        return pd.DataFrame(rows)

    @staticmethod
    def make_chunks_from_sentences(sents_df, max_chars=1800, overlap_chars=200):
        """Create overlapping chunks respecting sentence boundaries"""
        chunks = []
        buf_text = ""
        buf_starts = []
        buf_ends = []

        def flush_buffer():
            if not buf_text.strip():
                return
            chunks.append({
                "text": buf_text.strip(),
                "start_ts": min(buf_starts) if buf_starts else 0.0,
                "end_ts": max(buf_ends) if buf_ends else 0.0
            })

        for _, row in sents_df.iterrows():
            s = str(row["text"]).strip()
            st, et = float(row["start_ts"]), float(row["end_ts"])

            if not s:
                continue

            if len(buf_text) + len(s) + 1 <= max_chars:
                buf_text = (buf_text + " " + s).strip() if buf_text else s
                buf_starts.append(st)
                buf_ends.append(et)
            else:
                flush_buffer()
                buf_text = s
                buf_starts = [st]
                buf_ends = [et]

        flush_buffer()
        return chunks

    @staticmethod
    def build_and_save_chunks(transcript_path, max_chars=1800, overlap_chars=200):
        """Main function to create and save chunks"""
        print(f"\n=== Creating Chunks ===")
        doc_id = os.path.splitext(os.path.basename(transcript_path))[0]
        sents_df = TextChunker.build_sentence_table(transcript_path)
        chunks = TextChunker.make_chunks_from_sentences(sents_df, max_chars, overlap_chars)

        out_rows = []
        for i, c in enumerate(chunks):
            out_rows.append({
                "doc_id": doc_id,
                "chunk_idx": i,
                "start_ts": float(c["start_ts"]),
                "end_ts": float(c["end_ts"]),
                "text": c["text"]
            })

        cdf = pd.DataFrame(out_rows)
        outp = f"{DATA}/chunks/{doc_id}_chunks.parquet"
        cdf.to_parquet(outp, index=False)
        print(f"✓ Chunks saved: {outp} ({len(cdf)} chunks)")
        return outp


In [6]:
# ========================================
# VECTOR STORE CLASS
# ========================================
class VectorStore:
    def __init__(self, collection_name="quantum_notes", model_name='sentence-transformers/all-mpnet-base-v2'):
        self.collection_name = collection_name
        self.embedding_function = HuggingFaceEmbeddings(model_name=model_name)
        self.persist_directory = "./chroma_langchain_db"

        self.vector_store = Chroma(
            collection_name=self.collection_name,
            embedding_function=self.embedding_function,
            persist_directory=self.persist_directory
        )

    def add_documents(self, docs):
        """Add documents to vector store"""
        print(f"\n=== Adding {len(docs)} documents to vector store ===")
        self.vector_store.add_documents(docs)
        print("✓ Documents added successfully")

    def similarity_search_with_score(self, query, k=5):
        """Search with similarity scores"""
        results = self.vector_store.similarity_search_with_score(query, k=k)
        results = sorted(results, key=lambda x: x[1], reverse=True)
        return results

    def get_all_chunks(self):
        """Returns a list of Document objects"""
        raw_data = self.vector_store.get()
        docs = raw_data["documents"]
        metadatas = raw_data.get("metadatas", [{}] * len(docs))

        doc_objects = [Document(page_content=doc, metadata=meta) for doc, meta in zip(docs, metadatas)]
        return doc_objects

    def clear(self):
        """Clear the vector store"""
        if os.path.exists(self.persist_directory):
            shutil.rmtree(self.persist_directory)
            print(f"✓ Cleared vector store at {self.persist_directory}")

In [7]:
# ========================================
# DOCUMENT SUMMARIZER CLASS
# ========================================
class DocumentSummarizer:
    def __init__(self, model_name="llama-3.1-8b-instant", use_groq=True):
        """
        Initialize DocumentSummarizer

        Args:
            model_name: Model name to use
            use_groq: If True, use Groq, else use OpenAI
        """
        if use_groq:
            if IN_COLAB:
                os.environ["GROQ_API_KEY"] = userdata.get("GROQ_API_KEY")
            self.llm = ChatGroq(model_name=model_name, verbose=False)
        else:
            if IN_COLAB:
                os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_TOKEN")
            self.llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.5)

    def summarize_all(self, chunks):
        """Summarize all chunks"""
        print("\n=== Summarizing All Chunks ===")
        chain = load_summarize_chain(self.llm, chain_type="map_reduce")
        summary = chain.invoke(chunks)
        return summary

    def summarize_query(self, vector_store, query, k=3):
        """Summarize based on query"""
        print(f"\n=== Generating Summary for Query: '{query}' ===")
        results = vector_store.similarity_search_with_score(query)
        relevant_docs = [doc for doc, score in results[:k]]

        prompt_template = """Based on the following context, provide a meaningful and insightful summary focusing on: {query}

Context: {context}

DO NOT make up stuff. If the text is empty or does not contain information related to the query, say 'Information not found in documents related to the query'

Summary:"""

        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["query", "context"]
        )

        context = "\n\n".join([chunk.page_content for chunk in relevant_docs])
        summary = self.llm.invoke(prompt.format(query=query, context=context))

        return summary, relevant_docs

    def summarize_notes(self, notes):
        """Summarize user-provided notes"""
        print("\n=== Summarizing Notes ===")
        prompt_template = """Based on the given notes: {notes}, provide a meaningful and insightful summary.
DO NOT make up stuff. If the notes are empty, say 'Notes not found!'
Summary:"""

        prompt = PromptTemplate(template=prompt_template, input_variables=["notes"])
        summary = self.llm.invoke(prompt.format(notes=notes))
        return summary

In [8]:
# ========================================
# FLASHCARDS CLASS
# ========================================
class FlashCards:
    def __init__(self, model_name="llama-3.1-8b-instant", use_groq=True):
        if use_groq:
            if IN_COLAB:
                os.environ["GROQ_API_KEY"] = userdata.get("GROQ_API")
            self.llm = ChatGroq(model_name=model_name, verbose=False)
        else:
            if IN_COLAB:
                os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_TOKEN")
            self.llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.5)

    def create_flashcards_on_topic(self, vector_store, query, k=3):
        """Create flashcards on specific topic"""
        print(f"\n=== Creating Flashcards for: '{query}' ===")
        results = vector_store.similarity_search_with_score(query)
        relevant_docs = [doc for doc, score in results[:k]]

        prompt_template = """You are an expert in creating flashcards based on the provided information.

Based on the following context, create up to 8 flashcards focusing on the topic: {query}

If the concept is complex, break it down into a core definition, formula, or key takeaway.

Each flashcard should have:
1. A question or term on one side
2. A short (2 to 3 lines), memorable answer that captures the key point(s) on the other side
3. Simple and erudite language that is easy to understand and memorize

Context:
{context}

Format your response as:
Front: [Question]
Back: [Answer]

Flashcards:"""

        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["query", "context"]
        )

        context = "\n\n".join([chunk.page_content for chunk in relevant_docs])
        formatted_prompt = prompt.format(query=query, context=context)
        cards = self.llm.invoke(formatted_prompt)

        return cards, relevant_docs

    def create_flashcards_based_notes(self, notes):
        """Create flashcards from notes"""
        print("\n=== Creating Flashcards from Notes ===")
        prompt_template = """You are an expert in creating flashcards based on the provided information.

Based on the following notes, create up to 8 flashcards.
notes: {notes}

If the concept is complex, break it down into a core definition, formula, or key takeaway.

Each flashcard should have:
1. A question or term on one side
2. A short (2 to 3 lines), memorable answer that captures the key point(s) on the other side
3. Simple and erudite language that is easy to understand and memorize

Format your response as:
Front: [Question]
Back: [Answer]

Flashcards:"""

        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["notes"]
        )
        cards = self.llm.invoke(prompt.format(notes=notes))
        return cards

In [9]:
# ========================================
# QUIZ GENERATOR CLASS
# ========================================
class QuizGenerator:
    def __init__(self, model_name="llama-3.1-8b-instant", use_groq=True):
        if use_groq:
            if IN_COLAB:
                os.environ["GROQ_API_KEY"] = userdata.get("GROQ_API_KEY")
            self.llm = ChatGroq(model_name=model_name, verbose=False)
        else:
            if IN_COLAB:
                os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_TOKEN")
            self.llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.7)

    def generate_quiz_all(self, chunks, num_questions=10):
        """Generate quiz from all chunks"""
        print(f"\n=== Generating Quiz ({num_questions} questions) ===")
        context = "\n\n".join([chunk.page_content for chunk in chunks[:15]])

        prompt_template = """You are an expert quiz creator. Based on the following content, create a quiz with {num_questions} multiple-choice questions.

Content:
{context}

Instructions:
1. Create {num_questions} questions that test key concepts and understanding
2. Each question should have 4 options (A, B, C, D)
3. Mark the correct answer
4. Cover different topics/concepts from the content
5. Mix difficulty levels (easy, medium, hard)

Format your response EXACTLY as follows:

Question 1: [Question text]
A) [Option A]
B) [Option B]
C) [Option C]
D) [Option D]
Correct Answer: [A/B/C/D]
Explanation: [Brief explanation of why this is correct]

[Continue for all {num_questions} questions]

Quiz:"""

        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["num_questions", "context"]
        )

        quiz = self.llm.invoke(prompt.format(num_questions=num_questions, context=context))
        print("✓ Quiz generation complete!")
        return quiz

    def generate_quiz_on_topic(self, vector_store, query, num_questions=10, k=5):
        """Generate quiz on specific topic"""
        print(f"\n=== Generating Quiz on Topic: '{query}' ({num_questions} questions) ===")
        results = vector_store.similarity_search_with_score(query)
        relevant_docs = [doc for doc, score in results[:k]]

        prompt_template = """You are an expert quiz creator. Based on the following context, create a quiz with {num_questions} multiple-choice questions focusing on: {query}

Context:
{context}

Instructions:
1. Create {num_questions} questions specifically about: {query}
2. Each question should have 4 options (A, B, C, D)
3. Mark the correct answer
4. Questions should test understanding of the topic
5. Mix difficulty levels (easy, medium, hard)

DO NOT make up information. If the context doesn't contain enough information about the query, create fewer questions and mention: "Limited information available for full quiz on this topic."

Format your response EXACTLY as follows:

Question 1: [Question text]
A) [Option A]
B) [Option B]
C) [Option C]
D) [Option D]
Correct Answer: [A/B/C/D]
Explanation: [Brief explanation]

[Continue for all questions]

Quiz:"""

        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["num_questions", "query", "context"]
        )

        context = "\n\n".join([chunk.page_content for chunk in relevant_docs])
        quiz = self.llm.invoke(
            prompt.format(num_questions=num_questions, query=query, context=context)
        )

        print("✓ Quiz generation complete!")
        return quiz, relevant_docs

    def generate_quiz_from_notes(self, notes, num_questions=10):
        """Generate quiz from notes"""
        print(f"\n=== Generating Quiz from Notes ({num_questions} questions) ===")
        prompt_template = """You are an expert quiz creator. Based on the given notes, create a quiz with {num_questions} multiple-choice questions.

Notes:
{notes}

Instructions:
1. Create {num_questions} questions that test understanding of the notes
2. Each question should have 4 options (A, B, C, D)
3. Mark the correct answer
4. Cover key concepts from the notes
5. Mix difficulty levels (easy, medium, hard)

DO NOT make up stuff. If the notes are empty or too short, say 'Notes insufficient for quiz generation!'

Format your response EXACTLY as follows:

Question 1: [Question text]
A) [Option A]
B) [Option B]
C) [Option C]
D) [Option D]
Correct Answer: [A/B/C/D]
Explanation: [Brief explanation]

[Continue for all {num_questions} questions]

Quiz:"""

        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["num_questions", "notes"]
        )

        quiz = self.llm.invoke(prompt.format(num_questions=num_questions, notes=notes))
        print("✓ Quiz generation complete!")
        return quiz

In [12]:
# ========================================
# MAIN PIPELINE CLASS
# ========================================
class QuantumNotesPipeline:
    """
    Main pipeline for QuantumNotes with multimodal support

    Supports:
    - Videos (mp4, mkv, mov, avi)
    - Audio (wav, mp3, m4a, flac, aac, ogg)
    - PDFs
    - Text files
    - Transcripts (txt, srt, vtt, json, parquet)
    """

    def __init__(self, use_groq=True):
        """
        Initialize pipeline

        Args:
            use_groq: If True, use Groq for LLM, else use OpenAI
        """
        self.file_processor = FileProcessor()
        self.use_groq = use_groq
        self.vector_store = None
        self.current_doc_id = None

    def process_file(self, file_path):
        """
        Process any supported file type

        Args:
            file_path: Path to the file

        Returns:
            VectorStore instance ready for querying
        """
        print(f"\n{'='*60}")
        print(f"QUANTUMNOTES MULTIMODAL PIPELINE")
        print(f"{'='*60}")

        # Step 1: Process file (video/audio/pdf/transcript)
        transcript_path = self.file_processor.route_file(file_path)
        self.current_doc_id = self.file_processor.get_file_stem(file_path)

        # Step 2: Create chunks
        chunks_path = TextChunker.build_and_save_chunks(
            transcript_path,
            max_chars=CHUNK_SIZE,
            overlap_chars=CHUNK_OVERLAP
        )

        # Step 3: Load chunks as documents
        df = pd.read_parquet(chunks_path)
        documents = [
            Document(
                page_content=row["text"],
                metadata={
                    "doc_id": row["doc_id"],
                    "chunk_idx": row["chunk_idx"],
                    "start_ts": row["start_ts"],
                    "end_ts": row["end_ts"]
                }
            ) for _, row in df.iterrows()
        ]

        # Step 4: Create vector store
        print(f"\n=== Building Vector Store ===")
        self.vector_store = VectorStore(collection_name=f"quantum_{self.current_doc_id}")
        self.vector_store.add_documents(documents)

        print(f"\n{'='*60}")
        print(f"✓ File processing complete!")
        print(f"✓ Document ID: {self.current_doc_id}")
        print(f"✓ Total chunks: {len(documents)}")
        print(f"{'='*60}\n")

        return self.vector_store

    def summarize(self, query=None, k=3):
        """Generate summary based on query or summarize all"""
        if not self.vector_store:
            raise ValueError("No file processed. Call process_file() first.")

        ds = DocumentSummarizer(use_groq=self.use_groq)

        if query:
            summary, relevant_docs = ds.summarize_query(self.vector_store, query, k=k)
            return summary, relevant_docs
        else:
            all_chunks = self.vector_store.get_all_chunks()
            summary = ds.summarize_all(all_chunks)
            return summary


    def summarize_all(self):
        """Summarize entire document"""
        if not self.vector_store:
            raise ValueError("No file processed. Call process_file() first.")

        ds = DocumentSummarizer(use_groq=self.use_groq)
        all_chunks = self.vector_store.get_all_chunks()
        summary = ds.summarize_all(all_chunks)
        return summary

    def make_notes(self, query, k=3):
        """Create structured notes"""
        if not self.vector_store:
            raise ValueError("No file processed. Call process_file() first.")

        # Use summarizer to generate notes-style content
        ds = DocumentSummarizer(use_groq=self.use_groq)
        results = self.vector_store.similarity_search_with_score(query)
        relevant_docs = [doc for doc, score in results[:k]]

        prompt_template = """You are an expert in analyzing documents and creating meaningful notes.

Based on the following text:
{context}

Create structured notes focusing on the query: {query}

These notes should have:
1. Key Points: [main ideas]
2. Important Details: [supporting information]
3. Actionable Insights: [what can be done]
4. Additional Information: [any other relevant details from the text]"""

        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["query", "context"]
        )

        context = "\n\n".join([chunk.page_content for chunk in relevant_docs])
        notes = ds.llm.invoke(prompt.format(query=query, context=context))

        return notes

    def create_flashcards(self, query, k=3):
        """Generate flashcards on topic"""
        if not self.vector_store:
            raise ValueError("No file processed. Call process_file() first.")

        fc = FlashCards(use_groq=self.use_groq)
        cards, relevant_docs = fc.create_flashcards_on_topic(self.vector_store, query, k=k)
        return cards, relevant_docs

    def generate_quiz(self, query=None, num_questions=10, k=5):
        """Generate quiz"""
        if not self.vector_store:
            raise ValueError("No file processed. Call process_file() first.")

        qg = QuizGenerator(use_groq=self.use_groq)

        if query:
            quiz, relevant_docs = qg.generate_quiz_on_topic(
                self.vector_store,
                query,
                num_questions=num_questions,
                k=k
            )
            return quiz, relevant_docs
        else:
            all_chunks = self.vector_store.get_all_chunks()
            quiz = qg.generate_quiz_all(all_chunks, num_questions=num_questions)
            return quiz


    def search(self, query, k=5):
        """Search for relevant chunks"""
        if not self.vector_store:
            raise ValueError("No file processed. Call process_file() first.")

        results = self.vector_store.similarity_search_with_score(query, k=k)
        return results

In [None]:
pipeline = QuantumNotesPipeline(use_groq=True)

file_path = "/content/MCAKCA032-PRINCIPALES OF SOFT COMPUTING-SN SIVNANDAM AND DEEPA SN (1).pdf"
pipeline.process_file(file_path)

summary = pipeline.summarize("What is Linear Vector Quantization?")
print(summary.content)


QUANTUMNOTES MULTIMODAL PIPELINE

Processing file: MCAKCA032-PRINCIPALES OF SOFT COMPUTING-SN SIVNANDAM AND DEEPA SN (1).pdf
File type: .pdf

=== Processing PDF ===
✓ PDF processed: ./data/transcripts/MCAKCA032-PRINCIPALES_OF_SOFT_COMPUTING-SN_SIVNANDAM_AND_DEEPA_SN__1.parquet

=== Creating Chunks ===
✓ Chunks saved: ./data/chunks/MCAKCA032-PRINCIPALES_OF_SOFT_COMPUTING-SN_SIVNANDAM_AND_DEEPA_SN__1_chunks.parquet (786 chunks)

=== Building Vector Store ===

=== Adding 786 documents to vector store ===
✓ Documents added successfully

✓ File processing complete!
✓ Document ID: MCAKCA032-PRINCIPALES_OF_SOFT_COMPUTING-SN_SIVNANDAM_AND_DEEPA_SN__1
✓ Total chunks: 786


=== Generating Summary for Query: 'What is Linear Vector Quantization?' ===
Information not found in documents related to the query


In [13]:
pipeline = QuantumNotesPipeline(use_groq=True)
file_path = "/content/ana-bell-v1.mp3"
pipeline.process_file(file_path)
summary, relevant_docs = pipeline.summarize("Key concepts")
print(summary.content)
print("\nRelevant Documents:")
for doc in relevant_docs:
    print(f"- {doc.page_content[:200]}...") # Print first 200 characters of each doc


QUANTUMNOTES MULTIMODAL PIPELINE

Processing file: ana-bell-v1.mp3
File type: .mp3

=== Processing Audio ===
Converting .mp3 to WAV...
✓ Audio ready: ./data/audio/ana-bell-v1.wav
Initializing Whisper model...
Transcribing audio: ./data/audio/ana-bell-v1.wav
✓ Transcript saved: ./data/transcripts/ana-bell-v1.parquet

=== Creating Chunks ===
✓ Chunks saved: ./data/chunks/ana-bell-v1_chunks.parquet (16 chunks)

=== Building Vector Store ===

=== Adding 16 documents to vector store ===
✓ Documents added successfully

✓ File processing complete!
✓ Document ID: ana-bell-v1
✓ Total chunks: 16


=== Generating Summary for Query: 'Key concepts' ===
Key concepts discussed in the given context include:

1. **Interpretation**: The importance of understanding that programming code can be interpreted in only one way, unlike human language where multiple interpretations are possible.

2. **Rubber Duckie Debugging**: A technique used to debug code by explaining the code to an inanimate object, such a

In [None]:
pipeline = QuantumNotesPipeline(use_groq=True)
cards, relevant_docs = pipeline.create_flashcards("Important terms")
print(cards.content)
print("\nRelevant Documents:")
for doc in relevant_docs:
    print(f"- {doc.page_content[:200]}...") # Print first 200 characters of each doc

In [22]:
pipeline = QuantumNotesPipeline(use_groq=True)
file_path = "/content/lecture_transcript.txt"
pipeline.process_file(file_path)
quiz, relevant_docs = pipeline.generate_quiz("Core concepts", num_questions=5)
print(quiz.content)
print("\nRelevant Documents:")
for doc in relevant_docs:
    print(f"- {doc.page_content[:100]}...") # Print first 200 characters of each doc


QUANTUMNOTES MULTIMODAL PIPELINE

Processing file: lecture_transcript.txt
File type: .txt

=== Processing Transcript File ===
✓ Transcript saved: ./data/transcripts/lecture_transcript.parquet

=== Creating Chunks ===
✓ Chunks saved: ./data/chunks/lecture_transcript_chunks.parquet (17 chunks)

=== Building Vector Store ===

=== Adding 17 documents to vector store ===
✓ Documents added successfully

✓ File processing complete!
✓ Document ID: lecture_transcript
✓ Total chunks: 17


=== Generating Quiz on Topic: 'Core concepts' (5 questions) ===
✓ Quiz generation complete!
Question 1: What is the primary purpose of MIT OpenCourseWare (OCW)?
A) To provide exclusive educational resources to MIT students only
B) To share transformational resources with a wider audience
C) To create a community for MIT professors to discuss course materials
D) To sell online courses for a fee
Correct Answer: B) To share transformational resources with a wider audience
Explanation: The text suggests that MIT O

In [None]:
pipeline = QuantumNotesPipeline(use_groq=True)
results = pipeline.search("specific topic", k=3)
for doc, score in results:
      print(f"Score: {score}")
      print(doc.page_content)
      print("-" * 40)