In [None]:
from pydantic import BaseModel
import enum


class QuestionType(enum.Enum):
    MCQ = "Multiple-Choice-Question"
    MC_READING = "Reading-Comprehension-With-Multiple-Choices"
    TF_READING = "Reading-Comprehension-With-True-or-False"
    MC_CLOZE = "Cloze-With-Multiple-Choices"
    FR_CLOZE = "Cloze-With-Free-Responses"
    # not to support other types for now

class Choices(BaseModel):
    A: str
    B: str
    C: str
    D: str

class Question(BaseModel):
    text: str
    choices: Choices
    answer: str
    is_answer_provided: bool
    explanation: str
    test_point: str

class QuestionSet(BaseModel):
    type: QuestionType
    context: str
    questions: list[Question]

class Exam(BaseModel):
    question_sets: list[QuestionSet]

In [None]:
# Assort exercises as per the categories
root_path_raw = ""
root_path = ""
output_path = ""
filtered_output_path = ""

In [None]:
import os
import json
import logging
import collections
from tqdm import tqdm

logging.basicConfig(
    level=logging.DEBUG,  # Set logging level (INFO, ERROR, DEBUG, etc.)
    format="%(asctime)s - %(levelname)s - %(message)s",  # Add timestamp
    datefmt="%Y-%m-%d %H:%M:%S",  # Customize timestamp format
    force=True,
    handlers=[
        logging.FileHandler("output.log"),
        logging.StreamHandler()
    ]
)

In [None]:
sources = os.listdir(root_path_raw)
sources = [source for source in sources if os.path.isdir(os.path.join(root_path_raw, source))]
if "filtered" in sources:
    sources.remove("filtered")

for source in sources:
    for subdir in os.listdir(os.path.join(root_path_raw, source)):
        if not os.path.exists(os.path.join(output_path, source, subdir)):
            os.makedirs(os.path.join(output_path, source, subdir), exist_ok=True)
        questions = collections.defaultdict(list)
        for file in tqdm(os.listdir(os.path.join(root_path_raw, source, subdir)), desc=f"Processing {source}/{subdir}"):
            with open(os.path.join(root_path_raw, source, subdir, file), "r") as f:
                d = json.load(f)
                try:
                    exam = Exam(**d)
                    logging.info(f"Successfully parsed {file}")
                except Exception as e:
                    logging.error(f"Failed to parse {os.path.join(source, subdir, file)}: {e}")
                    continue
                for question_set in exam.question_sets:
                    questions[question_set.type].append(question_set)
        for question_type, question_set in questions.items():
            gather_exam = Exam(question_sets=question_set)
            with open(os.path.join(output_path, source, subdir, f"{question_type}.json"), "w") as f:
                f.write(gather_exam.model_dump_json(indent=4))
                logging.info(f"Successfully wrote [{len(question_set)}] questions of [{question_type}] to {os.path.join(source, subdir, f'{question_type}.json')}")

# Filter Multiple Choice Questions

In [None]:
filter_output_path = os.path.join(root_path, "filtered")
if not os.path.exists(filter_output_path):
    os.makedirs(filter_output_path, exist_ok=True)

## Rules

In [None]:
from typing import List, Any
import re
import json
import collections

In [None]:
import nltk
from nltk.tokenize import word_tokenize
import hashlib

nltk.download('punkt') 

In [None]:
def normalize_text(text):
    text = text.lower()  # Convert to lowercase
    text = re.sub(r'[\(\（][^()\（\）\u4e00-\u9fff]*[\u4e00-\u9fff]+[^()\（\）]*[\)\）]', '', text)
    text = re.sub(r"__+", "<blank>", text)
    tokens = word_tokenize(text)  # Tokenize into words
    return tokens

def HashTokens(tokens: List[str]) -> str:
    return hashlib.md5(" ".join(tokens).encode()).hexdigest()

def HashText(text: str) -> str:
    tokens = normalize_text(text)
    return HashTokens(tokens)

def HashMCQ(question_set: QuestionSet) -> str:
    text = question_set.questions[0].text
    options = [question_set.questions[0].choices.A, question_set.questions[0].choices.B, question_set.questions[0].choices.C, question_set.questions[0].choices.D]
    options.sort()
    return HashText(text + " ".join(options))

def HasConsecutiveChineseCharacters(text: str, n_max: int) -> bool:
    """
    Check if the text contains more than n_max consecutive Chinese characters.
    """
    pattern = r'[\u4e00-\u9fff]{' + str(n_max) + r',}'
    matches = re.findall(pattern, text)
    return len(matches) > 0

In [None]:
def SplitMCQQuestionSet(question_sets: List[QuestionSet]):
    splitted_question_set = []
    for question_set in question_sets:
        if question_set.type == QuestionType.MCQ:
            if question_set.context != "":
                logging.warning(f"MCQ Question set has context: {question_set.context}. Will be removed!")
            for question in question_set.questions:
                atomic_question_set = QuestionSet(
                    type=QuestionType.MCQ,
                    context="",
                    questions=[question]
                )
                splitted_question_set.append(atomic_question_set)
    return splitted_question_set

In [None]:
def DealWithBlank(question_set: QuestionSet):
    wrong_pattern_and_correction = {
        r"__+": "<blank>",
        r"(\w)<blank>(\w)": r"\1 <blank> \2",
        r"^<blank>(\w)": r"<blank> \1",
        r"(\w)<blank>$": r"\1 <blank>",
        r"！": "!",
        r"“": "\"",
        r"：": ":",
        r"；": ";",
        r"？": "?",
        r"（": "(",
        r"）": ")",
        r"，": ",",
        r"。": ".",
        r"、": ",",
        r"——": "-",
        r"---+": "--",
        # deal with spaces
        r"[ \t]+": " ",
        r"\s+([.,!?;:])": r"\1",
    }
    for question in question_set.questions:
        for pattern, correction in wrong_pattern_and_correction.items():
            question.text = re.sub(pattern, correction, question.text)
            question.choices.A = re.sub(pattern, correction, question.choices.A)
            question.choices.B = re.sub(pattern, correction, question.choices.B)
            question.choices.C = re.sub(pattern, correction, question.choices.C)
            question.choices.D = re.sub(pattern, correction, question.choices.D)
            
    return

In [None]:
def IsQuestionSetWithValidChoices(question_set: QuestionSet):
    if len(question_set.questions) == 0:
        return False
    return question_set.questions[0].choices.A != "" and question_set.questions[0].choices.B != "" and question_set.questions[0].choices.C != ""


def IsQuestionTextWithFewChineseCharacters(question_set: QuestionSet):
    if len(question_set.questions) == 0:
        return False
    question = question_set.questions[0].text
    if len(re.findall(r"[\u4e00-\u9fff]", question)) > 6:
        return False
    return True

In [None]:
def ProcessMCQByCriteria(question_sets: List[QuestionSet], criteria_arr: List[Any]):
    filtered_question_sets = SplitMCQQuestionSet(question_sets)
    for question_set in filtered_question_sets:
        DealWithBlank(question_set)
    for criteria in criteria_arr:
        logging.info(f"Before filtering: {len(filtered_question_sets)}")
        filtered_question_sets = [question_set for question_set in filtered_question_sets if criteria(question_set)]
        logging.info(f"After filtering: {len(filtered_question_sets)}")
    return filtered_question_sets

In [None]:
criteria_arr = [
    IsQuestionTextWithFewChineseCharacters,
    IsQuestionSetWithValidChoices,
]

In [None]:
sources = os.listdir(output_path)
sources = [source for source in sources if os.path.isdir(os.path.join(output_path, source))]
filtered_question_sets = collections.defaultdict(list)
test_points = set()


for source in sources:
    for subdir in os.listdir(os.path.join(output_path, source)):
        if not os.path.exists(os.path.join(filtered_output_path, source, subdir)):
            os.makedirs(os.path.join(filtered_output_path, source, subdir), exist_ok=True)
        for file in os.listdir(os.path.join(output_path, source, subdir)):
            if os.path.splitext(file)[1] != ".json":
                continue
            for file in os.listdir(os.path.join(output_path, source, subdir)):
                if os.path.splitext(file)[1] != ".json":
                    continue
                with open(os.path.join(output_path, source, subdir, file), "r") as f:
                    exam = Exam.model_validate(json.load(f))
                    if exam.question_sets[0].type == QuestionType.MCQ:
                        filtered_question_sets[os.path.join(source, subdir)] = ProcessMCQByCriteria(exam.question_sets, criteria_arr)
                        logging.info(f"Successfully parsed {file}: Got [{len(filtered_question_sets[os.path.join(source, subdir)])}] questions")
                        for question_set in filtered_question_sets[os.path.join(source, subdir)]:
                            new_test_points = question_set.questions[0].test_point.split(";")
                            for test_point in new_test_points:
                                test_points.add(test_point.strip())



In [None]:
visited_md5 = set()
# visited_md5 = {}

remove_repeated_question_sets = collections.defaultdict(list)
for source, question_sets in filtered_question_sets.items():
    logging.info(f"Processing {source}: Got [{len(question_sets)}] questions")
    for question_set in question_sets:
        md5 = HashMCQ(question_set)
        if md5 not in visited_md5:
            visited_md5.add(md5)
            # visited_md5[md5] = question_set
            remove_repeated_question_sets[source].append(question_set)
        # else:
            # logging.info(f"Repeated question: {question_set.questions[0].text}: {question_set.questions[0].choices.A} {question_set.questions[0].choices.B} {question_set.questions[0].choices.C} {question_set.questions[0].choices.D}")
            # logging.info(f"Repeated question: {visited_md5[md5].questions[0].text}: {visited_md5[md5].questions[0].choices.A} {visited_md5[md5].questions[0].choices.B} {visited_md5[md5].questions[0].choices.C} {visited_md5[md5].questions[0].choices.D}")
    logging.info(f"After removing repeated questions: Got [{len(remove_repeated_question_sets[source])}] questions")

In [None]:
for key, question_sets in remove_repeated_question_sets.items():
    with open(os.path.join(filtered_output_path, key, str(QuestionType.MCQ) + ".json"), "w") as f:
        f.write(Exam(question_sets=question_sets).model_dump_json(indent=4))
        logging.info(f"Successfully wrote [{len(question_sets)}] questions to {os.path.join(key, 'MCQ.json')}")

In [None]:
total_mcq = sum([len(question_sets) for question_sets in remove_repeated_question_sets.values()])
print(f"total mcq: {total_mcq}")

In [None]:
with open(os.path.join(filtered_output_path, "mcq_test_points.json"), "w") as f:
    json.dump(list(test_points), f, indent=4)

# Filter MC Reading

In [None]:
def IsMCReading(question_set: QuestionSet):
    if len(question_set.questions) == 0:
        return False
    if question_set.type != QuestionType.MC_READING:
        return False
    if len(normalize_text(question_set.context)) < 50:
        return False
    for question in question_set.questions:
        if len(question.text) == 0:
            return False
        if question.choices.A == "" or question.choices.B == "" or question.choices.C == "":
            return False
    return True

def HashMCReading(question_set: QuestionSet) -> str:
    text = question_set.context
    questions = sorted([question.text for question in question_set.questions])
    return HashText(text + " ".join(questions))

In [None]:
visited_md5 = set()
mc_reading = collections.defaultdict(list)
test_points = set()

for source in sources:
    for subdir in os.listdir(os.path.join(output_path, source)):
        if not os.path.exists(os.path.join(filtered_output_path, source, subdir)):
            os.makedirs(os.path.join(filtered_output_path, source, subdir), exist_ok=True)
        for file in os.listdir(os.path.join(output_path, source, subdir)):
            if os.path.splitext(file)[1] != ".json":
                continue
            with open(os.path.join(output_path, source, subdir, file), "r") as f:
                exam = Exam.model_validate(json.load(f))
            for question_set in exam.question_sets:
                if IsMCReading(question_set):
                    md5 = HashMCReading(question_set)
                    if md5 not in visited_md5:
                        visited_md5.add(md5)
                        mc_reading[os.path.join(source, subdir)].append(question_set)
                    for question in question_set.questions:
                        new_test_points = question.test_point.split(";")
                        for test_point in new_test_points:
                            test_points.add(test_point.strip())
        logging.info(f"After removing repeated questions: Got [{len(mc_reading[os.path.join(source, subdir)])}] MCReading questions")

with open(os.path.join(filtered_output_path, "mcreading_test_points.json"), "w") as f:
    json.dump(list(test_points), f, indent=4)

for key, question_sets in mc_reading.items():
    with open(os.path.join(filtered_output_path, key, str(QuestionType.MC_READING) + ".json"), "w") as f:
        f.write(Exam(question_sets=question_sets).model_dump_json(indent=4))
        logging.info(f"Successfully wrote [{len(question_sets)}] questions to {os.path.join(key, str(QuestionType.MC_READING) + '.json')}")

# Filter TF Reading

In [None]:
def IsTFReading(question_set: QuestionSet):
    if len(question_set.questions) == 0:
        return False
    if question_set.type != QuestionType.TF_READING:
        return False
    if len(normalize_text(question_set.context)) < 50:
        return False
    if HasConsecutiveChineseCharacters(question_set.context, 6):
        return False
    for question in question_set.questions:
        if len(question.text) == 0:
            return False
        if question.choices.A != "" or question.choices.B != "" or question.choices.C != "" or question.choices.D != "":
            return False
    return True

def HashTFReading(question_set: QuestionSet) -> str:
    text = question_set.context
    questions = sorted([question.text for question in question_set.questions])
    return HashText(text + " ".join(questions))

In [None]:
visited_md5 = set()
tf_reading = collections.defaultdict(list)
test_points = set()

for source in sources:
    for subdir in os.listdir(os.path.join(output_path, source)):
        if not os.path.exists(os.path.join(filtered_output_path, source, subdir)):
            os.makedirs(os.path.join(filtered_output_path, source, subdir), exist_ok=True)
        for file in os.listdir(os.path.join(output_path, source, subdir)):
            if os.path.splitext(file)[1] != ".json":
                continue
            with open(os.path.join(output_path, source, subdir, file), "r") as f:
                exam = Exam.model_validate(json.load(f))
            for question_set in exam.question_sets:
                if IsTFReading(question_set):
                    md5 = HashTFReading(question_set)
                    if md5 not in visited_md5:
                        visited_md5.add(md5)
                        tf_reading[os.path.join(source, subdir)].append(question_set)
                    for question in question_set.questions:
                        new_test_points = question.test_point.split(";")
                        for test_point in new_test_points:
                            test_points.add(test_point.strip())
        logging.info(f"After removing repeated questions: Got [{len(tf_reading[os.path.join(source, subdir)])}] TFReading questions")

with open(os.path.join(filtered_output_path, "tfreading_test_points.json"), "w") as f:
    json.dump(list(test_points), f, indent=4)

for key, question_sets in tf_reading.items():
    with open(os.path.join(filtered_output_path, key, str(QuestionType.TF_READING) + ".json"), "w") as f:
        f.write(Exam(question_sets=question_sets).model_dump_json(indent=4))
        logging.info(f"Successfully wrote [{len(question_sets)}] questions to {os.path.join(key, str(QuestionType.TF_READING) + '.json')}")

# Filter MC Cloze

In [None]:
def IsMCCloze(question_set: QuestionSet):
    if len(question_set.questions) == 0:
        return False
    if question_set.type != QuestionType.MC_CLOZE:
        return False
    if len(normalize_text(question_set.context)) < 50:
        return False
    if HasConsecutiveChineseCharacters(question_set.context, 6):
        return False
    if "<blank" not in question_set.context:
        return False
    for question in question_set.questions:
        if len(question.text) > 0:
            return False
        if question.choices.A == "" or question.choices.B == "" or question.choices.C == "":
            return False
    return True

def HashMCCloze(question_set: QuestionSet) -> str:
    text = question_set.context
    options = []
    for question in question_set.questions:
        options.extend(sorted([question.choices.A, question.choices.B, question.choices.C, question.choices.D]))
    return HashText(text + " ".join(options))

In [None]:
def RegularizeMCClozeFormat(question_set: QuestionSet):
    if question_set.type != QuestionType.MC_CLOZE:
        return
    question_set.context = re.sub(r"……", "...", question_set.context)
    question_set.context = re.sub(r"__+", "_", question_set.context)
    question_set.context = re.sub(r"<blank>(\d+)<blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"(\d+)<blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank>(\d+)", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"_+<blank", r"<blank", question_set.context)
    question_set.context = re.sub(r"<blank>_+", r"<blank>", question_set.context)
    question_set.context = re.sub(r"<blank text=(.*)>_+", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank text=\((.*)\)>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"_", "", question_set.context)
    question_set.context = re.sub(r"<blank \((\d+)\)>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank (\d+)>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank>\((\d+)\)<blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank>\((\d+)\)", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"\((\d+)\)<blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"\((\d+)\)", r"<blank text=\1>", question_set.context)

In [None]:
visited_md5 = set()
mc_clozes = collections.defaultdict(list)
test_points = set()

for source in sources:
    for subdir in os.listdir(os.path.join(output_path, source)):
        if not os.path.exists(os.path.join(filtered_output_path, source, subdir)):
            os.makedirs(os.path.join(filtered_output_path, source, subdir), exist_ok=True)
        for file in os.listdir(os.path.join(output_path, source, subdir)):
            if os.path.splitext(file)[1] != ".json":
                continue
            with open(os.path.join(output_path, source, subdir, file), "r") as f:
                exam = Exam.model_validate(json.load(f))
            for question_set in exam.question_sets:
                if IsMCCloze(question_set):
                    RegularizeMCClozeFormat(question_set)
                    md5 = HashMCCloze(question_set)
                    if md5 not in visited_md5:
                        visited_md5.add(md5)
                        mc_clozes[os.path.join(source, subdir)].append(question_set)
                    for question in question_set.questions:
                        new_test_points = question.test_point.split(";")
                        for test_point in new_test_points:
                            test_points.add(test_point.strip())
        logging.info(f"After removing repeated questions: Got [{len(mc_clozes[os.path.join(source, subdir)])}] MC Cloze questions")

with open(os.path.join(filtered_output_path, "mc_cloze_test_points.json"), "w") as f:
    json.dump(list(test_points), f, indent=4)

for key, question_sets in mc_clozes.items():
    with open(os.path.join(filtered_output_path, key, str(QuestionType.MC_CLOZE) + ".json"), "w") as f:
        f.write(Exam(question_sets=question_sets).model_dump_json(indent=4))
        logging.info(f"Successfully wrote [{len(question_sets)}] questions to {os.path.join(key, str(QuestionType.MC_CLOZE) + '.json')}")

# Filter FR Cloze

In [None]:
def IsFRCloze(question_set: QuestionSet):
    if len(question_set.questions) == 0:
        return False
    if question_set.type != QuestionType.FR_CLOZE:
        return False
    if len(normalize_text(question_set.context)) < 50:
        return False
    if question_set.context.count("<blank") < 5 or question_set.context.count("<blank") > 10:
        return False
    if question_set.context.count("\n") >= 4: 
        return False
    if question_set.context.count("<blank") - question_set.context.count("\n") < 2:
        return False
    if HasConsecutiveChineseCharacters(question_set.context, 6):
        return False
    if len(re.findall(r"(\d+)\.", question_set.context)) > 3:
        return False
    for question in question_set.questions:
        if len(question.text) > 0:
            return False
        if question.choices.A != "" or question.choices.B != "" or question.choices.C != "" or question.choices.D != "":
            return False
    return True

def HashFRCloze(question_set: QuestionSet) -> str:
    text = question_set.context
    return HashText(text)

In [None]:
def RegularizeFRFormat(question_set: QuestionSet):
    if question_set.type != QuestionType.FR_CLOZE:
        return
    question_set.context = re.sub(r"……", "...", question_set.context)
    question_set.context = re.sub(r"__+", "_", question_set.context)
    question_set.context = re.sub(r"<blank>(\d+)<blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank>\((\d+)\)<blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"\((\d+)\)<blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"\((\d+)\) <blank>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"_+<blank", r"<blank", question_set.context)
    question_set.context = re.sub(r"<blank>_+", r"<blank>", question_set.context)
    question_set.context = re.sub(r"<blank text=\((.*)\)>", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank text=(.*)>_+", r"<blank text=\1>", question_set.context)
    question_set.context = re.sub(r"<blank text=([^>]+)>", lambda m: m.group(0) if m.group(1).isdigit() else "<blank>", question_set.context)

In [None]:
visited_md5 = set()
fr_clozes = collections.defaultdict(list)
test_points = set()

for source in sources:
    for subdir in os.listdir(os.path.join(output_path, source)):
        if not os.path.exists(os.path.join(filtered_output_path, source, subdir)):
            os.makedirs(os.path.join(filtered_output_path, source, subdir), exist_ok=True)
        for file in os.listdir(os.path.join(output_path, source, subdir)):
            if os.path.splitext(file)[1] != ".json":
                continue
            with open(os.path.join(output_path, source, subdir, file), "r") as f:
                exam = Exam.model_validate(json.load(f))
            for question_set in exam.question_sets:
                if IsFRCloze(question_set):
                    RegularizeFRFormat(question_set)
                    md5 = HashFRCloze(question_set)
                    if md5 not in visited_md5:
                        visited_md5.add(md5)
                        fr_clozes[os.path.join(source, subdir)].append(question_set)
                    for question in question_set.questions:
                        new_test_points = question.test_point.split(";")
                        for test_point in new_test_points:
                            test_points.add(test_point.strip())
        logging.info(f"After removing repeated questions: Got [{len(fr_clozes[os.path.join(source, subdir)])}] FR Cloze questions")

with open(os.path.join(filtered_output_path, "fr_cloze_test_points.json"), "w") as f:
    json.dump(list(test_points), f, indent=4)

for key, question_sets in fr_clozes.items():
    with open(os.path.join(filtered_output_path, key, str(QuestionType.FR_CLOZE) + ".json"), "w") as f:
        f.write(Exam(question_sets=question_sets).model_dump_json(indent=4))
        logging.info(f"Successfully wrote [{len(question_sets)}] questions to {os.path.join(key, str(QuestionType.FR_CLOZE) + '.json')}")