## Instantiate Everything

In [1]:
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, EntityRecognizer, Pattern, RecognizerResult
from presidio_analyzer.recognizer_registry import RecognizerRegistry
from presidio_analyzer.nlp_engine import NlpEngine, SpacyNlpEngine, NlpArtifacts
from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer

from presidio_anonymizer import AnonymizerEngine, DeanonymizeEngine, OperatorConfig
from presidio_anonymizer.operators import Operator, OperatorType

from typing import Dict, List
import pprint

class InstanceCounterAnonymizer(Operator):
    """
    Anonymizer which replaces the entity value
    with an instance counter per entity.
    """

    REPLACING_FORMAT = "<{entity_type}_{index}>"

    def operate(self, text: str, params: Dict = None) -> str:
        """Anonymize the input text."""

        entity_type: str = params["entity_type"]

        # entity_mapping is a dict of dicts containing mappings per entity type
        entity_mapping: Dict[Dict:str] = params["entity_mapping"]

        entity_mapping_for_type = entity_mapping.get(entity_type)
        if not entity_mapping_for_type:
            new_text = self.REPLACING_FORMAT.format(
                entity_type=entity_type, index=0
            )
            entity_mapping[entity_type] = {}

        else:
            if text in entity_mapping_for_type:
                return entity_mapping_for_type[text]

            previous_index = self._get_last_index(entity_mapping_for_type)
            new_text = self.REPLACING_FORMAT.format(
                entity_type=entity_type, index=previous_index + 1
            )

        entity_mapping[entity_type][text] = new_text
        return new_text

    @staticmethod
    def _get_last_index(entity_mapping_for_type: Dict) -> int:
        """Get the last index for a given entity type."""

        def get_index(value: str) -> int:
            return int(value.split("_")[-1][:-1])

        indices = [get_index(v) for v in entity_mapping_for_type.values()]
        return max(indices)

    def validate(self, params: Dict = None) -> None:
        """Validate operator parameters."""

        if "entity_mapping" not in params:
            raise ValueError("An input Dict called `entity_mapping` is required.")
        if "entity_type" not in params:
            raise ValueError("An entity_type param is required.")

    def operator_name(self) -> str:
        return "entity_counter"

    def operator_type(self) -> OperatorType:
        return OperatorType.Anonymize

class InstanceCounterDeanonymizer(Operator):
    """
    Deanonymizer which replaces the unique identifier 
    with the original text.
    """

    def operate(self, text: str, params: Dict = None) -> str:
        """Anonymize the input text."""

        entity_type: str = params["entity_type"]

        # entity_mapping is a dict of dicts containing mappings per entity type
        entity_mapping: Dict[Dict:str] = params["entity_mapping"]

        if entity_type not in entity_mapping:
            raise ValueError(f"Entity type {entity_type} not found in entity mapping!")
        if text not in entity_mapping[entity_type].values():
            raise ValueError(f"Text {text} not found in entity mapping for entity type {entity_type}!")

        return self._find_key_by_value(entity_mapping[entity_type], text)

    @staticmethod
    def _find_key_by_value(entity_mapping, value):
        for key, val in entity_mapping.items():
            if val == value:
                return key
        return None
    
    def validate(self, params: Dict = None) -> None:
        """Validate operator parameters."""

        if "entity_mapping" not in params:
            raise ValueError("An input Dict called `entity_mapping` is required.")
        if "entity_type" not in params:
            raise ValueError("An entity_type param is required.")

    def operator_name(self) -> str:
        return "entity_counter_deanonymizer"

    def operator_type(self) -> OperatorType:
        return OperatorType.Deanonymize

# Define the recognizer with the defined pattern and context words 
# Creating 2 patterns - 1 for a perfect match, 1 for badly formed entries that require some context
# May need to modify the weak one -> think about what cases we want captured by this one
sortcode_pattern_full = Pattern(name="Sort Code Perfect", regex=r"\b\d{2}[-\s]\d{2}[-\s]\d{2}\b", score=1.0) # Standard pattern for sort code
sortcode_pattern = Pattern(name="Sort Code (weak)", regex=r"\b\d{6}\b", score=0.001) # Sequence of 6 digits, need context to confirm if sort code

# Score only increases when we added 'sort' to context words - it doesnt like strings with spaces
sortcode_recognizer = PatternRecognizer(supported_entity="SORTCODE", 
                                       patterns = [sortcode_pattern, sortcode_pattern_full],
                                       context = ["sortcode", "sort"])
registry = RecognizerRegistry()
registry.load_predefined_recognizers()
registry.add_recognizer(sortcode_recognizer)

context_aware_enhancer = LemmaContextAwareEnhancer(context_similarity_factor=0.75, min_score_with_context_similarity=0.4)

analyzer = AnalyzerEngine(registry=registry, context_aware_enhancer=context_aware_enhancer)

## Test Analyzer

In [13]:
text = "Let's assume I have a really long paragraph of text. Well, not super long, but still long enough that when I say sort code here, it will be a while until i actually provide it. I am going to fill some space between the actual value with some noise to see what happens if I provide the number after mentioning it initially. Here it is 345678"

analyzer_results = analyzer.analyze(text=text,language="en")
len(analyzer_results)

3

In [15]:
analyzer_results

[type: IN_PAN, start: 298, end: 308, score: 0.05,
 type: US_DRIVER_LICENSE, start: 334, end: 340, score: 0.01,
 type: SORTCODE, start: 334, end: 340, score: 0.001]

In [None]:
# Problematic entity types: DATE_TIME and US_DRIVER_LICENSE. Need to check these regex patterns.
# Also, wtf is IN_PAN?
# Write more test cases for sort code
# I don't think we need that US_DRIVER_LICENSE recognizer, are there really any cases where a customer would need to provide that?

## Test Anonymizer

In [16]:
# Create Anonymizer engine and add the custom anonymizer
anonymizer_engine = AnonymizerEngine()
anonymizer_engine.add_anonymizer(InstanceCounterAnonymizer)

# Create a mapping between entity types and counters
entity_mapping = dict()

# Anonymize the text
anonymized_result = anonymizer_engine.anonymize(
    text,
    analyzer_results,
    {
        "DEFAULT": OperatorConfig(
            "entity_counter", {"entity_mapping": entity_mapping}
        )
    },
)

In [17]:
anonymized_result

text: Let's assume I have a really long paragraph of text. Well, not super long, but still long enough that when I say sort code here, it will be a while until i actually provide it. I am going to fill some space between the actual value with some noise to see what happens if I provide the number after <IN_PAN_0> it initially. Here it is <US_DRIVER_LICENSE_0>
items:
[
    {'start': 334, 'end': 355, 'entity_type': 'US_DRIVER_LICENSE', 'text': '<US_DRIVER_LICENSE_0>', 'operator': 'entity_counter'},
    {'start': 298, 'end': 308, 'entity_type': 'IN_PAN', 'text': '<IN_PAN_0>', 'operator': 'entity_counter'}
]

## Test Deanonymizer

In [None]:
deanonymizer_engine = DeanonymizeEngine()
deanonymizer_engine.add_deanonymizer(InstanceCounterDeanonymizer)

deanonymized = deanonymizer_engine.deanonymize(
    anonymized_result.text, 
    anonymized_result.items, 
    {"DEFAULT": OperatorConfig("entity_counter_deanonymizer", 
                               params={"entity_mapping": entity_mapping})}
)