In [None]:
import os
from typing import List
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, RecognizerRegistry, EntityRecognizer, RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from presidio_analyzer.nlp_engine import NlpArtifacts

# Initialize Presidio Analyzer and Anonymizer engines
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

# Create a custom pattern recognizer for words like "one", "two", "three"
titles_recognizer = PatternRecognizer(supported_entity='NUMS', deny_list=["one", "two", "three"])

# Set up the registry and add predefined recognizers, then add the custom recognizer
registry = RecognizerRegistry()
registry.load_predefined_recognizers()
registry.add_recognizer(titles_recognizer)
analyzer = AnalyzerEngine(registry=registry)

# Define the custom NumbersRecognizer class
class NumbersRecognizer(EntityRecognizer):
    expected_confidence_level = 0.7  # Expected confidence level for this recognizer

    def load(self) -> None:
        """No loading is required."""
        pass

    def analyze(
        self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts
    ) -> List[RecognizerResult]:
        """
        Analyzes text to find tokens which represent numbers (either 123 or One Two Three).
        """
        results = []
        # Iterate over the spaCy tokens, and call `token.like_num`
        for token in nlp_artifacts.tokens:
            if token.like_num:
                result = RecognizerResult(
                    entity_type="NUMBER",
                    start=token.idx,
                    end=token.idx + len(token),
                    score=self.expected_confidence_level,
                )
                results.append(result)
        return results

# Initialize the custom numbers recognizer and add it to the analyzer
new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])
analyzer.registry.add_recognizer(new_numbers_recognizer)

# Define the input and output directories
input_directory = '/Users/mbolofinde/dev/anonprj/anonprj/output'
output_directory = '/Users/mbolofinde/dev/anonprj/anonprj/anon_output'

# Ensure the output directory exists
os.makedirs(output_directory, exist_ok=True)

# Specify the PII entities you want to detect and anonymize
entities = [
    "CREDIT_CARD", "DATE_TIME", "EMAIL_ADDRESS", "PERSON", "PHONE_NUMBER",
    "US_BANK_NUMBER", "US_DRIVER_LICENSE", "US_SSN", "NUMS", "NUMBER"
]

# Function to split text into smaller chunks
def split_text(text, max_length=100000):
    return [text[i:i + max_length] for i in range(0, len(text), max_length)]

# Helper function to anonymize identified PII
def anonymize_text(text, results):
    """
    Anonymize the text based on the analysis results.
    """
    # Define the replacement configuration for each entity
    operators = {
        "CREDIT_CARD": OperatorConfig("replace", {"new_value": "<CREDIT_CARD>"}),
        "DATE_TIME": OperatorConfig("replace", {"new_value": "<DATE>"}),
        "EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "<EMAIL>"}),
        "PERSON": OperatorConfig("replace", {"new_value": "<PERSON>"}),
        "PHONE_NUMBER": OperatorConfig("replace", {"new_value": "<PHONE>"}),
        "US_BANK_NUMBER": OperatorConfig("replace", {"new_value": "<BANK_NUMBER>"}),
        "US_DRIVER_LICENSE": OperatorConfig("replace", {"new_value": "<DRIVER_LICENSE>"}),
        "US_SSN": OperatorConfig("replace", {"new_value": "<SSN>"}),
        "NUMS": OperatorConfig("replace", {"new_value": "***"}),
        "NUMBER": OperatorConfig("replace", {"new_value": "***"})
    }

    # Perform the anonymization using the specified operators
    anonymized_result = anonymizer.anonymize(
        text=text,
        analyzer_results=results,
        operators=operators
    )
    return anonymized_result.text

# Process each text file in the input directory
for filename in os.listdir(input_directory):
    if filename.endswith('.txt'):
        input_file_path = os.path.join(input_directory, filename)
        output_file_path = os.path.join(output_directory, f"{os.path.splitext(filename)[0]}_anonymized.txt")
        print(f'Processing file: {input_file_path}')
        try:
            # Read the content of the file
            with open(input_file_path, 'r', encoding='utf-8') as file:
                text = file.read()
            
            # Split the text into smaller chunks
            text_chunks = split_text(text)
            anonymized_chunks = []

            for chunk in text_chunks:
                # Analyze the text chunk for PII entities
                results = analyzer.analyze(text=chunk, entities=entities, language='en')
                if results:
                    print(f'Found {len(results)} PII entities in chunk. Anonymizing...')
                    # Anonymize the detected PII entities
                    anonymized_chunk = anonymize_text(chunk, results)
                    anonymized_chunks.append(anonymized_chunk)
                else:
                    print(f'No PII entities found in chunk. Copying original content.')
                    anonymized_chunks.append(chunk)

            # Combine the anonymized chunks
            anonymized_text = ''.join(anonymized_chunks)

            # Write the anonymized text to the output file
            with open(output_file_path, 'w', encoding='utf-8') as file:
                file.write(anonymized_text)
            print(f'Anonymized file saved to: {output_file_path}\n')

        except Exception as e:
            print(f'Error processing file {filename}: {e}\n')

print('Processing complete for all files.')
