In [1]:
# ================================================================
# EduVision — Lecture Summarizer
# Input: YouTube URL OR local media file (.mp3/.mp4/.wav/...) OR a transcript (.txt/.vtt/.srt)
# Output: transcript + short/medium/long summaries + bullet notes + key phrases (saved to disk)
# ================================================================
import os, re, json, math, tempfile, subprocess
from pathlib import Path
from typing import List, Tuple

# ----------------- USER SETTINGS -----------------
# You can set these directly if you run in a notebook.
INPUT_SOURCE = ""  # e.g., "https://www.youtube.com/watch?v=..." OR r"C:\path\to\lecture.mp4" OR r"C:\path\to\transcript.txt"
OUT_DIR = Path(r"C:\Users\sagni\Downloads\Edu Vision\outputs")
ASR_MODEL = "base"        # whisper size: tiny | base | small | medium | large
DEVICE = "cuda" if False else "cpu"  # set True above if you have GPU configured (torch.cuda.is_available())
SUMMARY_MODEL = "sshleifer/distilbart-cnn-12-6"  # small, fast; use "facebook/bart-large-cnn" for higher quality

# Summary lengths (approximate)
SHORT_MAX_WORDS  = 120
MEDIUM_MAX_WORDS = 250
LONG_MAX_WORDS   = 500

# Chunking config for long transcripts (approx)
CHUNK_WORDS   = 1200
CHUNK_OVERLAP = 150

# ----------------- DEPENDENCIES ------------------
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

# Optional: If you plan to parse .vtt or .srt
VTT_SRT_SUPPORT = True

# ----------------- UTILITIES ---------------------
MEDIA_EXTS = {".mp3",".mp4",".m4a",".wav",".flac",".aac",".mov",".mkv",".webm",".ogg"}
TEXT_EXTS  = {".txt",".json",".vtt",".srt"}

def ensure_outdir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def is_url(s: str) -> bool:
    return s.lower().startswith(("http://","https://","www."))

def download_youtube_audio(url: str) -> Path:
    """
    Download audio from YouTube as a .mp3 using yt-dlp.
    Returns local file path.
    """
    ensure_outdir(OUT_DIR)
    out_tmpl = str(OUT_DIR / "yt_audio.%(ext)s")
    cmd = [
        "yt-dlp",
        "-x", "--audio-format", "mp3",
        "-o", out_tmpl,
        url
    ]
    print("[INFO] Downloading audio with yt-dlp...")
    subprocess.run(cmd, check=True)
    # Find the produced file
    for p in OUT_DIR.glob("yt_audio.*"):
        return p
    raise RuntimeError("yt-dlp did not produce an audio file.")

def read_text_file(path: Path) -> str:
    if path.suffix.lower() == ".json":
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        # If it's a plain dict or list of lines; try to join strings
        if isinstance(data, dict) and "text" in data:
            return str(data["text"])
        if isinstance(data, list):
            return "\n".join([str(x) for x in data])
        return json.dumps(data, ensure_ascii=False, indent=2)
    elif path.suffix.lower() in {".vtt",".srt"} and VTT_SRT_SUPPORT:
        return parse_captions_to_text(path)
    else:
        return Path(path).read_text(encoding="utf-8", errors="ignore")

def parse_captions_to_text(path: Path) -> str:
    """
    Very light VTT/SRT to plain text (drop timestamps).
    """
    raw = path.read_text(encoding="utf-8", errors="ignore")
    # remove WEBVTT header, SRT indices, timestamps
    raw = re.sub(r"WEBVTT.*?\n", "", raw, flags=re.IGNORECASE|re.DOTALL)
    raw = re.sub(r"^\d+\s*$", "", raw, flags=re.MULTILINE)  # srt cue index lines
    raw = re.sub(r"\d{2}:\d{2}:\d{2}\.\d{3}\s*-->\s*\d{2}:\d{2}:\d{2}\.\d{3}.*", "", raw)
    raw = re.sub(r"\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}.*", "", raw)
    # collapse multiple newlines
    raw = re.sub(r"\n{2,}", "\n", raw)
    return raw.strip()

def whisper_transcribe(audio_path: Path, model_size=ASR_MODEL, device=DEVICE) -> str:
    """
    Transcribe using openai-whisper (local inference).
    """
    import whisper
    print(f"[INFO] Loading Whisper model: {model_size} on {device}")
    model = whisper.load_model(model_size, device=device)
    print("[INFO] Transcribing… (this can take a while)")
    result = model.transcribe(str(audio_path))
    txt = result.get("text", "").strip()
    return txt

def chunk_text_words(text: str, chunk_words=CHUNK_WORDS, overlap=CHUNK_OVERLAP) -> List[str]:
    words = re.findall(r"\S+", text)
    chunks = []
    i = 0
    while i < len(words):
        j = min(i + chunk_words, len(words))
        chunk = " ".join(words[i:j])
        if chunk.strip():
            chunks.append(chunk)
        i = j - overlap
        if i <= 0: i = j  # first iteration safeguard
        if i >= len(words): break
    return chunks

def load_summarizer(model_name=SUMMARY_MODEL, device=DEVICE):
    print(f"[INFO] Loading summarizer: {model_name}")
    tok = AutoTokenizer.from_pretrained(model_name)
    m   = AutoModelForSeq2SeqLM.from_pretrained(model_name)
    if device == "cuda" and torch.cuda.is_available():
        m = m.to("cuda")
    return tok, m

def summarize_chunk(tok, model, text: str, max_words=150) -> str:
    # Convert word target to tokens approx
    # (rough heuristic: ~1.3 tokens/word for English; adjust as needed)
    max_new_tokens = max(64, int(max_words * 1.3))
    inputs = tok([text], truncation=True, padding=True, return_tensors="pt", max_length=1024)
    if next(model.parameters()).is_cuda:
        inputs = {k: v.to("cuda") for k, v in inputs.items()}
    with torch.no_grad():
        out = model.generate(
            **inputs, max_new_tokens=max_new_tokens,
            num_beams=4, length_penalty=2.0, early_stopping=True
        )
    summary = tok.decode(out[0], skip_special_tokens=True)
    return summary.strip()

def map_reduce_summarize(full_text: str, tok, model, target_words=200) -> str:
    # Map: summarize chunks
    chunks = chunk_text_words(full_text, CHUNK_WORDS, CHUNK_OVERLAP)
    if not chunks:
        return summarize_chunk(tok, model, full_text, target_words)
    partials = [summarize_chunk(tok, model, ck, max_words=target_words//2) for ck in chunks]
    # Reduce: summarize the concatenation of partial summaries
    joined = " ".join(partials)
    return summarize_chunk(tok, model, joined, max_words=target_words)

def bulletize(text: str, max_bullets=12) -> List[str]:
    """Very light bullet extraction: split by sentences and pick salient ones by length & keywords."""
    sents = re.split(r'(?<=[.!?])\s+', text)
    # simple scoring: prefer sentences with key edu-ish words or medium length
    KEYS = {"key","main","important","note","definition","example","conclusion","therefore","because","causes","result","summary"}
    scored = []
    for s in sents:
        t = s.strip()
        if not t:
            continue
        score = 0
        w = len(t.split())
        score += -abs(w - 18)  # prefer ~18 words
        if any(k in t.lower() for k in KEYS):
            score += 3
        scored.append((score, t))
    scored.sort(reverse=True, key=lambda x: x[0])
    return [t for _, t in scored[:max_bullets]]

def top_key_phrases(text: str, top_k=20) -> List[str]:
    """Quick key-phrase mining: collocations of 1–3 words, filtered stopwords."""
    stop = set("""
        a an the and or if in on with by for to of from that this these those as is are was were be been being have has had do does did not no yes it its it's
        at into over under between within without through about across up down out off your you we they he she them his her our their than then there here
    """.split())
    tokens = [re.sub(r"[^a-z0-9\-]", "", w.lower()) for w in re.findall(r"\b[\w\-']+\b", text)]
    tokens = [t for t in tokens if t and t not in stop and not t.isdigit() and len(t) > 2]
    # build unigrams/bigrams/trigrams
    from collections import Counter
    unis = Counter(tokens)
    bigs = Counter([" ".join(tokens[i:i+2]) for i in range(len(tokens)-1)])
    tris = Counter([" ".join(tokens[i:i+3]) for i in range(len(tokens)-2)])
    # blend (favor longer phrases)
    scores = {}
    for k,v in unis.items(): scores[k] = scores.get(k,0) + v
    for k,v in bigs.items(): scores[k] = scores.get(k,0) + v*2
    for k,v in tris.items(): scores[k] = scores.get(k,0) + v*3
    # sort and filter duplicates where longer phrase contains shorter one
    phrases = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    final = []
    used = set()
    for p,_ in phrases:
        if any(p in u for u in used if p != u):
            continue
        used.add(p)
        final.append(p)
        if len(final) >= top_k:
            break
    return final

def words_count(s: str) -> int:
    return len(re.findall(r"\S+", s))

# ----------------- MAIN FLOW ---------------------
def main(input_source: str):
    ensure_outdir(OUT_DIR)

    # 1) Resolve input
    src = input_source.strip().strip('"')
    if not src:
        raise SystemExit("Please set INPUT_SOURCE to a YouTube URL, a local media path, or a transcript file.")

    if is_url(src):
        audio_path = download_youtube_audio(src)
        transcript = whisper_transcribe(audio_path)
        basename = "youtube_lecture"
    else:
        p = Path(src)
        if not p.exists():
            raise SystemExit(f"Input not found: {p}")
        if p.suffix.lower() in MEDIA_EXTS:
            transcript = whisper_transcribe(p)
            basename = p.stem
        elif p.suffix.lower() in TEXT_EXTS:
            transcript = read_text_file(p)
            basename = p.stem
        else:
            # try anyway: whisper handles many formats if ffmpeg can read it
            try:
                transcript = whisper_transcribe(p)
                basename = p.stem
            except Exception as e:
                raise SystemExit(f"Unsupported input type and ASR failed: {e}")

    # Save transcript
    transcript_path = OUT_DIR / f"{basename}_transcript.txt"
    transcript_path.write_text(transcript, encoding="utf-8")
    print(f"[SAVE] Transcript -> {transcript_path} ({words_count(transcript)} words)")

    # 2) Load summarizer
    tok, model = load_summarizer(SUMMARY_MODEL, DEVICE)

    # 3) Summaries (short/medium/long)
    print("[INFO] Summarizing (short)…")
    summary_short  = map_reduce_summarize(transcript, tok, model, target_words=SHORT_MAX_WORDS)
    print("[INFO] Summarizing (medium)…")
    summary_medium = map_reduce_summarize(transcript, tok, model, target_words=MEDIUM_MAX_WORDS)
    print("[INFO] Summarizing (long)…")
    summary_long   = map_reduce_summarize(transcript, tok, model, target_words=LONG_MAX_WORDS)

    # 4) Notes & key phrases
    bullets    = bulletize(summary_long, max_bullets=14)
    keyphrases = top_key_phrases(transcript, top_k=25)

    # 5) Save all artifacts
    (OUT_DIR / f"{basename}_summary_short.txt").write_text(summary_short,  encoding="utf-8")
    (OUT_DIR / f"{basename}_summary_medium.txt").write_text(summary_medium, encoding="utf-8")
    (OUT_DIR / f"{basename}_summary_long.txt").write_text(summary_long,   encoding="utf-8")

    notes_md = "# Bullet Notes\n\n" + "\n".join([f"- {b}" for b in bullets])
    (OUT_DIR / f"{basename}_notes.md").write_text(notes_md, encoding="utf-8")

    meta = {
        "input": src,
        "asr_model": ASR_MODEL,
        "summary_model": SUMMARY_MODEL,
        "words_transcript": words_count(transcript),
        "chunks_config": {"chunk_words": CHUNK_WORDS, "overlap": CHUNK_OVERLAP},
        "lengths": {
            "short_words":  SHORT_MAX_WORDS,
            "medium_words": MEDIUM_MAX_WORDS,
            "long_words":   LONG_MAX_WORDS
        },
        "key_phrases": keyphrases[:25],
    }
    (OUT_DIR / f"{basename}_meta.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")

    print("\n[DONE] Saved:")
    print(" -", transcript_path)
    print(" -", OUT_DIR / f"{basename}_summary_short.txt")
    print(" -", OUT_DIR / f"{basename}_summary_medium.txt")
    print(" -", OUT_DIR / f"{basename}_summary_long.txt")
    print(" -", OUT_DIR / f"{basename}_notes.md")
    print(" -", OUT_DIR / f"{basename}_meta.json")

# ----------------- ENTRYPOINT --------------------
if __name__ == "__main__":
    # If you want CLI usage, uncomment below and run:
    # python lecture_summarizer.py --input "<url or path>" --outdir "C:\Users\sagni\Downloads\Edu Vision\outputs"
    #
    # import argparse
    # ap = argparse.ArgumentParser()
    # ap.add_argument("--input", required=True, help="YouTube URL, media file, or transcript file")
    # ap.add_argument("--outdir", default=str(OUT_DIR), help="Output directory")
    # ap.add_argument("--asr_model", default=ASR_MODEL, help="Whisper size: tiny/base/small/medium/large")
    # ap.add_argument("--device", default=DEVICE, help="cpu or cuda")
    # ap.add_argument("--summary_model", default=SUMMARY_MODEL)
    # args = ap.parse_args()
    #
    # INPUT_SOURCE = args.input
    # OUT_DIR = Path(args.outdir); ensure_outdir(OUT_DIR)
    # ASR_MODEL = args.asr_model; DEVICE = args.device; SUMMARY_MODEL = args.summary_model
    pass

# To run in a notebook or directly:
# 1) Set INPUT_SOURCE above, e.g.:
#    INPUT_SOURCE = "https://www.youtube.com/watch?v=XXXXX"
#    or INPUT_SOURCE = r"C:\path\to\lecture.mp4"
#    or INPUT_SOURCE = r"C:\path\to\transcript.txt"
# 2) Then call:
# main(INPUT_SOURCE)
