In [140]:
import json
from bs4 import BeautifulSoup
import re
import shutil
import os
import requests
from pydoc import text


In [141]:
to_process_directory = rf"../Study questions to process"
processed_directory = rf"../Study Questions processed"

default_raw_db_path = "../public/study_quiz_raw_questions_db.json"
default_processed_by_quiz_db_path = "../public/study_quiz_processed_questions_by_quiz_db.json"
default_processed_by_topics_db_path = "../public/study_quiz_processed_questions_by_topics_db.json"

canvas_images_development_fp = "../public/canvas_images.json"
img_dir_from_data_compilation = "../public/img"
os.makedirs(img_dir_from_data_compilation, exist_ok=True)

In [142]:
def reset_database_and_reprocess():
    """
    Resets the database by removing JSON files and moving all processed files
    back to the to-process directory for reprocessing.
    """
    # Delete database files if they exist
    if os.path.exists(default_raw_db_path):
        os.remove(default_raw_db_path)
    
    if os.path.exists(default_processed_by_quiz_db_path):
        os.remove(default_processed_by_quiz_db_path)
    
    # Create empty database files
    with open(default_raw_db_path, 'w', encoding='utf-8') as f:
        json.dump({"quizzes": []}, f)
        print(f"Created empty {default_raw_db_path}")
    
    # with open(default_processed_by_quiz_db_path, 'w', encoding='utf-8') as f:
    #     json.dump([], f)
    #     print(f"Created empty {default_processed_by_quiz_db_path}")
    
    # Ensure to_process_directory exists
    os.makedirs(to_process_directory, exist_ok=True)
    
    # Move files back from processed to to-process
    if os.path.exists(processed_directory):
        moved_count = 0
        for file in os.listdir(processed_directory):
            if file.endswith(".html"):
                source = os.path.join(processed_directory, file)
                target = os.path.join(to_process_directory, file)
                shutil.move(source, target)
                moved_count += 1
    else:
        print(f"Processed directory {processed_directory} does not exist")

# Usage example:
reset_database_and_reprocess()

Created empty ../public/study_quiz_raw_questions_db.json


In [143]:
if not os.path.exists(default_raw_db_path):
        with open(default_raw_db_path, 'w', encoding='utf-8') as f:
            json.dump({"quizzes": []}, f)


if not os.path.exists(default_processed_by_quiz_db_path):
        with open(default_processed_by_quiz_db_path, 'w', encoding='utf-8') as f:
            json.dump({"quizzes": []}, f)

In [144]:
# cleaning the texts
def remove_invisible_chars(text):
    invisible_chars = {
        '\u200b': 'Zero-width space',
        '\u00a0': 'Non-breaking space',
        '\u200c': 'Zero-width non-joiner',
        '\u200d': 'Zero-width joiner',
        '\ufeff': 'BOM',
        '\u2028': 'Line separator',
        '\u2029': 'Paragraph separator',
        '\u00ad': 'Soft hyphen'
    }
    for char in invisible_chars:
        text = text.replace(char, '')
    return text
def remove_spaces(text):
    """Remove extra spaces and newlines from text."""
    if not text:
        return ""
    text = re.sub(r'\n\s+', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'\n+', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

def completely_cleaned_text(text):
    """Clean up text by removing HTML tags, extra newlines, and normalizing whitespace."""
    if not text:
        return ""
    text = remove_invisible_chars(text)
    text = remove_spaces(text)
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    text = text.replace('&lt;', '<')
    text = text.replace('&gt;', '>')
    text = text.replace('&nbsp;', ' ')
    text = text.replace('&amp;', ' ')
    
    return text.strip()

def clean_question(text):
    """Clean up text by removing HTML tags, extra newlines, and normalizing whitespace."""
    if not text:
        return ""
    text = remove_invisible_chars(text)
    
    # Replace HTML entities
    # text = text.replace('<sup>;', '^')
    # text = text.replace('<br>;', ' ')
    text = text.replace('&nbsp;', ' ')
    text = text.replace('&amp;', ' ')
    # Remove HTML tags
    # text = re.sub(r'<[^>]+>', '', text)

    text = text.replace('&lt;', '<')
    text = text.replace('&gt;', '>')
    
    # # Replace newlines followed by spaces
    text = re.sub(r'\n\s+', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'\n+', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    
    return text.strip()


In [None]:
# html scrapping processes


def get_canvas_img(url):
    img_verifier = url.split("verifier=")[-1].strip()
    print(img_verifier)

canvas_images = []

def save_canvas_images_development():
    with open(canvas_images_development_fp, "w", encoding="utf-8") as f:
        json.dump(canvas_images, f, ensure_ascii=False, indent=4)

def extract_quiz_to_json(html_path):
    # Ensure we refer to the module-level canvas_images list when modifying it
    global canvas_images
    with open(html_path, encoding="utf-8") as f:
        soup = BeautifulSoup(f, "html.parser")

    base_url=rf"https://canvas.umn.edu"

    # Get quiz title from <title> tag
    title_tag = soup.find("title")
    quiz_title = completely_cleaned_text(title_tag.text) if title_tag else ""
    quiz_title = quiz_title.replace("Joshua Cheng's Quiz History: ", "").strip()
    quiz_title = quiz_title.replace(": BIOL 1009 (070-082) General Biology (Fall 2025)", "").strip()
    quiz_title = quiz_title.replace(" (course resources and policies)", "").strip()
    
    pattern = r'\s*\(due\s+\w+,\s+\w+\.\s+\d+\s+at\s+[\d:]+\s+[AP]M\)'
    
    # Replace the pattern with empty string
    quiz_title = re.sub(pattern, '', quiz_title)

    questions = []

    def extract_image_url(html_string):
        # Match image URLs in src attributes
        html_string = str(html_string).strip()
        if "img" not in html_string:
            return []
        
        # Try with regular quotes first (which is what your examples have)
        img_pattern = r'<img[^>]*src="([^"]+)"'
        
        # Find all matches
        matches = re.findall(img_pattern, html_string)
        
        # If no matches, try with escaped quotes
        if not matches:
            img_pattern = r'<img[^>]*src=\\"([^"\\]+)\\"'
            matches = re.findall(img_pattern, html_string)

        if matches:
            # Build absolute URLs for any relative paths
            img_urls = []
            for match in matches:
                if match.startswith('/'):
                    img_urls.append(base_url + match)
                else:
                    img_urls.append(match)
            
            # print(html_string)
            # print(img_urls)
            # print("-"*50)
            return img_urls
        else:
            return []
    
    for q_holder in soup.find_all("div", class_="question_holder"):
        question = {}
        question_image_url = []

        # Try to get question text - first from original_question_text, then from question_text
        q_text = q_holder.find("div", class_="original_question_text")
        if q_text and q_text.text.strip():
            question_image_url = extract_image_url(q_text.text)
            question_text = completely_cleaned_text(q_text.text)
        else:
            # Try getting question from question_text which may contain images
            q_content = q_holder.find("div", class_="question_text user_content enhanced")
            if q_content:
                print("question_text user_content enhanced")
                question_image_url = extract_image_url(q_content.text)
                # For questions with images, keep the HTML content
                question_text = str(completely_cleaned_text(q_content))
            else:
                # Fall back to display_question
                q_display = q_holder.find("div", class_="display_question")
                if q_display:
                    print("display_question")
                    # Sometimes question text is in aria-label or inside the div
                    question_image_url = extract_image_url(q_display.text)
                    question_text = completely_cleaned_text(q_display.get("aria-label", ""))
                    if not question_text:
                        question_text = completely_cleaned_text(q_display.text)
                else:
                    question_text = ""

        question["raw_html_question"] = rf"{remove_invisible_chars(q_text.text.strip())}"
        question["question"] = question_text
        question["question_image_url"] = question_image_url
        if question["question_image_url"] and question["question_image_url"] not in canvas_images:
            # canvas_images is a plain list of urls; extend it with the new urls
            canvas_images += question["question_image_url"]
            canvas_images = list(set(canvas_images)) 
            # print(f"question['question_image_url']: {question['question_image_url']} : {get_canvas_img(question['question_image_url'])}")

        answers = []
        correct_answer = None
        for ans_div in q_holder.find_all("div", class_="answer"):
            ans = {}
            ans_text_div = ans_div.find("div", class_="answer_text")
            ans_text = completely_cleaned_text(ans_text_div.text) if ans_text_div else ""
            ans["text"] = ans_text

            # Detect correct answer by class or presence of correct arrow
            is_correct = (
                "correct_answer" in ans_div.get("class", []) or
                "correct" in ans_div.get("class", []) or
                ans_div.find("span", class_="answer_arrow correct")
            )
            ans["is_correct"] = bool(is_correct)
            if ans["is_correct"]:
                correct_answer = ans_text

            # Get comment if present
            # comment_div = ans_div.find("div", class_="quiz_comment")
            # if comment_div:
            #     comment_text = completely_cleaned_text(comment_div.text)
            #     ans["comment"] = comment_text
            # else:
            #     ans["comment"] = None

            answers.append(ans)

        question["answers"] = answers
        question["correct_answer"] = correct_answer

        # Get any general comments for the question (outside answer divs)
        quiz_comment_divs = q_holder.find_all("div", class_="quiz_comment")
        # print(quiz_comment_divs)
        
        # Initialize comment fields
        # question["correct_comments"] = None
        question["comments"] = None
        
        if quiz_comment_divs:
            
            for comment_div in quiz_comment_divs:
                neutral_p = comment_div.find("p", class_="neutral_comments")
                if comment_div.text.strip() and neutral_p:
                    # print(comment_div, "\n", "-"*50)
                    # all_comment_text.append(comment_div.text)
                    if neutral_p:
                        question["raw_html_comments"] = remove_spaces(str(neutral_p))
                        question["comments"] = completely_cleaned_text(neutral_p.text)
                        if question["raw_html_comments"]:
                            comments_img_url = extract_image_url(question["raw_html_comments"])
                            if comments_img_url:
                                question["comments_image_url"] = comments_img_url
                                canvas_images += comments_img_url
                                canvas_images = list(set(canvas_images))
                            else:
                                question["comments_image_url"] = []

                else:
                    question["raw_html_comments"] = None
                    question["comments"] = None
                
                # neutral_comments_p = comment_div.find("p", class_="neutral_comments")
                
                # if neutral_comments_p and not question["neutral_comments"]:
                #     question["neutral_comments"] = completely_cleaned_text(neutral_comments_p.text)
                #     # question["neutral_comments"] = neutral_comments_p.text

        questions.append(question)

    # Save to JSON
    output = {
        "quiz_title": quiz_title,
        "questions": questions
    }
    save_canvas_images_development()
    return output


def moved_processed_file(file):
    shutil.move(rf"{os.path.join(to_process_directory, file)}", rf"{os.path.join(processed_directory, file)}")

def get_processed_quizes(db_path=default_raw_db_path, debug=0):
    if not os.path.exists(db_path):
        with open(db_path, 'w', encoding='utf-8') as f:
            json.dump({"quizzes": []}, f)

    with open(db_path, 'r', encoding='utf-8') as f:
        db = json.load(f)
    quizzes = [quiz["quiz_title"] for quiz in db.get("quizzes", [])]
    if debug:
        print(f"Processed quizzes: {quizzes}")
    return quizzes

def insert_json_quiz_into_db(quiz_data, db_path=default_raw_db_path):
    if not os.path.exists(db_path):
        with open(db_path, 'w', encoding='utf-8') as f:
            json.dump({"quizzes": []}, f)

    # Read current database
    with open(db_path, 'r', encoding='utf-8') as f:
        db = json.load(f)

    # Check for existing titles and increment suffix if needed
    base_title = quiz_data["quiz_title"]
    existing_titles = [quiz["quiz_title"] for quiz in db.get("quizzes", [])]
    new_title = base_title
    suffix = 2
    while new_title in existing_titles:
        new_title = f"{base_title}_{suffix}"
        suffix += 1
    quiz_data["quiz_title"] = new_title

    # Add new quiz to database
    db["quizzes"].append(quiz_data)

    # Write updated database
    with open(db_path, 'w', encoding='utf-8') as f:
        json.dump(db, f, indent=2, ensure_ascii=False)

    # print(f"Added quiz '{new_title}' to database.")
    return True

# Now update your file processing function to use this
def process_file(file_name):
    file_path = os.path.join(to_process_directory, file_name)
    quiz_data = extract_quiz_to_json(file_path)
    
    if insert_json_quiz_into_db(quiz_data):
        os.makedirs(processed_directory, exist_ok=True)
        
        shutil.move(file_path, os.path.join(processed_directory, file_name))
    else:
        print(f"Skipped {file_name}, quiz already in database.")

def process_directory(folder_path=to_process_directory):
    for file in os.listdir(folder_path):
        if file.endswith(".html"):
            process_file(file)

In [146]:
process_directory()

In [147]:
def get_indexed_quiz_titles(quiz_title, db_path=default_processed_by_quiz_db_path):
    # print("\n", quiz_title, end="")
    try:
        with open(db_path, "r", encoding="utf-8") as f:
            refined_db = json.load(f)

        for idx, quiz in enumerate(refined_db):
            if quiz["quiz_title"] == quiz_title:
                # print(f" == {quiz['quiz_title']}")
                return idx
            # else:
            #     print(f" != {quiz['quiz_title']}", end="")
        return -1
    except FileNotFoundError:
        return -1

# def refine_quiz_db_by_quiz(raw_db):
#     # Track unique questions and assign IDs
#     question_bank = {}
#     question_id_counter = 1
#     refined_quizzes = []

#     pattern = r'_\d+$'
    
#     process_last = []
#     for quiz in raw_db["quizzes"]:
#         if quiz["quiz_title"].startswith("Quiz") or quiz["quiz_title"].startswith("SQ midterm"):
#             process_last.append(quiz["quiz_title"])

#     sorted_quizzes = sorted(raw_db["quizzes"], key=lambda quiz: (
#         quiz["quiz_title"] in process_last,  # False (0) comes before True (1)
#         quiz["quiz_title"]  # Secondary sort by title for consistent ordering
#     ))

#     print(sorted_quizzes)

#     for quiz in sorted_quizzes:
#         base_quiz_title = re.sub(pattern, '', quiz["quiz_title"])

#         refined_quiz = {
#             "quiz_title": base_quiz_title,
#             "questions": []
#         }
#         unique_count = 0

#         for q in quiz["questions"]:
#             q_text = q["question"].strip()
#             q_image_url = q.get("question_image_url", "")
#             unique_key = (q_text, q_image_url)

#             # Check if question is unique
#             if unique_key not in question_bank:
#                 question_bank[unique_key] = question_id_counter
#                 unique = True
#                 q_id = question_id_counter
#                 question_id_counter += 1
#                 unique_count += 1
#             else:
#                 continue

#             # Build options list
#             options = []
#             for ans in q["answers"]:
#                 options.append({
#                     "text": ans["text"],
#                     "is_correct": ans["is_correct"],
#                     "comment": ans.get("comment")
#                 })

#             refined_quiz["questions"].append({
#                 "question": q_text,
#                 "question_image_url": q_image_url,
#                 "unique": unique,
#                 "id": q_id,
#                 "options": options,
#                 "neutral_comments": q.get("neutral_comments")
#             })

#         # Check if this base title already exists in our refined_quizzes list
#         existing_quiz_index = -1
#         for idx, existing_quiz in enumerate(refined_quizzes):
#             if existing_quiz["quiz_title"] == base_quiz_title:
#                 existing_quiz_index = idx
#                 break
        
#         if existing_quiz_index == -1:
#             # This is the first time we see this base quiz title
#             refined_quiz["unique_questions"] = unique_count
#             refined_quizzes.append(refined_quiz)
#         else:
#             # This quiz already exists, merge questions
#             # print(f"Merging questions from '{quiz['quiz_title']}' into '{base_quiz_title}'")
#             refined_quizzes[existing_quiz_index]["questions"].extend(refined_quiz["questions"])
#             refined_quizzes[existing_quiz_index]["unique_questions"] += unique_count
    
#     return refined_quizzes

# Usage:
with open(default_raw_db_path, encoding="utf-8") as f:
    raw_db = json.load(f)

# refined_db = refine_quiz_db_by_quiz(raw_db)

# with open(default_processed_by_quiz_db_path, "w", encoding="utf-8") as f:
#     json.dump(refined_db, f, indent=2, ensure_ascii=False)

In [148]:
def refine_quiz_db_by_topic(raw_db):
    # Track unique questions and assign IDs
    question_bank = {}
    question_id_counter = 1
    refined_quizzes = []

    base_quiz_title_pattern = r'((SQ topics?\s\d+(-\d+)?(\s(and)\s\d+)?)|(Quiz \d+)|(SQ midterm \d+))'

    process_second_pattern = r"SQ topics?\s\d+-\d+"
    
    process_third = []
    process_second = []
    for quiz in raw_db["quizzes"]:
        if quiz["quiz_title"].startswith("Quiz") or quiz["quiz_title"].startswith("SQ midterm"):
            process_third.append(quiz["quiz_title"])
        elif re.match(process_second_pattern, quiz["quiz_title"]):
            # print(f"Processing second pattern quiz: {re.match(process_second_pattern, quiz['quiz_title']).group(0)}")
            title2 = re.match(process_second_pattern, quiz['quiz_title']).group(0)
            # print(f"Processing second pattern quiz: {title2}")
            process_second.append(quiz["quiz_title"])

    sorted_quizzes = sorted(raw_db["quizzes"], key=lambda quiz: (
        3 if quiz["quiz_title"] in process_third else 2 if quiz["quiz_title"] in process_second else 1,
        # quiz["quiz_title"]  # Secondary sort by title for consistent ordering
    ))

    # print(sorted_quizzes)

    for quiz in sorted_quizzes:
        topic_title = re.match(base_quiz_title_pattern, quiz['quiz_title'])
        if topic_title:
            topic_title = topic_title.group(1)

        if quiz['quiz_title'] == "" and len(quiz['questions']) == 0:
            continue
        print(f"base processing quiz: {quiz['quiz_title']} --> {topic_title if topic_title else quiz['quiz_title']}")

        if not topic_title:
            topic_title = quiz['quiz_title']

        base_quiz_title = topic_title

        # print(f"quiz title: '{quiz['quiz_title']}' -> base title: '{base_quiz_title}'")

        refined_quiz = {
            "quiz_title": base_quiz_title,
            "questions": []
        }
        unique_count = 0

        for q in quiz["questions"]:
            q_text = q["question"].strip()
            q_image_url = q.get("question_image_url", "")
            q_options = frozenset([(options["text"], options["is_correct"]) for options in q.get("answers", [])])
            unique_key = (q_text, str(q_image_url), q_options)

            # Check if question is unique
            if unique_key not in question_bank:
                question_bank[unique_key] = question_id_counter
                unique = True
                q_id = question_id_counter
                question_id_counter += 1
                unique_count += 1
            else:
                continue

            # Build options list
            options = []
            for ans in q["answers"]:
                options.append({
                    "text": ans["text"],
                    "is_correct": ans["is_correct"],
                    # "comment": ans.get("comment")
                })

            refined_quiz["questions"].append({
                "raw_html_question": q.get("raw_html_question"),
                "question": q_text,
                "question_image_url": q_image_url,
                "unique": unique,
                "id": q_id,
                "options": options,
                "comments": q.get("comments"),
                "raw_html_comments": q.get("raw_html_comments"),
                "comments_image_url": q.get("comments_image_url", [])
            })

        # Check if this base title already exists in our refined_quizzes list
        existing_quiz_index = -1
        for idx, existing_quiz in enumerate(refined_quizzes):
            if existing_quiz["quiz_title"] == base_quiz_title:
                existing_quiz_index = idx
                break
        
        if existing_quiz_index == -1:
            # This is the first time we see this base quiz title
            refined_quiz["unique_questions"] = unique_count
            refined_quizzes.append(refined_quiz)
        else:
            # This quiz already exists, merge questions
            # print(f"Merging questions from '{quiz['quiz_title']}' into '{base_quiz_title}'")
            refined_quizzes[existing_quiz_index]["questions"].extend(refined_quiz["questions"])
            refined_quizzes[existing_quiz_index]["unique_questions"] += unique_count
    
    return refined_quizzes

# Usage:
with open(default_raw_db_path, encoding="utf-8") as f:
    raw_db = json.load(f)

refined_db = refine_quiz_db_by_topic(raw_db)

with open(default_processed_by_topics_db_path, "w", encoding="utf-8") as f:
    json.dump(refined_db, f, indent=2, ensure_ascii=False)

base processing quiz: SQ topic 3 level 1c (Chapter 4) --> SQ topic 3


In [149]:
def print_quiz_titles_and_unique_questions_table(db_path=default_processed_by_quiz_db_path):
    with open(db_path, encoding="utf-8") as f:
        quizzes = json.load(f)
    col_1_len = 60
    col_2_len = 15
    print(f"{'Quiz Title':<{col_1_len}} | {'Unique Questions':<{col_2_len}}")
    print("-" * (col_1_len + col_2_len))
    unique_count = 0
    for quiz in quizzes:
        title = quiz.get('quiz_title', '(no title)')
        # if "SQ topic 2 level" not in title:
        #     continue
        unique_q = quiz.get('unique_questions', 0)
        unique_count += unique_q
        print(f"{title:<{col_1_len}} | {unique_q:<{col_2_len}}")
    print(f"{'Total':<{col_1_len}} | {unique_count:<{col_2_len}}")

# print_quiz_titles_and_unique_questions_table(db_path=default_processed_by_quiz_db_path)

In [150]:
print_quiz_titles_and_unique_questions_table(db_path=default_processed_by_topics_db_path)


Quiz Title                                                   | Unique Questions
---------------------------------------------------------------------------
SQ topic 3                                                   | 10             
Total                                                        | 10             


In [151]:
def download_canvas_images(canvas_images_fp=canvas_images_development_fp, img_dir=img_dir_from_data_compilation):
    with open(canvas_images_fp, "r", encoding="utf-8") as f:
        canvas_images = json.load(f)
        # canvas_images = canvas_images[:10] # Limit to first 1 for testing

        for idx, img in enumerate(canvas_images):
            img_verifier = img.split("verifier=")[-1].strip()
            filename = f"{img_verifier}.jpg"
            filepath = os.path.join(img_dir, filename)

            if os.path.exists(filepath):
                # print(f"Image already exists, skipping: {filename}")
                continue

            try:
                response = requests.get(img)
                if response.status_code == 200:
                    with open(filepath, 'wb') as f:
                        f.write(response.content)
                    print(f"Downloaded: {filename}")
                else:
                    print(f"Failed to download image {img_verifier}.jpg: Status {response.status_code}")
            except Exception as e:
                print(f"Error downloading image {img_verifier}.jpg: {e}")

download_canvas_images()
