In [1]:
from pathlib import Path
import re

from tqdm.auto import tqdm
from lxml import etree
import spacy
from spacy.tokens import DocBin, Doc, Span
from collections import defaultdict

# Initialize spaCy
nlp = spacy.load("en_core_web_trf")

# Set up custom attributes for error spans
if not Span.has_extension("correction"):
    Span.set_extension("correction", default=None)
if not Span.has_extension("original"):
    Span.set_extension("original", default=None)

In [2]:
# Collect all XML files
dataset_dir = Path("../data/fce-released-dataset/dataset/")
xml_files = sorted(dataset_dir.glob("**/doc*.xml"))

print(f"Found {len(xml_files)} XML files")
print(f"First file: {xml_files[0]}")

Found 1249 XML files
First file: ../data/fce-released-dataset/dataset/0100_2000_12/.ipynb_checkpoints/doc1000-checkpoint.xml


In [3]:
def safe_find_text(element, path, default=None):
    """
    Safely find an XML element and get its text.

    Returns default if element not found or has no text.
    """
    found = element.find(path)
    if found is not None and found.text is not None:
        return found.text
    return default

In [9]:
def extract_text_and_errors(
    element, original_chars, corrected_chars, errors, orig_offset=0, corr_offset=0
):
    """
    Recursively extract text and error annotations from XML element.

    Args:
        element: lxml element to process
        original_chars: list to accumulate original text characters
        corrected_chars: list to accumulate corrected text characters
        errors: list to accumulate error annotations
        orig_offset: current offset in original text
        corr_offset: current offset in corrected text

    Returns:
        tuple: (new_orig_offset, new_corr_offset)
    """
    # Add any text directly in this element
    if element.text:
        text = element.text
        original_chars.append(text)
        corrected_chars.append(text)
        orig_offset += len(text)
        corr_offset += len(text)

    # Process child elements
    for child in element:
        if child.tag == "NS":  # Error annotation
            error_type = child.get("type")

            # Find <i> (incorrect) and <c> (correct) children
            i_elem = child.find("i")
            c_elem = child.find("c")

            incorrect_text = ""
            correct_text = ""

            # Record start position in original text
            error_start = orig_offset

            # Handle nested errors in <i> tag
            if i_elem is not None:
                # Recursively process incorrect portion
                i_chars_orig = []
                i_chars_corr = []
                nested_errors = []

                if i_elem.text:
                    i_chars_orig.append(i_elem.text)
                    i_chars_corr.append(i_elem.text)

                # Process nested NS tags
                for nested in i_elem:
                    if nested.tag == "NS":
                        nested_type = nested.get("type")
                        nested_i = nested.find("i")
                        nested_c = nested.find("c")

                        nested_start = len("".join(i_chars_orig))

                        if nested_i is not None and nested_i.text:
                            i_chars_orig.append(nested_i.text)
                            if nested_i.tail:
                                i_chars_orig.append(nested_i.tail)

                        if nested_c is not None and nested_c.text:
                            i_chars_corr.append(nested_c.text)
                            if nested_c.tail:
                                i_chars_corr.append(nested_c.tail)

                        nested_end = len("".join(i_chars_orig))

                        # Record nested error (relative to parent error start)
                        nested_errors.append(
                            {
                                "start": error_start + nested_start,
                                "end": error_start + nested_end,
                                "type": nested_type,
                                "incorrect": nested_i.text
                                if nested_i is not None
                                else "",
                                "correct": nested_c.text
                                if nested_c is not None
                                else "",
                            }
                        )

                    if nested.tail:
                        i_chars_orig.append(nested.tail)
                        i_chars_corr.append(nested.tail)

                if i_elem.tail:
                    i_chars_orig.append(i_elem.tail)
                    i_chars_corr.append(i_elem.tail)

                incorrect_text = "".join(i_chars_orig)

                # Add to original text
                original_chars.append(incorrect_text)
                orig_offset += len(incorrect_text)

                # Add nested errors
                errors.extend(nested_errors)

            # Handle correction
            if c_elem is not None:
                # Process correction (might also have nested elements)
                c_chars = []
                if c_elem.text:
                    c_chars.append(c_elem.text)
                for nested in c_elem:
                    if nested.tag == "NS":
                        nested_c = nested.find("c")
                        if nested_c is not None and nested_c.text:
                            c_chars.append(nested_c.text)
                    if nested.tail:
                        c_chars.append(nested.tail)
                if c_elem.tail:
                    c_chars.append(c_elem.tail)

                correct_text = "".join(c_chars)
                corrected_chars.append(correct_text)
                corr_offset += len(correct_text)
            else:
                # Deletion - nothing in corrected text
                pass

            # Record the error
            error_end = orig_offset
            if error_start < error_end:  # Only record if there's actually text
                errors.append(
                    {
                        "start": error_start,
                        "end": error_end,
                        "type": error_type,
                        "incorrect": incorrect_text,
                        "correct": correct_text,
                    }
                )
            elif i_elem is None and c_elem is not None:
                # Insertion - no text in original, but text in corrected
                errors.append(
                    {
                        "start": error_start,
                        "end": error_start,  # Zero-width span
                        "type": error_type,
                        "incorrect": "",
                        "correct": correct_text,
                    }
                )

        elif child.tag == "p":
            # Paragraph - recurse
            orig_offset, corr_offset = extract_text_and_errors(
                child, original_chars, corrected_chars, errors, orig_offset, corr_offset
            )
            # Add newline after paragraph
            original_chars.append("\n")
            corrected_chars.append("\n")
            orig_offset += 1
            corr_offset += 1

        # Process tail text (text after this element)
        if child.tail:
            original_chars.append(child.tail)
            corrected_chars.append(child.tail)
            orig_offset += len(child.tail)
            corr_offset += len(child.tail)

    return orig_offset, corr_offset


def parse_answer(answer_elem) -> tuple[str, str, list[dict], dict]:
    """
    Parse a single answer element.

    Returns:
        tuple: (original_text, corrected_text, errors, metadata)
    """
    question_num = safe_find_text(answer_elem, "question_number")
    exam_score = safe_find_text(answer_elem, "exam_score")

    if question_num is None or exam_score is None:
        raise ValueError(f"Missing question_number or exam_score")

    # Extract text and errors
    coded_answer = answer_elem.find("coded_answer")
    if coded_answer is None:
        raise ValueError("Missing coded_answer element")

    original_chars = []
    corrected_chars = []
    errors = []

    extract_text_and_errors(coded_answer, original_chars, corrected_chars, errors)

    original_text = "".join(original_chars).strip()
    corrected_text = "".join(corrected_chars).strip()

    metadata = {"question_number": question_num, "exam_score": exam_score}

    return original_text, corrected_text, errors, metadata

In [10]:
def parse_fce_xml(xml_path: Path) -> list[dict]:
    """
    Parse an FCE XML file and extract all answers with metadata.

    Returns:
        list of dicts, each containing parsed answer data
    """
    tree = etree.parse(str(xml_path))
    root = tree.getroot()

    # Extract learner metadata
    head = root.find(".//head")
    if head is None:
        raise ValueError(f"No head element found in {xml_path}")

    doc_id = head.get("sortkey", "unknown")

    candidate = head.find("candidate")
    if candidate is None:
        raise ValueError(f"No candidate element found in {xml_path}")

    language = safe_find_text(candidate, ".//language", "Unknown")
    age = safe_find_text(candidate, ".//age", "Unknown")
    overall_score = safe_find_text(candidate, "score")

    # Parse each answer
    text_elem = head.find("text")
    if text_elem is None:
        raise ValueError(f"No text element found in {xml_path}")

    answers = []

    for i in range(1, 10):  # FCE has up to 5 answers typically, but be safe
        answer_elem = text_elem.find(f"answer{i}")
        if answer_elem is None:
            break

        try:
            original_text, corrected_text, errors, answer_meta = parse_answer(
                answer_elem
            )

            answers.append(
                {
                    "doc_id": doc_id,
                    "l1": language,
                    "age": age,
                    "overall_score": overall_score,
                    "question_number": answer_meta["question_number"],
                    "exam_score": answer_meta["exam_score"],
                    "original_text": original_text,
                    "corrected_text": corrected_text,
                    "errors": errors,
                }
            )
        except ValueError as e:
            print(f"Warning: Skipping answer {i} in {xml_path}: {e}")
            continue

    if not answers:
        raise ValueError(f"No valid answers found in {xml_path}")

    return answers

In [11]:
# Test on the sample file
sample_file = Path("../data/fce-released-dataset/dataset/0100_2000_12/doc1000.xml")
sample_answers = parse_fce_xml(sample_file)

print(f"Found {len(sample_answers)} answers in sample file\n")

for i, answer in enumerate(sample_answers):
    print(f"Answer {i+1}:")
    print(f"  Question: {answer['question_number']}")
    print(f"  Score: {answer['exam_score']}")
    print(f"  Errors: {len(answer['errors'])}")
    print(f"  Original text preview: {answer['original_text'][:1_000]}...")
    print(f"  Corrected text preview: {answer['corrected_text'][:1_000]}...")
    print()

Found 2 answers in sample file

Answer 1:
  Question: 1
  Score: 2.3
  Errors: 8
  Original text preview: DECEMBER 12TH

          PRINCIPAL MR. ROBERTSON

          DEAR SIR,

          I WANT TO THAK YOU FOR PREPARING SUCH A GOOD PROGRAMME FOR US AND ESPECIALLY FOR TAKING US TO THE RIVER TRIP TO GREENWICH. I WOULD LIKE TO KNOW IF THERE IS ANY CHANCE OF CHANGING THE PROGRAMME BECAUSE WE HAVE FOUND A VERY INTERESTING ACTIVITY TO DO ON TUESDAY 14 MARCH. IT CONSISTS ON VISITING THE LONDON FASHION AND LEISURE SHOW IN THE CENTRAL EXHIBITION HALL. I THINK IT'S A GREAT OPPORTUNITY TO MAKE GREATER USE OF OUR KNOWLEDGE OF  ENGLISH LANGUAGE. ON THE OTHER HAND, WE COULD LEARN THE DIFFERENT WAYS TO GET TO THE CENTRAL EXHIBITION HALL.

          I SUGGEST THAT WE SHOULD GO TO THE NATIONAL ART GALLERY IN THE MORNING AND INSTEAD OF HAVING THE AFTERNOON FREE WE COULD GO TO THE FASHION AND LEISURE SHOW.

          I WILL BE WRITING ANXIOUSLY FOR YOUR RESPONSE.

          YOURS FAITHFULLY...
  Correcte

In [17]:
def create_spacy_docs(answer_data: dict) -> tuple[Doc, Doc, dict]:
    """
    Create SpaCy Doc objects for original and corrected texts.

    Returns:
        tuple: (original_doc, corrected_doc, diagnostics)
    """
    diagnostics = {"failed_spans": [], "total_errors": len(answer_data["errors"])}

    # Count error types from the annotations
    error_type_counts = defaultdict(int)
    for error in answer_data["errors"]:
        error_type_counts[error["type"]] += 1

    # Create original doc with errors
    original_doc = nlp(answer_data["original_text"])

    # Add metadata
    original_doc.user_data["doc_id"] = answer_data["doc_id"]
    original_doc.user_data["l1"] = answer_data["l1"]
    original_doc.user_data["age"] = answer_data["age"]
    original_doc.user_data["overall_score"] = answer_data["overall_score"]
    original_doc.user_data["question_number"] = answer_data["question_number"]
    original_doc.user_data["exam_score"] = answer_data["exam_score"]
    original_doc.user_data["error_counts"] = dict(
        error_type_counts
    )  # Convert to regular dict
    original_doc.user_data["total_errors"] = sum(error_type_counts.values())

    # Add error spans
    error_spans = []
    for error in answer_data["errors"]:
        # Try different alignment modes
        span = original_doc.char_span(
            error["start"], error["end"], label=error["type"], alignment_mode="expand"
        )

        # If expand doesn't work, try contract
        if span is None:
            span = original_doc.char_span(
                error["start"],
                error["end"],
                label=error["type"],
                alignment_mode="contract",
            )

        if span is not None:
            span._.correction = error["correct"]
            span._.original = error["incorrect"]
            error_spans.append(span)
        else:
            # Log failed span for diagnostics
            text_snippet = answer_data["original_text"][
                max(0, error["start"] - 10) : min(
                    len(answer_data["original_text"]), error["end"] + 10
                )
            ]
            diagnostics["failed_spans"].append(
                {
                    "doc_id": answer_data["doc_id"],
                    "start": error["start"],
                    "end": error["end"],
                    "type": error["type"],
                    "text": error["incorrect"],
                    "context": text_snippet,
                }
            )

    original_doc.spans["errors"] = error_spans
    diagnostics["successful_spans"] = len(error_spans)

    # Create corrected doc (simpler, no error spans)
    corrected_doc = nlp(answer_data["corrected_text"])

    # Add same metadata to corrected doc (including error counts)
    corrected_doc.user_data["doc_id"] = answer_data["doc_id"]
    corrected_doc.user_data["l1"] = answer_data["l1"]
    corrected_doc.user_data["age"] = answer_data["age"]
    corrected_doc.user_data["overall_score"] = answer_data["overall_score"]
    corrected_doc.user_data["question_number"] = answer_data["question_number"]
    corrected_doc.user_data["exam_score"] = answer_data["exam_score"]
    corrected_doc.user_data["error_counts"] = dict(error_type_counts)  # Same counts
    corrected_doc.user_data["total_errors"] = sum(error_type_counts.values())

    # Remove the transformer embeddings to prevent saving them when store_user_data=True
    original_doc._.trf_data = None
    corrected_doc._.trf_data = None

    return original_doc, corrected_doc, diagnostics

In [18]:
def process_all_files(xml_files: list[Path]) -> tuple[DocBin, DocBin, dict]:
    """
    Process all XML files and create DocBins for original and corrected texts.

    Returns:
        tuple: (original_docbin, corrected_docbin, stats)
    """
    original_docbin = DocBin(store_user_data=True)
    corrected_docbin = DocBin(store_user_data=True)

    stats = {
        "total_answers": 0,
        "total_errors": 0,
        "successful_spans": 0,
        "failed_spans": 0,
        "failed_span_details": [],
    }

    for xml_file in tqdm(xml_files, desc="Processing XML files"):
        try:
            answers = parse_fce_xml(xml_file)

            for answer in answers:
                orig_doc, corr_doc, diag = create_spacy_docs(answer)

                original_docbin.add(orig_doc)
                corrected_docbin.add(corr_doc)

                stats["total_answers"] += 1
                stats["total_errors"] += diag["total_errors"]
                stats["successful_spans"] += diag["successful_spans"]
                stats["failed_spans"] += len(diag["failed_spans"])
                stats["failed_span_details"].extend(diag["failed_spans"])

        except Exception as e:
            print(f"Error processing {xml_file}: {e}")
            continue

    print(f"\n=== Processing Statistics ===")
    print(f"Total answers: {stats['total_answers']}")
    print(f"Total errors annotated: {stats['total_errors']}")
    print(f"Successful spans: {stats['successful_spans']}")
    print(
        f"Failed spans: {stats['failed_spans']} ({stats['failed_spans']/stats['total_errors']*100:.2f}%)"
    )
    print(f"\nOriginal DocBin: {len(original_docbin)} docs")
    print(f"Corrected DocBin: {len(corrected_docbin)} docs")

    return original_docbin, corrected_docbin, stats

In [19]:
original_docbin, corrected_docbin, diagnostic = process_all_files(xml_files)

Processing XML files:   0%|          | 0/1249 [00:00<?, ?it/s]

Error processing ../data/fce-released-dataset/dataset/0102_2000_12/doc599.xml: No valid answers found in ../data/fce-released-dataset/dataset/0102_2000_12/doc599.xml

=== Processing Statistics ===
Total answers: 2482
Total errors annotated: 53332
Successful spans: 52752
Failed spans: 580 (1.09%)

Original DocBin: 2482 docs
Corrected DocBin: 2482 docs


In [21]:
# Create output directory
output_dir = Path("../data/clc-fce/")
output_dir.mkdir(parents=True, exist_ok=True)

# Save DocBins
original_path = output_dir / "original.docbin"
corrected_path = output_dir / "corrected.docbin"

original_docbin.to_disk(original_path)
corrected_docbin.to_disk(corrected_path)

print(f"Saved original docs to: {original_path}")
print(f"Saved corrected docs to: {corrected_path}")

Saved original docs to: ../data/clc-fce/original.docbin
Saved corrected docs to: ../data/clc-fce/corrected.docbin


In [23]:
# Load and verify
original_loaded = DocBin(store_user_data=True).from_disk(original_path)
corrected_loaded = DocBin(store_user_data=True).from_disk(corrected_path)

# Test: load first doc from each
orig_docs = list(original_loaded.get_docs(nlp.vocab))
corr_docs = list(corrected_loaded.get_docs(nlp.vocab))

print(f"Loaded {len(orig_docs)} original docs")
print(f"Loaded {len(corr_docs)} corrected docs\n")

# Inspect first doc
first_orig = orig_docs[0]
print("First original doc:")
print(f"  Text preview: {first_orig.text[:100]}...")
print(f"  Metadata: {first_orig.user_data}")
print(f"  Total errors: {first_orig.user_data['total_errors']}")
print(f"  Error type counts: {first_orig.user_data['error_counts']}")
print(f"  Number of error spans: {len(first_orig.spans['errors'])}")

if len(first_orig.spans["errors"]) > 0:
    first_error = first_orig.spans["errors"][0]
    print(f"\n  First error span:")
    print(f"    Type: {first_error.label_}")
    print(f"    Original: '{first_error._.original}'")
    print(f"    Correction: '{first_error._.correction}'")
    print(f"    Span text: '{first_error.text}'")

print(f"\nFirst corrected doc:")
print(f"  Text preview: {corr_docs[0].text[:100]}...")
print(f"  Total errors (from original): {corr_docs[0].user_data['total_errors']}")
print(f"  Error type counts: {corr_docs[0].user_data['error_counts']}")

# Show aggregate statistics across all docs
print("\n=== Aggregate Error Type Statistics ===")
all_error_counts = defaultdict(int)
for doc in orig_docs:
    for error_type, count in doc.user_data["error_counts"].items():
        all_error_counts[error_type] += count

print(f"Total unique error types: {len(all_error_counts)}")
print("\nMost common error types:")
for error_type, count in sorted(
    all_error_counts.items(), key=lambda x: x[1], reverse=True
)[:10]:
    print(f"  {error_type}: {count}")

Loaded 2482 original docs
Loaded 2482 corrected docs

First original doc:
  Text preview: DECEMBER 12TH

          PRINCIPAL MR. ROBERTSON

          DEAR SIR,

          I WANT TO THAK YOU ...
  Metadata: {('._.', 'trf_data', None, None): None, 'doc_id': 'TR3*0100*2000*02', 'l1': 'Catalan', 'age': '16-20', 'overall_score': '28.00', 'question_number': '1', 'exam_score': '2.3', 'error_counts': {'S': 1, 'RT': 3, 'RV': 2, 'MD': 1, 'ID': 1}, 'total_errors': 8, ('._.', 'correction', 100, 113): 'THANK', ('._.', 'original', 100, 113): 'THAK', ('._.', 'correction', 179, 184): 'ON', ('._.', 'original', 179, 184): 'TO', ('._.', 'correction', 375, 378): 'IN', ('._.', 'original', 375, 378): 'ON', ('._.', 'correction', 366, 378): 'INVOLVES', ('._.', 'original', 366, 378): 'CONSISTS ON', ('._.', 'correction', 418, 425): 'AT', ('._.', 'original', 418, 425): 'IN', ('._.', 'correction', 525, 533): 'THE', ('._.', 'original', 525, 533): '', ('._.', 'correction', 542, 562): 'ALSO', ('._.', 'original', 542