In [1]:
import re
from typing import List, Optional

import spacy_transformers
from faker import Faker
from presidio_analyzer import (
    AnalyzerEngine,
    EntityRecognizer,
    RecognizerResult,
    PatternRecognizer,
    Pattern,
)
from presidio_analyzer.nlp_engine import (
    NlpEngine,
    SpacyNlpEngine,
    TransformersNlpEngine,
    NerModelConfiguration,
)
from presidio_analyzer.recognizer_registry import RecognizerRegistry
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer

# --- 1. Enhanced Custom Korean PII Recognizers ---

# Regex patterns are improved for better matching.
# Handles optional hyphens in phone numbers and more bank account formats.
KOREAN_PHONE_NUMBER_REGEX = r"\b(?:010|02|0[3-9][0-9])[ -]?(\d{3,4})[ -]?(\d{4})\b"
KOREAN_BANK_ACCOUNT_REGEX = r"\b(?:\d{2,6}[-]?\d{2,6}[-]?\d{3,6}|\d{10,14})\b"

# Context words improve detection accuracy by boosting the score of a match
# when these words are found near the potential PII.
CONTEXT_WORDS = {
    "주민등록번호",
    "주민번호",
    "주민증",
    "RRN",
    "연락처",
    "휴대폰",
    "전화번호",
    "핸드폰",
    "phone",
    "mobile",
    "계좌번호",
    "계좌",
    "은행",
    "bank",
    "account",
}

In [2]:
from typing import Callable, Iterable, Iterator, List, Literal, Optional
import warnings

from thinc.api import get_torch_default_device
from spacy.language import Language
from spacy.pipeline import Pipe
from spacy.tokens import Doc, Span, SpanGroup
from spacy import util

from transformers import pipeline


@Language.factory(
    "hf_token_pipe",
    assigns=[],
    default_config={
        "model": "",
        "revision": "main",
        "stride": 16,
        "aggregation_strategy": "average",
        "annotate": "ents",
        "annotate_spans_key": None,
        "alignment_mode": "strict",
        "scorer": None,
        "kwargs": {},
    },
    default_score_weights={},
)
def make_hf_token_pipe(
    nlp: Language,
    name: str,
    model: str,
    revision: str,
    # note that the tokenizer stride is the size of the overlap, not the size of
    # the stride
    stride: Optional[int],
    # this is intentionally omitting "none" from the aggregation strategies
    aggregation_strategy: Literal["simple", "first", "average", "max"],
    annotate: Literal["ents", "pos", "spans", "tag"],
    annotate_spans_key: Optional[str],
    alignment_mode: Literal["strict", "contract", "expand"],
    scorer: Optional[Callable],
    kwargs: dict,
):
    try:
        device = get_torch_default_device().index
        if device is None:
            device = -1
    except Exception:
        device = -1
    if model == "":
        raise ValueError(
            "No model provided. Specify the model in your config, e.g.:\n\n"
            'nlp.add_pipe("hf_token_pipe", config={"model": "dslim/bert-base-NER"})'
        )
    hf_pipeline = pipeline(
        task="token-classification",
        model=model,
        revision=revision,
        aggregation_strategy=aggregation_strategy,
        device=device,
        stride=stride,
        **kwargs,
    )
    return HFTokenPipe(
        name=name,
        hf_pipeline=hf_pipeline,
        annotate=annotate,
        annotate_spans_key=annotate_spans_key,
        alignment_mode=alignment_mode,
        scorer=scorer,
    )


class HFTokenPipe(Pipe):
    def __init__(
        self,
        name: str,
        hf_pipeline: pipeline,
        *,
        annotate: Literal["ents", "pos", "spans", "tag"] = "ents",
        annotate_spans_key: Optional[str] = None,
        alignment_mode: str = "strict",
        scorer: Optional[Callable] = None,
    ):
        self.name = name
        self.hf_pipeline = hf_pipeline
        self.annotate = annotate
        if self.annotate == "spans":
            if isinstance(annotate_spans_key, str):
                self.annotate_spans_key = annotate_spans_key
            else:
                raise ValueError(
                    "'annotate_spans_key' setting required to set spans annotations for hf_token_pipe"
                )
        self.alignment_mode = alignment_mode
        self.scorer = scorer

    def __call__(self, doc: Doc) -> Doc:
        return next(self.pipe([doc]))

    def pipe(self, stream: Iterable[Doc], *, batch_size: int = 128) -> Iterator[Doc]:
        for docs in util.minibatch(stream, size=batch_size):
            outputs = self._get_annotations(docs)
            for doc, output in zip(docs, outputs):
                output_spans = SpanGroup(doc, attrs={"scores": []})
                prev_ann_end = 0
                for ann in output:
                    if ann["start"] >= prev_ann_end:
                        output_span = doc.char_span(
                            ann["start"],
                            ann["end"],
                            label=ann["entity_group"],
                            alignment_mode=self.alignment_mode,
                        )
                        if output_span is not None and output_span.start_char >= prev_ann_end:
                            output_spans.append(output_span)
                            output_spans.attrs["scores"].append(ann["score"])
                            prev_ann_end = ann["end"]
                        else:
                            text_excerpt = (
                                doc.text if len(doc.text) < 100 else doc.text[:100] + "..."
                            )
                            warnings.warn(
                                f"Skipping annotation, {ann} is overlapping or can't be aligned for doc '{text_excerpt}'"
                            )
                self._set_annotation_from_spans(doc, output_spans)
                yield doc

    def _get_annotations(self, docs: List[Doc]) -> List[List[dict]]:
        with warnings.catch_warnings():
            # the PipelineChunkIterator does not report its length correctly,
            # leading to many spurious warnings from torch
            warnings.filterwarnings(
                "ignore", message="Length of IterableDataset", category=UserWarning
            )
            warnings.filterwarnings(
                "ignore",
                message="You seem to be using the pipelines sequentially on GPU",
                category=UserWarning,
            )
            if len(docs) > 1:
                try:
                    return self.hf_pipeline([doc.text for doc in docs])
                except Exception:
                    warnings.warn(
                        "Unable to process texts as batch, backing off to processing texts individually"
                    )
            outputs = []
            for doc in docs:
                try:
                    outputs.append(self.hf_pipeline(doc.text))
                except Exception:
                    text_excerpt = doc.text if len(doc.text) < 100 else doc.text[:100] + "..."
                    warnings.warn(
                        f"Unable to process, skipping annotation for doc '{text_excerpt}'"
                    )
                    outputs.append([])
            return outputs

    def _set_annotation_from_spans(self, doc: Doc, spans: SpanGroup) -> Doc:
        if self.annotate == "ents":
            doc.set_ents(list(spans))
        elif self.annotate == "spans":
            doc.spans[self.annotate_spans_key] = spans
        elif self.annotate == "tag":
            for span in spans:
                for token in span:
                    token.tag_ = span.label_
        elif self.annotate == "pos":
            for span in spans:
                for token in span:
                    token.pos_ = span.label_
        return doc

    # dummy serialization methods
    def to_bytes(self, **kwargs):
        return b""

    def from_bytes(self, _bytes_data, **kwargs):
        return self

    def to_disk(self, _path, **kwargs):
        return None

    def from_disk(self, _path, **kwargs):
        return self

In [3]:
class KoreanRRNRecognizer(EntityRecognizer):
    ENTITIES = ["KR_RESIDENT_REGISTRATION_NUMBER"]
    SUPPORTED_LANGUAGE = "ko"

    def __init__(self, **kwargs):
        self.pattern = Pattern(name="rrn_candidate", regex=r"(\d{6})[-]\d{7}", score=0.5)
        super().__init__(supported_entities=self.ENTITIES, name="Korean RRN Recognizer", **kwargs)

    def load(self) -> None:
        pass

    def analyze(self, text: str, entities: List[str], nlp_artifacts) -> List[RecognizerResult]:
        results = []
        for match in re.finditer(self.pattern.regex, text):
            if self.is_valid_rrn(match.group(0)):
                results.append(
                    RecognizerResult(
                        entity_type=self.ENTITIES[0],
                        start=match.start(),
                        end=match.end(),
                        score=1.0,
                    )
                )
        return results

    @staticmethod
    def is_valid_rrn(rrn: str) -> bool:
        rrn = rrn.replace("-", "")
        if not rrn.isdigit() or len(rrn) != 13:
            return False
        check_sum = sum(int(rrn[i]) * ((i % 8) + 2) for i in range(12))
        return (11 - (check_sum % 11)) % 10 == int(rrn[12])


class KoreanPhoneNumberRecognizer(PatternRecognizer):
    def __init__(self, **kwargs):
        patterns = [
            Pattern(
                name="Korean Phone Number",
                regex=r"\b(?:010|02|0[3-9][0-9])[ -]?(\d{3,4})[ -]?(\d{4})\b",
                score=0.85,
            )
        ]
        super().__init__(
            supported_entity="KR_PHONE_NUMBER",
            patterns=patterns,
            context={"연락처", "휴대폰", "전화번호"},
            **kwargs,
        )


class KoreanBankAccountRecognizer(PatternRecognizer):
    def __init__(self, **kwargs):
        patterns = [
            Pattern(
                name="Korean Bank Account",
                regex=r"\b(?:\d{2,6}[-]?\d{2,6}[-]?\d{3,6}|\d{10,14})\b",
                score=0.6,
            )
        ]
        super().__init__(
            supported_entity="KR_BANK_ACCOUNT_NUMBER",
            patterns=patterns,
            context={"계좌번호", "계좌", "은행"},
            **kwargs,
        )

In [4]:
class KoreanPIIProcessor:
    """A class to handle PII analysis and anonymization for Korean text."""

    def __init__(
        self, hf_model_name: str = "Leo97/KoELECTRA-small-v3-modu-ner"
    ):  # taeminlee/gliner_ko KPF/KPF-bert-ner Leo97/KoELECTRA-small-v3-modu-ner
        """
        Initializes the processor by setting up the Presidio Analyzer and Anonymizer.

        Args:
            hf_model_name (str): The Hugging Face model to use for NER.
        """
        self.lang_code = "ko"
        self.analyzer = self._setup_analyzer(hf_model_name)
        self.anonymizer = AnonymizerEngine()
        self.faker = Faker("ko_KR")  # For generating fake Korean data
        print("Korean PII Processor initialized successfully.")

    def _setup_analyzer(self, hf_model_name: str) -> AnalyzerEngine:
        """Configures and returns a Presidio AnalyzerEngine."""
        try:
            # Using TransformersNlpEngine for state-of-the-art Korean NER
            ner_model_configuration = NerModelConfiguration(
                # model_to_presidio_entity_mapping=mapping,
                alignment_mode="expand",  # "strict", "contract", "expand"
                aggregation_strategy="max",  # "simple", "first", "average", "max"
                # labels_to_ignore = labels_to_ignore
            )

            nlp_engine = TransformersNlpEngine(
                models=[
                    {
                        "model_name": {"spacy": "ko_core_news_sm", "transformers": hf_model_name},
                        "lang_code": self.lang_code,
                    }
                ],
                ner_model_configuration=ner_model_configuration,
            )
            print(f"Successfully loaded Hugging Face NER model: {hf_model_name}")
        except Exception as e:
            print(f"Error loading Hugging Face model '{hf_model_name}': {e}")
            print("Falling back to a rule-based only analyzer.")
            nlp_engine = NlpEngine()  # Basic NLP engine without a model

        self.nlp_engine = nlp_engine
        # Register all custom recognizers
        registry = RecognizerRegistry(supported_languages=[self.lang_code])
        registry.add_recognizer(KoreanRRNRecognizer(supported_language=self.lang_code))
        registry.add_recognizer(KoreanPhoneNumberRecognizer(supported_language=self.lang_code))
        registry.add_recognizer(KoreanBankAccountRecognizer(supported_language=self.lang_code))

        # Add a recognizer for emails, which is language-agnostic
        from presidio_analyzer.predefined_recognizers import EmailRecognizer

        registry.add_recognizer(EmailRecognizer())

        from presidio_analyzer.predefined_recognizers import (
            GLiNERRecognizer,
            TransformersRecognizer,
        )

        gliner_recognizer = GLiNERRecognizer(
            supported_language="ko",
            model_name="taeminlee/gliner_ko",
        )
        registry.add_recognizer(gliner_recognizer)

        # registry.add_recognizer(TransformersRecognizer(supported_language="ko"))

        return AnalyzerEngine(
            nlp_engine=nlp_engine,
            registry=registry,
            supported_languages=[self.lang_code],
            # context_aware_enhancer=LemmaContextAwareEnhancer(
            #     context_similarity_factor=0.45, min_score_with_context_similarity=0.4
            # ),
        )

    def analyze(self, text: str) -> List[RecognizerResult]:
        """Analyzes text to find PII."""
        return self.analyzer.analyze(text=text, language=self.lang_code)

    def anonymize(self, text: str, operators: Optional[dict] = None) -> str:
        """
        Analyzes and then anonymizes PII in the text using specified operators.

        Args:
            text (str): The text to anonymize.
            operators (dict, optional): A dictionary defining anonymization strategies
                                        for different PII entities.

        Returns:
            str: The anonymized text.
        """
        if operators is None:
            # Default behavior: replace with entity type (e.g., <KR_PHONE_NUMBER>)
            operators = {"DEFAULT": OperatorConfig("replace")}

        analyzer_results = self.analyze(text)
        anonymized_result = self.anonymizer.anonymize(
            text=text, analyzer_results=analyzer_results, operators=operators
        )
        return anonymized_result.text

In [5]:
pii_processor = KoreanPIIProcessor()

test_text = (
    "안녕하세요, 제 이름은 홍길동이고, 저의 주민등록번호는 880101-1234567 입니다. "
    "유효하지 않은 주민번호 991231-1111111도 있습니다. "
    "제 이메일은 g.hong@example.com이고, 연락처는 010-9876-5432 입니다. "
    "다른 전화번호는 02 1234 5678 입니다. "
    "주거래 은행 계좌번호는 110-234-567890 입니다."
)



Successfully loaded Hugging Face NER model: Leo97/KoELECTRA-small-v3-modu-ner


Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

Device set to use cpu


Korean PII Processor initialized successfully.


In [6]:

from pprint import pprint

pprint(test_text)

('안녕하세요, 제 이름은 홍길동이고, 저의 주민등록번호는 880101-1234567 입니다. 유효하지 않은 주민번호 '
 '991231-1111111도 있습니다. 제 이메일은 g.hong@example.com이고, 연락처는 010-9876-5432 입니다. '
 '다른 전화번호는 02 1234 5678 입니다. 주거래 은행 계좌번호는 110-234-567890 입니다.')


In [7]:
pprint(pii_processor.anonymize(test_text))

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


('안녕하세요, 제 이름은 <PERSON>, 저의 주민등록번호는 <ID> 입니다. 유효하지 않은 주민번호 <PERSON> 있습니다. 제 '
 '이메일은 <EMAIL>, 연락처는 <KR_PHONE_NUMBER> 입니다. 다른 전화번호는 <KR_PHONE_NUMBER> 입니다. '
 '주거래 은행 계좌번호는 <KR_BANK_ACCOUNT_NUMBER> 입니다.')
