# process description  

the program takes in a pdf  
mathpix is used to scan the pdf and turning it into markdown  
markdown then processed to get the images  

llm is used to extract the questions and solutions in **ONE** go.  
the final JSON is made using the in2lambda api.

In [None]:
import os
import re
import json
import time
import requests
import concurrent.futures

from pathlib import Path

from dotenv import load_dotenv
from pydantic import BaseModel, Field, ValidationError
import pypandoc

from langchain_openai import ChatOpenAI
from langchain.output_parsers import PydanticOutputParser

from in2lambda.api.module import Module
from in2lambda.api.question import Question
from in2lambda.api.part import Part

from PIL import Image

# Load environment variables from .env file.
load_dotenv()

# scanning/processing the initial pdf into markdown

In [None]:
MATHPIX_API_KEY = os.getenv("MATHPIX_API_KEY")
MATHPIX_APP_ID = os.getenv("MATHPIX_APP_ID")

def pdf_to_markdown(source_path: str, result_path: str):
    ''' 
    converts the pdf at `source_path` to a markdown file at `result_path` using Mathpix API.
    '''
    # Upload PDF to Mathpix and returns a Markdown file with the content.
    with open(source_path, "rb") as file:
        r = requests.post(
            "https://api.mathpix.com/v3/pdf",   
            headers={
                "app_id": MATHPIX_APP_ID,
                "app_key": MATHPIX_API_KEY,
            },
            files={"file": file},
        )
        pdf_id = r.json()["pdf_id"]
        print("PDF ID:", pdf_id)
        print("Response:", r.json())

        # url of where the location of the processed PDF will be
        url = f"https://api.mathpix.com/v3/pdf/{pdf_id}.md"
        headers = {
            "app_id": MATHPIX_APP_ID,
            "app_key": MATHPIX_API_KEY,
        }

        max_retries = 10
        retry_delay = 5  # seconds
        for attempt in range(max_retries):
            response = requests.get(url, headers=headers)
            if response.status_code == 200:
                # Save the result if the request is successful
                with open(result_path, "w") as f:
                    f.write(response.text)
                print("Downloaded MD successfully.")
                break
            else:
                print(f"Attempt {attempt + 1}/{max_retries}: Processing not complete. Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
        else:
            print("Failed to retrieve processed PDF after multiple attempts:", response.status_code, response.text)

# setting up the directories

In [None]:
# location of the output folder and media folder.
folder_path = "conversion_content"
input_path = f"{folder_path}/input"
output_path = f"{folder_path}/mathpix_to_llm_with_lines_to_api"
media_path = f"{output_path}/media"

# Create output and media directories if they do not exist.
Path(media_path).mkdir(parents=True, exist_ok=True)

# location of the source pdf file and the result markdown file.
files = [f for f in os.listdir(input_path) if f != '.gitkeep']
source_path = f"{input_path}/{files[0]}" # the first file in the input folder
result_path = f"{output_path}/example.md"



# Only activate mathpix if the markdown has not been created yet.
# This avoids unnecessary reprocessing of the same PDF.
if not Path(result_path).exists():
    if Path(source_path).exists():
        extension = Path(source_path).suffix.lower() # obtains the file extension
        if extension == ".pdf":
            pdf_to_markdown(source_path, result_path)
        else:
            pypandoc.convert_file(source_path, 'md', outputfile=result_path)
    else:
        print(f"Error: Source PDF file not found at {source_path}")
        exit(1)

# Read the markdown content from the result file.
try:
    with open(result_path, "r") as f:
        md_content = f.read()
except FileNotFoundError:
    print(f"Error: Markdown file not found at {result_path}")
    exit(1)

# Print out a summary.
print("Markdown text: ")
print(f"  {result_path}: {len(md_content)} characters")
print("Markdown text: ")
print(f"  {result_path}: {md_content}")

# downlaoding extracted images from Mathpix

In [None]:
# Fetch the figures from the paper and answers.
def extract_figures_from_text(text): #, ans=False):
    """
    Extracts figures from the text using regex.
    Finds figure references and their descriptions.
    """
    figures = {}
    # Regex to match figure references and their descriptions
    # Matches ![alt text](url) format for images
    pattern = r'!\[.*?\]\((.*?)\)'
    matches = re.findall(pattern, text)
    print(f"Matches found: {matches}")
    
    for match in matches:
        url = match
        url = url.strip()
        
        if url.startswith("http"):
            # Download the image and save it to a file
            image = Image.open(requests.get(url, stream=True).raw)
            # Create a figure name based on the URL
            fig_name = os.path.basename(url)
            figures[fig_name] = {
                "image": image,
                "url": url,
                "local_path": "",
                # "answerFile": ans
            }
    return figures

# a dictionary storing information on the figures
figures = extract_figures_from_text(md_content)

# saving the images locally

In [None]:
def save_figures_to_path(figures):
    for idx, (fig_name, fig_info) in enumerate(figures.items()):
        print(f"URL='{fig_info['url']}'")

        # Extract file extension and create a clean filename
        # Mathpix leaves image urls like `image.png?width=800&height=600`
        # We only want the base name without query parameters.
        if "?" in fig_name:
            end_location = fig_name.index("?")
            image_name = f"{idx}_{fig_name[:end_location]}"
        else:
            image_name = f"{idx}_{fig_name}"
        
        fig_info["local_path"] = image_name
        try:
            # Saves the image to the media path
            fig_info["image"].save(f"{media_path}/{fig_info['local_path']}")
            print(f"Saved image: {fig_info['local_path']}")
        except Exception as e:
            print(f"Error saving image {image_name}: {e}")

save_figures_to_path(figures)

# replacing url for images with local path

In [None]:
def replace_figures_in_markdown(md_content, figures) -> str:
    #replace the image URLs in the markdown content with local paths
    # add pictureTag for Lambda Feedback to recognise it as a picture
    md_content = md_content.replace("![]", "![pictureTag]")
    for fig_name, fig_info in figures.items():
        md_content = md_content.replace(fig_info["url"], fig_info["local_path"])
        print(f"Replaced {fig_info['url']} with {fig_info['local_path']} in markdown content.")
    # Save the modified markdown content to a file
    try:
        with open(f"{output_path}/example.md", "w") as f:
            f.write(md_content)
        print("Modified markdown saved successfully.")
    except Exception as e:
        print(f"Error saving modified markdown: {e}")
    
    return md_content

md_content = replace_figures_in_markdown(md_content, figures)

# Initialising llm

In [None]:
# Set up the LLM via LangChain.

# Uses gpt-4.1-nano:
#    - a faster model
#    - less intelligent

llm_nano = ChatOpenAI(
            model="gpt-4.1-nano",
            api_key=os.environ["OPENAI_API_KEY"],
        )

# Uses gpt-4.1-mini:
#    - more intelligent
llm_mini = ChatOpenAI(
            model="gpt-4.1-mini",
            api_key=os.environ["OPENAI_API_KEY"],
        )

# Spelling and structure check

In [None]:
llm_task_correct_mistakes = """
The input is a markdown file that is converted from a pdf using Mathpix API.
The pdf contains questions and may contain the solutions too.
As the original pdf may contain hand written text, the markdown file may contain mistakes in spelling, grammar and structure.

Important things to remember:
    1. Leave all Math commands and LaTeX formatting the same. As they are completely valid. Do not change the LaTeX formatting and expressions.
    2. Only ever use LaTeX math delimiters for math expressions. I.e. use `$...$` for inline math, and `$$...$$` for display math.
    3. Leave references to images and figures the same. I.e. do not change the image links or alt text.

Your task is to:
    1. Correct any spelling mistakes in the markdown file.
    2. Correct any grammar mistakes in the markdown file.
    3. Correct any layout mistakes in the markdown file, such that it follows the styles of the entire markdown file.
    4. Do not change the content of the markdown file, only correct the mistakes.
Output only a valid markdown file with the corrections applied, if any. Do not add any additional text or comments.
"""

def correct_mistakes_in_markdown(md_content: str) -> str:
    correct_mistakes_prompt = f"""
        {llm_task_correct_mistakes}

        ```input
        {md_content}
        ```

        Return the markdown now.
    """

    response = llm_nano.invoke(correct_mistakes_prompt)
    print("Corrected markdown content:")
    print(response.content.strip())

    return response.content.strip()

# Extract Questions

In [None]:
#define initial question model
class QuestionModelLines(BaseModel):
    # full question and full solution
    question_content_start: int = Field(..., description="Line number the question starts on.")
    question_content_end: int = Field(..., description="Line number the question ends on.")
    solution_content_start: int = Field(..., description="Line number the solution starts on.")
    solution_content_end: int = Field(..., description="Line number the solution ends on.")

class AllQuestionsModelLines(BaseModel):
    name: str = Field(..., description="Title of the set")
    year: str = Field(..., description="Year of the set")
    questions: list[QuestionModelLines] = Field(..., description="A list of questions.")

llm_task_seperate_questions = """
    Your task is to extract the line numbers for the start and end of each question and solution from the markdown file, then format it as a JSON object.
    These line numbers will be used later to extract the content of the questions and solutions procedurally.
    
    1.  **Content Extraction:**
        -   Your may choose a suitable name for the set of questions.
        -   Identify the `year` of the questions, otherwise use "0".
        -   Begin by Identifying the questions in the markdown file, and for each question:
            -   Identify the start and end line numbers of the full question content, and place them in `question_content_start` and `question_content_end`.
            -   Identify the start and end line numbers of the full relevant solution content, and place them in `solution_content_start` and `solution_content_end`.
            -   Be careful to ensure that everything related to the question and solution is included, including any math delimiters and LaTeX formatting.
            -   Do not forget to include any images or figures that are part of the question or solution.
    
    2.  **Output Format:**
        -   You MUST output ONLY a single, raw, valid JSON string that matches the provided schema.
        -   Do NOT include any explanations, comments, or markdown code blocks (like ```json).
    """

def llm_extract_questions_lines(doc_page_content: list[str]) -> dict:
    # Initialise the parser for the output.
    parser = PydanticOutputParser(pydantic_object=AllQuestionsModelLines)

    # Prompt for the LLM to extract questions.
    seperate_questions_prompt = f"""
        Your task is to extract a JSON with the following structure exactly, ready to be parsed by a pydantic model:
        {parser.get_format_instructions()}

        {llm_task_seperate_questions}

        Input markdown:
        ```
        {list(enumerate(doc_page_content))}
        ```
        Return the JSON now.
    """

    for attempt_idx in range(3):
        try:
            response = llm_mini.invoke(seperate_questions_prompt)
            return parser.parse(response.content).model_dump()
        except ValidationError as e:
            print(f"Validation error on attempt {attempt_idx + 1}: {e}")
            if attempt_idx == 2:
                raise e
            else:
                print("Retrying...")

In [None]:
def extract_images(text: str) -> list[str]:
    """
    Extracts image URLs from the markdown text.
    Returns a list of image URLs.
    """
    pattern = r'!\[.*?\]\((.*?)\)'
    matches = re.findall(pattern, text)
    return matches

In [None]:
class QuestionModel(BaseModel):
    # full question and full solution
    question_content: str = Field(..., description="The content of the question.")
    solution_content: str = Field(..., description="The content of the solution.")
    images: list[str] = Field(..., description="A list of image URLs associated with the question.")

class AllQuestionsModel(BaseModel):
    name: str = Field(..., description="Title of the set")
    year: str = Field(..., description="Year of the set")
    questions: list[QuestionModel] = Field(..., description="A list of questions.")


def extract_questions(allQuestionsModel: dict, doc_page_content: list[str]) -> dict:
    """
    Extracts questions from the AllQuestions model and returns a list of Question objects.
    """
    name = allQuestionsModel["name"]
    year = allQuestionsModel["year"]
    questions = []

    for question in allQuestionsModel["questions"]:
        question_content = "\n".join(doc_page_content[question["question_content_start"]:question["question_content_end"]+1])
        solution_content = "\n".join(doc_page_content[question["solution_content_start"]:question["solution_content_end"]+1])
        #important, image will be wrong if two identical images are used, although this should not be possible.
        images = list(set(extract_images(question_content) + extract_images(solution_content)))

        questions.append(
            QuestionModel(
                question_content=question_content,
                solution_content=solution_content,
                images=images
            )
        )
    
    allQuestions = AllQuestionsModel(
        name=name,
        year=year,
        questions=questions
    )
    return allQuestions.model_dump()

# Extract question parts and solutions

In [None]:
# Define the schema for the tutorial output.
class Set_Question_Part_Lines(BaseModel):
    """
    Represents a part of a question with its start and end lines.
    """
    part_start: int = Field(..., description="The start line number of the part.")
    part_end: int = Field(..., description="The end line number of the part.")
    
class Set_Question_Lines(BaseModel):
    title: str = Field(..., description="Title of the question (only the text, no numbering)")
    content_start: int = Field(..., description="start of the content of the question (no exercise title, no subquestions)")
    content_end: int = Field(..., description="end of the content of the question (no exercise title, no subquestions)")
    parts: list[Set_Question_Part_Lines] = Field(..., description="List of parts within the question (only the text, no numbering)")

class Set_Question(BaseModel):
    title: str = Field(..., description="Title of the question (only the text, no numbering)")
    content: str = Field(..., description="Content of the question (no exercise title, no subquestions)")
    parts: list[str] = Field(..., description="List of parts within the question (only the text, no numbering)")
    images: list[str] = Field(..., description="List of image URLs associated with the question (no alt text, only URLs)")


class Set_Solution_Part_Lines(BaseModel):
    part_solution_start: int = Field(..., description="The start of the worked solution for the part (no numbering or counting)")
    part_solution_end: int = Field(..., description="The end of the worked solution for the part (no numbering or counting)")

class Set_Solution(BaseModel):
    parts_solutions: list[str] = Field(..., description="List of worked solutions for the question (no numbering or counting)")


class Set_Question_With_Solution(Set_Question):
    parts_solutions: list[str] = Field(..., description="The worked solution for the parts.")

    def __init__(self, question: Set_Question, solution: Set_Solution):
        """
        Initialize the Set_Question_With_Solution with a question and its solution.
        
        Args:
            question (Set_Question): The question object.
            solution (Set_Solution): The solution object.
        """
        super().__init__(
            **question.model_dump(),
            parts_solutions=solution.parts_solutions,
        )


class Set_Lines(BaseModel):
    name: str = Field(..., description="Title of the set")
    year: str = Field(..., description="Year of the set")
    questions: list[Set_Question_With_Solution] = Field(..., description="List of questions in the set")


In [None]:
def convert_set_question_lines_to_set_question(set_question_lines: Set_Question_Lines, question_content: list[str], images: list[str] = []) -> Set_Question:
    """
    Convert Set_Question_Lines to Set_Question.
    """
    return Set_Question(
        title=set_question_lines.title,
        content="\n".join(question_content[set_question_lines.content_start:set_question_lines.content_end + 1]),
        parts=["\n".join(question_content[part.part_start:part.part_end + 1]) for part in set_question_lines.parts],
        images=images
    )

def convert_set_solution_lines_to_set_solution(set_solution_lines: list[Set_Solution_Part_Lines], solution_content: list[str]) -> Set_Solution:
    """
    Convert Set_Solution_Part_Lines to Set_Solution.
    """
    return Set_Solution(
        parts_solutions=[
            "\n".join(solution_content[part.part_solution_start:part.part_solution_end + 1])
            for part in set_solution_lines
        ]
    )


In [None]:

llm_task_seperate_parts_question = r"""
    1. **Content Extraction:**
        -   You may choose the `title` for the question.
        -   From the input `Full Question Content`, identify the start line and end line for the main introductory text (the stem), place them in `content_start` and `content_end`. 
        -   From the input `Full Question Content`, identify and separate all the `parts`(sub-questions), they could be explicit (e.g. using, "(a)", "(b)", "i.", "ii."... etc.), but may also be implied. For each identified sub-question:
            -   Place the start line going into `part_start` and the end line going into `part_end`.
            -   If the question has no sub-questions, leave `part_start` as 0 and `part_end` as -1.
            -   You may use the `Full Solution Content` to help with identifying the parts.
        -   Be careful to ensure that everything related to the question stem/parts is included, including any math delimiters and LaTeX formatting.
        -   Do not forget to include any images or figures that are part of the question stem, parts or solution.
        -   Ensure no solution content is included in the `content` or `parts` fields.
    
    2.  **Output Format:**
        -   You MUST output ONLY a single, raw, valid JSON string that matches the provided schema.
        -   Do NOT include any explanations, comments, or markdown code blocks (like ```json).
    """

llm_task_seperate_parts_solution = r"""
    1. **Content Extraction:**
        -   From the input `full solution content`, identify the specific solution part that corresponds to the `target question part`, and place the start line and end line into `part_solution_start` and `part_solution_end`.
        -   If the `target question part` is empty, identify the specific solution part that corresponds to the `full question stem`.
        -   Use the `full question stem` and `full question parts` to help identify the specific solution part.
        -   Ensure that the `target question part` is used to extract the specific solution part.
        -   Be careful to ensure that everything related to the solution part is included, including any math delimiters and LaTeX formatting.
        -   Do not forget to include any images or figures that are part of the solution.

    2.  **Output Format:**
        -   You MUST output ONLY a single, raw, valid JSON string that matches the provided schema.
        -   Do NOT include any explanations, comments, or markdown code blocks (like ```json).
    """

def process_single_question(question_data):
    """Process a single question and its parts in parallel"""
    question_idx, question = question_data
    
    # Initialize the output parser with the Set_Question schema.
    question_parser = PydanticOutputParser(pydantic_object=Set_Question_Lines)

    question_input: list[str] = question["question_content"].splitlines()
    solution_input: str = question["solution_content"]
    all_images = question["images"]

    # Prompt for the LLM to extract The question parts.
    # Use the full question content and the images to extract the parts.
    seperate_parts_question_prompt = f"""
        Your task is to extract a JSON with the following structure exactly, ready to be parsed by a pydantic model:
        {question_parser.get_format_instructions()}

        {llm_task_seperate_parts_question}

        Full Solution Content:
        {solution_input}

        Full Question Content:
        {list(enumerate(question_input))}

        Return the JSON now.
        """
    
    # Process the question part
    for attempt_idx in range(3):

        response = llm_mini.invoke(seperate_parts_question_prompt)

        try:
            parsed_output_parts = question_parser.parse(response.content)
            print(f"LLM response successfully parsed question {question_idx + 1}.")
            break
        except Exception as e:
            print(f"Error parsing LLM response as JSON for question {question_idx + 1}:")
            print(f"Retrying... Attempt No.{attempt_idx + 1}")
            time.sleep(2)
    else:
        print("Final LLM Response:")
        print(response.content)
        raise Exception(f"Failed to parse LLM response as JSON after multiple attempts for question {question_idx + 1}.")

    # Convert from Set_Question_Lines to Set_Question
    parsed_output_parts = convert_set_question_lines_to_set_question(parsed_output_parts, question_input, all_images)

    # Process solution parts in parallel
    def process_solution_part(part_data) -> Set_Solution_Part_Lines:
        part_idx, part = part_data
        solution_parser = PydanticOutputParser(pydantic_object=Set_Solution_Part_Lines)

        target_solution_input: list[str] = solution_input.splitlines()

        # Prompt for the LLM to extract The solution part.
        # Use the full solution content and the part to extract the specific solution.
        seperate_parts_solution_prompt = f"""
            Your task is to extract a JSON with the following structure exactly, ready to be parsed by a pydantic model:
            {solution_parser.get_format_instructions()}

            {llm_task_seperate_parts_solution}

            full question stem:
            {parsed_output_parts.content}

            full question parts:
            {parsed_output_parts.parts}
            
            full solution content:
            {list(enumerate(target_solution_input))}

            target question part:
            {part}
            """
            
        for attempt_idx in range(3):
            
            response = llm_mini.invoke(seperate_parts_solution_prompt)
            
            try:
                parsed_output_solution_part = solution_parser.parse(response.content)
                print(f"LLM response successfully parsed solution for part {part_idx + 1} of question {question_idx + 1}.")
                return parsed_output_solution_part
            except Exception as e:
                print(f"Error parsing LLM response as JSON for part {part_idx + 1} of question {question_idx + 1}:")
                print(f"Retrying... Attempt No.{attempt_idx + 1}")
                time.sleep(2)
        
        else:
            print("Final LLM Response:")
            print(response.content)
            raise Exception(f"Failed to parse LLM response as JSON after multiple attempts part {part_idx + 1} of question {question_idx + 1}:")

    # Process all parts in parallel
    with concurrent.futures.ThreadPoolExecutor() as executor:
        part_data_list = [(i, part) for i, part in enumerate(parsed_output_parts.parts)]
        solutions_parts = list(executor.map(process_solution_part, part_data_list))

    solutions_parts = convert_set_solution_lines_to_set_solution(
        solutions_parts, 
        solution_input.splitlines()
    )

    # set_solution = Set_Solution(parts_solutions=solutions_parts)
    return Set_Question_With_Solution(
        question=parsed_output_parts,
        solution=solutions_parts
    )

def extract_parts_question(questions_dict: dict) -> dict:
    """
    Extracts the title and individual questions from a tutorial sheet.
    Now processes questions in parallel while maintaining order.
    """

    # Process all questions in parallel
    with concurrent.futures.ThreadPoolExecutor() as executor:
        question_data_list = [(i, q) for i, q in enumerate(questions_dict["questions"])]
        questions_in_parts = list(executor.map(process_single_question, question_data_list))

    return Set_Lines(
        name=questions_dict["name"],
        year=questions_dict["year"],
        questions=questions_in_parts
    ).model_dump()

In [None]:
def md_to_json(md_content: str) -> dict:
    """
    Extracts the title and individual questions from a tutorial sheet.
    
    Args:
        md_content (str): The content of a set.
        
    Returns:
        dict: A dictionary containing the keys "name" and "exercise".
              If parsing fails, returns None.
    """

    md_content_lines = md_content.splitlines()

    # corrected_md_content = correct_mistakes_in_markdown(md_content)
    # print("Markdown content corrected for spelling, grammar, and structure.")

    questions_dict_lines = llm_extract_questions_lines(md_content_lines)
    print("Successfully extracted the lines for questions and solutions from the markdown lines. Now extracting the questions...")

    questions_dict = extract_questions(questions_dict_lines, md_content_lines)
    print((json.dumps(questions_dict)))
    print("successfully extracted the questions from the markdown. Now extracting the parts...")

    extracted_dict = extract_parts_question(questions_dict)
    print("succesfully extracted the parts from the questions.")
    print(json.dumps(extracted_dict, indent=2))
    print("Now validating the content...")

    return extracted_dict

In [None]:
full_json_question_set = md_to_json(md_content)

# Displaying questions

In [None]:
# Extract title
title = full_json_question_set["name"] + " " + full_json_question_set["year"]

# Print the title
print(f"Title: {title}\n")

# Extract questions
questions = full_json_question_set["questions"]

# Loop over and print each question
for question_idx, question in enumerate(questions, start=1):
    print(f"**Question {question_idx}**:\n{question.get('title')}\n")
    print(f"Content: {question.get('content')}\n")
    for part_idx, (part_question, part_answer) in enumerate(zip(question.get("parts", []), question.get("parts_solutions", [])), start=1):
        print(f"Question {question_idx}:")
        print(f"- Subquestion {part_idx}: {part_question}")
        print(f"- Worked Solution {part_idx}: {part_answer}")
        print("\n")
    print("-" * 40)  # Separator for readability

# in2lambda to JSON

In [None]:
questions = full_json_question_set["questions"]

in2lambda_questions = []

# Loop over all questions and question_answers and use in2lambda API to create a JSON.
for question_idx, question_dict in enumerate(questions, start=1):
    parts = []

    for part_question, part_solution in zip(question_dict.get("parts", []), question_dict.get("parts_solutions", [])):
        part_obj = Part(
            text=part_question,
            worked_solution=part_solution
        )
        parts.append(part_obj)

    # Handle image paths - ensure they exist
    image_paths = []
    for img in question_dict.get("images", []):
        if img.startswith("http"):
            # Skip URLs that weren't processed
            continue
        full_path = f"{media_path}/{img}"
        if Path(full_path).exists():
            image_paths.append(full_path)
        else:
            print(f"Warning: Image file not found: {full_path}")

    question = Question(
        title=question_dict.get("title", f"Question {question_idx}"),
        main_text=question_dict.get("content", ""),
        parts=parts,
        images=image_paths
    )
    in2lambda_questions.append(question)

try:
    Module(in2lambda_questions).to_json(f"{output_path}/out")
    print("JSON output successfully created.")
except Exception as e:
    print(f"Error creating JSON output: {e}")