# **Presidio + Spark: PII Detection & Anonymization**  

## **Table of Contents**  

**1. Small Model Setup & Demo**: Initialize Spark session and set up a lightweight SpaCy-based PII detection model.  
**2. Large Model Setup & Demo**: Configure a larger SpaCy model for enhanced PII detection on bigger datasets.  
**3. Detect PII Summary**: Run Presidio’s detection engine on a Spark DataFrame to extract sensitive information.  
**4. Anonymize user_query**: Apply Presidio’s anonymization engine to mask detected PII while maintaining data utility.  
**5. Scale Data (Duplicate Rows)**: Expand datasets by duplicating rows, appending unique IDs, and re-anonymizing.  


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PresidioInFabric").getOrCreate()

In [None]:
%pip install /lakehouse/default/Files/presidio/models/en_core_web_lg-3.8.0-py3-none-any.whl  
# Installing the large model from the lakehouse as it exceeds the size limit for custom libraries in the Fabric environment.

In [None]:
from pyspark.sql.functions import (
    array, lit, explode, col, monotonically_increasing_id, concat
)
from presidio_analyzer import AnalyzerEngine
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import pandas_udf, PandasUDFType
from presidio_anonymizer.entities import OperatorConfig
import pandas as pd

In [None]:
num_duplicates = 5000 # for the scale part
csv_path = "Files/presidio/fabric_sample_data.csv"
is_write_to_delta = True
table_namne = "presidio_demo_table"
partitions_number = 100

## **Small Model Setup & Demo**  

In [None]:
configuration = {
    "nlp_engine_name": "spacy",
    "models": [
        {"lang_code": "en", "model_name": "en_core_web_md"},
    ]
}

provider = NlpEngineProvider(nlp_configuration=configuration)
nlp_engine_with_spanish = provider.create_engine()

small_analyzer = AnalyzerEngine(
    nlp_engine=nlp_engine_with_spanish, supported_languages=["en"]
)

text_to_anonymize = "His name is Mr. Jones and his phone number is 212-555-5555"
analyzer_results = small_analyzer.analyze(text=text_to_anonymize, entities=["PHONE_NUMBER"], language='en')
print(analyzer_results)

## **Large Model Setup & Demo**  

In [None]:
configuration = {
    "nlp_engine_name": "spacy",
    "models": [
        {"lang_code": "en", "model_name": "en_core_web_lg"},
    ]
}

provider = NlpEngineProvider(nlp_configuration=configuration)
nlp_engine_with_spanish = provider.create_engine()

analyzer = AnalyzerEngine(
    nlp_engine=nlp_engine_with_spanish, supported_languages=["en"]
)

text_to_anonymize = "His name is Mr. Jones and his phone number is 212-555-5555"
analyzer_results = analyzer.analyze(text=text_to_anonymize, entities=["PHONE_NUMBER"], language='en')

print(analyzer_results)

## **Load the data and broadcast**  

In [None]:
anonymizer = AnonymizerEngine()
broadcasted_analyzer = spark.sparkContext.broadcast(analyzer)
broadcasted_anonymizer = spark.sparkContext.broadcast(anonymizer)
df = spark.read.format("csv").option("header", "true").load(csv_path)
display(df)

## **Detect PII Summary**  

In [None]:
def detect_pii_in_row(*cols):
    """
    Analyze each column separately so we know which substring (entity text) 
    belongs to which column. Return a dict {col_name: [ 'ENTITY_TYPE: substring', ... ] }.
    """
    analyzer = broadcasted_analyzer.value
    col_names = detect_pii_in_row.col_names
    entities_found = {}

    for idx, val in enumerate(cols):
        if val is None:
            continue
        column_text = str(val)
        results = analyzer.analyze(text=column_text, language="en")

        if results:
            # Example: ["PERSON: John Doe", "PHONE_NUMBER: 212-555-1111", ...]
            found_entities = []
            for res in results:
                substring = column_text[res.start:res.end]  # The actual text recognized
                entity_str = f"{res.entity_type}: {substring}"
                found_entities.append(entity_str)
            
            entities_found[col_names[idx]] = found_entities

    # If no PII was detected at all
    if not entities_found:
        return "No PII"
    return str(entities_found)

detect_pii_in_row.col_names = df.columns
detect_pii_udf = udf(detect_pii_in_row, StringType())

df_with_pii_summary = df.withColumn(
    "pii_summary",
    detect_pii_udf(*[col(c) for c in df.columns])
)

display(df_with_pii_summary)


## **Anonymize user_query**  

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def anonymize_text(text: str) -> str:
    """
    Detect PII in the given text using the large model and replace it with an empty string.
    """
    if text is None:
        return None

    analyzer = broadcasted_analyzer.value
    anonymizer = broadcasted_anonymizer.value
    analyze_results = analyzer.analyze(text=text, language="en")
    anonymized_result = anonymizer.anonymize(
        text=text,
        analyzer_results=analyze_results,
        operators={"DEFAULT": OperatorConfig("replace", {"new_value": ""})}
)
    return anonymized_result.text

# Registering as a regular PySpark UDF
anon_udf = udf(anonymize_text, StringType())

df_final = df_with_pii_summary.withColumn("anon_user_query", anon_udf(col("user_query")))

display(df_final)

## **Scale Data (Duplicate Rows)**  

In [None]:
df_expanded = df.withColumn(
    "duplication_array",
    array([lit(i) for i in range(num_duplicates)])
)

df_test = df_expanded.withColumn("duplicate_id", explode(col("duplication_array")))

df_test = df_test.withColumn("id", monotonically_increasing_id())

df_test = df_test.withColumn(
    "user_query",
    concat(col("user_query"), lit(" - ID: "), col("id"))
)

df_test = df_test.drop("duplication_array", "duplicate_id")
df_test = df_test.repartition(partitions_number) # repartition to show parrallel processing -should be remove/modify to allow high scales.
df_test = df_test.withColumn("anon_user_query", anon_udf(col("user_query")))
print(f"total row number {df_test.count()}") # Number of duplicates X number of rows in the DF
display(df_test.limit(partitions_number))

In [None]:
if is_write_to_delta:
    df_test.write.format("delta").mode("overwrite").saveAsTable(table_namne)