# Question Generator

In [None]:
# Import necessary libraries
import os
import re
import json
import pandas as pd
import google.generativeai as genai
import textstat

from datetime import datetime
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
# Initialize GenAI
genai.configure(api_key="AIzaSyCSC0LPUznCj0USGxAVXjXT_4vgVqp-ah4")
model = genai.GenerativeModel("gemini-2.5-flash")

In [None]:
# Configuration
QUESTION_TYPE_CSV = "model_training/processed_data/questionType.csv"
GENERATED_JSON = "model_training/processed_data/generated_questions.json"
WORD_CSV = "model_training/processed_data/ielts_vocab.csv"
TRAINING_CSV = "model_training/processed_data/training_set.csv"
TEMP_CSV = "model_training/processed_data/temp_generated_questions.json"

MAX_ATTEMPT = 5
REWARD_GOAL = 4

In [None]:
# Load Data
question_type_df = pd.read_csv(QUESTION_TYPE_CSV)
common_vocab_df = pd.read_csv(WORD_CSV)
training_df = pd.read_csv(TRAINING_CSV)

In [None]:
# Mock Data Only
section_choices = {
    1: [
        {
            "typeID": "T001",
            "theme": "Booking a hotel room",
            "topic": "Asking about room facilities",
            "spec": "Include questions about availability and prices."
        }
    ],
    2: [
        {
            "typeID": "T004",
            "theme": "Campus tour",
            "topic": "Identifying main buildings",
            "spec": "Include left/right directions."
        }
    ],
    3: [
        {
            "typeID": "T006",
            "theme": "Group project planning",
            "topic": "Assigning tasks",
            "spec": "Include four students."
        }
    ],
    4: [
        {
            "typeID": "T009",
            "theme": "AI in communication",
            "topic": "Impact of AI",
            "spec": "Use academic style and one real example."
        }
    ]
}

In [None]:
# Prompt Template
PROMPT_TEMPLATE = """
You are an expert IELTS Listening question generator.
Create realistic IELTS Listening questions and transcripts following the official format.

--- QUESTION REQUIREMENTS ---
Section: {section}
Question Type: {typeID} - {type_name}
Question Numbers: {question_range}
Number of Questions: {question_count}

Theme: {theme}
Specific Topic: {specific_topic}
Additional Specifications from Test Creator: {specifications}

Instructions to Display: {instruction}
Expected Answer Format: {answer_format}
Format Rules: {format}
Key Listening Skills: {key_skills}
Typical Duration: {avg_duration}
Expected Transcript Length: {avg_script_length} words
Audio Speed: {audio_speed}
Key Features: {key_features}

--- OUTPUT REQUIREMENTS ---
1. Produce exactly {question_count} questions.
2. Output MUST be valid JSON ONLY, with these keys:
   "Section", "Type", "Instructions", "Diagram",
   "Questions", "Answers", "Options", "Transcript".
3. "Questions" must be a list of strings.
4. "Answers" must be a list of strings of equal length.
5. For multiple-choice types, include "Options" (list of lists).
6. The diagram should be drawn in the characters and plain text only.
7. Transcript MUST naturally reference ALL question numbers in {question_range}.
8. No Markdown. No explanations. JSON ONLY.

Return the JSON format only.
"""

In [None]:
# JSON Parser
def safe_json_parse(raw):
    if not raw: return None
    raw = raw.replace("```json", "").replace("```", "").strip()
    try:
        return json.loads(raw)
    except:
        return None

In [None]:
# Reward Functions
def calculate_readability_score(text):
    return textstat.flesch_reading_ease(str(text))

def is_in_average_word_count(text, section_label):
    if not text: return False
    words = re.findall(r'\b\w+\b', str(text))
    wc = len(words)
    expected = {
        "Section 1": (500, 700),
        "Section 2": (600, 800),
        "Section 3": (800, 1000),
        "Section 4": (1000, 1200)
    }
    low, high = expected.get(section_label, (0, 99999))
    return low <= wc <= high

def calculate_common_word_ratio(text):
    common_vocab = set(common_vocab_df["Words"].str.lower())
    words = [w.lower() for w in re.findall(r'\b\w+\b', str(text))]
    if not words: return 0
    uncommon = [w for w in words if w not in common_vocab]
    return len(uncommon) / len(words)

def calculate_similarity(text):
    existing_texts = []
    if "transcript" in training_df.columns:
        existing_texts += training_df["transcript"].dropna().astype(str).tolist()
    if os.path.exists(GENERATED_JSON):
        with open(GENERATED_JSON, "r", encoding="utf-8") as f:
            saved_data = json.load(f)
            existing_texts += [item.get("Transcript","") for item in saved_data]
    if not existing_texts:
        return 0.0
    corpus = existing_texts + [text]
    vec = TfidfVectorizer().fit_transform(corpus)
    sims = cosine_similarity(vec[-1], vec[:-1]).flatten()
    return max(sims) if len(sims) > 0 else 0.0

In [None]:
# Question number calculation
def get_question_counts(types):
    if len(types) == 1:
        return {types[0]: 10}
    return {types[0]: 5, types[1]: 5}

def number_ranges(counts, section_num):
    start = (section_num - 1) * 10 + 1
    ranges = {}
    cur = start
    for t, c in counts.items():
        ranges[t] = f"{cur}-{cur+c-1}"
        cur += c
    return ranges

In [None]:
# Model Call
def model_generate(prompt):
    response = model.generate_content(prompt)
    return safe_json_parse(response.text)

def generate_full_set(section_choices):
    all_results = []

    dt_key = datetime.now().strftime("%Y_%m_%d_%H_%M")

    for section_label, entries in section_choices.items():
        section_num = int(re.search(r"\d+", str(section_label)).group())
        # Map typeIDs to counts
        types = [e["typeID"] for e in entries]
        counts = get_question_counts(types)
        ranges = number_ranges(counts, section_num)

        for entry in entries:
            typeID = entry["typeID"]
            theme = entry["theme"]
            topic = entry["topic"]
            spec = entry["spec"]

            # Find the type info by typeID
            type_row = question_type_df[question_type_df["typeID"] == typeID]
            if type_row.empty:
                print(f" WARNING: typeID '{typeID}' not found in question_type_df. Using placeholder info.")
                type_info = {
                    "type": f"Unknown Type ({typeID})",
                    "instruction": "Follow standard instructions.",
                    "answer_format": "List of answers",
                    "format": "Text",
                    "key_skills": "Listening",
                    "avg_duration": "3-4 min",
                    "avg_script_length": "600",
                    "key_features": "IELTS standard",
                    "audio_speed": "Normal"
                }
            else:
                type_info = type_row.iloc[0]

            q_type_name = type_info["type"]
            question_count = counts[typeID]
            question_range = ranges[typeID]

            best_reward = -99
            best_json = None

            for attempt in range(1, MAX_ATTEMPT + 1):
                print(f"\n[GENERATING] {section_label} - {q_type_name} Attempt {attempt}")

                prompt = PROMPT_TEMPLATE.format(
                    section=section_label,
                    question_range=question_range,
                    question_count=question_count,
                    typeID=typeID,
                    type_name=q_type_name,
                    theme=theme,
                    specific_topic=topic,
                    specifications=spec,
                    instruction=type_info["instruction"],
                    answer_format=type_info["answer_format"],
                    format=type_info["format"],
                    key_skills=type_info["key_skills"],
                    avg_duration=type_info["avg_duration"],
                    avg_script_length=type_info["avg_script_length"],
                    key_features=type_info["key_features"],
                    audio_speed=type_info["audio_speed"],
                )

                model_json = model_generate(prompt)

                if not isinstance(model_json, dict):
                    print("  Invalid JSON, using placeholder")
                    continue

                transcript = model_json.get("Transcript", "")
                reward = 0
                if calculate_readability_score(transcript) >= 55: reward += 1
                if is_in_average_word_count(transcript, section_label): reward += 1
                if calculate_common_word_ratio(transcript) >= 0.1: reward += 1
                if calculate_similarity(transcript) <= 0.85: reward += 1

                print(f" -> Reward: {reward}")

                if reward > best_reward:
                    best_reward = reward
                    best_json = model_json

                if reward == REWARD_GOAL:
                    break

            # Fallback placeholder
            if best_json is None:
                best_json = {
                    "Section": section_label,
                    "Type": q_type_name,
                    "Instructions": type_info["instruction"],
                    "Diagram": None,
                    "Questions": [f"Placeholder Q{i}" for i in range(1, question_count+1)],
                    "Answers": [f"Answer_{i}" for i in range(1, question_count+1)],
                    "Options": [None]*question_count,
                    "Transcript": f"Placeholder transcript {question_range}"
                }

            all_results.append(best_json)

    wrapped_output = {dt_key: all_results}

    # Save temp copy
    with open(TEMP_CSV, "w", encoding="utf-8") as f:
        json.dump(wrapped_output, f, indent=2, ensure_ascii=False)

    # Save or append into master JSON
    if os.path.exists(GENERATED_JSON):
        with open(GENERATED_JSON, "r", encoding="utf-8") as f:
            existing = json.load(f)
    else:
        existing = {}

    existing[dt_key] = all_results

    with open(GENERATED_JSON, "w", encoding="utf-8") as f:
        json.dump(existing, f, indent=2, ensure_ascii=False)

    print(f"\n Full question set saved under key {dt_key} in {GENERATED_JSON}")
    return wrapped_output

In [None]:
# Main Pipeline
questions = generate_full_set(section_choices)

# View Question Generated

In [None]:
print(questions)

# Save to Files

In [None]:
# Import necessary libraries
import os
import re
import json

from fpdf import FPDF

In [None]:
# Configurations
TEMP_JSON = "model_training/processed_data/temp_generated_questions.json"

DEJAVUSANS_FONT = "frontend/fonts/DejaVuSans.ttf"

In [None]:
# Determine next set folder
# Make sure the the folder exists
base_folder = "sets"
os.makedirs(base_folder, exist_ok=True)

# Find existing set numbers
existing = [int(re.search(r"set(\d+)", d).group(1)) for d in os.listdir(base_folder) if re.match(r"set\d+", d)]
next_set = max(existing, default=0) + 1
set_folder = os.path.join(base_folder, f"set{next_set}")
os.makedirs(set_folder, exist_ok=True)

In [None]:
# PDF Class
class PDF(FPDF):
    def header(self):
        self.set_font("DejaVu", "B", 12)
        self.cell(0, 10, "Generated Questions", ln=True, align="C")
        self.ln(5)

    def add_section(self, section_data, include_answers=True, include_transcript=True):
        # Part header
        self.set_font("DejaVu", "B", 11)
        self.cell(0, 8, f"Part {section_data['Section']} - {section_data['Type']}", ln=True)

        # Instructions
        self.set_font("DejaVu", "", 10)
        self.multi_cell(0, 6, f"Instructions: {section_data['Instructions']}")
        
        # Diagram if exists
        if section_data.get("Diagram") and section_data["Diagram"].strip() != "":
            self.multi_cell(0, 6, f"Diagram:\n{section_data['Diagram']}")
        self.ln(2)

        # Questions (+ answers if requested)
        for i, q in enumerate(section_data["Questions"], start=1):
            text = f"{section_data['Section']}.{i}. {q}"
            if include_answers:
                text += f"  â†’ Answer: {section_data['Answers'][i-1]}"
            self.multi_cell(0, 6, text)
        self.ln(3)

        # Transcript
        if include_transcript and section_data.get("Transcript"):
            self.multi_cell(0, 6, f"Transcript:\n{section_data['Transcript']}")
        self.add_page()


In [None]:
# 1. Full Set PDF
pdf_full = PDF()
pdf_full.add_font("DejaVu", "", DEJAVUSANS_FONT, uni=True)
pdf_full.add_font("DejaVu", "B", DEJAVUSANS_FONT, uni=True)
pdf_full.set_auto_page_break(auto=True, margin=15)
pdf_full.add_page()
pdf_full.set_font("DejaVu", "", 10)

for section in data:
    pdf_full.add_section(section, include_answers=True, include_transcript=True)

pdf_full_path = os.path.join(set_folder, "full_set.pdf")
pdf_full.output(pdf_full_path)

In [None]:
# 2. Questoins Only PDF
pdf_q = PDF()
pdf_q.add_font("DejaVu", "", DEJAVUSANS_FONT, uni=True)
pdf_q.add_font("DejaVu", "B", DEJAVUSANS_FONT, uni=True)
pdf_q.set_auto_page_break(auto=True, margin=15)
pdf_q.add_page()
pdf_q.set_font("DejaVu", "", 10)

for section in data:
    pdf_q.add_section(section, include_answers=False, include_transcript=False)

pdf_q_path = os.path.join(set_folder, "questions_only.pdf")
pdf_q.output(pdf_q_path)

In [None]:
# 3. Transcript Only TXT
trans_path = os.path.join(set_folder, "transcripts_only.txt")
with open(trans_path, "w", encoding="utf-8") as f_trans:
    for section in data:
        f_trans.write(f"Part {section['Section']}\n")
        f_trans.write(section.get("Transcript", "") + "\n\n")

In [None]:
# 4. Questions Only TXT
questions_txt_path = os.path.join(set_folder, "questions_only.txt")
with open(questions_txt_path, "w", encoding="utf-8") as f_qtxt:
    for section in data:
        f_qtxt.write(f"Part {section['Section']} - {section['Type']}\n")
        f_qtxt.write(f"Instructions: {section['Instructions']}\n")
        
        # Diagram if exists
        if section.get("Diagram") and section["Diagram"].strip() != "":
            f_qtxt.write(f"Diagram:\n{section['Diagram']}\n")
        
        for i, q in enumerate(section["Questions"], start=1):
            f_qtxt.write(f"{section['Section']}.{i}. {q}\n")
        f_qtxt.write("\n")