<function print(*args, sep=' ', end='\n', file=None, flush=False)>

# **Final Testing Version**
This script evaluates the quality of text anonymization using a span-based comparison between the original, ground truth, and anonymized (PIIRanha) versions of a text.

**It performs the following steps:**


1.   **Tokenization:** Splits the text into tokens (words, numbers, punctuation).
2.   **Label Alignment:** Aligns ground truth and anonymized labels to the original text.
3.   **Span Extraction:** Extracts labeled spans (e.g., names, dates) from both ground truth and anonymized versions.
4. **Span Matching:** Compares which ground truth spans were correctly anonymized.
5. **Metric Calculation:** Computes precision, recall, and F1 score for each label type
6. **Evaluation Across Multiple Examples:** For multiple files: evaluates & calculates aggregate metrics.



**The output includes:**
1. Detailed span-level matching results

2. Metrics per label type

3. Overall anonymization coverage

4. Summary of results across all examples

# Global SetUp

**Import**

In [2]:
import re
import pandas as pd
from collections import defaultdict, Counter
import numpy as np
from difflib import SequenceMatcher

**SpanEvaluator**

In [3]:
class PIIRanhaSpanEvaluator:
    """
    Pr√§ziser Evaluator f√ºr PIIRanha - pr√ºft tats√§chliche Textspan-Anonymisierung
    """

    def __init__(self):
        self.label_types = [
            'GIVENNAME', 'SURNAME', 'ACCOUNTNUM', 'Year',
            'DATE', 'DAY', 'MONTH'
        ]

    def tokenize_text(self, text):
        """
        Intelligente Tokenisierung die W√∂rter, Zahlen und Satzzeichen trennt
        """
        # Regex f√ºr Tokenisierung: W√∂rter, Zahlen, Satzzeichen
        tokens = re.findall(r'\w+|[^\w\s]', text)
        return tokens

    def align_texts_with_labels(self, original_text, labeled_text):
        """
        Erstellt ein Alignment zwischen Original und Label-Text
        Gibt zur√ºck: Liste von (original_token, label_info)
        """
        original_tokens = self.tokenize_text(original_text)

        # Spezielle Tokenisierung f√ºr Label-Text
        labeled_tokens = []
        current_pos = 0

        # Finde alle Labels im Text
        label_pattern = r'\[([^\]]+)\]'

        for match in re.finditer(label_pattern, labeled_text):
            # Text vor dem Label
            before_label = labeled_text[current_pos:match.start()]
            if before_label.strip():
                labeled_tokens.extend(self.tokenize_text(before_label))

            # Das Label selbst
            labeled_tokens.append({
                'type': 'LABEL',
                'label': match.group(1),
                'full_match': match.group(0)
            })

            current_pos = match.end()

        # Rest des Textes nach dem letzten Label
        remaining_text = labeled_text[current_pos:]
        if remaining_text.strip():
            labeled_tokens.extend(self.tokenize_text(remaining_text))

        # Alignment zwischen Original und Label-Tokens
        alignment = self._align_token_sequences(original_tokens, labeled_tokens)

        return alignment

    def _align_token_sequences(self, original_tokens, labeled_tokens):
        """
        Aligniert Original-Tokens mit Label-Tokens
        """
        alignment = []
        orig_idx = 0
        label_idx = 0

        while orig_idx < len(original_tokens) and label_idx < len(labeled_tokens):
            labeled_token = labeled_tokens[label_idx]

            if isinstance(labeled_token, dict) and labeled_token['type'] == 'LABEL':
                label_span_start = orig_idx

                # Z√§hle wie viele Labels direkt nacheinander folgen
                lookahead = label_idx + 1
                next_labels = 0
                while lookahead < len(labeled_tokens):
                    if isinstance(labeled_tokens[lookahead], dict):
                        next_labels += 1
                        lookahead += 1
                    else:
                        break

                num_labels_in_a_row = next_labels + 1  # inklusive dieses Labels

                # Verteilt die n√§chsten N Original-Tokens auf die Labels
                tokens_per_label = 1  # Annahme: 1 Token pro Label

                # Zuweisung: aktueller Label bekommt 1 Token
                if orig_idx < len(original_tokens):
                    alignment.append({
                        'original_token': original_tokens[orig_idx],
                        'original_index': orig_idx,
                        'label_type': labeled_token['label'],
                        'is_labeled': True
                    })
                    orig_idx += 1

                label_idx += 1


            else:
                # Normaler Token - sollte √ºbereinstimmen
                if self._tokens_match(original_tokens[orig_idx], labeled_token):
                    alignment.append({
                        'original_token': original_tokens[orig_idx],
                        'original_index': orig_idx,
                        'label_type': None,
                        'is_labeled': False
                    })
                    orig_idx += 1
                    label_idx += 1
                else:
                    # Tokens stimmen nicht √ºberein - versuche zu synchronisieren
                    orig_idx += 1

        return alignment

    def _tokens_match(self, token1, token2):
        """
        Pr√ºft ob zwei Tokens √ºbereinstimmen (case-insensitive)
        """
        if isinstance(token1, dict) or isinstance(token2, dict):
            return False
        return str(token1).lower().strip() == str(token2).lower().strip()

    def extract_label_spans(self, alignment):
        """
        Extrahiert Label-Spans aus einem Alignment
        """
        spans = []
        current_span = None

        for item in alignment:
            if item['is_labeled']:
                if current_span is None or current_span['label_type'] != item['label_type']:
                    # Neuer Span beginnt
                    if current_span is not None:
                        spans.append(current_span)

                    current_span = {
                        'label_type': item['label_type'],
                        'original_tokens': [item['original_token']],
                        'original_indices': [item['original_index']],
                        'original_text': item['original_token']
                    }
                else:
                    # Span fortsetzung
                    current_span['original_tokens'].append(item['original_token'])
                    current_span['original_indices'].append(item['original_index'])
                    current_span['original_text'] += ' ' + item['original_token']
            else:
                # Nicht-Label Token - aktueller Span endet
                if current_span is not None:
                    spans.append(current_span)
                    current_span = None

        # Letzten Span hinzuf√ºgen falls vorhanden
        if current_span is not None:
            spans.append(current_span)

        return spans

    def evaluate_span_coverage(self, original_text, piiranha_text, ground_truth_text):
        """
        Hauptevaluierungsfunktion - pr√ºft Span-basierte Abdeckung
        """
        print("üîç Starte Span-basierte Evaluierung...")

        # 1. Alignments erstellen
        print("üìù Erstelle Alignments...")
        ground_truth_alignment = self.align_texts_with_labels(original_text, ground_truth_text)
        piiranha_alignment = self.align_texts_with_labels(original_text, piiranha_text)

        # 2. Label-Spans extrahieren
        print("üéØ Extrahiere Label-Spans...")
        ground_truth_spans = self.extract_label_spans(ground_truth_alignment)
        piiranha_spans = self.extract_label_spans(piiranha_alignment)

        print(f"üìä Ground Truth Spans: {len(ground_truth_spans)}")
        print(f"üìä PIIRanha Spans: {len(piiranha_spans)}")

        # 3. Span-Vergleich
        print("‚öñÔ∏è Vergleiche Spans...")
        results = self._compare_spans(ground_truth_spans, piiranha_spans)

        # 4. Debug-Informationen
        self._print_debug_info(ground_truth_spans, piiranha_spans, results)

        return results

    def _compare_spans(self, ground_truth_spans, piiranha_spans):
        """
        Vergleicht Ground Truth Spans mit PIIRanha Spans
        """
        # Erstelle Index f√ºr PIIRanha Spans nach Token-Indizes
        piiranha_index = {}
        for span in piiranha_spans:
            for idx in span['original_indices']:
                piiranha_index[idx] = span

        # Evaluiere jeden Ground Truth Span
        span_results = []
        label_metrics = defaultdict(lambda: {'tp': 0, 'fp': 0, 'fn': 0})

        for gt_span in ground_truth_spans:
            # Pr√ºfe ob PIIRanha diesen Span abgedeckt hat
            covered_indices = []
            piiranha_labels = set()

            for idx in gt_span['original_indices']:
                if idx in piiranha_index:
                    covered_indices.append(idx)
                    piiranha_labels.add(piiranha_index[idx]['label_type'])

            # Bestimme Abdeckungsgrad
            coverage_ratio = len(covered_indices) / len(gt_span['original_indices'])

            # Bestimme ob es ein Match ist (>= 50% √úberlappung)
            is_exact_match = coverage_ratio >= 0.5
            label_match = gt_span['label_type'] in piiranha_labels

            span_result = {
                'ground_truth_span': gt_span,
                'coverage_ratio': coverage_ratio,
                'is_covered': is_exact_match,
                'label_match': label_match,
                'piiranha_labels': list(piiranha_labels),
                'covered_indices': covered_indices
            }

            span_results.append(span_result)

            # Update Metriken
            if is_exact_match and label_match:
                label_metrics[gt_span['label_type']]['tp'] += 1
            else:
                label_metrics[gt_span['label_type']]['fn'] += 1

        # Finde False Positives (PIIRanha Spans die nicht in Ground Truth sind)
        gt_indices = set()
        for span in ground_truth_spans:
            gt_indices.update(span['original_indices'])

        for span in piiranha_spans:
            span_indices = set(span['original_indices'])
            if not span_indices.intersection(gt_indices):
                # Dieser PIIRanha Span hat keine √úberlappung mit Ground Truth
                label_metrics[span['label_type']]['fp'] += 1

        # Berechne finale Metriken
        final_metrics = {}
        for label_type, counts in label_metrics.items():
            tp, fp, fn = counts['tp'], counts['fp'], counts['fn']

            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

            final_metrics[label_type] = {
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'true_positives': tp,
                'false_positives': fp,
                'false_negatives': fn
            }

        # Gesamtabdeckung
        total_gt_spans = len(ground_truth_spans)
        covered_spans = sum(1 for r in span_results if r['is_covered'] and r['label_match'])
        overall_coverage = covered_spans / total_gt_spans if total_gt_spans > 0 else 0

        return {
            'span_results': span_results,
            'label_metrics': final_metrics,
            'overall_coverage': overall_coverage,
            'total_ground_truth_spans': total_gt_spans,
            'total_covered_spans': covered_spans
        }

    def _print_debug_info(self, ground_truth_spans, piiranha_spans, results):
        """
        Gibt detaillierte Debug-Informationen aus
        """
        print("\n" + "="*80)
        print("üîç DETAILLIERTE SPAN-ANALYSE")
        print("="*80)

        print(f"\nüìã GROUND TRUTH SPANS ({len(ground_truth_spans)}):")
        for i, span in enumerate(ground_truth_spans, 1):
            print(f"  {i}. [{span['label_type']}] ‚Üí '{span['original_text']}' (Indices: {span['original_indices']})")

        print(f"\nü§ñ PIIRANHA SPANS ({len(piiranha_spans)}):")
        for i, span in enumerate(piiranha_spans, 1):
            print(f"  {i}. [{span['label_type']}] ‚Üí '{span['original_text']}' (Indices: {span['original_indices']})")

        print(f"\n‚úÖ SPAN-MATCHING ERGEBNISSE:")
        for i, result in enumerate(results['span_results'], 1):
            gt_span = result['ground_truth_span']
            status = "‚úÖ ERKANNT" if result['is_covered'] and result['label_match'] else "‚ùå VERFEHLT"
            print(f"  {i}. [{gt_span['label_type']}] '{gt_span['original_text']}' ‚Üí {status}")
            print(f"     Abdeckung: {result['coverage_ratio']:.1%}, PIIRanha Labels: {result['piiranha_labels']}")

    def print_results(self, results):
        """
        Gibt die finalen Ergebnisse formatiert aus
        """
        print("\n" + "="*80)
        print("üìä PIIRANHA SPAN-BASIERTE EVALUATION ERGEBNISSE")
        print("="*80)

        print(f"\nüéØ GESAMTABDECKUNG: {results['overall_coverage']:.1%}")

        # Nur anzeigen wenn die Keys existieren (einzelne Ergebnisse)
        if 'total_covered_spans' in results and 'total_ground_truth_spans' in results:
            print(f"   Erkannte Spans: {results['total_covered_spans']}/{results['total_ground_truth_spans']}")

        print(f"\nüìà METRIKEN PRO LABEL-TYP:")
        print("-" * 80)

        df_data = []
        for label_type, metrics in results['label_metrics'].items():
            df_data.append({
                'Label Type': label_type,
                'Precision': f"{metrics['precision']:.3f}",
                'Recall': f"{metrics['recall']:.3f}",
                'F1-Score': f"{metrics['f1_score']:.3f}",
                'TP': metrics['true_positives'],
                'FP': metrics['false_positives'],
                'FN': metrics['false_negatives']
            })

        if df_data:
            df = pd.DataFrame(df_data)
            print(df.to_string(index=False))
        else:
            print("Keine Metriken verf√ºgbar.")

    def evaluate_multiple_examples(self, examples):
        """
        Evaluiert mehrere Beispiele
        """
        all_results = []
        aggregated_metrics = defaultdict(lambda: {'tp': 0, 'fp': 0, 'fn': 0})

        for i, example in enumerate(examples):
            print(f"\n{'='*60}")
            print(f"üìù EVALUIERE BEISPIEL {i+1}/{len(examples)}")
            print('='*60)

            result = self.evaluate_span_coverage(
                example['original'],
                example['piiranha'],
                example['ground_truth']
            )

            all_results.append(result)
            self.print_results(result)

            # Aggregiere Metriken
            for label_type, metrics in result['label_metrics'].items():
                aggregated_metrics[label_type]['tp'] += metrics['true_positives']
                aggregated_metrics[label_type]['fp'] += metrics['false_positives']
                aggregated_metrics[label_type]['fn'] += metrics['false_negatives']

        # Berechne aggregierte Metriken
        final_aggregated = {}
        for label_type, counts in aggregated_metrics.items():
            tp, fp, fn = counts['tp'], counts['fp'], counts['fn']

            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

            final_aggregated[label_type] = {
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'true_positives': tp,
                'false_positives': fp,
                'false_negatives': fn
            }

        # Durchschnittliche Coverage
        avg_coverage = np.mean([r['overall_coverage'] for r in all_results])

        # Finale Ausgabe
        print(f"\n{'='*80}")
        print("üèÜ AGGREGIERTE ERGEBNISSE √úBER ALLE BEISPIELE")
        print('='*80)
        print(f"\nüéØ DURCHSCHNITTLICHE ABDECKUNG: {avg_coverage:.1%}")

        aggregated_results = {
            'label_metrics': final_aggregated,
            'overall_coverage': avg_coverage
        }

        self.print_results(aggregated_results)

        return {
            'individual_results': all_results,
            'aggregated_results': aggregated_results
        }

**Old Import & Upload Function** - do not use it

In [10]:
'''
from google.colab import files  # for Colab
uploaded = files.upload()

def load_multiple_examples(count=10):
    examples = []
    for i in range(1, count + 1):
        with open(f"original_{i}.txt", encoding='utf-8') as f1, \
             open(f"groundtruth_{i}.txt", encoding='utf-8') as f2, \
             open(f"piiranha_{i}.txt", encoding='utf-8') as f3:

            examples.append({
                'original': f1.read(),
                'ground_truth': f2.read(),
                'piiranha': f3.read()
            })
    return examples
'''

Saving original_1.txt to original_1 (2).txt
Saving groundtruth_1.txt to groundtruth_1 (2).txt
Saving piiranha_1.txt to piiranha_1 (2).txt


**New Upload Function**

In [13]:
from google.colab import files
import re
from collections import defaultdict

# Upload f√ºr original-Dateien
print("üìÇ Bitte ORIGINAL-Dateien hochladen (z.‚ÄØB. original_1.txt)")
uploaded_original = files.upload()

# Upload f√ºr ground truth-Dateien
print("üìÇ Bitte GROUNDTRUTH-Dateien hochladen (z.‚ÄØB. groundtruth_1.txt)")
uploaded_groundtruth = files.upload()

# Upload f√ºr piiranha-Ausgaben
print("üìÇ Bitte PIIRANHA-Dateien hochladen (z.‚ÄØB. piiranha_1.txt)")
uploaded_piiranha = files.upload()

def build_examples_from_uploads(uploaded_original, uploaded_groundtruth, uploaded_piiranha):
    examples = []

    for orig_filename, orig_content in uploaded_original.items():
        # Basisname extrahieren: z.‚ÄØB. "original_1.txt" ‚Üí "1"
        match = re.search(r'original_(\d+)\.txt', orig_filename)
        if not match:
            continue
        file_id = match.group(1)

        ground_filename = f"groundtruth_{file_id}.txt"
        piiranha_filename = f"piiranha_{file_id}.txt"

        # Dateien m√ºssen in allen Sets vorhanden sein
        if ground_filename not in uploaded_groundtruth or piiranha_filename not in uploaded_piiranha:
            print(f"‚ö†Ô∏è Datei fehlt f√ºr ID {file_id}")
            continue

        examples.append({
            'original': orig_content.decode('utf-8'),
            'ground_truth': uploaded_groundtruth[ground_filename].decode('utf-8'),
            'piiranha': uploaded_piiranha[piiranha_filename].decode('utf-8')
        })

    print(f"‚úÖ {len(examples)} Beispiele erfolgreich geladen.")
    return examples


üìÇ Bitte ORIGINAL-Dateien hochladen (z.‚ÄØB. original_1.txt)


Saving original_1.txt to original_1.txt
üìÇ Bitte GROUNDTRUTH-Dateien hochladen (z.‚ÄØB. groundtruth_1.txt)


Saving groundtruth_1.txt to groundtruth_1.txt
üìÇ Bitte PIIRANHA-Dateien hochladen (z.‚ÄØB. piiranha_1.txt)


Saving piiranha_1.txt to piiranha_1.txt


**MainFunction**

In [15]:
def main():
    print("üöÄ STARTE SPAN-BASIERTE PIIRANHA EVALUATION")

    evaluator = PIIRanhaSpanEvaluator()

    # ‚õ≥ Richtig: Baue Beispiele direkt aus den Uploads!
    examples = build_examples_from_uploads(uploaded_original, uploaded_groundtruth, uploaded_piiranha)

    multiple_results = evaluator.evaluate_multiple_examples(examples)

# üì¢ Danach einfach ausf√ºhren:
main()


üöÄ STARTE SPAN-BASIERTE PIIRANHA EVALUATION
‚úÖ 1 Beispiele erfolgreich geladen.

üìù EVALUIERE BEISPIEL 1/1
üîç Starte Span-basierte Evaluierung...
üìù Erstelle Alignments...
üéØ Extrahiere Label-Spans...
üìä Ground Truth Spans: 11
üìä PIIRanha Spans: 5
‚öñÔ∏è Vergleiche Spans...

üîç DETAILLIERTE SPAN-ANALYSE

üìã GROUND TRUTH SPANS (11):
  1. [GIVENNAME] ‚Üí 'Catharina' (Indices: [10])
  2. [SURNAME] ‚Üí 'Thies' (Indices: [11])
  3. [ACCOUNTNUM] ‚Üí '402157398' (Indices: [14])
  4. [Year] ‚Üí '2022' (Indices: [19])
  5. [Year] ‚Üí '2023' (Indices: [21])
  6. [DATE] ‚Üí '15' (Indices: [29])
  7. [DAY] ‚Üí '16' (Indices: [44])
  8. [MONTH] ‚Üí '05' (Indices: [46])
  9. [DATE] ‚Üí '31' (Indices: [49])
  10. [GIVENNAME] ‚Üí 'Catharina' (Indices: [79])
  11. [SURNAME] ‚Üí 'Thies' (Indices: [80])

ü§ñ PIIRANHA SPANS (5):
  1. [GIVENNAME] ‚Üí 'Catharina' (Indices: [10])
  2. [SURNAME] ‚Üí 'Thies' (Indices: [11])
  3. [ACCOUNTNUM] ‚Üí '402157398' (Indices: [14])
  4. [GIVENNAME] 

**Old Main Function - do not use it**

In [16]:
'''
# Hauptfunktion f√ºr die Ausf√ºhrung
def main():
    print("üöÄ STARTE SPAN-BASIERTE PIIRANHA EVALUATION")

    # Evaluator initialisieren
    evaluator = PIIRanhaSpanEvaluator()

    # üìÇ Lade automatisch mehrere Beispieldateien aus dem Ordner "data/"
    examples = load_multiple_examples(count=2)

    # ‚úÖ Evaluiere alle Beispiele
    multiple_results = evaluator.evaluate_multiple_examples(examples)


if __name__ == "__main__":
    main()
'''

'\n# Hauptfunktion f√ºr die Ausf√ºhrung\ndef main():\n    print("üöÄ STARTE SPAN-BASIERTE PIIRANHA EVALUATION")\n\n    # Evaluator initialisieren\n    evaluator = PIIRanhaSpanEvaluator()\n\n    # üìÇ Lade automatisch mehrere Beispieldateien aus dem Ordner "data/"\n    examples = load_multiple_examples(count=2)\n\n    # ‚úÖ Evaluiere alle Beispiele\n    multiple_results = evaluator.evaluate_multiple_examples(examples)\n\n\nif __name__ == "__main__":\n    main()\n'