<div style="border:2px solid #4F8EF7;padding:14px;border-radius:10px;background:#F5FAFF">
<h2>Class Transcriber — Overview</h2>
<p>
This notebook transcribes class audio/video, pulls Forum metadata (title, section/schedule, date/time, attendance, events), and generates a polished PDF + CSV. It supports <b>Local</b>, <b>Google Drive</b>, and <b>Direct URL</b> inputs, and lets you save outputs to <b>Local</b> or <b>Google Drive</b>.
</p>

<h3>Quick Start</h3>
<ol>
  <li>Runtime → <b>Change runtime type</b> → T4 GPU (or CPU if GPU not available).</li>
  <li>Runtime → <b>Run all</b>. The notebook will pause to ask for:
    <ul>
      <li><b>Forum cURL</b> (copied from DevTools → Network → “Copy as cURL”)</li>
      <li><b>Audio source</b>: local / gdrive / url</li>
      <li><b>Output destination</b>: local / gdrive</li>
      <li><b>Privacy</b>: names / ids / both</li>
    </ul>
  </li>
</ol>

<h3>Important Notes</h3>
<ul>
  <li><b>Accuracy:</b> Machine transcripts can be imperfect—verify key details.</li>
  <li><b>URLs:</b> Signed links may expire. If a download fails, refresh the link.</li>
  <li><b>Privacy:</b> Use <code>ids</code> or <code>both</code> to anonymize student names.</li>
  <li><b>Drive:</b> Drive mounts only when you choose it for input or output.</li>
</ul>
</div>

In [None]:
# =========================
#  IMPORTS & DEPENDENCIES
# =========================

!pip install -q openai-whisper pydub requests iso8601 reportlab
!apt-get update -qq && apt-get install -y -qq ffmpeg

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [None]:
import gc
import whisper
import json
import datetime
from pathlib import Path
import torch
from pydub import AudioSegment
import numpy as np
import requests
import re
import iso8601
import csv
import subprocess
from datetime import timedelta
from IPython.display import clear_output
from tqdm.notebook import tqdm  # For Jupyter/Colab

In [None]:
# =========================
#  CONSTANTS & UTILITIES
#  (helpers, regex, spacing, labelers)
# =========================

def extract_ids_from_curl(curl_text: str):
    """
    Pull class/section/course IDs and the app link from the cURL.
    Prefers the Referer app URL; falls back to the API class URL.
    Returns dict: {"course_id": str|None, "section_id": str|None, "class_id": str|None, "class_link": str}
    """
    # 1) Try Referer header (e.g., https://forum.minerva.edu/app/courses/2933/sections/11209/classes/79183)
    ref_match = re.search(r"-H\s+['\"](?:referer|Referer):\s*([^'\"\r\n]+)", curl_text)
    ref = ref_match.group(1).strip() if ref_match else ""
    class_link = ""
    course_id = section_id = class_id = None

    if ref:
        m = re.search(r"/app/courses/(\d+)/sections/(\d+)/classes/(\d+)", ref)
        if m:
            course_id, section_id, class_id = m.group(1), m.group(2), m.group(3)
            class_link = ref

    # 2) Fallback: any API class URL in the cURL body
    if not class_id:
        m2 = re.search(r"/api/v1/class_grader/classes/(\d+)", curl_text)
        if m2:
            class_id = m2.group(1)
            class_link = f"https://forum.minerva.edu/app/classes/{class_id}"

    return {
        "course_id": course_id,
        "section_id": section_id,
        "class_id": class_id,
        "class_link": class_link
    }

from contextlib import nullcontext  # at top if not present

def download_to_temp(url: str) -> str:
    """
    Download a remote media file (MP3/MP4/WAV/M4A/AAC/OGG) to /content and
    return the local path. Works with signed URLs that don't need cookies.
    """
    base = url.split('?', 1)[0]
    suffix = Path(base).suffix or ".mp4"
    local_name = f"/content/input_from_url{suffix}"
    print(f"Downloading from URL: {url}")
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(local_name, "wb") as f:
            for chunk in r.iter_content(chunk_size=1024 * 1024):
                if chunk:
                    f.write(chunk)
    print(f"Downloaded URL to: {local_name}")
    return local_name

def resolve_audio_input() -> str:
    """
    Ask the user how to provide the audio/video: local upload, Google Drive, or URL.
    - local: user uploads in the Colab sidebar then provides a /content/... path
    - gdrive: lazily import & mount google.colab.drive, then prompt for Drive path
    - url: download file to /content and return the local temp path
    """
    print("2) Choose audio source: local  |  gdrive  |  url")
    src = input("Source [local/gdrive/url]: ").strip().lower()
    clear_output()

    if src == "gdrive":
        print("Mounting Google Drive (one-time authorization may be required)...")
        # Lazy import ONLY if user picked Google Drive
        from google.colab import drive   # imported here to avoid forcing the dependency otherwise
        drive.mount("/content/drive")
        print("Enter the file path inside Drive (e.g., /content/drive/MyDrive/recordings/lecture.mp4)")
        path = input("Drive path: ").strip()
        clear_output()
        return path

    if src == "url":
        print("Paste the direct file URL (signed URLs supported):")
        url = input("URL: ").strip()
        clear_output()
        return download_to_temp(url)

    # default → local upload path
    print("After uploading your file to Colab, enter its path")
    print("   (Typically this will be '/content/your_file.mp3' or '/content/your_file.mp4')")
    print()
    print("‼️  Wait for the upload to finish before submitting its path.")
    print("   Track progress in the bottom-left corner.")
    print()
    path = input("Path: ").strip()
    clear_output()
    return path

def free_cuda_mem():
    """Safely clear CUDA cache and trigger Python GC."""
    try:
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
    except Exception:
        pass
    try:
        gc.collect()
    except Exception:
        pass

def resolve_output_destination() -> str:
    """
    Ask where to save outputs (PDF/CSV): local folder or Google Drive folder.
    Lazily mounts Drive only if chosen. Ensures the folder exists.
    Returns an absolute folder path.
    """
    print("Where should I save the outputs? local  |  gdrive")
    dest = input("Destination [local/gdrive]: ").strip().lower()
    clear_output()

    if dest == "gdrive":
        # Lazy import/mount only when needed
        from google.colab import drive
        print("Mounting Google Drive...")
        drive.mount("/content/drive")
        print("Enter a Drive folder (e.g., /content/drive/MyDrive/Transcripts)")
        out_dir = input("Drive folder: ").strip()
        clear_output()
        if not out_dir:
            out_dir = "/content/drive/MyDrive"
        Path(out_dir).mkdir(parents=True, exist_ok=True)
        return out_dir

    # default: local
    print("Enter a local folder (press Enter for /content)")
    out_dir = input("Local folder: ").strip() or "/content"
    clear_output()
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    return out_dir

<div style="border:2px solid #444;padding:12px;border-radius:8px;background:#FBFBFB">
<h3>User Prompts</h3>
<p>
You’ll paste your Forum cURL, choose where your audio/video comes from (local / Google Drive / direct URL), decide where to save outputs (local / Google Drive), and set privacy (names / ids / both).
</p>
<ul>
  <li><b>Class ID auto-detected</b> from your cURL — no need to type it.</li>
  <li><b>Direct URL</b> option will download the media automatically.</li>
  <li><b>Outputs</b> (PDF/CSV) go to the folder you select, regardless of input source.</li>
</ul>
</div>

In [None]:
print("1) Paste your Forum cURL (right-click in Chrome DevTools → Copy as cURL):")
raw_curl = input().strip()
clear_output()

# Auto-derive Class ID (fallback to API URL if Referer is missing)
_ids = extract_ids_from_curl(raw_curl)
CLASS_ID = _ids.get("class_id") or (re.search(r"/api/v1/class_grader/classes/(\d+)", raw_curl).group(1)
                                    if re.search(r"/api/v1/class_grader/classes/(\d+)", raw_curl) else None)
if not CLASS_ID:
    raise ValueError(
        "Could not extract Class ID from your cURL. "
        "Open the class page and copy a request that includes a Referer like "
        "https://forum.minerva.edu/app/courses/.../sections/.../classes/<ID>, "
        "or a class API URL."
    )

# NEW: unified input selection (local / Google Drive / direct URL)
AUDIO_PATH = resolve_audio_input()

# NEW: choose where outputs (PDF/CSV) will be saved
OUTPUT_DIR = resolve_output_destination()

print("3) Student name privacy")
print("   Type one of: names  (show names)  |  ids  (anonymize to IDs)  |  both  (generate both)")
PRIVACY_MODE = input("Privacy mode [names/ids/both]: ").strip().lower()
if PRIVACY_MODE not in ("names", "ids", "both"):
    PRIVACY_MODE = "names"
clear_output()

print("Thank you! Here's what you provided:\n")
print(f"Class ID: {CLASS_ID}")
print(f"Audio Path: {AUDIO_PATH}")
print(f"Outputs Folder: {OUTPUT_DIR}")
print(f"Privacy Mode: {PRIVACY_MODE}")
print()
print("Starting the transcript generation process...")
print("⏳ Expect this to take about 15 minutes (varies with file length)")

Thank you! Here's what you provided:

Class ID: 81239
Audio Path: /content/input_from_url.mp4
Outputs Folder: /content/drive/MyDrive/AI Curriculum Projects Summer 2025/Transcriptions/Alex/v2.3
Privacy Mode: both

Starting the transcript generation process...
⏳ Expect this to take about 15 minutes (varies with file length)


<div style="border:2px solid #2E86C1;padding:12px;border-radius:8px;background:#F3F9FE">
<h3>Audio Prep & Transcription</h3>
<p>
Validates/normalizes audio (MP3/MP4/WAV/M4A/AAC/OGG), converts to a Whisper-optimized WAV, and transcribes in chunks to handle long recordings efficiently.
</p>
<ul>
  <li>Uses <b>torch.amp.autocast('cuda', ...)</b> (no deprecation warnings).</li>
  <li>Fixes common text issues (glued sentences, spacing).</li>
  <li>GPU (T4) recommended but not required.</li>
</ul>
</div>

In [None]:
class AudioPreprocessor:
    @staticmethod
    def validate_and_fix_file(file_path: str) -> str:
        """
        Validates and preprocesses audio files for optimal transcription.
        For MP4 files, first converts to MP3 as an intermediate step.
        """
        print(f"Validating file: {file_path}")
        if not Path(file_path).exists():
            raise FileNotFoundError(f"File not found: {file_path}")

        try:
            # If MP4, convert to MP3 (more tolerant), then to 16kHz mono WAV
            if file_path.lower().endswith('.mp4'):
                print(f"Converting MP4 to MP3 (intermediate step)...")
                mp3_path = file_path.rsplit('.', 1)[0] + '.mp3'
                result = subprocess.run([
                    'ffmpeg', '-y', '-v', 'warning', '-xerror',
                    '-i', file_path, '-vn',
                    '-acodec', 'libmp3lame', '-ar', '44100', '-ab', '192k', '-f', 'mp3',
                    mp3_path
                ], capture_output=True, text=True, check=False)

                if result.returncode == 0 and Path(mp3_path).exists() and Path(mp3_path).stat().st_size > 0:
                    print(f"Successfully converted to MP3: {mp3_path}")
                    return AudioPreprocessor._convert_to_whisper_wav(mp3_path)
                else:
                    print(f"MP3 conversion failed with error: {result.stderr}")
                    return AudioPreprocessor._python_extract_audio(file_path)

            # Common audio formats → Whisper WAV
            elif file_path.lower().endswith(('.mp3', '.m4a', '.aac', '.ogg')):
                print(f"Converting audio file to optimized WAV format...")
                return AudioPreprocessor._convert_to_whisper_wav(file_path)

            elif file_path.lower().endswith('.wav'):
                print(f"File is already in WAV format: {file_path}")
                return file_path

            else:
                raise ValueError(f"Unsupported file format: {file_path}. Please upload MP3, MP4, WAV, or other common audio/video format.")

        except Exception as e:
            print(f"Error during file processing: {str(e)}")
            raise

    @staticmethod
    def _convert_to_whisper_wav(audio_path: str) -> str:
        """Convert any audio file to WAV format optimized for Whisper (16kHz mono, s16le)."""
        wav_path = audio_path.rsplit('.', 1)[0] + '.wav'
        try:
            subprocess.run([
                'ffmpeg', '-y', '-i', audio_path,
                '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
                wav_path
            ], capture_output=True, text=True, check=True)
            print(f"Successfully converted to Whisper-optimized WAV: {wav_path}")
            return wav_path
        except subprocess.CalledProcessError as e:
            print(f"WAV conversion failed. ffmpeg error: {e.stderr}")
            raise RuntimeError(f"Failed to convert {audio_path} to WAV format")

    @staticmethod
    def _python_extract_audio(file_path: str) -> str:
        """Fallback extraction using PyDub when ffmpeg CLI fails."""
        print("Attempting Python-based audio extraction...")
        wav_path = file_path.rsplit('.', 1)[0] + '_extracted.wav'
        try:
            audio = AudioSegment.from_file(file_path).set_frame_rate(16000).set_channels(1).set_sample_width(2)
            audio.export(wav_path, format="wav")
            if Path(wav_path).exists() and Path(wav_path).stat().st_size > 0:
                print(f"Successfully extracted audio using Python: {wav_path}")
                return wav_path
        except Exception as e:
            print(f"Python audio extraction failed: {str(e)}")

        print("Attempting direct binary extraction...")
        try:
            binary_wav_path = file_path.rsplit('.', 1)[0] + '_binary.wav'
            result = subprocess.run([
                'ffmpeg', '-y',
                '-f', 'lavfi', '-i', f"movie={file_path}[out+audio]",
                '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
                binary_wav_path
            ], capture_output=True, text=True, check=False)

            if result.returncode == 0 and Path(binary_wav_path).exists() and Path(binary_wav_path).stat().st_size > 0:
                print(f"Binary extraction successful: {binary_wav_path}")
                return binary_wav_path
        except Exception as e2:
            print(f"Binary extraction failed: {str(e2)}")

        raise RuntimeError("All audio extraction methods failed. Please convert the file manually before uploading.")

In [None]:
class TranscriptionProcessor:
    """
    Chunked Whisper transcription with GPU-friendly settings.
    Splits long audio into ~4-hour segments, transcribes, and stitches results.

    Args:
        segment_length (int): Segment length in seconds (default ~4 hours).
        model_name (str): Whisper model name to load.
    """
    def __init__(self, segment_length: int = 14_400, model_name: str = "medium"):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"Using device: {self.device}")

        if self.device == "cuda":
            # CUDA perf tweaks
            torch.backends.cuda.matmul.allow_tf32 = True
            torch.backends.cudnn.benchmark = True
            torch.backends.cudnn.allow_tf32 = True
            torch.backends.cudnn.deterministic = False
            torch.cuda.empty_cache()
            # Be careful with this fraction in multi-process environments
            try:
                torch.cuda.set_per_process_memory_fraction(0.9)
            except Exception as e:
                print(f"Warning: could not set memory fraction: {e}")

        print("Loading Whisper model...")
        self.model = whisper.load_model(model_name).to(self.device)
        if self.device == "cuda":
            # Use FP16 on GPU
            self.model = self.model.half()

        self.segment_length = int(segment_length)

    def transcribe(self, audio_path: str, class_id: str) -> str | None:
        """
        Transcribe `audio_path` into /content/session_{class_id}_transcript.json and return that path.

        Args:
            audio_path (str): Path to an audio/video file supported by pydub/ffmpeg.
            class_id (str): Identifier used to name the output transcript.

        Returns:
            str | None: Path to the resulting JSON transcript, or None if nothing was transcribed.
        """
        print("Processing audio to generate transcript JSON...")
        try:
            # Load audio
            audio = AudioSegment.from_file(audio_path)
            total_duration = len(audio) / 1000.0  # seconds (float)
            print(f"Total duration: {timedelta(seconds=int(total_duration))}")

            all_segments: list[dict] = []
            segment_times = range(0, int(total_duration), self.segment_length)

            for start_time in tqdm(segment_times, desc="Processing segments", unit="segment"):
                # Compute this segment's end (in seconds)
                remaining = total_duration - start_time
                duration = min(self.segment_length, remaining)

                # Slice pydub audio in milliseconds (cast to int to be safe)
                start_ms = int(start_time * 1000)
                end_ms = int((start_time + duration) * 1000)
                segment = audio[start_ms:end_ms]

                # Export temp wav for Whisper
                temp_path = f"/content/temp_segment_{start_time}.wav"
                segment.export(temp_path, format="wav")

                try:
                    # Use autocast only on CUDA; nullcontext() on CPU
                    cast_ctx = torch.amp.autocast("cuda") if self.device == "cuda" else nullcontext()
                    with cast_ctx:
                        result = self.model.transcribe(
                            temp_path,
                            word_timestamps=True,
                            language="en",
                            task="transcribe",
                            fp16=(self.device == "cuda"),
                            condition_on_previous_text=True,
                            initial_prompt="This is a university lecture."
                        )

                        # Gather segments with global (shifted) times
                        for seg in result.get("segments", []):
                            seg_start = float(seg.get("start", 0.0)) + start_time
                            seg_end = float(seg.get("end", 0.0)) + start_time

                            words = []
                            for w in seg.get("words", []) or []:
                                words.append({
                                    "word": str(w.get("word", "")).strip(),
                                    "start": float(w.get("start", 0.0)) + start_time,
                                    "end": float(w.get("end", 0.0)) + start_time
                                })

                            all_segments.append({
                                "start": seg_start,
                                "end": seg_end,
                                "text": normalize_sentence_spacing(str(seg.get("text", "")).strip()),
                                "words": words
                            })

                except Exception as segment_error:
                    print(f"Error processing segment at {start_time}s: {segment_error}")
                    continue
                finally:
                    # Cleanup temp file and free GPU mem
                    try:
                        Path(temp_path).unlink(missing_ok=True)
                    except Exception as e:
                        print(f"Warning: Failed to delete temp file {temp_path}: {e}")
                    if self.device == "cuda":
                        torch.cuda.empty_cache()
                        gc.collect()

            if not all_segments:
                print("Warning: No segments were successfully transcribed!")
                return None

            # Sort by start time and write JSON
            transcript_path = f"/content/session_{class_id}_transcript.json"
            with open(transcript_path, "w", encoding="utf-8") as f:
                json.dump({"segments": sorted(all_segments, key=lambda x: x["start"])}, f, indent=2)

            print(f"\nTranscript JSON saved to: {transcript_path}")
            return transcript_path

        except Exception as e:
            print(f"Error in transcription process: {e}")
            raise

<div style="border:2px solid #8E44AD;padding:12px;border-radius:8px;background:#F9F3FF">
<h3>Forum API Fetch</h3>
<p>
Fetches class metadata (session title, section/schedule), recording time, attendance, voice windows, and timeline events. These drive the header, attendance table, event table, and the event-based transcript grouping in the PDF/CSV.
</p>
<ul>
  <li>If some fields are missing, sensible fallbacks are used (e.g., speaker shown as “Professor”).</li>
  <li>Attendance table highlights <b>Present</b> (green) and <b>Absent</b> (red).</li>
</ul>
</div>

In [None]:
def clean_curl(curl_string):
    """Parse a DevTools cURL and return an HTTP headers dict including Cookie (if present)."""
    headers = {}
    header_matches = re.findall(r"-H ['\"](.*?): (.*?)['\"]", curl_string)
    for name, value in header_matches:
        headers[name] = value
    cookie_match = re.search(r"-b ['\"](.*?)['\"]", curl_string)
    if cookie_match:
        headers['Cookie'] = cookie_match.group(1)
    return headers

def get_forum_events(class_id, headers):
    """
    Fetch class meta + class events from Forum and normalize.

    Also extracts a student attendance list (present/absent).
    """
    print("Fetching class and event data from Forum...")

    # ---- Class meta ----
    class_url = f'https://forum.minerva.edu/api/v1/class_grader/classes/{class_id}'
    print(f"Requesting class data from: {class_url}")
    r = requests.get(class_url, headers=headers)
    if r.status_code != 200:
        print(f"Error accessing class data. Status code: {r.status_code}\n{r.text}")
        raise RuntimeError(f"Failed to access class data. Status code: {r.status_code}")
    data = r.json()

    session_title = data.get('title') or f"Session {class_id}"
    course_obj = (data.get('section') or {}).get('course') or {}
    course_code  = course_obj.get('course-code', '')
    course_title = course_obj.get('title', '')
    section_title = (data.get('section') or {}).get('title', '')
    class_type = data.get('type', '')
    rec = (data.get('recording-sessions') or [{}])[0]
    recording_start = rec.get('recording-started')
    recording_end   = rec.get('recording-ended')

    schedule_guess = ''
    if isinstance(section_title, str) and ',' in section_title:
        parts = [p.strip() for p in section_title.split(',', 1)]
        schedule_guess = parts[1] if len(parts) > 1 else ''

    class_meta = {
        'session_title': session_title,
        'course_code': course_code,
        'course_title': course_title,
        'section_title': section_title,
        'schedule': schedule_guess,
        'class_type': class_type,
        'recording_start': recording_start,
        'recording_end': recording_end,
    }

    if not recording_start:
        raise KeyError("No recording-started found in class data")

    # ---- Events ----
    events_url = f'https://forum.minerva.edu/api/v1/class_grader/classes/{class_id}/class-events'
    print(f"Requesting events from: {events_url}")
    r = requests.get(events_url, headers=headers)
    if r.status_code != 200:
        print(f"Error accessing class events. Status code: {r.status_code}\n{r.text}")
        raise RuntimeError(f"Failed to access class events. Status code: {r.status_code}")
    events = r.json()
    if not isinstance(events, list):
        raise ValueError("No valid class events returned from API")

    voice_events = []
    timeline_segments = []
    ref_time = iso8601.parse_date(recording_start)

    for ev in events:
        et = ev.get('event-type')
        try:
            if et == 'voice':
                duration_ms = (ev.get('event-data') or {}).get('duration', 0)
                duration = duration_ms / 1000.0
                if duration >= 1:
                    start_time = iso8601.parse_date(ev['start-time'])
                    end_time   = iso8601.parse_date(ev['end-time'])
                    voice_events.append({
                        'start': (start_time - ref_time).total_seconds(),
                        'end': (end_time - ref_time).total_seconds(),
                        'duration': duration,
                        'speaker': {
                          'id':        (ev.get('actor') or {}).get('id') or (ev.get('actor') or {}).get('user-id') or ((ev.get('actor') or {}).get('user') or {}).get('id'),
                          'first-name': (ev.get('actor') or {}).get('first-name'),
                          'last-name':  (ev.get('actor') or {}).get('last-name')
                        }
                    })
            elif et == 'timeline-segment':
                start_time = iso8601.parse_date(ev['start-time'])
                seg = (ev.get('event-data') or {})
                timeline_segments.append({
                    'abs_start': ev['start-time'],
                    'offset_seconds': (start_time - ref_time).total_seconds(),
                    'section': seg.get('timeline-section-title', ''),
                    'title':   seg.get('timeline-segment-title', ''),
                })
        except KeyError:
            continue

    timeline_segments.sort(key=lambda x: x['offset_seconds'])

    # ---- Attendance ----
    attendance = []
    for cu in (data.get('class-users') or []):
        role = (cu.get('role') or '').lower()
        if role == 'student':
            u = cu.get('user') or {}
            first = u.get('first-name', '') or ''
            last = u.get('last-name', '') or ''
            name = f"{first} {last}".strip() or (u.get('preferred-name') or '').strip() or (u.get('first-name') or '').strip()
            uid = u.get('id') or u.get('user-id')
            absent = bool(cu.get('absent', False))
            attendance.append({'id': uid, 'name': name, 'absent': absent})

    try:
        attendance.sort(key=lambda x: (x['name'] or '').lower())
    except Exception:
        pass

    events_data = {
        'class_id': class_id,
        'class_meta': class_meta,
        'voice_events': voice_events,
        'timeline_segments': timeline_segments,
        'attendance': attendance
    }
    temp_events_path = f"/content/session_{class_id}_events.json"
    with open(temp_events_path, 'w', encoding='utf-8') as f:
        json.dump(events_data, f, indent=2)

    print(f"Processed voice events: {len(voice_events)}; timeline segments: {len(timeline_segments)}")
    return events_data

# ----------------------------
# Utilities
# ----------------------------
def _fmt_mmss(seconds_float):
    if seconds_float is None:
        return ""
    seconds = max(0, int(seconds_float))
    m, s = divmod(seconds, 60)
    return f"{m:02d}:{s:02d}"

def _safe_date(date_str):
    if not date_str:
        return ""
    try:
        return date_str.split('T')[0]
    except Exception:
        return ""

def _fmt_dt_hm(dt_str: str) -> str:
    """YYYY-MM-DD HH:MM TZ from ISO8601; fallback to YYYY-MM-DD."""
    if not dt_str:
        return ""
    try:
        dt = iso8601.parse_date(dt_str)
        return dt.strftime("%Y-%m-%d %H:%M %Z")
    except Exception:
        return _safe_date(dt_str)

def soft_break_long_token(s: str, max_run: int = 14) -> str:
    if not s:
        return s
    # use a callable to avoid backslash escapes in the replacement
    pat = re.compile(r'(\S{%d})(?=\S)' % max_run)
    return pat.sub(lambda m: m.group(1) + '\u200b', s)

# --- Text cleanup helper ---

def normalize_sentence_spacing(text: str) -> str:
    """Fix glued sentences & punctuation spacing (respects ellipses), collapse newlines/spaces."""
    if not text:
        return text
    # remove zero-widths & non-breaking spaces, collapse newlines
    text = re.sub(r'[\u200B-\u200D\uFEFF]', '', text)
    text = text.replace('\u00A0', ' ')
    text = re.sub(r'\s*\n+\s*', ' ', text)

    # keep ellipses intact but add a space after if glued
    text = re.sub(r'(\.\.\.)(?=\S)', r'\1 ', text)

    # add a space after ., ?, ! when next visible char is a letter or an opening quote/paren
    # and NOT a decimal number (next char digit)
    text = re.sub(r'(?<!\.)'              # previous char is not a dot (avoid inside "...")
                  r'([.!?])'              # sentence end
                  r'(?=(["“\'(\[]?[A-Za-z]))'  # next visible char is letter (maybe after quote/paren)
                  , r'\1 ', text)

    # optional: add space after : or ; if followed by a letter
    text = re.sub(r'([:;])(?=(["“\'(\[]?[A-Za-z]))', r'\1 ', text)

    # de-glue common quote cases like: .“Word  .’Word  .)Word
    text = re.sub(r'([.!?]["”\')\]])(?=\S)', r'\1 ', text)

    # collapse multiple spaces
    text = re.sub(r'\s{2,}', ' ', text)
    return text.strip()

def label_from_actor(actor: dict, name_mode: str, student_ids: set | None = None) -> str:
    """Return display label for a speaker based on privacy setting."""
    if not isinstance(actor, dict):
        return "Professor"
    uid = actor.get('id') or actor.get('user-id') or (actor.get('user') or {}).get('id')
    fn  = (actor.get('first-name') or '').strip()
    ln  = (actor.get('last-name') or '').strip()
    full = f"{fn} {ln}".strip()
    if name_mode == 'ids':
        # anonymize students; if we don't know role, anonymize when in the student set (if provided)
        if (student_ids is None and uid is not None) or (student_ids is not None and uid in student_ids):
            return str(uid) if uid is not None else "ID"
    return full or "Professor"

<div style="border:2px solid #27AE60;padding:12px;border-radius:8px;background:#F4FBF6">
<h3>Output Compilers (PDF/CSV) & Fallbacks</h3>
<p>
Builds the final PDF and CSV with a centered header (title + section/schedule), then left-aligned <b>Class ID</b>, <b>Class Date/Time</b>, <b>Class Link</b>, followed by Attendance, Class Events, and an event-grouped transcript (timestamps, speakers, text).
</p>
<ul>
  <li><b>Privacy modes:</b> names / ids / both (generates two sets: <code>_names</code> and <code>_ids</code>).</li>
  <li>Outputs are saved to your chosen folder (local or Google Drive).</li>
  <li>Fallbacks produce simplified PDF/CSV if Forum data is unavailable.</li>
</ul>
</div>

In [None]:
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch

def compile_transcript_to_pdf(class_id, headers, name_mode: str = "names", file_suffix: str = "", output_dir: str = "/content"):
    """
    Render the PDF with header, attendance, events, and event-bucketed transcript.
    """
    try:
        with open(f"/content/session_{class_id}_transcript.json", 'r') as f:
            transcript_data = json.load(f)
        with open(f"/content/session_{class_id}_events.json", 'r') as f:
            events_data = json.load(f)

        class_meta = events_data.get('class_meta', {})
        timeline_segments = events_data.get('timeline_segments', [])
        attendance = events_data.get('attendance', [])
        student_ids_set = {a.get('id') for a in attendance if a.get('id') is not None}

        # Build speaker map from voice windows
        speaker_map = {}
        for ev in events_data.get('voice_events', []):
          speaker_map[(ev['start'], ev['end'])] = ev.get('speaker', {})  # keep dict (has id + names)

        def find_speaker_at_time(t):
          for (start, end), spk in speaker_map.items():
              if start <= t <= end:
                  return label_from_actor(spk, name_mode, student_ids_set)
          return "Professor"


        # Combine consecutive segments by same speaker
        compiled_entries = []
        current = {'speaker': None, 'start_time': None, 'text': [], 'end_time': None}

        for seg in transcript_data['segments']:
            st, en, tx = seg['start'], seg['end'], seg['text'].strip()
            if not tx:
                continue
            spk = find_speaker_at_time(st)

            start_new = False
            if not current['speaker']:
                start_new = True
            elif current['speaker'] != spk:
                start_new = True
            elif current['end_time'] is not None and st - current['end_time'] > 2:
                start_new = True

            if start_new:
                if current['speaker']:
                    compiled_entries.append(current)
                current = {'speaker': spk, 'start_time': st, 'text': [tx], 'end_time': en}
            else:
                current['text'].append(tx)
                current['end_time'] = en

        if current['speaker']:
            compiled_entries.append(current)

        # Styles
        styles = getSampleStyleSheet()
        contribution_style = ParagraphStyle(
            'ContributionStyle', parent=styles['Normal'],
            fontName='Helvetica', fontSize=10, leading=12, wordWrap='CJK'
        )
        header_style = ParagraphStyle(
            'HeaderStyle', parent=styles['Normal'],
            fontName='Helvetica-Bold', fontSize=12, textColor=colors.whitesmoke, alignment=1
        )
        speaker_style = ParagraphStyle(
            'SpeakerStyle', parent=styles['Normal'],
            fontName='Helvetica', fontSize=10, leading=12, wordWrap='CJK'
        )

        # PDF scaffold
        output_path = str(Path(output_dir) / f"session_{class_id}_transcript{file_suffix}.pdf")
        doc = SimpleDocTemplate(output_path, pagesize=letter,
                                rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=72)
        elements = []

        # ====== HEADER ======
        session_line = class_meta.get('session_title') or f"Session {class_id}"
        elements.append(Paragraph(session_line, styles['Title']))

        # Only the sec_sched value, centered
        sec_sched = class_meta.get('section_title', '') or class_meta.get('schedule', '')
        if sec_sched:
            centered_info_style = ParagraphStyle('CenteredInfo', parent=styles['Heading3'], alignment=1)
            elements.append(Paragraph(sec_sched, centered_info_style))
            elements.append(Spacer(1, 12))
        else:
            elements.append(Spacer(1, 12))

        # Left-aligned: labeled class id, date/time, and class link from cURL Referer
        left_info_style = ParagraphStyle('LeftInfo', parent=styles['Normal'], alignment=0)
        class_datetime = _fmt_dt_hm(class_meta.get('recording_start'))

        # Prefer the Referer header for the app URL (e.g., https://forum.minerva.edu/app/courses/.../classes/...)
        ref = (headers.get('referer') or headers.get('Referer') or '').strip()
        m = re.search(r'https://forum\.minerva\.edu/app/[^\s"\']+', ref)
        class_link = m.group(0) if m else f"https://forum.minerva.edu/app/classes/{class_id}"

        elements.append(Paragraph(f"<b>Class ID:</b> {class_id}", left_info_style))
        elements.append(Paragraph(f"<b>Class Date/Time:</b> {class_datetime}", left_info_style))
        elements.append(Paragraph(f'<b>Class Link:</b> <a href="{class_link}">{class_link}</a>', left_info_style))
        elements.append(Spacer(1, 12))  # blank line before next section

        # ====== ATTENDANCE TABLE ======
        attendance = events_data.get('attendance', [])
        if attendance:
            elements.append(Paragraph("Attendance", styles['Heading3']))
            att_rows = [[Paragraph('Student', header_style), Paragraph('Status', header_style)]]
            for a in attendance:
              status = 'Absent' if a.get('absent') else 'Present'
              display_student = (f"ID {a.get('id')}" if name_mode == 'ids' and a.get('id') is not None else a.get('name',''))
              att_rows.append([Paragraph(soft_break_long_token(display_student, 14), speaker_style), status])
            att_table = Table(att_rows, colWidths=[4.5*inch, 1.5*inch], repeatRows=1)
            att_style = TableStyle([
                ('BACKGROUND', (0,0), (-1,0), colors.grey),
                ('TEXTCOLOR', (0,0), (-1,0), colors.whitesmoke),
                ('ALIGN', (0,0), (-1,0), 'CENTER'),
                ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'),
                ('FONTNAME', (0,1), (-1,-1), 'Helvetica'),
                ('FONTSIZE', (0,1), (-1,-1), 10),
                ('VALIGN', (0,0), (-1,-1), 'TOP'),
                ('GRID', (0,0), (-1,-1), 1, colors.black),
                ('LEFTPADDING', (0,0), (-1,-1), 6),
                ('RIGHTPADDING', (0,0), (-1,-1), 6),
                ('TOPPADDING', (0,0), (-1,-1), 3),
                ('BOTTOMPADDING', (0,0), (-1,-1), 3),
            ])
            for i, a in enumerate(attendance, start=1):
                color = colors.red if a.get('absent') else colors.green
                att_style.add('TEXTCOLOR', (1,i), (1,i), color)
            att_table.setStyle(att_style)
            elements.append(att_table)
            elements.append(Spacer(1, 18))

        # ====== CLASS EVENTS TABLE (wrapped cells) ======
        if timeline_segments:
            elements.append(Paragraph("Class Events", styles['Heading3']))
            events_data_rows = [[Paragraph('Time', header_style),
                                Paragraph('Section', header_style),
                                Paragraph('Event', header_style)]]
            for seg in timeline_segments:
                sec_txt = soft_break_long_token(seg.get('section', '') or '', 14)
                evt_txt = soft_break_long_token(seg.get('title', '') or '', 14)
                events_data_rows.append([
                    _fmt_mmss(seg.get('offset_seconds')),
                    Paragraph(sec_txt, contribution_style),
                    Paragraph(evt_txt, contribution_style)
                ])
            # slightly wider Event column to reduce wrapping pressure
            events_table = Table(events_data_rows, colWidths=[0.85*inch, 2.10*inch, 4.05*inch], repeatRows=1)
            events_table.setStyle(TableStyle([
                ('BACKGROUND', (0,0), (-1,0), colors.grey),
                ('TEXTCOLOR', (0,0), (-1,0), colors.whitesmoke),
                ('ALIGN', (0,0), (-1,0), 'CENTER'),
                ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'),
                ('FONTNAME', (0,1), (-1,-1), 'Helvetica'),
                ('FONTSIZE', (0,1), (-1,-1), 10),
                ('VALIGN', (0,0), (-1,-1), 'TOP'),
                ('GRID', (0,0), (-1,-1), 1, colors.black),
                ('LEFTPADDING', (0,0), (-1,-1), 6),
                ('RIGHTPADDING', (0,0), (-1,-1), 6),
                ('TOPPADDING', (0,0), (-1,-1), 3),
                ('BOTTOMPADDING', (0,0), (-1,-1), 3),
            ]))
            elements.append(events_table)
            elements.append(Spacer(1, 18))


        # ====== TRANSCRIPT: ONLY BREAK ON NEW CLASS EVENTS ======
        elements.append(Paragraph("Transcript", styles['Heading3']))
        elements.append(Spacer(1, 6))

        # Flatten entries to printable rows
        all_items = []
        for entry in compiled_entries:
            text = ' '.join(entry['text']).strip()
            text = normalize_sentence_spacing(text)
            if text in ['...', '.', '', 'Mm-hmm.'] or len(text) < 3:
                continue
            timestamp = _fmt_mmss(entry['start_time'])


            max_chars_per_chunk = 500
            sentences = text.split('. ')
            chunks, curr = [], ""
            for s in sentences:
                candidate = (curr + s + '. ').strip() if curr else (s + '. ')
                if len(candidate) <= max_chars_per_chunk:
                    curr = candidate
                else:
                    if curr: chunks.append(curr.strip())
                    curr = s + '. '
            if curr: chunks.append(curr.strip())

            for i, chunk in enumerate(chunks or [text]):
                display_ts = "(cont.)" if i > 0 else timestamp
                all_items.append({
                    'start_time': entry['start_time'],
                    'end_time': entry['end_time'],
                    'timestamp': display_ts,
                    'speaker': entry['speaker'],
                    'text': chunk
                })

        all_items.sort(key=lambda x: x['start_time'])

        # Build event windows; include preamble
        seg_windows = []
        if timeline_segments:
            first_start = max(0, (timeline_segments[0].get('offset_seconds') or 0))
            if first_start > 0:
                seg_windows.append({'start': 0, 'end': first_start, 'label': f"{_fmt_mmss(0)} — Before first event"})
            for idx, seg in enumerate(timeline_segments):
                start = max(0, (seg.get('offset_seconds') or 0))
                end = (timeline_segments[idx+1].get('offset_seconds') if idx+1 < len(timeline_segments) else float('inf')) or float('inf')
                label_bits = []
                if seg.get('section'): label_bits.append(seg['section'])
                if seg.get('title'): label_bits.append(seg['title'])
                label_core = ' · '.join(label_bits) if label_bits else 'Event'
                seg_windows.append({'start': start, 'end': end, 'label': f"{_fmt_mmss(start)} — {label_core}"})
        else:
            seg_windows.append({'start': 0, 'end': float('inf'), 'label': "Transcript"})

        for win in seg_windows:
            bucket = [it for it in all_items if win['start'] <= it['start_time'] < win['end']]
            if not bucket:
                continue

            # Event heading inside transcript
            elements.append(Paragraph(win['label'], styles['Heading4']))
            elements.append(Spacer(1, 4))

            # Single table for this event; ReportLab splits across pages automatically.
            data = [[Paragraph('Time', header_style),
                     Paragraph('Speaker', header_style),
                     Paragraph('Contribution', header_style)]]
            for item in bucket:
                spk_txt = soft_break_long_token(item['speaker'], 14)
                data.append([item['timestamp'], Paragraph(spk_txt, speaker_style), Paragraph(normalize_sentence_spacing(item['text']), contribution_style)])

            table = Table(data, colWidths=[0.75*inch, 2.2*inch, 4.25*inch], repeatRows=1)
            table.setStyle(TableStyle([
                ('BACKGROUND', (0, 0), (-1, 0), colors.grey),
                ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
                ('ALIGN', (0, 0), (-1, 0), 'CENTER'),
                ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
                ('FONTSIZE', (0, 1), (-1, -1), 10),
                ('VALIGN', (0, 0), (-1, -1), 'TOP'),
                ('GRID', (0, 0), (-1, -1), 1, colors.black),
                ('LEFTPADDING', (0, 0), (-1, -1), 6),
                ('RIGHTPADDING', (0, 0), (-1, -1), 6),
                ('TOPPADDING', (0, 0), (-1, -1), 3),
                ('BOTTOMPADDING', (0, 0), (-1, -1), 3),
            ]))
            elements.append(table)
            elements.append(Spacer(1, 16))

        doc.build(elements)
        print(f"Created PDF transcript: {output_path}")
        return output_path

    except Exception as e:
        print(f"Error processing transcript: {str(e)}")
        raise e

def compile_transcript_to_csv(class_id, headers, name_mode: str = "names", file_suffix: str = "", output_dir: str = "/content"):
    """
    CSV:
      - Header
      - Attendance
      - Class Events
      - Transcript with event headers
    """
    try:
        with open(f"/content/session_{class_id}_transcript.json", 'r') as f:
            transcript_data = json.load(f)
        with open(f"/content/session_{class_id}_events.json", 'r') as f:
            events_data = json.load(f)

        class_meta = events_data.get('class_meta', {})
        timeline_segments = events_data.get('timeline_segments', [])
        attendance = events_data.get('attendance', [])
        student_ids_set = {a.get('id') for a in attendance if a.get('id') is not None}

        # Speaker mapping from voice events
        speaker_map = {}
        for ev in events_data.get('voice_events', []):
          speaker_map[(ev['start'], ev['end'])] = ev.get('speaker', {})


        def find_speaker_at_time(t):
          for (start, end), spk in speaker_map.items():
              if start <= t <= end:
                  return label_from_actor(spk, name_mode, student_ids_set)
          return "Professor"

        # Combine segments by speaker
        compiled_entries = []
        current = {'speaker': None, 'start_time': None, 'text': [], 'end_time': None}
        for seg in transcript_data['segments']:
            st, en, tx = seg['start'], seg['end'], seg['text'].strip()
            if not tx:
                continue
            spk = find_speaker_at_time(st)

            start_new = False
            if not current['speaker']:
                start_new = True
            elif current['speaker'] != spk:
                start_new = True
            elif current['end_time'] is not None and st - current['end_time'] > 2:
                start_new = True

            if start_new:
                if current['speaker']:
                    compiled_entries.append(current)
                current = {'speaker': spk, 'start_time': st, 'text': [tx], 'end_time': en}
            else:
                current['text'].append(tx)
                current['end_time'] = en
        if current['speaker']:
            compiled_entries.append(current)

        # Flatten → rows
        all_items = []
        for entry in compiled_entries:
            text = normalize_sentence_spacing(' '.join(entry['text']).strip())
            if text in ['...', '.', '', 'Mm-hmm.'] or len(text) < 3:
                continue
            timestamp = _fmt_mmss(entry['start_time'])
            all_items.append({
                'timestamp': timestamp,
                'speaker': entry['speaker'],
                'text': text,
                'start_time': entry['start_time'],
                'end_time': entry['end_time']
            })
        all_items.sort(key=lambda x: x['start_time'])

        # Build event windows (with preamble)
        segmented_rows = []
        seg_windows = []
        if timeline_segments:
            first_start = max(0, (timeline_segments[0].get('offset_seconds') or 0))
            if first_start > 0:
                seg_windows.append({'start': 0, 'end': first_start, 'label': f"{_fmt_mmss(0)} — Before first event"})
            for idx, seg in enumerate(timeline_segments):
                start = max(0, (seg.get('offset_seconds') or 0))
                end = (timeline_segments[idx+1].get('offset_seconds') if idx+1 < len(timeline_segments) else float('inf')) or float('inf')
                bits = []
                if seg.get('section'): bits.append(seg['section'])
                if seg.get('title'):   bits.append(seg['title'])
                label = f"{_fmt_mmss(start)} — " + (' / '.join(bits) if bits else 'Event')
                seg_windows.append({'start': start, 'end': end, 'label': label})
        else:
            seg_windows.append({'start': 0, 'end': float('inf'), 'label': "Transcript"})

        for win in seg_windows:
            bucket = [it for it in all_items if win['start'] <= it['start_time'] < win['end']]
            if not bucket:
                continue
            segmented_rows.append({'timestamp': '', 'speaker': '', 'text': f"--- {win['label']} ---"})
            segmented_rows.extend(bucket)

        all_items = segmented_rows or all_items

        # Write CSV
        output_path = str(Path(output_dir) / f"session_{class_id}_transcript{file_suffix}.csv")
        with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
            w = csv.writer(csvfile)

            # --- Header block ---
            w.writerow(["Session", class_meta.get('session_title','')])
            sec_sched = class_meta.get('section_title') or class_meta.get('schedule') or ''
            if sec_sched:
                w.writerow([sec_sched])
            w.writerow([])  # blank line (matches PDF spacing)

            # class id, class date/time, class link (each on its own labeled line)
            class_datetime = _fmt_dt_hm(class_meta.get('recording_start'))
            ref = (headers.get('referer') or headers.get('Referer') or '').strip()
            m = re.search(r'https://forum\.minerva\.edu/app/[^\s"\']+', ref)
            class_link = m.group(0) if m else f"https://forum.minerva.edu/app/classes/{class_id}"

            w.writerow([f"Class ID: {class_id}"])
            w.writerow([f"Class Date/Time: {class_datetime}"])
            w.writerow([f"Class Link: {class_link}"])
            w.writerow([])  # blank line before Attendance / Events

            # --- Attendance ---
            attendance = events_data.get('attendance', [])
            if attendance:
                w.writerow(["Attendance"])
                w.writerow(["Student", "Status"])
                for a in attendance:
                    label = (str(a['id']) if (name_mode == "ids" and a.get('id')) else a.get('name',''))
                    w.writerow([label, "Absent" if a.get('absent') else "Present"])
                w.writerow([])

            # --- Class Events (timeline) ---
            if timeline_segments:
                w.writerow(["Class Events"])
                w.writerow(["Time", "Section", "Event"])
                for seg in timeline_segments:
                    w.writerow([
                        _fmt_mmss(seg.get('offset_seconds')),
                        seg.get('section',''),
                        seg.get('title','')
                    ])
                w.writerow([])

            # --- Transcript table ---
            w.writerow(['Time', 'Speaker', 'Contribution'])
            for item in all_items:
                w.writerow([
                    item['timestamp'],
                    item['speaker'],
                    item['text']
                ])

        print(f"Created CSV transcript: {output_path}")
        return output_path

    except Exception as e:
        print(f"Error creating CSV transcript: {str(e)}")
        return None

def create_simplified_csv(class_id, transcript_path, output_dir: str = "/content"):
    """
    Fallback CSV: just time + text (no speakers/events).
    """
    try:
        with open(f"/content/session_{class_id}_transcript.json", 'r') as f:
            transcript_data = json.load(f)

        output_path = str(Path(output_dir) / f"session_{class_id}_transcript_simple.csv")
        with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Time', 'Text'])
            for seg in transcript_data['segments']:
                minutes = int(seg['start'] // 60)
                seconds = int(seg['start'] % 60)
                timestamp = f"{minutes:02d}:{seconds:02d}"
                writer.writerow([timestamp, normalize_sentence_spacing(seg.get('text', ''))])

        print(f"Created simplified CSV transcript: {output_path}")
        return output_path
    except Exception as e:
        print(f"Error creating simplified CSV transcript: {str(e)}")
        return None

def create_simplified_transcript(class_id, transcript_path, output_dir: str = "/content"):
    """
    Fallback PDF: no events, no speakers; includes minimal title.
    """
    try:
        from reportlab.lib import colors
        from reportlab.lib.pagesizes import letter
        from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
        from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
        from reportlab.lib.units import inch
        import json
        from datetime import datetime

        with open(f"/content/session_{class_id}_transcript.json", 'r') as f:
            transcript_data = json.load(f)

        styles = getSampleStyleSheet()
        text_style = ParagraphStyle('TextStyle', parent=styles['Normal'],
                                    fontName='Helvetica', fontSize=10, leading=12, spaceAfter=0, spaceBefore=0,
                                    wordWrap='CJK')
        header_style = ParagraphStyle('HeaderStyle', parent=styles['Normal'],
                                      fontName='Helvetica-Bold', fontSize=12, textColor=colors.whitesmoke,
                                      alignment=1)

        output_path = str(Path(output_dir) / f"session_{class_id}_transcript_simple.pdf")
        doc = SimpleDocTemplate(output_path, pagesize=letter,
                                rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=72)

        elements = []
        title = Paragraph(f"Session {class_id}", styles['Title'])
        date_str = datetime.now().strftime("%Y-%m-%d")
        subtitle = Paragraph(f"Generated on {date_str}", styles['Heading2'])
        elements.append(title)
        elements.append(subtitle)
        elements.append(Spacer(1, 12))

        data = [[Paragraph('Time', header_style), Paragraph('Text', header_style)]]
        for seg in transcript_data['segments']:
            minutes = int(seg['start'] // 60)
            seconds = int(seg['start'] % 60)
            timestamp = f"{minutes:02d}:{seconds:02d}"
            data.append([
                timestamp,
                Paragraph(normalize_sentence_spacing(seg['text']), text_style)
            ])

        table = Table(data, colWidths=[0.75*inch, 6.25*inch], repeatRows=1)
        table.setStyle(TableStyle([
            ('BACKGROUND', (0, 0), (-1, 0), colors.grey),
            ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
            ('ALIGN', (0, 0), (-1, 0), 'CENTER'),
            ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
            ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
            ('FONTSIZE', (0, 1), (-1, -1), 10),
            ('VALIGN', (0, 0), (-1, -1), 'TOP'),
            ('GRID', (0, 0), (-1, -1), 1, colors.black),
            ('LEFTPADDING', (0, 0), (-1, -1), 6),
            ('RIGHTPADDING', (0, 0), (-1, -1), 6),
            ('TOPPADDING', (0, 0), (-1, -1), 3),
            ('BOTTOMPADDING', (0, 0), (-1, -1), 3),
        ]))
        elements.append(table)
        doc.build(elements)
        print(f"Created simplified PDF transcript: {output_path}")
        return output_path
    except Exception as e:
        print(f"Error creating simplified transcript: {str(e)}")
        return None

In [None]:
def _emit_outputs(class_id, headers, privacy_mode: str, output_dir: str):
    """
    Builds PDF/CSV for names, ids, or both, into the given output_dir.
    Returns a dict of produced file paths.
    """
    outputs = {}
    if privacy_mode == "both":
        # Names versions
        outputs['pdf_names'] = compile_transcript_to_pdf(class_id, headers, name_mode="names", file_suffix="_names", output_dir=output_dir)
        outputs['csv_names'] = compile_transcript_to_csv(class_id, headers, name_mode="names", file_suffix="_names", output_dir=output_dir)
        # IDs versions
        outputs['pdf_ids'] = compile_transcript_to_pdf(class_id, headers, name_mode="ids", file_suffix="_ids", output_dir=output_dir)
        outputs['csv_ids'] = compile_transcript_to_csv(class_id, headers, name_mode="ids", file_suffix="_ids", output_dir=output_dir)
    else:
        suffix = "" if privacy_mode == "names" else "_ids"
        outputs['pdf'] = compile_transcript_to_pdf(class_id, headers, name_mode=privacy_mode, file_suffix=suffix, output_dir=output_dir)
        outputs['csv'] = compile_transcript_to_csv(class_id, headers, name_mode=privacy_mode, file_suffix=suffix, output_dir=output_dir)
    return outputs

def _print_outputs(outputs: dict, privacy_mode: str):
    """Console-print produced file paths; supports single or 'both' privacy modes."""
    if privacy_mode == "both":
        print("\nSuccess! Your transcripts are ready (both versions):")
        print(f"PDF (names): {outputs.get('pdf_names')}")
        print(f"CSV (names): {outputs.get('csv_names')}")
        print(f"PDF (ids):   {outputs.get('pdf_ids')}")
        print(f"CSV (ids):   {outputs.get('csv_ids')}")
    else:
        print("\nSuccess! Your transcripts are ready:")
        print(f"PDF: {outputs.get('pdf')}")
        print(f"CSV: {outputs.get('csv')}")

<div style="border:2px solid #D35400;padding:12px;border-radius:8px;background:#FFF7F0">
<h3>Pipeline Entrypoint</h3>
<p>
Coordinates the full run: fetch Forum data → preprocess audio → transcribe → compile outputs. Prints final paths and a caution to double-check accuracy.
</p>
<ul>
  <li>Handles local / Drive / URL inputs and local / Drive outputs.</li>
  <li>If “both” privacy is selected, produces both <code>_names</code> and <code>_ids</code> versions.</li>
</ul>
</div>

In [None]:
# @title
def process_lecture(audio_path, class_id, curl_string):
    """
    End-to-end pipeline with graceful fallbacks.
    """
    output_pdf, output_csv = None, None  # NEW: ensure defined for the return
    try:
        # 1) Forum events
        print("Step 1/4: Processing Forum class events...")
        headers = clean_curl(curl_string)
        events_data = get_forum_events(class_id, headers)

        # 2) Audio preprocess
        print("\nStep 2/4: Preprocessing audio file...")
        preprocessor = AudioPreprocessor()
        fixed_path = preprocessor.validate_and_fix_file(audio_path)

        # 3) Transcribe
        print("\nStep 3/4: Generating transcript...")
        tp = TranscriptionProcessor()
        transcript_path = tp.transcribe(fixed_path, class_id)

        # 4) Compile outputs
        print("\nStep 4/4: Compiling final PDF and CSV transcripts...")
        try:
            outputs = _emit_outputs(class_id, headers, PRIVACY_MODE, OUTPUT_DIR)
            _print_outputs(outputs, PRIVACY_MODE)

            # NEW: choose representative return values
            if PRIVACY_MODE == "both":
                output_pdf = outputs.get('pdf_names') or outputs.get('pdf_ids')
                output_csv = outputs.get('csv_names') or outputs.get('csv_ids')
            else:
                output_pdf = outputs.get('pdf')
                output_csv = outputs.get('csv')

            # NEW: accuracy caution banner
            print("\n⚠️  Accuracy caution: Do not rely solely on this transcript. Manually verify key information.")

        except Exception as e:
            print(f"Error compiling transcripts: {str(e)}")
            print("Attempting to create simplified transcripts without speaker information...")
            # Simplified outputs don't include names/attendance, so privacy mode is irrelevant here
            output_pdf = create_simplified_transcript(class_id, transcript_path, OUTPUT_DIR)
            output_csv = create_simplified_csv(class_id, transcript_path, OUTPUT_DIR)
            if output_pdf and output_csv:
              print(f"\nCreated simplified transcripts:")
              print(f"PDF: {output_pdf}")
              print(f"CSV: {output_csv}")
              print("\n⚠️  Accuracy caution: Do not rely solely on this transcript. Manually verify key information.")

            else:
                print("Failed to create simplified transcripts.")
                return None, None

        # Cleanup converted audio
        try:
            temp_files = [fixed_path]
            for temp_file in temp_files:
                if temp_file != audio_path and Path(temp_file).exists():
                    Path(temp_file).unlink()
                    print(f"Cleaned up temporary file: {temp_file}")
        except Exception as cleanup_error:
            print(f"Note: Could not clean up temporary files: {str(cleanup_error)}")

        return output_pdf, output_csv

    except Exception as e:
        print(f"\nERROR: {str(e)}")
        if "MP4" in str(e) and audio_path.lower().endswith('.mp4'):
            print("\nThere was a problem with your MP4 file. Suggestions:")
            print("1. Convert it to MP3 on your computer before uploading")
            print("2. Use a screen recorder to record Forum while playing back the class")
            print("3. Contact Forum support about MP4 download issues")
        else:
            print("\nTranscription failed. Please try again with a different file.")
        return None, None

In [None]:
# Run the process
pdf_output, csv_output = process_lecture(AUDIO_PATH, CLASS_ID, raw_curl)

Step 1/4: Processing Forum class events...
Fetching class and event data from Forum...
Requesting class data from: https://forum.minerva.edu/api/v1/class_grader/classes/81239
Requesting events from: https://forum.minerva.edu/api/v1/class_grader/classes/81239/class-events
Processed voice events: 126; timeline segments: 15

Step 2/4: Preprocessing audio file...
Validating file: /content/input_from_url.mp4
Converting MP4 to MP3 (intermediate step)...
Successfully converted to MP3: /content/input_from_url.mp3
Successfully converted to Whisper-optimized WAV: /content/input_from_url.wav

Step 3/4: Generating transcript...
Using device: cuda
Loading Whisper model...
Processing audio to generate transcript JSON...
Total duration: 1:41:00


Processing segments:   0%|          | 0/1 [00:00<?, ?segment/s]


Transcript JSON saved to: /content/session_81239_transcript.json

Step 4/4: Compiling final PDF and CSV transcripts...
Created PDF transcript: /content/drive/MyDrive/AI Curriculum Projects Summer 2025/Transcriptions/Alex/v2.3/session_81239_transcript_names.pdf
Created CSV transcript: /content/drive/MyDrive/AI Curriculum Projects Summer 2025/Transcriptions/Alex/v2.3/session_81239_transcript_names.csv
Created PDF transcript: /content/drive/MyDrive/AI Curriculum Projects Summer 2025/Transcriptions/Alex/v2.3/session_81239_transcript_ids.pdf
Created CSV transcript: /content/drive/MyDrive/AI Curriculum Projects Summer 2025/Transcriptions/Alex/v2.3/session_81239_transcript_ids.csv

Success! Your transcripts are ready (both versions):
PDF (names): /content/drive/MyDrive/AI Curriculum Projects Summer 2025/Transcriptions/Alex/v2.3/session_81239_transcript_names.pdf
CSV (names): /content/drive/MyDrive/AI Curriculum Projects Summer 2025/Transcriptions/Alex/v2.3/session_81239_transcript_names.csv


In [None]:
free_cuda_mem()

print("CUDA cache cleared")

CUDA cache cleared
