In [22]:
import os, time, io, zipfile, csv
import requests
import xml.etree.ElementTree as ET
from dotenv import load_dotenv

load_dotenv()
BASE = os.environ["CANVAS_BASE_URL"].rstrip("/")
TOKEN = os.environ["CANVAS_TOKEN"]
H = {"Authorization": f"Bearer {TOKEN}"}

def _get(url, params=None):
    r = requests.get(url, headers=H, params=params or {})
    r.raise_for_status()
    return r

def _post(url, data=None, files=None):
    r = requests.post(url, headers=H, data=data, files=files)
    r.raise_for_status()
    return r

def quiz_meta(course_id, quiz_id):
    url = f"{BASE}/api/v1/courses/{course_id}/quizzes/{quiz_id}"
    return _get(url).json()

def list_classic_questions(course_id, quiz_id):
    url = f"{BASE}/api/v1/courses/{course_id}/quizzes/{quiz_id}/questions"
    out, params = [], {"per_page": 100, "include[]": "answers"}
    while url:
        r = _get(url, params=params)
        out.extend(r.json())
        next_url = None
        if "link" in r.headers:
            for part in r.headers["link"].split(","):
                seg_rel = part.split(";")
                if any('rel="next"' in s for s in seg_rel[1:]):
                    next_url = seg_rel[0].strip()[1:-1]
        url, params = next_url, {}
    return out

def start_qti_export(course_id):
    url = f"{BASE}/api/v1/courses/{course_id}/content_exports"
    # export all quizzes/content as QTI
    r = _post(url, data={"export_type": "qti"})
    return r.json()["id"]

def poll_export(course_id, export_id, timeout_s=600, poll_every=5):
    url = f"{BASE}/api/v1/courses/{course_id}/content_exports/{export_id}"
    start = time.time()
    while True:
        j = _get(url).json()
        status = j.get("workflow_state") or j.get("progress")  # workflow_state: 'queued'|'exporting'|'completed'|'failed'
        if j.get("workflow_state") == "completed" and j.get("attachment"):
            return j["attachment"]["url"]
        if j.get("workflow_state") == "failed":
            raise RuntimeError(f"Export failed: {j}")
        if time.time() - start > timeout_s:
            raise TimeoutError("QTI export polling timed out")
        time.sleep(poll_every)

def download_bytes(url):
    r = requests.get(url, headers=H)  # attachment URLs usually require auth
    r.raise_for_status()
    return r.content

def _iter_qti_assessments(zipf: zipfile.ZipFile):
    """Yield (filename, xml_root) for each XML that looks like a QTI assessment."""
    for name in zipf.namelist():
        if not name.lower().endswith(".xml"):
            continue
        try:
            root = ET.fromstring(zipf.read(name))
        except ET.ParseError:
            continue
        # QTI 1.2 “assessment” elements
        if root.tag.endswith("assessment") or root.find(".//{*}assessment") is not None:
            # Normalize: root may be <questestinterop><assessment>...</assessment></questestinterop>
            assess = root if root.tag.endswith("assessment") else root.find(".//{*}assessment")
            if assess is not None:
                yield name, assess

def _clean_text(x):
    return (x or "").strip()

def extract_items_from_assessment(assessment_elem):
    """Return list of items: id, title, stem (html/plain), and simple choice list if available."""
    items = []
    for item in assessment_elem.findall(".//{*}item"):
        ident = item.get("ident")
        title = item.get("title")
        # stem text (Canvas packs HTML in material/mattext)
        mat = item.find(".//{*}material/{*}mattext")
        stem = _clean_text(mat.text if mat is not None else "")
        # choices (if present)
        choices = []
        for resp in item.findall(".//{*}response_label"):
            lab_mat = resp.find(".//{*}material/{*}mattext")
            txt = _clean_text(lab_mat.text if lab_mat is not None else "")
            choices.append(txt)
        items.append({"id": ident, "title": title, "question_text": stem, "choices": choices})
    return items

def find_assessment_for_quiz_title(zip_bytes, quiz_title):
    with zipfile.ZipFile(io.BytesIO(zip_bytes)) as z:
        candidates = []
        for name, assess in _iter_qti_assessments(z):
            a_title = assess.get("title") or ""
            if a_title.strip().lower() == quiz_title.strip().lower():
                return name, assess
            # fuzzy candidate if exact not found
            if quiz_title.lower() in a_title.lower():
                candidates.append((name, assess))
        return (candidates[0] if candidates else (None, None))

def write_csv(rows, output_file="quiz_questions.csv"):
    fieldnames = ["id", "title", "question_text", "choices"]
    with open(output_file, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in rows:
            # flatten choices
            r = r.copy()
            r["choices"] = "; ".join(r.get("choices", []))
            w.writerow(r)
    print(f"✅ Exported {len(rows)} questions to {output_file}")

if __name__ == "__main__":
    course_id = 16676
    quiz_id = 557321

    qm = quiz_meta(course_id, quiz_id)
    quiz_title = qm.get("title") or f"quiz_{quiz_id}"
    print("Quiz meta:", qm)

    classic = list_classic_questions(course_id, quiz_id)
    print(f"Classic /questions returned: {len(classic)}")

    if len(classic) > 0:
        # You’ve got a Classic Quiz — export directly
        rows = []
        for q in classic:
            answers = [a.get("text", "") for a in q.get("answers", [])]
            rows.append({
                "id": q.get("id"),
                "title": q.get("question_name"),
                "question_text": q.get("question_text"),
                "choices": answers
            })
        write_csv(rows, output_file=f"quiz_{quiz_id}_questions.csv")
    else:
        print("Looks like a New Quiz or bank-backed quiz. Falling back to QTI export…")
        export_id = start_qti_export(course_id)
        url = poll_export(course_id, export_id)
        zip_bytes = download_bytes(url)
        name, assess = find_assessment_for_quiz_title(zip_bytes, quiz_title)
        if not assess:
            raise RuntimeError("Could not locate the assessment in the QTI export. Try checking the quiz title or inspect the ZIP manually.")
        items = extract_items_from_assessment(assess)
        write_csv(items, output_file=f"quiz_{quiz_id}_questions.csv")


Quiz meta: {'id': 557321, 'title': 'Quiz - output', 'html_url': 'https://instructure.charlotte.edu/courses/16676/quizzes/557321', 'mobile_url': 'https://instructure.charlotte.edu/courses/16676/quizzes/557321?force_user=1&persist_headless=1', 'description': '', 'quiz_type': 'assignment', 'time_limit': None, 'timer_autosubmit_disabled': False, 'shuffle_answers': False, 'show_correct_answers': True, 'scoring_policy': 'keep_highest', 'allowed_attempts': 1, 'one_question_at_a_time': False, 'question_count': 426, 'points_possible': 426.0, 'cant_go_back': False, 'access_code': None, 'ip_filter': None, 'due_at': None, 'lock_at': None, 'unlock_at': None, 'published': False, 'unpublishable': True, 'locked_for_user': True, 'lock_info': {'missing_permission': 'participate_as_student', 'asset_string': 'quizzes:quiz_557321'}, 'lock_explanation': 'This quiz is currently locked.', 'hide_results': None, 'show_correct_answers_at': None, 'hide_correct_answers_at': None, 'all_dates': [{'due_at': None, 'un

In [25]:
# clean_quiz_csvs.py
import pandas as pd
from bs4 import BeautifulSoup
import html
import re
from pathlib import Path

# --- CONFIG ---
INPUT_CSVS = [
    "quiz_436089_questions.csv",
    "quiz_557321_questions.csv",
    "quiz_557322_questions.csv",
    "quiz_557324_questions.csv"
]

OUTPUT_CSV = "quiz_merged_clean.csv"
COLUMNS_TO_CLEAN = ["question_text"]  # add "title", "choices" if you want to clean those too
DEDUP_BY_ID = True  # set False if you don't want deduplication
ID_COLUMN = "id"

# --- CLEANERS ---
_ws_re = re.compile(r"\s+")

def clean_html_to_text(val: str) -> str:
    if pd.isna(val):
        return val
    # Ensure it's a string
    s = str(val)

    # Parse with BeautifulSoup and drop all tags, including <script>/<style>
    soup = BeautifulSoup(s, "html.parser")
    # Remove script/style/noscript explicitly (sometimes get_text keeps their content out anyway,
    # but this is extra safety for malformed HTML)
    for tag in soup(["script", "style", "noscript"]):
        tag.decompose()

    text = soup.get_text(separator=" ", strip=True)
    # Unescape HTML entities (&quot;, &amp;, etc.)
    text = html.unescape(text)
    # Normalize whitespace
    text = _ws_re.sub(" ", text).strip()
    return text

def clean_df(df: pd.DataFrame, cols) -> pd.DataFrame:
    for c in cols:
        if c in df.columns:
            df[c] = df[c].apply(clean_html_to_text)
    return df

# --- MAIN ---
def main():
    frames = []
    for path in INPUT_CSVS:
        p = Path(path)
        if not p.exists():
            print(f"Warning: file not found -> {p}")
            continue

        # Use engine='python' to be extra tolerant of quotes/commas
        df = pd.read_csv(
            p,
            dtype=str,               # keep everything as string to avoid dtype surprises
            keep_default_na=False,   # keep empty strings as empty, not NaN
            engine="python"
        )
        df = clean_df(df, COLUMNS_TO_CLEAN)
        frames.append(df)

    if not frames:
        print("No input files loaded. Please check INPUT_CSVS.")
        return

    merged = pd.concat(frames, ignore_index=True)

    if DEDUP_BY_ID and ID_COLUMN in merged.columns:
        # Keep the first occurrence by id
        before = len(merged)
        merged = merged.drop_duplicates(subset=[ID_COLUMN], keep="first").reset_index(drop=True)
        print(f"De-duplicated by '{ID_COLUMN}': {before} -> {len(merged)} rows")

    # Optional: consistent column order if your schema is fixed
    preferred_order = ["id", "title", "question_text", "choices"]
    cols = [c for c in preferred_order if c in merged.columns] + [c for c in merged.columns if c not in preferred_order]
    merged = merged[cols]

    merged.to_csv(OUTPUT_CSV, index=False)
    print(f"Wrote cleaned file: {OUTPUT_CSV}")

if __name__ == "__main__":
    main()

De-duplicated by 'id': 1284 -> 1284 rows
Wrote cleaned file: quiz_merged_clean.csv


In [30]:
# upload_quiz_to_supabase.py
# ------------------------------------------------------------
# pip install -q pandas python-dotenv requests sentence-transformers psycopg[binary]
# ------------------------------------------------------------
import os
import sys
import json
import time
import requests
import pandas as pd
from pathlib import Path
from typing import List, Dict, Any
from sentence_transformers import SentenceTransformer

# ========== CONFIG ==========
CSV_PATHS = [
    "quiz_merged_clean.csv"
]
TABLE_NAME = "quiz_questions"
EMBED_COL = "question_text"
MODEL_NAME = "BAAI/bge-base-en-v1.5"   # 768 dims
BATCH_SIZE = 64                        # embedding batch size
JOIN_COLUMNS_FOR_EMBED = True         # set True to embed concatenated fields
JOIN_TEMPLATE = "{question_text}\nChoices: {choices}"

# ========== ENV ==========
from dotenv import load_dotenv
load_dotenv()

SUPABASE_URL = os.environ.get("SUPABASE_URL", "").rstrip("/")
SUPABASE_SERVICE_ROLE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY", "")
SUPABASE_DB_URL = os.environ.get("SUPABASE_DB_URL")  # optional for DDL

if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
    sys.exit("Please set SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY env vars.")

REST_URL = f"{SUPABASE_URL}/rest/v1/{TABLE_NAME}"
HEADERS_JSON = {
    "apikey": SUPABASE_SERVICE_ROLE_KEY,
    "Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}",
    "Content-Type": "application/json",
}

DDL_SQL = f"""
create extension if not exists vector;

create table if not exists {TABLE_NAME} (
  id               bigint primary key,
  title            text,
  question_text    text,
  choices          text,
  question_name    text,
  question_type    text,
  points_possible  int,
  answers          text
);

do $$
begin
  if not exists (
    select 1
    from information_schema.columns
    where table_name = '{TABLE_NAME}'
      and column_name = 'embedding'
  ) then
    alter table {TABLE_NAME} add column embedding vector(768);
  end if;
end$$;
-- Suggested later:
-- create index if not exists {TABLE_NAME}_embedding_idx on {TABLE_NAME} using ivfflat (embedding vector_cosine_ops) with (lists = 100);
"""

def ensure_schema_via_psycopg(sql: str) -> bool:
    if not SUPABASE_DB_URL:
        return False
    try:
        import psycopg
        with psycopg.connect(SUPABASE_DB_URL) as conn:
            with conn.cursor() as cur:
                cur.execute(sql)
        print("Schema ensured via direct Postgres connection.")
        return True
    except Exception as e:
        print(f"[WARN] psycopg DDL failed: {e}")
        return False

def read_csvs(paths: List[str]) -> pd.DataFrame:
    frames = []
    for p in paths:
        fp = Path(p)
        if not fp.exists():
            print(f"[WARN] CSV not found: {fp}")
            continue
        df = pd.read_csv(fp, dtype=str, keep_default_na=False, engine="python")
        frames.append(df)
    if not frames:
        sys.exit("No CSV files loaded.")
    return pd.concat(frames, ignore_index=True)

def build_text_for_embedding(row: Dict[str, Any]) -> str:
    if JOIN_COLUMNS_FOR_EMBED:
        return JOIN_TEMPLATE.format(
            question_text=row.get("question_text", "").strip(),
            choices=row.get("choices", "").strip(),
        ).strip()
    return str(row.get(EMBED_COL, "")).strip()

def compute_embeddings(texts: List[str]) -> List[List[float]]:
    model = SentenceTransformer(MODEL_NAME)
    vecs = model.encode(
        texts,
        batch_size=BATCH_SIZE,
        show_progress_bar=True,
        normalize_embeddings=True
    )
    return [v.tolist() for v in vecs]

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def to_int_or_none(v):
    s = str(v).strip()
    return int(s) if s.isdigit() else None

def prepare_records(df: pd.DataFrame, embeddings: List[List[float]]) -> List[Dict[str, Any]]:
    out = []
    for i, (_, row) in enumerate(df.iterrows()):
        rec = {
            "id": to_int_or_none(row.get("id")),
            "title": row.get("title") or None,
            "question_text": row.get("question_text") or None,
            "choices": row.get("choices") or None,
            "question_name": row.get("question_name") or None,
            "question_type": row.get("question_type") or None,
            "points_possible": to_int_or_none(row.get("points_possible")),
            "answers": row.get("answers") or None,
            "embedding": embeddings[i],
        }
        if rec["id"] is not None:
            out.append(rec)
    return out

def upsert_rows(rows: List[Dict[str, Any]]) -> None:
    headers = HEADERS_JSON.copy()
    headers["Prefer"] = "resolution=merge-duplicates"
    for batch in chunks(rows, 1000):
        r = requests.post(REST_URL, headers=headers, json=batch, timeout=60)
        if r.status_code not in (200, 201, 204):
            raise RuntimeError(f"Upsert error {r.status_code}: {r.text}")

def main():
    print("Ensuring schema...")
    created = ensure_schema_via_psycopg(DDL_SQL)
    if not created:
        print(
            "\n[Action required once] Run this SQL in Supabase SQL editor (or set SUPABASE_DB_URL to let the script do it):\n"
            + DDL_SQL
        )

    print("Reading CSV(s)...")
    df = read_csvs(CSV_PATHS)

    # Ensure expected columns exist
    for col in ["id","title","question_text","choices","question_name","question_type","points_possible","answers"]:
        if col not in df.columns:
            df[col] = ""

    print(f"Building texts from: {'joined fields' if JOIN_COLUMNS_FOR_EMBED else EMBED_COL}")
    texts = [build_text_for_embedding(r) for r in df.to_dict(orient="records")]

    print(f"Computing embeddings with {MODEL_NAME}...")
    embeddings = compute_embeddings(texts)
    if len(embeddings) != len(df):
        sys.exit("Embedding count mismatch.")

    print("Preparing records...")
    records = prepare_records(df, embeddings)
    if not records:
        sys.exit("No valid records with 'id' to upsert.")

    print(f"Upserting {len(records)} rows into {TABLE_NAME}...")
    upsert_rows(records)
    print("Done.")

if __name__ == "__main__":
    main()


Ensuring schema...

[Action required once] Run this SQL in Supabase SQL editor (or set SUPABASE_DB_URL to let the script do it):

create extension if not exists vector;

create table if not exists quiz_questions (
  id               bigint primary key,
  title            text,
  question_text    text,
  choices          text,
  question_name    text,
  question_type    text,
  points_possible  int,
  answers          text
);

do $$
begin
  if not exists (
    select 1
    from information_schema.columns
    where table_name = 'quiz_questions'
      and column_name = 'embedding'
  ) then
    alter table quiz_questions add column embedding vector(768);
  end if;
end$$;
-- Suggested later:
-- create index if not exists quiz_questions_embedding_idx on quiz_questions using ivfflat (embedding vector_cosine_ops) with (lists = 100);

Reading CSV(s)...
Building texts from: joined fields
Computing embeddings with BAAI/bge-base-en-v1.5...


Batches: 100%|██████████| 21/21 [00:03<00:00,  6.46it/s]


Preparing records...
Upserting 1284 rows into quiz_questions...
Done.
