In [None]:
!git clone https://github.com/microsoft/presidio.git
!pip install presidio-analyzer presidio-anonymizer transformers -q
!pip install -U spacy -q
!python -m spacy download en_core_web_lg -q

[K     |████████████████████████████████| 777.4 MB 5.7 kB/s 
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_lg')


In [None]:
import logging
from typing import Optional, List, Tuple, Set

from presidio_analyzer import (
    RecognizerResult,
    EntityRecognizer,
    AnalysisExplanation,
)
from presidio_analyzer.nlp_engine import NlpArtifacts

from transformers import pipeline


class TransformersRecognizer(EntityRecognizer):
    """
    Wrapper for a transformers model, if needed to be used within Presidio Analyzer.
    """

    ENTITIES = [
        "LOCATION",
        "PERSON",
        "ORGANIZATION",
        # "MISCELLANEOUS"   # - There are no direct correlation with Presidio entities.
    ]

    DEFAULT_EXPLANATION = "Identified as {} by a Named Entity Recognition using transformers"

    CHECK_LABEL_GROUPS = [
        ({"LOCATION"}, {"LOC", "LOCATION"}),
        ({"PERSON"}, {"PER", "PERSON"}),
        ({"ORGANIZATION"}, {"ORG"}),
        # ({"MISCELLANEOUS"}, {"MISC"}), # Probably not PII
    ]

    MODEL_LANGUAGES = {
        "en": "elastic/distilbert-base-uncased-finetuned-conll03-english",
        "es": "Davlan/xlm-roberta-base-ner-hrl",
        "de": "Davlan/xlm-roberta-base-ner-hrl",
        "ar": "Davlan/xlm-roberta-base-ner-hrl",
        "fr": "Davlan/xlm-roberta-base-ner-hrl",
        "it": "Davlan/xlm-roberta-base-ner-hrl",
        "nl": "Davlan/xlm-roberta-base-ner-hrl",
        "lv": "Davlan/xlm-roberta-base-ner-hrl",
    }

    PRESIDIO_EQUIVALENCES = {
        "PER": "PERSON",
        "LOC": "LOCATION",
        "ORG": "ORGANIZATION",
        # 'MISC': 'MISCELLANEOUS'   # - Probably not PII
    }

    def __init__(
        self,
        supported_language: str = "en",
        supported_entities: Optional[List[str]] = None,
        check_label_groups: Optional[Tuple[Set, Set]] = None,
        model: str = None,
    ):
        self.check_label_groups = (
            check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS
        )

        self.model = pipeline("ner", model=self.MODEL_LANGUAGES.get(supported_language), aggregation_strategy="first")

        all_labels = list(self.model.model.config.id2label.values())
        self.ENTITIES = list(set([x[2:] for x in all_labels if x!="O" and (x.startswith("B-") or x.startswith("I-"))]))

        supported_entities = supported_entities if supported_entities else self.ENTITIES

        super().__init__(
            supported_entities=supported_entities,
            supported_language=supported_language,
            name="Transformer Analytics",
        )

    def load(self) -> None:
        """Load the model, not used. Model is loaded during initialization."""
        pass

    def get_supported_entities(self) -> List[str]:
        """
        Return supported entities by this model.
        :return: List of the supported entities.
        """
        return self.supported_entities

    # Class to use Flair with Presidio as an external recognizer.
    def analyze(
        self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None
    ) -> List[RecognizerResult]:
        """
        Analyze text using Text Analytics.
        :param text: The text for analysis.
        :param entities: Not working properly for this recognizer.
        :param nlp_artifacts: Not used by this recognizer.
        :param language: Text language. Supported languages in MODEL_LANGUAGES
        :return: The list of Presidio RecognizerResult constructed from the recognized
            Flair detections.
        """


        model_results = self.model(text)

        # If there are no specific list of entities, we will look for all of it.
        if not entities:
            entities = self.supported_entities

        recognizer_results = []
        for result in model_results:
            entity = result["entity_group"]

            if entity not in self.supported_entities:
                continue

            if not self.__check_label(entity, self.check_label_groups):
                continue

        
            textual_explanation = self.DEFAULT_EXPLANATION.format(
                entity
            )
            explanation = self.build_explanation(
                round(result["score"], 2), textual_explanation
            )
            result = self._convert_to_recognizer_result(result, explanation)

            recognizer_results.append(result)

        return recognizer_results

    def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult:

        entity_type = self.PRESIDIO_EQUIVALENCES.get(entity["entity_group"], entity["entity_group"])
        score = round(entity["score"], 2)

        flair_results = RecognizerResult(
            entity_type=entity_type,
            start=entity["start"],
            end=entity["end"],
            score=score,
            analysis_explanation=explanation,
        )

        return flair_results

    def build_explanation(
        self, original_score: float, explanation: str
    ) -> AnalysisExplanation:
        """
        Create explanation for why this result was detected.
        :param original_score: Score given by this recognizer
        :param explanation: Explanation string
        :return:
        """
        explanation = AnalysisExplanation(
            recognizer=self.__class__.__name__,
            original_score=original_score,
            textual_explanation=explanation,
        )
        return explanation

    @staticmethod
    def __check_label(
        entity: str,  check_label_groups: Tuple[Set, Set]
    ) -> bool:
        return any(
            [entity in egrp or entity in lgrp for egrp, lgrp in check_label_groups]
        )

In [None]:
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry

transformers_recognizer = (
    TransformersRecognizer()
)  # This would download a very large (+2GB) model on the first run

registry = RecognizerRegistry()
registry.add_recognizer(transformers_recognizer)

analyzer = AnalyzerEngine(registry=registry, )

results = analyzer.analyze(
    "My name is Christopher and I live in Irbid.",
    language="en",
    return_decision_process=True,
)
for result in results:
    print(result)
    print(result.analysis_explanation)

type: PERSON, start: 11, end: 22, score: 1.0
{'recognizer': 'TransformersRecognizer', 'pattern_name': None, 'pattern': None, 'original_score': 1.0, 'score': 1.0, 'textual_explanation': 'Identified as PER by a Named Entity Recognition using transformers', 'score_context_improvement': 0, 'supportive_context_word': '', 'validation_result': None}
type: LOCATION, start: 37, end: 42, score: 0.5
{'recognizer': 'TransformersRecognizer', 'pattern_name': None, 'pattern': None, 'original_score': 0.5, 'score': 0.5, 'textual_explanation': 'Identified as LOC by a Named Entity Recognition using transformers', 'score_context_improvement': 0, 'supportive_context_word': '', 'validation_result': None}
