<a href="https://colab.research.google.com/github/suryap19997-spec/Classroom-Analysis/blob/main/New_Classroom_Content_Analysis_End_to_End_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Step - 1: üîß Batch compress all Classroom Recording MP4 files**

In [None]:
#@title üîß Batch compress all MP4 files in "School Project Video" folder to ‚â§70 MB each
import os, json, subprocess, shlex, textwrap

# ===== 0) Install ffmpeg =====
!apt -y -qq install ffmpeg >/dev/null

# ===== 1) Mount Google Drive =====
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# ===== 2) Paths =====
input_folder  = "/content/drive/MyDrive/Colab Notebooks/Reduce Size/School Project Video"
output_folder = os.path.join(input_folder, "New")
os.makedirs(output_folder, exist_ok=True)

target_size_mb = 70   #@param {type:"number"}
max_width      = 854  #@param {type:"number"} # 854‚âà480p, 640‚âà360p
audio_kbps     = 96   #@param {type:"number"}

# ===== 3) Helpers =====
def run(cmd):
    p = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    if p.returncode != 0:
        print(p.stdout)
        raise RuntimeError(f"Command failed: {cmd}")
    return p.stdout

def probe_duration(path):
    cmd = f'ffprobe -v error -show_entries format=duration -of json {shlex.quote(path)}'
    out = run(cmd)
    return float(json.loads(out)["format"]["duration"])

def compute_target_bitrates(target_mb, duration_s, audio_kbps=96):
    total_kbps = (target_mb * 8 * 1024) / max(duration_s, 1e-6)
    video_kbps = max(50, int(total_kbps - audio_kbps))
    return video_kbps, int(audio_kbps)

def human_time(s):
    s = int(round(s))
    h, m, s = s//3600, (s%3600)//60, s%60
    return f"{h:02d}:{m:02d}:{s:02d}"

# ===== 4) Loop through all mp4 files =====
for fn in sorted(os.listdir(input_folder)):
    if not fn.lower().endswith(".mp4"):
        continue
    src = os.path.join(input_folder, fn)
    stem, _ = os.path.splitext(fn)
    out_path = os.path.join(output_folder, f"{stem}_compressed.mp4")

    # Skip if already exists
    if os.path.exists(out_path):
        print(f"‚è≠Ô∏è Skipping {fn} (already compressed)")
        continue

    # Probe and compute bitrate
    dur = probe_duration(src)
    v_kbps, a_kbps = compute_target_bitrates(target_size_mb, dur, audio_kbps)
    print(f"\n‚ñ∂ {fn} | {human_time(dur)} | v~{v_kbps}k + a{a_kbps}k")

    scale_filter = f"scale='min({int(max_width)},iw)':-2"
    passlog = "/content/ffmpeg2pass"

    # Pass 1
    cmd1 = f"""
    ffmpeg -y -hide_banner -loglevel error -i {shlex.quote(src)} -vf {shlex.quote(scale_filter)} \
      -c:v libx264 -preset medium -b:v {v_kbps}k -pass 1 -passlogfile {shlex.quote(passlog)} \
      -an -movflags +faststart -f mp4 /dev/null
    """
    # Pass 2
    cmd2 = f"""
    ffmpeg -y -hide_banner -loglevel error -i {shlex.quote(src)} -vf {shlex.quote(scale_filter)} \
      -c:v libx264 -preset medium -b:v {v_kbps}k -pass 2 -passlogfile {shlex.quote(passlog)} \
      -c:a aac -b:a {a_kbps}k -movflags +faststart {shlex.quote(out_path)}
    """

    print("‚Äî Pass 1 ‚Äî")
    run(textwrap.dedent(cmd1))
    print("‚Äî Pass 2 ‚Äî")
    run(textwrap.dedent(cmd2))

    # Clean logs
    for ext in (".log", ".log.mbtree"):
        p = f"{passlog}{ext}"
        if os.path.exists(p):
            os.remove(p)

    final_mb = os.path.getsize(out_path)/(1024*1024)
    print(f"   ‚úÖ Done: {out_path} ({final_mb:.2f} MB)")


## **Step 2: Creating audio file for each compressed video for Script Analysis**

A) Create Excel sheet with video path and its corresponding audio path on drive (mp3)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import pandas as pd
import shutil

# Base folder
video_folder = '/content/drive/MyDrive/Colab Notebooks/Reduce Size/School Project Video/New/'
audio_subfolder = os.path.join(video_folder, 'audio_files')

# Step 1: Create audio_files folder if it doesn't exist
os.makedirs(audio_subfolder, exist_ok=True)

# Step 2: Get only .mp4 and .mp3 files in the main folder
target_files = [f for f in os.listdir(video_folder)
                if os.path.isfile(os.path.join(video_folder, f)) and f.lower().endswith(('.mp4', '.mp3'))]

# Step 3: Copy .mp3 files to audio_files folder
for file in target_files:
    if file.lower().endswith('.mp3'):
        src = os.path.join(video_folder, file)
        dst = os.path.join(audio_subfolder, file)
        if not os.path.exists(dst):
            shutil.copy(src, dst)
            print(f"üìÅ Copied: {file} ‚Üí audio_files/")
        else:
            print(f"‚ö†Ô∏è Skipped (already exists): {file}")

# Step 4: Create records with video_path and audio_path
records = []
for file in target_files:
    full_path = os.path.join(video_folder, file)
    audio_file_name = file.replace('.MP4', '.mp3').replace('.mp4', '.mp3')
    audio_path = os.path.join(audio_subfolder, audio_file_name)
    records.append({
        'video_path': full_path,
        'audio_path': audio_path
    })

# Step 5: Save to Excel
df = pd.DataFrame(records)
excel_path = os.path.join(video_folder, 'video_file_list_with_links.xlsx')
df.to_excel(excel_path, index=False)

print(f"\n‚úÖ Excel saved at: {excel_path}")
print(f"‚úÖ Audio folder ensured at: {audio_subfolder}")


B) Creating audio file (mp3) from compressed video and saving in above path

In [None]:
!apt-get update -qq
!apt-get install -y ffmpeg

import pandas as pd
import subprocess

# Load the Excel with video and audio paths
excel_path = '/content/drive/MyDrive/Colab Notebooks/Reduce Size/School Project Video/New/video_file_list_with_links.xlsx'
df = pd.read_excel(excel_path)

# Loop through each row and run ffmpeg
for index, row in df.iterrows():
    video_path = row['video_path']
    output_audio_path = row['audio_path']

    # Skip if video_path is already an .mp3 file
    if str(video_path).lower().endswith('.mp3'):
        print(f"‚è≠Ô∏è Skipping already-audio file: {video_path}")
        continue

    print(f"üé¨ Extracting audio from: {video_path}")

    # Run ffmpeg command
    command = f'ffmpeg -i "{video_path}" -vn -acodec libmp3lame -y "{output_audio_path}"'
    subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    print(f"‚úÖ Saved audio to: {output_audio_path}")


C) Creating a excel wih video-audio path-link (google drive)

In [None]:
import os
import pandas as pd
from google.colab import auth
from googleapiclient.discovery import build
from google.auth import default

# === Step 1: Google Drive Auth ===
auth.authenticate_user()
creds, _ = default()
drive_service = build('drive', 'v3', credentials=creds)

# === Step 2: Load Excel with video/audio paths ===
excel_path = '/content/drive/MyDrive/Colab Notebooks/Reduce Size/School Project Video/New/video_file_list_with_links.xlsx'
df = pd.read_excel(excel_path)

# === Step 3: Get Shareable Link of Existing File in Drive ===
def get_existing_file_link(file_path):
    file_name = os.path.basename(file_path)

    # Search for file by exact name
    query = f"name = '{file_name}' and trashed = false"
    results = drive_service.files().list(
        q=query,
        fields="files(id, name, parents)",
        spaces='drive'
    ).execute()

    files = results.get('files', [])
    if not files:
        print(f"‚ùå File not found in Drive: {file_name}")
        return ""

    file_id = files[0]['id']

    # Make file public
    try:
        drive_service.permissions().create(
            fileId=file_id,
            body={'role': 'reader', 'type': 'anyone'},
            fields='id'
        ).execute()
    except Exception as e:
        print(f"‚ö†Ô∏è Permission error for {file_name}: {e}")

    return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"

# === Step 4: Loop through rows and create links ===
video_links = []
audio_links = []

for index, row in df.iterrows():
    video_path = row['video_path']
    audio_path = row['audio_path']

    print(f"\nüîó Processing: {os.path.basename(video_path)}")
    video_link = get_existing_file_link(video_path)
    audio_link = get_existing_file_link(audio_path)

    video_links.append(video_link)
    audio_links.append(audio_link)

# === Step 5: Update and Save Excel ===
df['video_link'] = video_links
df['audio_link'] = audio_links

df.to_excel(excel_path, index=False)
print(f"\n‚úÖ Excel updated with links: {excel_path}")


# **Step 3: Convert Audio to Script with diarization**

Excel list of audio files ‚Üí converted audio ‚Üí chunks & sends to Gemini for transcription and speaker labeling ‚Üí translates English output to Hindi

In [None]:
# ======================== Minimal Installs ========================
# Use system ffmpeg; do not upgrade pandas/requests pinned by Colab.
!apt-get -qq update >/dev/null
!apt-get -qq install -y ffmpeg >/dev/null
!pip -q install openpyxl  # for Excel I/O (keeps Colab's pandas version)

# ============================ Imports ============================
import os, io, json, time, math, wave, contextlib, subprocess, shlex, base64, requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from google.colab import drive

# =========================== Mount Drive =========================
drive.mount('/content/drive', force_remount=False)

# =========================== Configuration ======================
# Excel columns expected: video_path, audio_path, video_link, audio_link
excel_path = "/content/drive/MyDrive/Colab Notebooks/Reduce Size/School Project Video/New/video_file_list_with_links.xlsx"
output_excel_path = "/content/drive/MyDrive/Colab Notebooks/Reduce Size/School Project Video/New/All_Captions_Excel.xlsx"

GEMINI_API_KEY = "AIzaSyAncHidRLxZpPXhCTsHFIrbSz6BcZhVi_o"   # <-- put your API key (rotate if you exposed one earlier)

# REST model to use for chunk transcription/diarization and translation
MODEL_CHUNK = "gemini-1.5-pro"

# Chunking & runtime knobs
LONG_AUDIO_THRESH_SEC = 90         # split if audio longer than this
CHUNK_SEC = 60                     # chunk length (sec)
PER_CHUNK_TIMEOUT = 150            # timeout per chunk REST call (sec)
RETRIES_PER_CHUNK = 2              # retries per chunk
ITEM_WATCHDOG_SEC = 540            # max time per file (sec)
SAVE_EVERY_N_ROWS = 1              # save progress after each processed row
DO_HINDI_TRANSLATION = True        # set False if you don't need Hindi column

SUPPORTED_LANGUAGES = {
    "hi": "Hindi", "mr": "Marathi", "pa": "Punjabi", "en": "English", "bn": "Bengali"
}

# ============================== Helpers ==========================
def format_ts(seconds):
    """Format seconds as [MM:SS.mmm]."""
    try:
        s = float(seconds)
    except Exception:
        return "[00:00.000]"
    m, s = divmod(s, 60.0)
    return f"[{int(m):02d}:{s:06.3f}]"

def strip_code_fences(s: str) -> str:
    s = (s or "").strip()
    if s.startswith("```"):
        s = s[3:]
        if "\n" in s: s = s.split("\n", 1)[1]
    if s.endswith("```"):
        s = s[:-3]
    return s.strip()

def ensure_wav_16k_mono(in_path: str) -> str:
    """
    Convert any input to 16 kHz mono PCM WAV via ffmpeg (no moviepy).
    Returns path to /content/tmp_audio_16k_mono.wav or original on failure.
    """
    out_path = "/content/tmp_audio_16k_mono.wav"
    try:
        cmd = f'ffmpeg -y -i {shlex.quote(in_path)} -vn -ac 1 -ar 16000 -acodec pcm_s16le {shlex.quote(out_path)}'
        r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if r.returncode != 0:
            print("‚ö†Ô∏è ffmpeg convert failed:", r.stderr.decode(errors="ignore")[:300])
            return in_path
        return out_path
    except Exception as e:
        print(f"‚ö†Ô∏è ffmpeg convert error: {e}")
        return in_path

def wav_duration_sec(path: str) -> float:
    with contextlib.closing(wave.open(path, 'rb')) as w:
        frames = w.getnframes()
        rate = w.getframerate() or 16000
        return frames / float(rate)

def read_wav_bytes(path: str, start_sec: float = 0.0, end_sec: float = None) -> bytes:
    """Return a WAV slice [start_sec, end_sec) as bytes with a proper header."""
    with wave.open(path, 'rb') as w:
        n_channels = w.getnchannels()
        sampwidth = w.getsampwidth()
        framerate = w.getframerate()
        nframes = w.getnframes()
        start_frame = max(0, min(int((start_sec or 0.0) * framerate), nframes))
        end_frame = nframes if end_sec is None else max(start_frame, min(int(end_sec * framerate), nframes))
        w.setpos(start_frame)
        raw = w.readframes(end_frame - start_frame)
    buf = io.BytesIO()
    with wave.open(buf, 'wb') as out:
        out.setnchannels(n_channels); out.setsampwidth(sampwidth); out.setframerate(framerate)
        out.writeframes(raw)
    return buf.getvalue()

# ---------- REST helpers ----------
def rest_generate_text(prompt: str, *, model=MODEL_CHUNK, api_key=GEMINI_API_KEY, timeout=120) -> str:
    """Simple REST text generation (used for translation)."""
    url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}"
    body = {"contents":[{"parts":[{"text": prompt}]}]}
    r = requests.post(url, json=body, timeout=timeout, proxies={"http": None, "https": None})
    r.raise_for_status()
    data = r.json()
    return data["candidates"][0]["content"]["parts"][0]["text"].strip()

def call_gemini_chunk(audio_wav_bytes: bytes,
                      timeout_sec: int = PER_CHUNK_TIMEOUT,
                      model: str = MODEL_CHUNK,
                      api_key: str = GEMINI_API_KEY) -> dict:
    """
    REST version: one chunk -> JSON (speaker-labeled, English-only).
    Uses inline_data (base64) and disables proxies explicitly to avoid localhost routing.
    """
    prompt = """
Transcribe and translate all speech to English.
Diarize with these speaker labels ONLY (pick best match or "Unknown"):
- "Male Kid"
- "Female Kid"
- "Male Teacher (nearby)"
- "Teacher (on laptop)"
- "Unknown"

Split into sentence-level segments.
Each segment JSON must be:
{"start": number, "end": number, "speaker": "Male Kid|Female Kid|Male Teacher (nearby)|Teacher (on laptop)|Unknown", "text_en": "..."}

Return ONLY JSON exactly:
{
  "detected_language_code": "en|hi|mr|pa|bn",
  "segments": [
    {"start": 0.00, "end": 1.23, "speaker": "Male Kid", "text_en": "Hello ..."}
  ]
}
""".strip()

    url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}"
    body = {
        "contents": [{
            "parts": [
                {"text": prompt},
                {"inline_data": {
                    "mime_type": "audio/wav",
                    "data": base64.b64encode(audio_wav_bytes).decode("ascii")
                }}
            ]
        }]
    }
    r = requests.post(url, json=body, timeout=timeout_sec, proxies={"http": None, "https": None})
    r.raise_for_status()
    data = r.json()

    try:
        txt = data["candidates"][0]["content"]["parts"][0]["text"]
    except Exception:
        raise RuntimeError(f"Bad REST response: {str(data)[:500]}")

    txt = strip_code_fences(txt)
    obj = json.loads(txt)
    if "segments" not in obj:
        raise ValueError("JSON missing 'segments'.")
    if "detected_language_code" not in obj:
        obj["detected_language_code"] = "en"
    return obj

# ---------- Main transcription (chunked, REST only) ----------
def gemini_transcribe_with_speakers(
    audio_path: str,
    long_audio_thresh_sec: int = LONG_AUDIO_THRESH_SEC,
    chunk_sec: int = CHUNK_SEC,
    per_chunk_timeout: int = PER_CHUNK_TIMEOUT,
    retries: int = RETRIES_PER_CHUNK,
    sleep_sec: int = 2
) -> dict:
    """
    Transcribe + diarize with chunking (inline bytes, REST).
    Returns:
      {"detected_language_code": str,
       "segments": [{"start": float, "end": float, "speaker": str, "text_en": str}, ...]}
    """
    wav = ensure_wav_16k_mono(audio_path)
    dur = wav_duration_sec(wav)
    print(f"   ‚è±Ô∏è Audio duration ‚âà {dur:.1f}s")

    def run_one(start_t: float, end_t: float):
        b = read_wav_bytes(wav, start_t, end_t)
        last_err = None
        for attempt in range(1, retries + 1):
            try:
                print(f"      ‚ñ∂ chunk {start_t:.1f}‚Äì{end_t:.1f}s (attempt {attempt})")
                return call_gemini_chunk(b, timeout_sec=per_chunk_timeout)
            except Exception as exc:
                last_err = exc
                print(f"         ‚ö†Ô∏è chunk failed: {exc}")
                time.sleep(sleep_sec)
        raise last_err

    # Short audio ‚Üí single call
    if dur <= long_audio_thresh_sec:
        return run_one(0.0, dur)

    # Long audio ‚Üí chunked
    print("   ‚úÇÔ∏è Long audio detected ‚Üí chunking.")
    offsets = []
    s = 0.0
    while s < dur:
        e = min(s + chunk_sec, dur)
        offsets.append((s, e))
        s = e

    results = []
    for (start_t, end_t) in offsets:
        try:
            d = run_one(start_t, end_t)
            # shift segment times by chunk offset (start_t)
            for seg in d.get("segments", []):
                seg["start"] = float(seg.get("start", 0.0)) + start_t
                seg["end"]   = float(seg.get("end",   0.0)) + start_t
            results.append(d)
        except Exception as exc:
            print(f"         ‚ùå skipping chunk {start_t:.1f}‚Äì{end_t:.1f}: {exc}")

    # Merge segments & language (majority vote)
    segments, langs = [], []
    for d in results:
        segments.extend(d.get("segments", []))
        langs.append((d.get("detected_language_code") or "en").lower())
    segments.sort(key=lambda x: float(x.get("start", 0.0)))

    if not langs:
        lang = "en"
    else:
        from collections import Counter
        lang = Counter(langs).most_common(1)[0][0]

    return {"detected_language_code": lang, "segments": segments}

def translate_text_rest(text: str, target_lang: str = "hi") -> str:
    """Translate final English transcript to a target language (REST)."""
    if not text.strip():
        return ""
    prompt = f"""Translate the following educational script to {target_lang.upper()}.
Preserve meaning and proper nouns. Avoid abusive words. Keep it suitable for parents and children.

{text}"""
    try:
        return rest_generate_text(prompt, model=MODEL_CHUNK, api_key=GEMINI_API_KEY, timeout=120)
    except Exception as exc:
        print(f"‚ö†Ô∏è Translation failed: {exc}")
        return ""

def build_linewise_english(data):
    """Build line-wise English transcript with fixed speaker labels."""
    code = (data.get("detected_language_code") or "en").lower()
    lang_name = SUPPORTED_LANGUAGES.get(code, code.upper())
    lines = []
    for seg in data.get("segments", []):
        try:
            start = float(seg.get("start", 0.0))
            speaker = str(seg.get("speaker", "Unknown")).strip() or "Unknown"
            text_en = (seg.get("text_en") or "").strip()
            if text_en:
                lines.append(f"{format_ts(start)} {speaker}: {text_en}")
        except Exception:
            continue
    english_text = "\n".join(lines)
    return lang_name, english_text

# ======================== Load Input Excel =======================
df = pd.read_excel(excel_path)
output_rows = []

# ================== Process Rows (watchdog + saves) =============
for idx, row in df.iterrows():
    try:
        video_path = row.get("video_path", "")
        audio_path = row.get("audio_path", "")
        video_link = row.get("video_link", "")
        audio_link = row.get("audio_link", "")

        if not audio_path or not os.path.exists(audio_path):
            print(f"\n‚ö†Ô∏è Skipping (no audio): {os.path.basename(video_path or 'unknown_video')}")
            continue

        label = os.path.basename(video_path or audio_path)
        print(f"\nüéß [{idx+1}/{len(df)}] Processing (chunk-wise diarization): {label}")

        # Per-item watchdog to prevent hangs
        data = None
        try:
            with ThreadPoolExecutor(max_workers=1) as ex:
                fut = ex.submit(gemini_transcribe_with_speakers, audio_path)
                data = fut.result(timeout=ITEM_WATCHDOG_SEC)
        except FuturesTimeoutError:
            print(f"   ‚è±Ô∏è Item timeout after {ITEM_WATCHDOG_SEC}s")
            data = None

        if not data:
            output_rows.append({
                "video_path": video_path, "audio_path": audio_path,
                "video_link": video_link, "audio_link": audio_link,
                "detected_language": "Unknown",
                "cleaned_transcript": "", "hindi_translated": "",
                "english_translated": "", "error": "Item timeout/failure"
            })
            pd.DataFrame(output_rows).to_excel(output_excel_path, index=False)
            print("   ‚ùå Skipped (progress saved).")
            continue

        lang_name, english_text = build_linewise_english(data)
        hindi_text = translate_text_rest(english_text, target_lang="hi") if DO_HINDI_TRANSLATION else ""

        output_rows.append({
            "video_path": video_path, "audio_path": audio_path,
            "video_link": video_link, "audio_link": audio_link,
            "detected_language": lang_name,
            "cleaned_transcript": english_text,   # English, line-wise, speaker-labeled
            "hindi_translated": hindi_text,       # optional
            "english_translated": english_text
        })

        # Save progress incrementally
        if SAVE_EVERY_N_ROWS and (len(output_rows) % SAVE_EVERY_N_ROWS == 0):
            pd.DataFrame(output_rows).to_excel(output_excel_path, index=False)
            print(f"   üíæ Progress saved ‚Üí {output_excel_path}")

        # Clean temp wav
        tmp_wav = "/content/tmp_audio_16k_mono.wav"
        if os.path.exists(tmp_wav):
            try: os.remove(tmp_wav)
            except: pass

        print("   ‚úÖ Row completed.")

    except KeyboardInterrupt:
        print("\nüõë Interrupted ‚Äî saving partial results‚Ä¶")
        pd.DataFrame(output_rows).to_excel(output_excel_path, index=False)
        raise
    except Exception as exc:
        print(f"‚ùå Unexpected error on row {idx}: {exc}")
        pd.DataFrame(output_rows).to_excel(output_excel_path, index=False)
        continue

# =========================== Final Save ==========================
pd.DataFrame(output_rows).to_excel(output_excel_path, index=False)
print(f"\nüìÅ All results saved to: {output_excel_path}")


IndentationError: unindent does not match any outer indentation level (<tokenize>, line 326)

# **Step 4: Data Analysis**

# i) Data Preparation:

Transform Script in Readable format

A) Add time stamp. speaker, linewise utterance columns

In [None]:
import re
import pandas as pd

# --- CONFIGURATION ---
INPUT_FILE = "AllExcelCaptionData.xlsx"        # Sheet containing all chapters
OUTPUT_FILE = "Linewise_Transcript.xlsx"
SHEET_NAME = "Sheet1"                       # Sheet index or name

# --- STEP 1: READ ALL CHAPTERS ---
df = pd.read_excel(INPUT_FILE, sheet_name=SHEET_NAME)

all_rows = []

# --- STEP 2: LOOP THROUGH EACH CHAPTER ---
for i, row in df.iterrows():
    video_path = row.get("video_path", "")
    audio_path = row.get("audio_path", "")
    video_link = row.get("video_link", "")
    audio_link = row.get("audio_link", "")
    detected_language = row.get("detected_language", "Hindi")
    transcript = str(row.get("transcript_text", ""))

    # --- STEP 3: EXTRACT timestamp, speaker, utterance ---
    pattern = r"\[(\d{2}:\d{2}(?:\.\d{3})?)\]\s*([^:]+):\s*(.*?)(?=\s*\[\d{2}:\d{2}\.\d{3}\]|$)"
    matches = re.findall(pattern, transcript, re.DOTALL)

    for match in matches:
        timestamp, speaker, utterance = match
        all_rows.append({
            "chapter_id": row.get("chapter_id", i+1),
            "video_path": video_path,
            "audio_path": audio_path,
            "video_link": video_link,
            "audio_link": audio_link,
            "detected_language": detected_language,
            "timestamp": timestamp.strip(),
            "speaker": speaker.strip(),
            "utterance": utterance.strip()
        })

# --- STEP 4: CREATE OUTPUT DATAFRAME ---
final_df = pd.DataFrame(all_rows)

# --- STEP 5: SAVE OUTPUT ---
final_df.to_excel(OUTPUT_FILE, index=False)
print(f"‚úÖ Processed {len(df)} chapters and {len(final_df)} lines saved to '{OUTPUT_FILE}'.")


B) Add end_time, word count, duration column for each utterance (caption) in data

In [None]:
# STEP 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import os
from datetime import timedelta

# STEP 2: Helper - parse timestamp (MM:SS.SSS ‚Üí timedelta)
def parse_timestamp(ts_str):
    """Convert MM:SS.SSS to timedelta"""
    minutes, sec_ms = ts_str.split(":")
    seconds, millis = sec_ms.split(".")
    return timedelta(minutes=int(minutes),
                     seconds=int(seconds),
                     milliseconds=int(millis))

# STEP 3: Helper - format timedelta back to MM:SS.SSS
def format_timestamp(td):
    total_ms = int(td.total_seconds() * 1000)
    minutes = total_ms // 60000
    seconds = (total_ms % 60000) // 1000
    millis = total_ms % 1000
    return f"{minutes:02}:{seconds:02}.{millis:03}"

# STEP 4: Main function
def add_end_time(file_path, output_dir, words_per_second=2.2):
    # Read Excel
    df = pd.read_excel(file_path)

    # Convert timestamp to timedelta
    df['start_time_td'] = df['timestamp'].astype(str).apply(parse_timestamp)

    # Word count
    df['word_count'] = df['utterance'].astype(str).apply(lambda x: len(x.split()))

    # Duration estimate
    df['duration_sec'] = df['word_count'] / words_per_second

    # Approx end time
    df['end_time_td'] = df['start_time_td'] + df['duration_sec'].apply(lambda x: timedelta(seconds=x))

    # Clip if end_time > next start_time
    for i in range(len(df) - 1):
        next_start = df.loc[i+1, 'start_time_td']
        if df.loc[i, 'end_time_td'] > next_start:
            df.loc[i, 'end_time_td'] = next_start

    # Convert back to MM:SS.SSS format
    df['end_time'] = df['end_time_td'].apply(format_timestamp)

    # Drop helper cols if not needed
    df_out = df.drop(columns=['start_time_td', 'end_time_td'])

    # Save with same name into output_dir
    filename = os.path.basename(file_path)
    output_path = os.path.join(output_dir, filename)
    df_out.to_excel(output_path, index=False)

    print(f"‚úÖ File saved to: {output_path}")
    return df_out

# STEP 5: Run the function
# Example usage:
# Place your input file in the target Drive folder or adjust path below.
input_path = "/content/drive/MyDrive/Colab Notebooks/School Project_Reduce Size_Final Code/Data Analysis/Chapter Data Ready for Analysis.xlsx"
output_dir = "/content/drive/MyDrive/Colab Notebooks/School Project_Reduce Size_Final Code/Data Analysis/Data Transformation"

processed_df = add_end_time(input_path, output_dir)
processed_df.head()


C) Adding "is_question" column

# Method 1: Using AI API for identifying question

In [None]:
#testing Perplexity API
import requests

api_key = "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"

url = "https://api.perplexity.ai/chat/completions"
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json",
}
data = {
    "model": "sonar-pro",  # Insert the correct model id
    "messages": [{"role": "user", "content": "Hello, are you working?"}]
}


try:
    response = requests.post(url, json=data, headers=headers)
    print("Status code:", response.status_code)
    print("Content:", response.text)
    if response.status_code == 200:
        print("Perplexity API is working!")
    else:
        print("Check your API key or parameters.")
except requests.exceptions.RequestException as e:
    print("Error:", e)


üîπ Question Detector with Perplexity API

In [None]:
# ===============================
# üîπ Question Detector with Perplexity API
# üîπ Save output to a chosen Drive folder
# ===============================

import pandas as pd
import requests
import time
from google.colab import auth
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
import io
import datetime

# Authenticate Google Drive
auth.authenticate_user()
drive_service = build('drive', 'v3')

# ===============================
# üîπ Configuration
# ===============================
PERPLEXITY_API_KEY = "pplx-ih7VJ9rxAc8qXZhfdYyJ2Xu7fDU5AGnc32xpyUXLXXXX"   # ‚Üê replace with your key
PERPLEXITY_URL = "https://api.perplexity.ai/chat/completions"

# ===============================
# üîπ Helper functions
# ===============================
def get_file_id_from_path(file_path):
    """Extract Google Drive file ID from shareable link or direct path"""
    if '/d/' in file_path:
        return file_path.split('/d/')[1].split('/')[0]
    return file_path

def download_file_from_drive(file_id, local_filename):
    """Download file from Google Drive"""
    request = drive_service.files().get_media(fileId=file_id)
    fh = io.BytesIO()
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
    fh.seek(0)
    with open(local_filename, 'wb') as f:
        f.write(fh.read())
    print(f"‚úÖ Downloaded: {local_filename}")

def upload_file_to_custom_location(local_filename, parent_folder_id=None):
    """Upload the output file to a chosen location in Google Drive"""
    file_metadata = {'name': local_filename}
    if parent_folder_id:
        file_metadata['parents'] = [parent_folder_id]
    media = MediaFileUpload(local_filename, resumable=True)
    uploaded_file = drive_service.files().create(
        body=file_metadata,
        media_body=media,
        fields='id, name, webViewLink'
    ).execute()
    print(f"\n‚úÖ Uploaded: {uploaded_file.get('name')}")
    print(f"üîó View file: {uploaded_file.get('webViewLink')}")
    return uploaded_file.get('id')

def check_question_with_perplexity(utterance):
    """Use Perplexity API to determine if utterance is a question"""
    headers = {
        "Authorization": f"Bearer {PERPLEXITY_API_KEY}",
        "Content-Type": "application/json",
    }
    prompt = f"""Analyze this utterance and determine if it's a question.
Utterance: "{utterance}"

Reply with ONLY "1" if it's a question, or "0" if it's not a question. No explanation needed."""
    payload = {
        "model": "sonar-pro",
        "messages": [{"role": "user", "content": prompt}]
    }
    try:
        response = requests.post(PERPLEXITY_URL, json=payload, headers=headers)
        if response.status_code == 200:
            result = response.json()
            answer = result['choices'][0]['message']['content'].strip()
            return 1 if '1' in answer else 0
        else:
            print(f"Error: {response.status_code}")
            return 0
    except Exception as e:
        print(f"Exception: {e}")
        return 0

# ===============================
# üîπ Main processing
# ===============================
# Input: Provide Google Drive file ID or shareable link
INPUT_FILE_PATH = input("15BE0sy2lvOwZ_SXvdEjBVBEyqqDn2yRK").strip()
LOCAL_INPUT_FILE = "input_transcript.csv"

# Extract file ID
file_id = get_file_id_from_path(INPUT_FILE_PATH)

# Download file
download_file_from_drive(file_id, LOCAL_INPUT_FILE)

# Read CSV or Excel
try:
    df = pd.read_csv(LOCAL_INPUT_FILE)
except:
    df = pd.read_excel(LOCAL_INPUT_FILE)

print(f"\nProcessing {len(df)} rows...")

# Add column
df['is_question'] = 0

# Process each utterance
for idx, row in df.iterrows():
    utterance = str(row.get('utterance', '')).strip()
    if utterance and utterance.lower() != 'nan':
        is_question = check_question_with_perplexity(utterance)
        df.at[idx, 'is_question'] = is_question
        print(f"Row {idx+1}/{len(df)} ‚Üí {is_question}")
        time.sleep(0.5)  # Rate limit

# Save output with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
LOCAL_OUTPUT_FILE = f"output_transcript_{timestamp}.csv"
df.to_csv(LOCAL_OUTPUT_FILE, index=False)

print("\n‚úÖ Processing complete.")

# Ask where to upload
chosen_folder = input("106uHJyfpf9-gUQSGS9_C6xHSssWiKgMk").strip()
if '/folders/' in chosen_folder:
    folder_id = chosen_folder.split('/folders/')[1].split('/')[0]
elif chosen_folder:
    folder_id = chosen_folder
else:
    folder_id = None

# Upload result
upload_file_to_custom_location(LOCAL_OUTPUT_FILE, folder_id)
print("\n‚úÖ Done! File uploaded successfully.")


# Method 2: Personalized NLP for Identifying Question

In [None]:
!pip install transformers datasets pandas --quiet


In [None]:
from google.colab import files
uploaded = files.upload()  # Select and upload Training Data Set


In [None]:
import pandas as pd

# Load CSV, ensure columns are 'utterance' and 'is_question'
df = pd.read_csv('utterances_is_question_17k_hybrid.csv')
# Preview your data
print(df.head())

# Rename columns for model input
df['text'] = df['utterance']
df['label'] = df['is_question'].astype(int)
dataset = df[['text', 'label']]
dataset.to_json('finetune_data.jsonl', orient='records', lines=True)


In [None]:
from datasets import load_dataset
hf_dataset = load_dataset('json', data_files='finetune_data.jsonl', split='train')

from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_name = 'distilbert-base-uncased'
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True, padding='max_length', max_length=128)

hf_dataset = hf_dataset.map(preprocess_function, batched=True)

from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="finetuned_distilbert",
    per_device_train_batch_size=8,
    num_train_epochs=2,   # Adjust as needed
    eval_strategy="steps",
    eval_steps=50,
    logging_steps=10,
    learning_rate=2e-5,
    fp16=True,
    report_to="none"
)


In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=hf_dataset,
    eval_dataset=hf_dataset,
)
trainer.train()
trainer.save_model("finetuned_distilbert") #save model

In [None]:
import os
print(os.listdir('.'))
print(os.listdir('finetuned_distilbert'))

In [None]:
!pip install -U transformers safetensors

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# Load tokenizer from original pretrained model repo (e.g., DistilBERT)
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

# Load fine-tuned model from local folder
model = AutoModelForSequenceClassification.from_pretrained("./finetuned_distilbert")

# ===============================
# üîπ Question Detector with Custom DistilBERT Model
# üîπ Save output to a chosen Drive folder
# ===============================

import pandas as pd
import time
from google.colab import auth
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
import io
import datetime
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# Authenticate Google Drive
auth.authenticate_user()
drive_service = build('drive', 'v3')

# ===============================
# üîπ Helper functions
# ===============================
def get_file_id_from_path(file_path):
    if '/d/' in file_path:
        return file_path.split('/d/')[1].split('/')[0]
    return file_path

def download_file_from_drive(file_id, local_filename):
    request = drive_service.files().get_media(fileId=file_id)
    fh = io.BytesIO()
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
    fh.seek(0)
    with open(local_filename, 'wb') as f:
        f.write(fh.read())
    print(f"‚úÖ Downloaded: {local_filename}")

def upload_file_to_custom_location(local_filename, parent_folder_id=None):
    file_metadata = {'name': local_filename}
    if parent_folder_id:
        file_metadata['parents'] = [parent_folder_id]
    media = MediaFileUpload(local_filename, resumable=True)
    uploaded_file = drive_service.files().create(
        body=file_metadata,
        media_body=media,
        fields='id, name, webViewLink'
    ).execute()
    print(f"\n‚úÖ Uploaded: {uploaded_file.get('name')}")
    print(f"üîó View file: {uploaded_file.get('webViewLink')}")
    return uploaded_file.get('id')

# === LOAD YOUR MODEL ONCE ===
MODEL_PATH = "finetuned_distilbert"  # Change if your folder name is different
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

# Load fine-tuned model from local folder
model = AutoModelForSequenceClassification.from_pretrained("./finetuned_distilbert")

def check_question_with_local_model(utterance):
    inputs = tokenizer(str(utterance), return_tensors="pt", truncation=True, padding=True, max_length=128)
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        predicted_class_id = logits.argmax(dim=1).item()
    return predicted_class_id  # 1 = question, 0 = not-question

# ===============================
# üîπ Main processing
# ===============================
INPUT_FILE_PATH = input("Enter Sheet ID or file link: ").strip()
LOCAL_INPUT_FILE = "input_transcript.csv"

file_id = get_file_id_from_path(INPUT_FILE_PATH)
download_file_from_drive(file_id, LOCAL_INPUT_FILE)

try:
    df = pd.read_csv(LOCAL_INPUT_FILE)
except:
    df = pd.read_excel(LOCAL_INPUT_FILE)

print(f"\nProcessing {len(df)} rows...")

df["is_question"] = 0

for idx, row in df.iterrows():
    utterance = str(row.get("utterance", "")).strip()
    if utterance and utterance.lower() != "nan":
        is_question = check_question_with_local_model(utterance)
        df.at[idx, "is_question"] = is_question
        print(f"Row {idx+1}/{len(df)} ‚Üí {is_question}")

# Save output with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
LOCAL_OUTPUT_FILE = f"output_transcript_{timestamp}.csv"
df.to_csv(LOCAL_OUTPUT_FILE, index=False)

print("\n‚úÖ Processing complete.")

chosen_folder = input("Enter destination folder ID or link: ").strip()
if "/folders/" in chosen_folder:
    folder_id = chosen_folder.split("/folders/")[1].split("/")[0]
elif chosen_folder:
    folder_id = chosen_folder
else:
    folder_id = None

upload_file_to_custom_location(LOCAL_OUTPUT_FILE, folder_id)
print("\n‚úÖ Done! File uploaded successfully.")


# ii) Metric Calculation

A) "Student_Talk_Rate", "Dialogue_Frequency", "Student_Agency"

In [None]:
# STEP 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import os
from datetime import timedelta
import re

# ================================
# Helper Functions
# ================================

def parse_timestamp(ts_str):
    """Convert MM:SS.SSS to timedelta"""
    minutes, sec_ms = ts_str.split(":")
    seconds, millis = sec_ms.split(".")
    return timedelta(minutes=int(minutes), seconds=int(seconds), milliseconds=int(millis))

def format_timestamp(td):
    total_ms = int(td.total_seconds() * 1000)
    minutes = total_ms // 60000
    seconds = (total_ms % 60000) // 1000
    millis = total_ms % 1000
    return f"{minutes:02}:{seconds:02}.{millis:03}"

# ================================
# Metric Calculation
# ================================
def calculate_metrics(df):
    # Convert times
    df['start_td'] = df['timestamp'].apply(parse_timestamp)
    df['end_td'] = df['end_time'].apply(parse_timestamp)
    df['word_count'] = df['utterance'].astype(str).apply(lambda x: len(x.split()))

    # Session duration
    session_dur = (df['end_td'].max() - df['start_td'].min()).total_seconds() / 60

    # Normalize speakers
    df['speaker_norm'] = df['speaker'].apply(lambda x: "Student" if "kid" in x.lower() or "student" in x.lower() else "Teacher")

    # Student Talk Rate
    student_words = df.loc[df['speaker_norm']=="Student", 'word_count'].sum()
    student_talk_rate = student_words / session_dur if session_dur>0 else 0

    # Dialogue Frequency
    dialogue_count = 0
    for i in range(1,len(df)):
        if df.loc[i,'speaker_norm'] != df.loc[i-1,'speaker_norm']:
            dialogue_count += 1
    dialogue_freq = dialogue_count / session_dur if session_dur>0 else 0

        # Average Wait Time (teacher Q -> immediate next student response only)
    waits = []
    for i in range(len(df) - 1):  # up to second last row
        row = df.iloc[i]
        next_row = df.iloc[i+1]

        if row['speaker_norm']=="Teacher" and is_question(row['utterance']):
            # Only count if immediate next utterance is a student
            if next_row['speaker_norm']=="Student":
                wait = (next_row['start_td'] - row['end_td']).total_seconds()
                if wait < 0:
                    wait = 0
                waits.append(wait)

    avg_wait = sum(waits)/len(waits) if waits else None


    # Student Agency (initiations)
    student_init, teacher_init = 0, 0
    for i in range(1,len(df)):
        gap = (df.loc[i,'start_td'] - df.loc[i-1]['end_td']).total_seconds()
        if df.loc[i,'speaker_norm'] != df.loc[i-1,'speaker_norm'] and gap>=3:
            if df.loc[i,'speaker_norm']=="Student":
                student_init += 1
            else:
                teacher_init += 1
    student_agency = student_init/teacher_init if teacher_init>0 else None

    return {
        "Student_Talk_Rate": student_talk_rate,
        "Dialogue_Frequency": dialogue_freq,
        "Student_Agency": student_agency
    }

# ================================
# Process All Files in Directory
# ================================
def process_directory(input_dir):
    all_summaries = []

    for filename in os.listdir(input_dir):
        if filename.endswith(".xlsx") and not filename.startswith("~$"):  # skip temp files
            file_path = os.path.join(input_dir, filename)
            print(f"Processing {filename}...")
            df = pd.read_excel(file_path)
            metrics = calculate_metrics(df)
            summary = pd.DataFrame([metrics])
            summary['Chapter_Name'] = df['Chapter_Name'].iloc[0]
            all_summaries.append(summary)

    if not all_summaries:
        print("‚ö†Ô∏è No .xlsx files found in directory.")
        return None

    final_df = pd.concat(all_summaries, ignore_index=True)

    # Reorder columns
    cols = ["Chapter_Name"] + [c for c in final_df.columns if c!="Chapter_Name"]
    final_df = final_df[cols]

    # Save combined file
    output_path = os.path.join(input_dir, "All_Chapters_Metrics_RuleBased.xlsx")
    final_df.to_excel(output_path, index=False)
    print(f"‚úÖ Combined metrics saved to: {output_path}")
    return final_df

# ================================
# Example Usage
# ================================
input_dir = "/content/drive/MyDrive/Colab Notebooks/School Project_Reduce Size_Final Code/Data Analysis/Data Transformation/Script Chapter Wise"

result = process_directory(input_dir)
print(result)


B) 'Question_Rate_Student', 'Question_Rate_Teacher'

In [None]:
import pandas as pd
from datetime import timedelta

# Mount your Google Drive in Colab before running this code
from google.colab import drive
drive.mount('/content/drive')

# Set your input CSV path here
csv_path = '/content/drive/MyDrive/Colab Notebooks/School Project_Reduce Size_Final Code/Data Analysis/output_transcript_20251027_131530.csv'

# Read the CSV file
df = pd.read_csv(csv_path)

# Parse time stamps for session duration computation
def parse_timestamp(ts_str):
    minutes, sec_ms = ts_str.split(":")
    seconds, millis = sec_ms.split(".")
    return timedelta(minutes=int(minutes), seconds=int(seconds), milliseconds=int(millis))

df['start_td'] = df['timestamp'].apply(parse_timestamp)
df['end_td'] = df['end_time'].apply(parse_timestamp)

# Normalize speaker column for consistent labeling
df['speaker_norm'] = df['speaker'].apply(lambda x: 'Student' if 'kid' in x.lower() or 'student' in x.lower() else 'Teacher')

# Function to compute question rates for each chapter
def compute_qrates(group):
    session_dur = (group['end_td'].max() - group['start_td'].min()).total_seconds() / 60
    student_questions = group[(group['speaker_norm'] == 'Student') & (group['is_question'] == 1)].shape[0]
    teacher_questions = group[(group['speaker_norm'] == 'Teacher') & (group['is_question'] == 1)].shape[0]
    student_q_rate = student_questions / session_dur if session_dur > 0 else 0
    teacher_q_rate = teacher_questions / session_dur if session_dur > 0 else 0
    return pd.Series({
        'Question_Rate_Student': student_q_rate,
        'Question_Rate_Teacher': teacher_q_rate
    })

# Group by chapter and compute rates
out_df = df.groupby('chapter').apply(compute_qrates).reset_index()

# Save to Excel/CSV
out_path = '/content/drive/MyDrive/Colab Notebooks/School Project_Reduce Size_Final Code/Data Analysis/Data Transformation/Question_Rates_by_Chapter_metric.xlsx'
out_df.to_excel(out_path, index=False)
print(f'Saved: {out_path}')
print(out_df)


# **Metric Analysis**

A) Categorization

In [None]:
# --- Step 1: Install and import libraries ---
!pip install --quiet gspread gspread_dataframe

import pandas as pd
import gspread
from gspread_dataframe import get_as_dataframe
from google.colab import auth
from google.auth import default

# --- Step 2: Authenticate your Google account ---
auth.authenticate_user()
creds, _ = default()
gc = gspread.authorize(creds)

# --- Step 3: Provide your Google Sheet ID and optional sheet/tab name ---
SHEET_ID = "1sFB3RA9qaPF8ChPuRkzTVMWhdB106ULFj5kH_EPSUzc"          # üîπ e.g. "1PhmF8WvpqRgcTBGKpWVznPiFaN_gzT-nPO4XjmSrUbI"
SHEET_NAME = "Sheet1"                    # üîπ tab name inside the Google Sheet

# --- Step 4: Open and read the sheet ---
worksheet = gc.open_by_key(SHEET_ID).worksheet(SHEET_NAME)

# Convert to pandas DataFrame
df = get_as_dataframe(worksheet, evaluate_formulas=True, dtype=str)

# --- Step 5: Clean and preview the data ---
df = df.dropna(how='all')  # Remove empty rows


# --- Logic for labeling ---
def mark_content(chapter):
    chapter_lower = chapter.lower()
    if "advance" in chapter_lower:
        return "Advance Module"        # ‚Üê your own content
    elif "basic" in chapter_lower:
        return "Traditional Module"       # ‚Üê traditional content
    else:
        return "Unclassified"

# --- Apply the rule ---
df["Content_Type"] = df["Chapter_Name"].apply(mark_content)

# --- View the result ---
print(df)



B) Mean Valuse Analysis Table

In [None]:
import pandas as pd
from IPython.display import display, HTML

# --- Ensure numeric columns are actually numeric ---
numeric_cols = [
    "Student_Talk_Rate",
    "Dialogue_Frequency",
    "Student_Agency",
    "Question_Rate_Student",
    "Question_Rate_Teacher"
]
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors="coerce")

# --- Compute means by module type ---
mean_df = df.groupby("Content_Type")[numeric_cols].mean().round(2)

# --- Extract Traditional and Advance means ---
try:
    basic_means = mean_df.loc["Traditional Module"]
    adv_means = mean_df.loc["Advance Module"]
except KeyError as e:
    raise ValueError(f"Check your Content_Type values ‚Äî missing one of them. Found: {mean_df.index.tolist()}")

# --- Build comparison table ---
table = pd.DataFrame({
    "Metric": numeric_cols,
    "Traditional Module Mean": basic_means.values,
    "Advance Module Mean": adv_means.values
})

# --- Calculate percentage difference ---
table["% Difference (‚Üë or ‚Üì)"] = (
    ((table["Advance Module Mean"] - table["Traditional Module Mean"]) / table["Traditional Module Mean"]) * 100
).round(0)
table["% Difference (‚Üë or ‚Üì)"] = table["% Difference (‚Üë or ‚Üì)"].apply(
    lambda x: f"‚Üë +{int(x)}%" if x >= 0 else f"‚Üì {int(x)}%"
)

# --- Style output to look like your screenshot ---
styled = (
    table.style
    .format({
        "Traditional Module Mean": "{:.2f}",
        "Advance Module Mean": "<b>{:.2f}</b>",
    })
    .hide(axis="index")
    .set_table_styles([
        {"selector": "th", "props": [("text-align", "left"), ("font-weight", "bold"), ("padding", "6px 10px")]},
        {"selector": "td", "props": [("text-align", "left"), ("padding", "6px 10px")]},
        {"selector": "caption", "props": [("caption-side", "top"), ("font-size", "16px"), ("font-weight", "bold")]}
    ])
    .set_caption("üìä 2Ô∏è‚É£ Descriptive Table")
)

display(HTML(styled.to_html()))


C) Mean Value Analysis Bar Chart

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Compute means for each metric by module type
means = df.groupby("Content_Type")[metrics].mean().reset_index()
means_melted = means.melt(id_vars="Content_Type", var_name="Metric", value_name="Mean_Value")

plt.figure(figsize=(8, 5))
sns.barplot(
    x="Metric",
    y="Mean_Value",
    hue="Content_Type",
    data=means_melted,
    palette={"Traditional Module": "#A0AEC0", "Advance Module": "#3182CE"}
)

plt.title("Advance vs Traditional ‚Äì Engagement Metrics Comparison", fontsize=13, weight="bold")
plt.xlabel("Engagement Metric", fontsize=10)
plt.ylabel("Mean Value", fontsize=10)
plt.xticks(rotation=20, ha="right")
plt.legend(title="Module Type")
plt.tight_layout()
plt.show()


D) Light-Weight Direction Tests (Pilot Scale)(Cohen‚Äôs d)

In [None]:
# üìâ 4Ô∏è‚É£ Light-Weight Direction Tests (Pilot Scale)
# =================================================
# Instead of running t-tests (which can be unreliable for small sample sizes),
# we‚Äôll compute **Cohen‚Äôs d** ‚Äî a standardized effect-size measure that tells us
# how strongly two groups differ, expressed in standard-deviation units.

import numpy as np
import pandas as pd

# --- Define the metrics of interest ---
metrics = [
    "Student_Talk_Rate",
    "Dialogue_Frequency",
    "Student_Agency",
    "Question_Rate_Student",
    "Question_Rate_Teacher"
]

# --- Define Cohen‚Äôs d function ---
def cohens_d(x, y):
    """
    Computes Cohen‚Äôs d effect size between two independent samples.
    Formula: (mean1 ‚àí mean2) / pooled standard deviation
    """
    nx, ny = len(x), len(y)
    dof = nx + ny - 2
    pooled_std = np.sqrt(((nx - 1) * x.var(ddof=1) + (ny - 1) * y.var(ddof=1)) / dof)
    return (x.mean() - y.mean()) / pooled_std

# --- Compute and display results ---
print("üìà Cohen‚Äôs d Effect Sizes (Advance vs Traditional)\n")
for col in metrics:
    x = df[df["Content_Type"] == "Advance Module"][col].dropna()
    y = df[df["Content_Type"] == "Traditional Module"][col].dropna()
    d = cohens_d(x, y)
    strength = (
        "üü¢ Large" if abs(d) >= 0.8 else
        "üü† Medium" if abs(d) >= 0.5 else
        "üü° Small" if abs(d) >= 0.2 else
        "‚ö™ Negligible"
    )
    direction = "‚Üë Higher in Advance" if d > 0 else "‚Üì Higher in Traditional"
    print(f"{col.replace('_',' '):25s} : d = {d:6.2f}  ‚Üí  {strength} ({direction})")

# --- Interpretation guide ---
print("\nüìò Interpretation (rule of thumb):")
print("   0.2 = small effect")
print("   0.5 = medium effect")
print("   0.8 = large effect\n")
print("If most metrics show d ‚â• 0.6 ‚Üí strong practical improvement in pilot terms.")


E) Directional Robustness Check ‚Äì Mann‚ÄìWhitney U Test

In [None]:
# üß™ 5Ô∏è‚É£ Directional Robustness Check ‚Äì Mann‚ÄìWhitney U Test
# =========================================================
# Cohen‚Äôs d showed very large differences, but with small pilot samples (‚âà6 per group),
# parametric assumptions (normality, equal variance) may not hold.
# The Mann‚ÄìWhitney U test is a *non-parametric* alternative that compares
# the median ranks between two groups ‚Äî robust even for small n or skewed data.

from scipy.stats import mannwhitneyu
import pandas as pd

# --- Metrics to test ---
metrics = [
    "Student_Talk_Rate",
    "Dialogue_Frequency",
    "Student_Agency",
    "Question_Rate_Student",
    "Question_Rate_Teacher"
]

# --- Run one-sided Mann‚ÄìWhitney U tests (Advance > Traditional) ---
print("üìä Mann‚ÄìWhitney U Test Results  (one-sided: Advance > Traditional)\n")

results = []
for col in metrics:
    a = df[df["Content_Type"] == "Advance Module"][col].dropna()
    b = df[df["Content_Type"] == "Traditional Module"][col].dropna()
    stat, p = mannwhitneyu(a, b, alternative="greater")
    results.append({"Metric": col.replace("_", " "),
                    "U-Statistic": stat,
                    "p-Value": p})
    sig = "‚úÖ directional (p < 0.10)" if p < 0.10 else "‚Äì"
    print(f"{col.replace('_',' '):25s} :  U = {stat:6.2f}   p = {p:6.3f}   {sig}")

# --- Optional: collect into a DataFrame for summary table ---
u_df = pd.DataFrame(results).round(3)
display(u_df)

# --- Interpretation note ---
print("\nüìò Interpretation:")
print("‚Ä¢ The Mann‚ÄìWhitney U test assesses whether Advance-module values tend to be higher than Traditional.")
print("‚Ä¢ p < 0.10 (one-sided) is interpreted here as *directional evidence* of higher engagement in Advance chapters.")
print("‚Ä¢ Because n ‚âà 6 per group, this is a robustness check ‚Äî not formal significance testing.")


F) Engagement Index Analysis

In [None]:
# üìä 6Ô∏è‚É£ Engagement Index Analysis
# ============================================
# Combine all five engagement metrics into a single composite "Engagement Index"
# and test whether Advance Modules show higher overall engagement than Traditional Modules.

import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from scipy.stats import mannwhitneyu
import seaborn as sns
import matplotlib.pyplot as plt

# --- 1Ô∏è‚É£ Define the metrics you already use ---
metrics = [
    "Student_Talk_Rate",
    "Dialogue_Frequency",
    "Student_Agency",
    "Question_Rate_Student",
    "Question_Rate_Teacher"
]

# --- 2Ô∏è‚É£ Compute Engagement Index (normalized mean across metrics) ---
scaler = MinMaxScaler()
df["Engagement_Index"] = scaler.fit_transform(df[metrics]).mean(axis=1)

# --- 3Ô∏è‚É£ Visualize boxplot for overall engagement ---
plt.figure(figsize=(6, 4))
sns.boxplot(
    x="Content_Type",
    y="Engagement_Index",
    data=df,
    palette={"Traditional Module": "#A0AEC0", "Advance Module": "#63B3ED"}
)
sns.stripplot(
    x="Content_Type",
    y="Engagement_Index",
    data=df,
    color="black",
    alpha=0.6,
    jitter=True,
    size=5
)
plt.title("Overall Engagement Index: Advance vs Traditional", fontsize=13, weight="bold")
plt.xlabel("Module Type", fontsize=10)
plt.ylabel("Engagement Index (0‚Äì1 scale)", fontsize=10)
plt.tight_layout()
plt.show()

# --- 4Ô∏è‚É£ Compute Cohen‚Äôs d for the composite index ---
def cohens_d(x, y):
    nx, ny = len(x), len(y)
    dof = nx + ny - 2
    pooled_std = np.sqrt(((nx - 1)*x.var(ddof=1) + (ny - 1)*y.var(ddof=1)) / dof)
    return (x.mean() - y.mean()) / pooled_std

adv = df[df["Content_Type"] == "Advance Module"]["Engagement_Index"]
trad = df[df["Content_Type"] == "Traditional Module"]["Engagement_Index"]

d = cohens_d(adv, trad)

# --- 5Ô∏è‚É£ Mann‚ÄìWhitney U test (one-sided) ---
stat, p = mannwhitneyu(adv, trad, alternative="greater")

# --- 6Ô∏è‚É£ Print results with professional interpretation ---
print("üìà Engagement Index Comparison (Advance vs Traditional)\n")
print(f"Cohen‚Äôs d = {d:.2f}")
print(f"Mann‚ÄìWhitney U = {stat:.2f}, p = {p:.3f}\n")

if p < 0.10:
    print("‚úÖ Directional evidence (p < 0.10): Advance Modules show higher overall engagement.")
else:
    print("‚ö™ No directional difference detected at p < 0.10 level.")

# Interpretation guide
print("\nüìò Interpretation:")
print("‚Ä¢ Engagement Index combines all five engagement dimensions into a single 0‚Äì1 scale.")
print("‚Ä¢ Cohen‚Äôs d shows magnitude of difference (0.2=small, 0.5=medium, 0.8=large).")
print("‚Ä¢ Mann‚ÄìWhitney U tests direction ‚Äî whether Advance > Traditional overall.")
print("‚Ä¢ Together, they summarize both strength and consistency of engagement improvements.")
