<a href="https://colab.research.google.com/github/kuldeep27396/apache_pinot_kafka/blob/main/pii_maskingdata_datasecurity.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# !pip install pyspark
# !pip install presidio-analyzer presidio-anonymizer
# !pip install typing


In [2]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, current_timestamp
from pyspark.sql.types import StringType
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import pandas as pd
from typing import Any

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("PII Anonymization Pipeline")


In [3]:
# @title
def initialize_spark() -> SparkSession:
    spark = SparkSession.builder \
        .appName("PII Anonymization Pipeline") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .getOrCreate()
    logger.info("Spark session initialized.")
    return spark

spark = initialize_spark()

In [4]:
def initialize_presidio():
    """
    Initialize and return Presidio Analyzer and Anonymizer engines along with custom patterns.
    """
    # Initialize Presidio engines
    analyzer = AnalyzerEngine()
    anonymizer = AnonymizerEngine()

    # Define custom patterns
    policy_number_pattern = Pattern(name="policy_number_pattern", regex=r"\d{8}", score=1.0)
    property_policy_number_pattern = Pattern(name="property_policy_number_pattern", regex=r"[A-Z]{3}\d{6}", score=1.0)
    time_pattern = Pattern(name="effective_timestamp_pattern", regex=r"(0[1-9]|1[0-2])/[0-3][0-9]/(?:19|20)\d{2}", score=1.0)

    # Create custom recognizers
    policy_number_recognizer = PatternRecognizer(
        supported_entity="POLICY_NUMBER",
        patterns=[policy_number_pattern, property_policy_number_pattern]
    )
    time_recognizer = PatternRecognizer(
        supported_entity="EFFECTIVE_DATE",
        patterns=[time_pattern]
    )

    # Register custom recognizers
    analyzer.registry.add_recognizer(policy_number_recognizer)
    analyzer.registry.add_recognizer(time_recognizer)

    # Define custom anonymization operators
    custom_anonymization_operators = {
        "PERSON": OperatorConfig("custom", {"lambda": lambda x: "**NAME**"}),
        "LOCATION": OperatorConfig("custom", {"lambda": lambda x: "**LOCATION**"}),
        "EMAIL_ADDRESS": OperatorConfig("custom", {"lambda": lambda x: "**EMAIL**"}),
        "PHONE_NUMBER": OperatorConfig("custom", {"lambda": lambda x: "**PHONE_NUMBER**"}),
        "POLICY_NUMBER": OperatorConfig("custom", {"lambda": lambda x: "**POLICY_NUMBER**"}),
        "EFFECTIVE_DATE": OperatorConfig("custom", {"lambda": lambda x: "**EFFECTIVE_DATE**"}),
        "DATE_TIME": OperatorConfig("custom", {"lambda": lambda x: "**DATE_TIME**"})
    }

    logger.info("Presidio engines initialized.")
    return analyzer, anonymizer, custom_anonymization_operators

# Initialize Presidio engines
analyzer, anonymizer, custom_operators = initialize_presidio()






[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_lg')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.




In [5]:
def broadcast_presidio_engines(spark, analyzer, anonymizer):
    return spark.sparkContext.broadcast(analyzer), spark.sparkContext.broadcast(anonymizer)

analyzer_broadcast, anonymizer_broadcast = broadcast_presidio_engines(spark, analyzer, anonymizer)


In [7]:
# Global variables for broadcasted objects
def set_broadcast_vars(b_analyzer, b_anonymizer, operators):
    global analyzer_broadcast, anonymizer_broadcast, custom_operators
    analyzer_broadcast = b_analyzer
    anonymizer_broadcast = b_anonymizer
    custom_operators = operators

set_broadcast_vars(analyzer_broadcast, anonymizer_broadcast, custom_operators)

def anonymize_text(text):
    """
    Anonymize a single text string using Presidio Analyzer and Anonymizer.
    """
    analyzer = analyzer_broadcast.value
    anonymizer = anonymizer_broadcast.value

    if not text or text.strip() == "":
        return text
    try:
        results = analyzer.analyze(text=text, language="en")
        if results:
            anonymized_result = anonymizer.anonymize(
                text=text,
                analyzer_results=results,
                operators=custom_operators
            )
            return anonymized_result.text
        return text
    except Exception as e:
        logger.error(f"Error processing text: {text}", exc_info=True)
        return "ERROR"

@pandas_udf(StringType())
def analyze_and_anonymize(text_series: pd.Series) -> pd.Series:
    """
    Anonymize a Pandas Series using the global analyzer and anonymizer.
    """
    return text_series.apply(anonymize_text)



In [11]:
import pandas as pd
import numpy as np
from faker import Faker
from datetime import datetime, timedelta
import random

def generate_synthetic_insurance_data(num_records=1000):
    """
    Generate synthetic insurance data with PII and PCI information.
    Returns a DataFrame with realistic-looking insurance records.
    """
    fake = Faker()
    Faker.seed(12345)
    np.random.seed(12345)

    # Lists for categorical data
    insurance_types = ['Health', 'Auto', 'Home', 'Life', 'Disability']
    claim_status = ['Pending', 'Approved', 'Denied', 'Under Review']
    payment_methods = ['Credit Card', 'Debit Card', 'Bank Transfer', 'Check']

    # Generate base data
    data = {
        # PII Information
        'customer_id': [f'CUS{str(i).zfill(6)}' for i in range(num_records)],
        'first_name': [fake.first_name() for _ in range(num_records)],
        'last_name': [fake.last_name() for _ in range(num_records)],
        'email': [fake.email() for _ in range(num_records)],
        'phone': [fake.phone_number() for _ in range(num_records)],
        'ssn': [fake.ssn() for _ in range(num_records)],
        'dob': [fake.date_of_birth(minimum_age=18, maximum_age=90).strftime('%Y-%m-%d')
                for _ in range(num_records)],
        'address': [fake.address().replace('\n', ', ') for _ in range(num_records)],

        # Insurance Information
        'insurance_type': [random.choice(insurance_types) for _ in range(num_records)],
        'policy_number': [f'POL{str(i).zfill(8)}' for i in range(num_records)],
        'premium_amount': np.random.uniform(500, 5000, num_records).round(2),
        'coverage_amount': np.random.uniform(50000, 1000000, num_records).round(2),
        'policy_start_date': [fake.date_between(start_date='-5y', end_date='today').strftime('%Y-%m-%d')
                             for _ in range(num_records)],

        # Claims Information
        'claim_number': [f'CLM{str(i).zfill(8)}' if random.random() < 0.3 else ''
                        for i in range(num_records)],
        'claim_amount': [round(random.uniform(100, 50000), 2) if random.random() < 0.3 else 0
                        for _ in range(num_records)],
        'claim_status': [random.choice(claim_status) if random.random() < 0.3 else ''
                        for _ in range(num_records)],

        # Payment Information (PCI)
        'payment_method': [random.choice(payment_methods) for _ in range(num_records)],
        'card_number': [fake.credit_card_number() if random.random() < 0.7 else ''
                       for _ in range(num_records)],
        'card_expiry': [fake.credit_card_expire() if random.random() < 0.7 else ''
                       for _ in range(num_records)],

        # Adding some anomalous patterns
        'last_payment_date': [fake.date_between(start_date='-1y', end_date='today').strftime('%Y-%m-%d')
                            for _ in range(num_records)],
        'payment_frequency': [random.choice(['Monthly', 'Quarterly', 'Annually'])
                            for _ in range(num_records)]
    }

    df = pd.DataFrame(data)

    # Insert some anomalies (approximately 5% of records)
    num_anomalies = int(num_records * 0.05)
    anomaly_indices = np.random.choice(num_records, num_anomalies, replace=False)

    for idx in anomaly_indices:
        anomaly_type = random.random()

        if anomaly_type < 0.3:
            # Unusually high premium amount
            df.loc[idx, 'premium_amount'] = random.uniform(10000, 20000)
        elif anomaly_type < 0.6:
            # Unusually high claim amount compared to coverage
            df.loc[idx, 'claim_amount'] = df.loc[idx, 'coverage_amount'] * random.uniform(0.8, 0.95)
        else:
            # Suspicious payment pattern
            df.loc[idx, 'payment_frequency'] = 'Irregular'
            df.loc[idx, 'premium_amount'] = random.uniform(100, 300)

    return df

# Generate the synthetic dataset
df = generate_synthetic_insurance_data(1000)


# Save to CSV
df.to_csv('synthetic_pii_pci_data.csv', index=False)

# Display first few records
print("Sample of generated data:")
print(df.head())

# Display basic statistics
print("\nDataset Statistics:")
print(f"Total number of records: {len(df)}")
print(f"Number of customers with claims: {len(df[df['claim_number'] != ''])}")
print(f"Average premium amount: ${df['premium_amount'].mean():.2f}")
print(f"Average claim amount: ${df['claim_amount'].mean():.2f}")


Sample of generated data:
  customer_id first_name  last_name                    email  \
0   CUS000000     Robert      Brown      kthomas@example.net   
1   CUS000001  Alejandro      Banks     xenglish@example.net   
2   CUS000002      Mandy  Mcpherson    richard97@example.net   
3   CUS000003    Kenneth     Thomas  graywilliam@example.net   
4   CUS000004     Miguel     Hansen      yturner@example.net   

                   phone          ssn         dob  \
0      (508)895-7778x480  727-36-5455  1981-06-10   
1       321-656-5380x865  826-54-2565  1968-05-19   
2       001-949-228-3320  707-59-1278  1979-01-17   
3    +1-255-946-5759x363  720-75-5722  1962-03-12   
4  +1-942-653-6820x61579  686-75-4465  1991-04-14   

                                             address insurance_type  \
0  10581 Gina Shoals Suite 529, Kellymouth, VA 12422           Home   
1           120 Michael Fort, New Danielle, VA 28961           Auto   
2  178 Williams Greens Suite 218, West Jessicaber...     

In [13]:
dff = spark.createDataFrame(df)

In [14]:

def anonymize_column(df, column: str):
    """
    Apply anonymization on the specified column of the DataFrame.
    """
    logger.info(f"Starting anonymization for column: {column}")
    return df.withColumn(
        column,
        analyze_and_anonymize(col(column))
    )

# Specify the column to anonymize
anonymized_column = "email"  # Update this to your column name
anonymized_df = anonymize_column(dff, anonymized_column)
anonymized_df.show(10, truncate=False)


+-----------+----------+---------+---------+----------------------+-----------+----------+---------------------------------------------------------+--------------+-------------+-----------------+---------------+-----------------+------------+------------+------------+--------------+----------------+-----------+-----------------+-----------------+
|customer_id|first_name|last_name|email    |phone                 |ssn        |dob       |address                                                  |insurance_type|policy_number|premium_amount   |coverage_amount|policy_start_date|claim_number|claim_amount|claim_status|payment_method|card_number     |card_expiry|last_payment_date|payment_frequency|
+-----------+----------+---------+---------+----------------------+-----------+----------+---------------------------------------------------------+--------------+-------------+-----------------+---------------+-----------------+------------+------------+------------+--------------+----------------+--