<a href="https://colab.research.google.com/github/midhun-james/validation_module/blob/main/validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install presidio_analyzer
!pip install faker
!python -m spacy download en_core_web_lg

Collecting presidio_analyzer
  Downloading presidio_analyzer-2.2.358-py3-none-any.whl.metadata (3.2 kB)
Collecting phonenumbers<9.0.0,>=8.12 (from presidio_analyzer)
  Downloading phonenumbers-8.13.55-py2.py3-none-any.whl.metadata (11 kB)
Collecting tldextract (from presidio_analyzer)
  Downloading tldextract-5.1.3-py3-none-any.whl.metadata (11 kB)
Collecting requests-file>=1.4 (from tldextract->presidio_analyzer)
  Downloading requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading presidio_analyzer-2.2.358-py3-none-any.whl (114 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.9/114.9 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading phonenumbers-8.13.55-py2.py3-none-any.whl (2.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m36.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tldextract-5.1.3-py3-none-any.whl (104 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.9/104.9 kB[

In [9]:
# Testing - classify orgainization
import re
import os
import gzip
import time
import json
import spacy
import string
import random
import pandas as pd
from faker import Faker
from collections import defaultdict
from presidio_analyzer import AnalyzerEngine

nlp = spacy.load("en_core_web_lg")
analyzer = AnalyzerEngine()
fake=Faker()
with open("test.json", "r") as f:
    fake_data_list = json.load(f)
fake_data={}
for data in fake_data_list:
    for key,value in data.items():
        fake_data[key]=set(value)

entity_mapping={
    'names':'PERSON',
    'emails':'EMAIL_ADDRESS',
    'phone':'PHONE_NUMBER',
    'location':'LOCATION',
    'credit':'CREDIT_CARD',
    'url':'URL',
    # 'country':'COUNTRY',
    # 'company':"ORG",
    'id':'ID',
}

mapping_file="mapping.json"
forward_mapping=defaultdict(dict)
reverse_mapping=defaultdict(dict)

if os.path.exists(mapping_file):
    with open(mapping_file, "r") as f:
        mapping_data = json.load(f)
        forward_mapping.update(mapping_data.get("forward_mapping", {}))
        reverse_mapping.update(mapping_data.get("reverse_mapping", {}))
def time_it(func):
    """Decorator to measure execution time of functions."""
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f'\n⏳ Execution time {func.__name__}: {end-start:.6f} seconds')
        return result
    return wrapper

@time_it
def analyze_column(df):
    entity_columns = {}  # Initialize as a dictionary

    # Step 1: Use Presidio to analyze all columns
    for col in df.columns:
        if 'id' in col.lower():
            entity_columns[col] = 'ID'
        elif 'country' in col.lower():
            entity_columns[col] = 'COUNTRY'
        else:
            unique_values = df[col].dropna().astype(str).unique()[:25]
            entity_counts = {}

            for value in unique_values:
                results = analyzer.analyze(text=value, language='en')
                for result in results:
                    entity_counts[result.entity_type] = entity_counts.get(result.entity_type, 0) + 1

            if entity_counts:
                predominant_entity = max(entity_counts, key=entity_counts.get)
                      if predominant_entity=="LOCATION":
                        org_count=0
                        for value in unique_values:
                          doc=nlp(value)
                          for ent in doc.ents:
                            if ent.label_=="ORG":
                              org_count+=1
                        if org_count>12:
                          predominant_entity="ORG"
                entity_columns[col] = predominant_entity

    # Step 2: Use SpaCy to analyze non-numeric and unclassified columns
    for col in df.select_dtypes(exclude=['number']).columns:
        if col not in entity_columns:
            unique_values = df[col].dropna().astype(str).unique()[:25]
            org_count = 0

            for value in unique_values:
                doc = nlp(value)
                for ent in doc.ents:
                    if ent.label_ == 'ORG':
                        org_count += 1

            # If more than half of the sample values are ORG, classify as ORG
            if org_count > 12:
                entity_columns[col] = 'ORG'

    return entity_columns

def modify_fake_value(category, base_fake_value, counter):
    if category == "names":
        return f"{base_fake_value} {string.ascii_uppercase[counter % 26]}."

    elif category == "emails":
        name, domain = base_fake_value.split("@")
        return f"{name}{counter}@{domain}"

    elif category == "location":
        return f"{base_fake_value}, District {counter % 100_000_000 + 1}"

    elif category == "url":
        return base_fake_value.replace("://", f"://sub{counter}.", 1)

    elif category == "phone":
        return f"{base_fake_value[:-2]}{counter % 100:02d}"

    elif category == "company":
        return f"{base_fake_value} Group {counter % 50}"

    elif category == "credit":
        return f"{base_fake_value[:-4]}{counter % 10000:04d}"
    else:
        return f"{base_fake_value}-{counter}"



def get_fake_value(category, original_value):
    # Handle ID category separately for a unique format
    if original_value in forward_mapping[category]:
        return forward_mapping[category][original_value]
    if category == 'id':
        fake_value = fake.bothify(text='ID-############')
    elif fake_data.get(category):
        fake_value = fake_data[category].pop() if fake_data[category] else None
    else:
        # Reuse existing fake values or create a new unknown value
        used_fake=list(forward_mapping[category].values())
        if used_fake:
          base_fake_value=random.choice(used_fake)
          counter=len(used_fake)
          fake_value=modify_fake_value(category,base_fake_value,counter)

    # Save mappings
    forward_mapping[category][original_value] = fake_value
    reverse_mapping[category][fake_value] = original_value

    return fake_value

@time_it
def mask_dataframe(df):
    for col, entity in entity_columns.items():
        matching_keys = [key for key, value in entity_mapping.items() if value == entity]
        if matching_keys:
            df[col] = df[col].astype(str).apply(lambda x: get_fake_value(matching_keys[0], str(x)) if x else str(x))
    return df
def restore_original_value(category, fake_value):
    return reverse_mapping[category].get(fake_value, fake_value)

@time_it
def unmask_dataframe(df):
    for col, entity in entity_columns.items():
        matching_keys = [key for key, value in entity_mapping.items() if value == entity]

        if matching_keys:
            category = matching_keys[0]
            df[col] = df[col].astype(str).apply(lambda x: restore_original_value(category, str(x)) if x else str(x))

    return df
def compare_files(original_file, restored_file):
    """Check if the original and restored files are identical."""
    file_ext = os.path.splitext(original_file)[-1].lower()
    original_df = pd.read_excel(original_file) if file_ext == ".xlsx" else pd.read_csv(original_file)
    restored_df = pd.read_excel(restored_file) if file_ext == ".xlsx" else pd.read_csv(restored_file)

    is_identical = original_df.equals(restored_df)
    print(f"📊 Are files identical? {'✅ Yes' if is_identical else '❌ No'}")
    if not is_identical:
        print("⚠️ The restored file does not match the original. There may be an issue with the mapping.")

    return is_identical
def de_anonymize_paragraph(text):
  for category,mapping in reverse_mapping.items():
    for fake_value,original_value in mapping.items():
      if fake_value in text:
        text=text.replace(fake_value,original_value)
  return text
def save_mapping():
    mapping_data={
        "forward_mapping":forward_mapping,
        "reverse_mapping":reverse_mapping
    }
    with open(mapping_file, "w") as f:
        json.dump(mapping_data, f, indent=4)

if __name__=="__main__":
    input_file="test.csv"
    file_ext=os.path.splitext(input_file)[-1].lower()

    df = pd.read_excel(input_file, dtype=str) if file_ext == ".xlsx" else pd.read_csv(input_file, dtype=str, low_memory=False)
    entity_columns=analyze_column(df)
    print(entity_columns)

    anonymized_df=mask_dataframe(df)
    output_file="anonymized.xlsx" if file_ext==".xlsx" else "anonymized.csv"
    anonymized_df.to_excel(output_file,index=False) if file_ext==".xlsx" else anonymized_df.to_csv(output_file,index=False)
    print(f"✅ Anonymized data saved as {output_file}")

    save_mapping()
    restored_df=unmask_dataframe(pd.read_excel(output_file) if file_ext==".xlsx" else pd.read_csv(output_file))
    restored_file="restored.xlsx" if file_ext==".xlsx" else "restored.csv"
    restored_df.to_excel(restored_file,index=False) if file_ext==".xlsx" else restored_df.to_csv(restored_file,index=False)
    print(f"✅ Restored data saved as {restored_file}")
    compare_files(input_file, restored_file)



In [10]:
if __name__=="__main__":
    input_file="test.csv"
    file_ext=os.path.splitext(input_file)[-1].lower()

    df = pd.read_excel(input_file, dtype=str) if file_ext == ".xlsx" else pd.read_csv(input_file, dtype=str, low_memory=False)
    entity_columns=analyze_column(df)
    print(entity_columns)

    anonymized_df=mask_dataframe(df)
    output_file="anonymized.xlsx" if file_ext==".xlsx" else "anonymized.csv"
    anonymized_df.to_excel(output_file,index=False) if file_ext==".xlsx" else anonymized_df.to_csv(output_file,index=False)
    print(f"✅ Anonymized data saved as {output_file}")

    save_mapping()
    restored_df=unmask_dataframe(pd.read_excel(output_file) if file_ext==".xlsx" else pd.read_csv(output_file))
    restored_file="restored.xlsx" if file_ext==".xlsx" else "restored.csv"
    restored_df.to_excel(restored_file,index=False) if file_ext==".xlsx" else restored_df.to_csv(restored_file,index=False)
    print(f"✅ Restored data saved as {restored_file}")
    compare_files(input_file, restored_file)

22
2
0

⏳ Execution time analyze_column: 4.885260 seconds
{'id': 'ID', 'country': 'COUNTRY', 'name': 'ORG', 'domain': 'URL', 'size range': 'DATE_TIME', 'locality': 'LOCATION', 'linkedin url': 'URL', 'current employee estimate': 'US_DRIVER_LICENSE', 'total employee estimate': 'US_DRIVER_LICENSE'}

⏳ Execution time mask_dataframe: 0.003151 seconds
✅ Anonymized data saved as anonymized.csv

⏳ Execution time unmask_dataframe: 0.001795 seconds
✅ Restored data saved as restored.csv
📊 Are files identical? ❌ No
⚠️ The restored file does not match the original. There may be an issue with the mapping.
