In [104]:
import docx
import uuid
import time
import io
import re
import unicodedata

from bson import ObjectId
# from odfpy import opendocument, text as odf_text # Import odfpy components
# from odf import opendocument, text as odf_text
from odf import opendocument
from odf.text import P, H

# Helper function to map option letters (A, B, C, D) to JSON keys (1, 2, 3, 4)
OPTION_KEY_MAP = {
    'A': '0',
    'B': '1',
    'C': '2',
    'D': '3'
}



In [105]:
def finalize_and_add_question(quiz_obj, question_obj, options_text_map, correct_letter):
    """
    Helper function to process the collected question data, format options,
    and add the complete question object to the quiz's questions list.
    Called when a question block is finished.
    Uses bson.ObjectId for option IDs.
    """
    if not quiz_obj or not question_obj:
        # Nothing to finalize if we don't have a quiz or question object
        return

    # Check if we actually collected meaningful data for this question
    # (e.g., content or options)
    if not question_obj.get('content') and not options_text_map:
        # print("end")
        return # Don't add empty question blocks

    question_obj['questionId'] = ObjectId() # Generate ID for the question
    options_list = {}

    # Ensure all expected options A, B, C, D are present based on map,
    # even if text wasn't found for them in the input.
    expected_letters = ['A', 'B', 'C', 'D']

    for letter in expected_letters:
        # Get the full line text stored during parsing
        full_option_text = options_text_map.get(letter, "").strip()

        # Determine if this option is correct based on the extracted correct_letter
        is_correct = (letter == correct_letter)

        option_key = OPTION_KEY_MAP.get(letter) # Get the corresponding JSON key (1, 2, 3, 4)

        if option_key: # Make sure mapping exists
            options_list[option_key] = {
                "optionText": full_option_text,
                "isCorrect": is_correct,
                "optionId": str(ObjectId()) # Generate ObjectId for the option
            }
        else:
             print(f"Warning: No mapping found for option letter '{letter}'")

    
    options_arr = []
    for item in options_list.items():
        options_arr.append(item[1]) # tuple of (key, value)
    question_obj['options'] = options_arr

    # print(f"Adding question obj: {question_obj}")
    # Add the complete question object to the quiz's questions list
    quiz_obj['questions'].append(question_obj)


def simple_normalize(text):
    text = unicodedata.normalize('NFC', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text


In [106]:
# Start a new quiz
from bson import ObjectId
text = 'Heritage: Chùa Vĩnh Nghiêm: heritageId: 67f3edb13834bd66e6e1c681'
parts = text.split(':', 2) # Split into max 3 parts: "Heritage", " Name ", " heritageId: ID"
# print(parts) # Remove debug print
heritage_part = parts[2].strip() if len(parts) > 1 else "Unknown Heritage : heritageId: unknown_id"

# name_id_parts = heritage_part.split('heritageId:') # Split the middle part by heritageId:

heritage_name = parts[1].strip()
# Handle cases where heritageId might not be present
heritage_id = ObjectId(parts[2].split(":")[1].strip())
print(heritage_name, heritage_id, sep="\n")
print()


Chùa Vĩnh Nghiêm
67f3edb13834bd66e6e1c681



In [107]:
def parse_quiz_lines(lines_list):
    """
    Parses a list of text lines/paragraphs containing quiz data and converts
    it into a structured format, matching the ODT example structure where
    question content is on the same line as 'Câu hỏi N:'.

    Args:
        lines_list: A list of strings, where each string is a line or paragraph
                    from the document.

    Returns:
        A list of dictionaries, each representing a complete quiz object
        in the target JSON structure.
    """
    quizzes = []
    current_quiz = None
    current_question = None
    current_options_text = {} # To store A, B, C, D full line text temporarily
    correct_answer_letter = None # To store the correct letter (A, B, C, D)
    reference_link = "" # Variable to store the reference link for the current heritage

    # Regex to find "Câu <number>:" or "Câu hỏi <number>:" at the start of the line
    # Added \s* after ^ to handle potential leading whitespace in the line itself
    # Made 'hỏi' optional `(hỏi)?`
    cau_pattern = re.compile(r'^\s*Câu\s*(hỏi)?\s*\d+\s*:')

    # No need for awaiting_question_content_line flag with this format

    for line in lines_list:
        # Apply normalization and strip whitespace from the line
        text = simple_normalize(line) # Use the simple_normalize function
        text = text.strip()

        if not text:
            # Skip empty lines
            continue

        # --- State Machine Logic based on line patterns ---

        # 1. Heritage Section
        # Must NOT be inside a question block (current_question is None) to avoid misinterpretation
        if text.startswith('Heritage:') and current_question is None:
            # Finalize the previous quiz if exists
            if current_quiz:
                 # Ensure the last question of the previous quiz is added
                 if current_question: # Check again inside the if
                    finalize_and_add_question(
                        current_quiz,
                        current_question,
                        current_options_text,
                        correct_answer_letter
                    )
                 # Reset question state for the next section
                 current_question = None
                 current_options_text = {}
                 correct_answer_letter = None
                 reference_link = "" # Reset reference link for the new heritage


            # Start a new quiz
            # Example: Heritage: Thành nhà Hồ: heritageId: 67f3edb13834bd66e6e1c678
            parts = text.split(':', 2) # Split into max 3 parts: "Heritage", " Name ", " heritageId: ID"
            heritage_name = parts[1].strip() if len(parts) > 1 else "Unknown Heritage"

            # Find heritageId part more reliably, splitting by 'heritageId:'
            # Handle potential leading/trailing spaces around the id part after splitting by ':'
            heritage_id_part = ""
            if len(parts) > 2:
                id_segment = parts[2].strip()
                id_parts = id_segment.split('heritageId:')
                if len(id_parts) > 1:
                    heritage_id_part = id_parts[1].strip() # Get the part after 'heritageId:'

            heritage_id = heritage_id_part if heritage_id_part else "unknown_id"


            current_quiz = {
                "_id": ObjectId(), # Generate a unique ObjectId for the quiz
                "heritageId": heritage_id, # Store heritageId as string
                "title": f"Kiểm tra di tích lịch sử {heritage_name}",
                "content": f"Bài kiểm tra này sẽ giúp bạn hiểu rõ hơn về {heritage_name}",
                "questions": [],
                "topPerformersLimit": 10,
                "stats": {},
                "topPerformers": [],
                "status": "INACTIVE",
                "createdAt": int(time.time()), # Unix timestamp
                "updatedAt": int(time.time())  # Unix timestamp
            }
            quizzes.append(current_quiz)
            # print(f"Debug: Started new quiz: {heritage_name} ({heritage_id})") # Optional debug
            continue # Move to the next paragraph/line

        # 2. Link (Capture the reference link)
        # Handle both spellings and store the text after the colon
        # Must also NOT be inside a question block
        if text.startswith(('Link tham khao:', 'Link tham khảo:')) and current_question is None:
             # Extract the URL part after the prefix
             link_part = text.split(':', 1)[1].strip() if ':' in text else ""
             reference_link = text # Store the extracted link text
             # print(f"Debug: Captured link: {reference_link}") # Optional debug
             continue

        # 3. Separator (Handle if present, although not in your latest sample)
        # This signals the end of a question block, potentially starting a new one soon.
        # if text.startswith('-----'):
        #      # Finalize and add the current question if one is being processed
        #      if current_quiz and current_question:
        #          finalize_and_add_question(
        #              current_quiz,
        #              current_question,
        #              current_options_text,
        #              correct_answer_letter
        #          )
        #          # Reset question state for the next potential question
        #          current_question = None
        #          current_options_text = {}
        #          correct_answer_letter = None

        #      # print("Debug: Hit separator") # Optional debug
        #      continue # Move to the next paragraph/line

        # 4. Start of a new Question Block ("Câu N:" or "Câu hỏi N:")
        # This line now CONTAINS the question content.
        if cau_pattern.match(text):
            # print(f"Debug: Starting new question block: {text}") # Optional debug

            # Finalize and add the previous question if one was being processed
            # This handles cases where the file doesn't end with a separator
            # and a new question block starts right after the previous one ends.
            if current_quiz and current_question:
                 finalize_and_add_question(
                     current_quiz,
                     current_question,
                     current_options_text,
                     correct_answer_letter
                 )
                 # Reset question state for the new question
                 current_question = None # This will be created below
                 current_options_text = {}
                 correct_answer_letter = None


            # Start a new question object dictionary
            if current_quiz is None:
                 # We found a question before a Heritage block. This is an error
                 # based on the expected structure.
                 raise ValueError("File format error: Question found before a 'Heritage:' block.")

            # Create the dictionary for the new question
            current_question = {
                "explanation": "", # Initialize explanation
                "image": "" # Always empty as per the target structure
            }

            # Extract the question content from *this* line (after the colon)
            content_part = text.split(':', 1)[1].strip() if ':' in text else ""
            current_question['content'] = content_part
            # print(f"Debug: Captured question content from same line: '{current_question['content']}'") # Optional debug


            # No need to set awaiting_question_content_line as content is on this line

            continue # Move to the next line, expecting options


        # --- Now, process lines that are part of a question block (current_question is not None) ---
        # These checks should only run IF we have an active current_question
        if current_question is not None:

            # 5. Options Text (A., B., C., D.)
            # Must start with A., B., C., or D.
            # Store the ENTIRE line here, not just the text after the dot
            if text.startswith(('A.', 'B.', 'C.', 'D.')):
                parts = text.split('.', 1) # Split only on the first dot to get the letter
                if len(parts) >= 1: # Should always be at least 1 part if it starts with Letter.
                    option_letter = parts[0].strip() # Get the letter (e.g., "A")
                    if option_letter in ['A', 'B', 'C', 'D']:
                       # Store the full stripped line text
                       current_options_text[option_letter] = text.strip()
                       # print(f"Debug: Captured option {option_letter}: '{text}'") # Optional debug
                    else:
                       print(f"Warning: Found line starting with '{option_letter}.' not A, B, C, or D within question block: '{text}'")
                continue # Move to the next line

            # 6. Correct Answer
            # Must start with "Dap an dung:" or "Đáp án đúng:"
            elif text.startswith(('Dap an dung:', 'Đáp án đúng:')):
                # Example: "Đáp án đúng: B. Năm 1397"
                # Extract the part after the label
                answer_part = text.split(':', 1)[1].strip() if ':' in text else text.strip()

                # Find the first letter (A, B, C, or D) in the extracted part
                # This handles formats like "B. Năm 1397" or just "B"
                # Use ^[A-D] to match only if it starts with the letter after the colon and space
                match = re.search(r'^[A-D]', answer_part)
                if match:
                    correct_answer_letter = match.group(0) # Store the found letter
                    # print(f"Debug: Captured correct answer letter: {correct_answer_letter}") # Optional debug
                else:
                     # Handle case where correct answer format is unexpected
                     print(f"Warning: Could not extract correct answer letter from '{text}'. Setting correct_answer_letter to None.")
                     correct_answer_letter = None # Set to None if not found

                continue # Move to the next line

            # 7. Explanation
            # Must start with "Giai thich:" or "Giải thích:"
            elif text.startswith(('Giai thich:', 'Giải thích:')):
                 explanation_text = text.split(':', 1)[1].strip() if ':' in text else ""
                 # Append the reference link if one was captured for this heritage
                 if reference_link:
                     # Append the link text itself as requested
                     current_question['explanation'] = explanation_text + " " + reference_link
                 else:
                     current_question['explanation'] = explanation_text
                 # print(f"Debug: Captured explanation: '{current_question['explanation']}'") # Optional debug

                 # After explanation, we assume the question block is finished (unless separator follows).
                 # The logic handles this by finishing the question when the next "Câu hỏi N:" or "-----" is met.
                 # However, if a document ends right after an explanation, we need the finalization after the loop.

                 continue # Move to the next line

            # If a line is within a question block and current_question is not None,
            # AND it didn't match any specific pattern (Option, Answer, Explanation),
            # it's likely unexpected formatting or stray text. We ignore it.
            # print(f"Debug: Ignoring unhandled line within question context: '{text}'")


    # --- After the loop finishes, finalize the very last question ---
    # This is necessary if the file doesn't end with a separator or new Heritage block
    if current_quiz and current_question:
         finalize_and_add_question(
            quiz_obj=current_quiz,
            question_obj=current_question,
            options_text_map=current_options_text,
            correct_letter=correct_answer_letter
         )

    return quizzes

In [108]:
import io
from odf import opendocument

def extract_text_recursive(element):
    """Recursively extracts all text from an ODF element and its children."""
    texts = []
    if hasattr(element, 'data') and element.data:
        texts.append(element.data)
    for child in getattr(element, 'childNodes', []):
        texts.append(extract_text_recursive(child))
    return ''.join(texts)

def read_odt_file(odt_file_bytes: bytes) -> list[str]:
    """Reads an ODT file and returns a list of text contents from paragraphs and headings."""
    lines = []
    try:
        document = opendocument.load(io.BytesIO(odt_file_bytes))
        for element in document.text.childNodes:
            if hasattr(element, 'tagName') and element.tagName in ('text:p', 'text:h'):
                paragraph_text = extract_text_recursive(element).strip()
                if paragraph_text:
                    lines.append(paragraph_text)
    except Exception as e:
        raise ValueError(f"Error reading ODT file: {e}")
    return lines


In [113]:

import os
# Example Usage (for testing the parser logic directly)
folder_dir = '/home/phucuy2025/HRS_Project/Content-Creator/content'
folder_dir_2 = '/home/phucuy2025/HRS_Project/Content-Creator/content_2'
folder_dir_3 = '/home/phucuy2025/HRS_Project/Content-Creator/content_3'
folder_lst = [folder_dir_3]
if __name__ == '__main__':
    my_dicts = []
    for folder in folder_lst:
        for filename in os.listdir(folder):
            file_path = os.path.join(folder, filename)
            # print(file_path)
            with open(file_path, "rb") as f:
                odt_bytes = f.read()
                line_lst = read_odt_file(odt_file_bytes=odt_bytes)
                # with open(f'/home/phucuy2025/HRS_Project/Content-Creator/test_output_2/{filename}.txt', 'w', encoding='utf-8') as file:
                #     for line in line_lst:
                #         file.write(line + "\n")
                parsed_data_odt_simulated = parse_quiz_lines(line_lst)
        #     # # print(parsed_data_odt_simulated[0])
                print(type(parsed_data_odt_simulated[0]))
                my_dict=parsed_data_odt_simulated[0]
                my_dicts.append(my_dict)
                for key, value in my_dict.items():
                    print(f"{key}, : {value}")
        # break

<class 'dict'>
_id, : 680e5a93f5c7d1338cde58b5
heritageId, : 67f3edb13834bd66e6e1c674
title, : Kiểm tra di tích lịch sử Cố đô Hoa Lư
content, : Bài kiểm tra này sẽ giúp bạn hiểu rõ hơn về Cố đô Hoa Lư
questions, : [{'explanation': '', 'image': '', 'content': 'Cố đô Hoa Lư nằm ở tỉnh nào của Việt Nam?', 'questionId': ObjectId('680e5a93f5c7d1338cde58b6'), 'options': [{'optionText': 'A. Hà NamB. Ninh Bình', 'isCorrect': False, 'optionId': '680e5a93f5c7d1338cde58b7'}, {'optionText': '', 'isCorrect': False, 'optionId': '680e5a93f5c7d1338cde58b8'}, {'optionText': 'C. Thanh HóaD. Nam ĐịnhĐáp án đúng: B. Ninh BìnhGiải thích: Cố đô Hoa Lư thuộc địa phận xã Trường Yên, huyện Hoa Lư, tỉnh Ninh Bình.', 'isCorrect': False, 'optionId': '680e5a93f5c7d1338cde58b9'}, {'optionText': '', 'isCorrect': False, 'optionId': '680e5a93f5c7d1338cde58ba'}]}, {'explanation': '', 'image': '', 'content': 'Cố đô Hoa Lư từng là kinh đô của nước Đại Cồ Việt trong giai đoạn nào?A. Thế kỷ VIIIB. Thế kỷ IXC. Thế kỷ XD. Th

In [110]:
for i in range(len(my_dicts)):
    print((my_dicts[i].keys()))

dict_keys(['_id', 'heritageId', 'title', 'content', 'questions', 'topPerformersLimit', 'stats', 'topPerformers', 'status', 'createdAt', 'updatedAt'])
dict_keys(['_id', 'heritageId', 'title', 'content', 'questions', 'topPerformersLimit', 'stats', 'topPerformers', 'status', 'createdAt', 'updatedAt'])
dict_keys(['_id', 'heritageId', 'title', 'content', 'questions', 'topPerformersLimit', 'stats', 'topPerformers', 'status', 'createdAt', 'updatedAt'])
dict_keys(['_id', 'heritageId', 'title', 'content', 'questions', 'topPerformersLimit', 'stats', 'topPerformers', 'status', 'createdAt', 'updatedAt'])
dict_keys(['_id', 'heritageId', 'title', 'content', 'questions', 'topPerformersLimit', 'stats', 'topPerformers', 'status', 'createdAt', 'updatedAt'])
dict_keys(['_id', 'heritageId', 'title', 'content', 'questions', 'topPerformersLimit', 'stats', 'topPerformers', 'status', 'createdAt', 'updatedAt'])


In [None]:
# import os
# from pymongo import MongoClient
# from pymongo.errors import ConnectionFailure, OperationFailure, PyMongoError
# from bson import ObjectId
# from bson import json_util # Import json_util for BSON serialization
# import time
# # No need to import json if only using json_util for pymongo results

# # --- Configuration (use environment variables) ---
# MONGO_URI = 
# MONGO_TEST_DB = "History_Heritage_Database"
# MONGO_TEST_COLLECTION = "knowledgeTest"

# # --- Connection and Test Operations Snippet ---

# client = None

# print(f"Attempting to connect to MongoDB at: {MONGO_URI}")

# try:
#     client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
#     client.admin.command('ismaster')
#     print("MongoDB connection successful!")

#     db = client[MONGO_TEST_DB]
#     collection = db[MONGO_TEST_COLLECTION]
#     print(f"Using database '{MONGO_TEST_DB}' and collection '{MONGO_TEST_COLLECTION}'")

#     for dt in my_dicts:
#         test_document = dt

#         print(f"\nAttempting to insert document with _id: {test_document['_id']}")
#         insert_result = collection.insert_one(test_document)
#         print(f"Insert successful! Inserted ID: {insert_result.inserted_id}")
#         inserted_doc_id = insert_result.inserted_id

#         # print(f"\nAttempting to fetch document with _id: {inserted_doc_id}")
#         # fetched_document = collection.find_one({"_id": ObjectId('680312e056663ebc1e2c8b59')})

#         # if fetched_document:
#         #     print("Fetch successful! Document found:")
#         #     # FIX: Use bson.json_util.dumps instead of json.dumps
#         #     # json_util knows how to serialize ObjectId and other BSON types
#         #     print(json_util.dumps(fetched_document, indent=2))

#         #     # Verify content - Compare ObjectId instances directly
#         #     if fetched_document.get("_id") == test_document["_id"]:
#         #         print("Fetched document _id matches expected.")
#         #     else:
#         #         print(f"Warning: Fetched document _id ({fetched_document.get('_id')}) does NOT match expected test_document _id ({test_document['_id']}).")
#         # else:
#         #     print("Fetch failed: Document not found after insertion.")
#         # break

#     # print(f"\nAttempting to delete document with _id: {inserted_doc_id}")
#     # delete_result = collection.delete_one({"_id": inserted_doc_id})

#     # if delete_result.deleted_count == 1:
#     #     print("Delete successful! 1 document deleted.")
#     # else:
#     #     print("Delete failed: Document not found for deletion.")

#     # Optional: Verify deletion
#     # print("\nVerifying deletion...")
#     # verify_deleted = collection.find_one({"_id": inserted_doc_id})
#     # if verify_deleted is None:
#     #     print("Verification successful: Document is no longer in the collection.")
#     # else:
#     #     print("Verification failed: Document still found after deletion.")


# except ConnectionFailure as e:
#     print(f"\nError: Could not connect to MongoDB. Please check your MONGO_URI and network settings. Details: {e}")
# except OperationFailure as e:
#      print(f"\nError: MongoDB operation failed. This might be due to authentication or permissions (e.g., user, database, collection permissions). Details: {e}")
# except PyMongoError as e:
#     print(f"\nAn unexpected PyMongo error occurred: {e}")
# except Exception as e:
#     print(f"\nAn unexpected error occurred during test operations: {e}")

# finally:
#     if client:
#         client.close()
#         print("\nMongoDB client connection closed.")

Attempting to connect to MongoDB at: mongodb+srv://HistoryHeritage:HistoryHeritage%40123@historyheritage.ia1mhxt.mongodb.net/?retryWrites=true&w=majority&appName=historyHeritage
MongoDB connection successful!
Using database 'History_Heritage_Database' and collection 'knowledgeTest'

Attempting to insert document with _id: 680e59c5f5c7d1338cde5746
Insert successful! Inserted ID: 680e59c5f5c7d1338cde5746

Attempting to insert document with _id: 680e59c5f5c7d1338cde5783
Insert successful! Inserted ID: 680e59c5f5c7d1338cde5783

Attempting to insert document with _id: 680e59c5f5c7d1338cde57c0
Insert successful! Inserted ID: 680e59c5f5c7d1338cde57c0

Attempting to insert document with _id: 680e59c5f5c7d1338cde57fd
Insert successful! Inserted ID: 680e59c5f5c7d1338cde57fd

Attempting to insert document with _id: 680e59c5f5c7d1338cde583a
Insert successful! Inserted ID: 680e59c5f5c7d1338cde583a

Attempting to insert document with _id: 680e59c5f5c7d1338cde5877
Insert successful! Inserted ID: 680e

In [112]:
print(
)




