<a href="https://colab.research.google.com/github/midhunjmes/pres_faker/blob/main/thejus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install spacy
!python -m spacy download en_core_web_sm
!pip install openpyxl
!pip install presidio_analyzer
!python -m spacy download en_core_web_lg

Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m84.6 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
Collecting presidio_analyzer
  Downloading presidio_analyzer-2.2.358-py3-none-any.whl.metadata (3.2 kB)
Collecting phonenumbers<9.0.0,>=8.12 (from presidio_analyzer)
  Downloading phonenumbers-8.13.55-py2.py3-none-any.whl.metadata (11 kB)
Collecting tldextract (from presidio_analyzer)
  Downloading tldextract-5.1.3-py3-none-any.whl.m

In [2]:
# from presidio_analyzer import AnalyzerEngine

# analyzer = AnalyzerEngine()
# for recognizer in analyzer.get_recognizers():
#     print(recognizer.supported_entities)

In [3]:


import pandas as pd
import spacy
import re
import openpyxl
import json
from presidio_analyzer import AnalyzerEngine
analyzer = AnalyzerEngine()
mapping={}
nlp = spacy.load("en_core_web_sm")


#-------------------------------------------------------------------------------------------------------------------------------
#function to find out the noun values dominating columns considering it will be a sensitive data if there is so many nouns
#------------------------------------------------------------------------------------------------------------------------------
def detect_noun(file_path):
    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path, engine="python")
    elif file_path.endswith((".xls", ".xlsx")):
        df = pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")

    sensitive = []

    for col in df.columns:
        if df[col].dtype in ['int64', 'float64']:
            continue  # Skip purely numeric columns

        text_samples = df[col].astype(str).head(5)  # Take first 5 values
        noun_count = 0
        total_count = 0

        for value in text_samples:
            if "%" in value or value.replace(".", "").isdigit():
                continue  # Skip percentage or number-like values

            doc = nlp(value)
            for token in doc:
                if token.pos_ in ['NOUN', 'PROPN']:
                    noun_count += 1
                total_count += 1

        # Mark column as sensitive only if a significant portion are nouns
        if total_count > 0 and (noun_count / total_count) > 0.2:
            sensitive.append(col)

    return sensitive


#------------------------------------------------------------------------------------------
#finding out the descriptive data columns which may have sensitive data in the form of text
#------------------------------------------------------------------------------------------
def descriptive_columns(file_path):
    # Define keywords to filter out
    keywords = ["description", "remarks", "notes", "comments", "observations", "details", "summary", "explanation",
    "reviews", "feedback", "testimonials", "opinions", "assessment", "suggestions", "experience",
    "incident_report", "case_notes", "audit_notes", "findings", "status_update", "history", "progress_report",
    "additional_info", "clarifications", "justification", "annotations", "excerpts", "statement", "explanation_text"]

    # Ensure columns are properly loaded from CSV/Excel
    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path, nrows=1)  # Read only header
    elif file_path.endswith((".xls", ".xlsx")):
        df = pd.read_excel(file_path, nrows=1, engine="openpyxl")  # Read only header
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")

    # Get actual column names
    all_columns = df.columns.tolist()


    des=[col for col in all_columns if any(re.search(keyword, col, re.IGNORECASE) for keyword in keywords)]
    return des

#-----------------------------------------------------------------
#anonymizing descriptive values
#-----------------------------------------------------------------
def detect_noun_desc(text):
    doc = nlp(text)
    modified_text=[]
    for token in doc:
        if token.pos_ in ['PROPN']:
            modified_text.append("<sensitive>")
        else:
            modified_text.append(token.text)
    text=" ".join(modified_text)
    return text


#------------------------------------------------------------------
#function for detecting numerical sensitive
#------------------------------------------------------------------
def detect_sensitive_numerical(file_path,sensitive):
    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path, engine="python")
    elif file_path.endswith((".xls", ".xlsx")):
        df = pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")

    sensitive_columns = []

    for col in df.columns:
        if col not in sensitive:
          text_samples = df[col].astype(str).head(5)  # Convert first 5 values to string

          for value in text_samples:
              if pd.notna(value) and value.strip():
                  results = analyzer.analyze(text=value, language="en")

                  for result in results:
                      # print(result,result.entity_type)
                      if result.entity_type in ["PHONE_NUMBER", "CREDIT_CARD", "IBAN", "US_SSN","EMAIL"]:
                          sensitive_columns.append(col)
                          break  # If any value in the column is sensitive, mark the whole column

    return list(set(sensitive_columns))
#-------------------------------------------------------------------
#function to anonymize a excel file
#-------------------------------------------------------------------
def excel_an(input_file, output_file):
    df = pd.read_excel(input_file)  # Read as string for safety

    sensitive_old = detect_noun(input_file)
    desc = descriptive_columns(input_file)
    sensitive = list(set(sensitive_old) - set(desc))
    num_sensitive=detect_sensitive_numerical(input_file,sensitive_old)
    sensitive=list(set(sensitive)+set(num_sensitive))

    column_counters = {col: 1 for col in df.columns if col in sensitive_old}
    column_mappings = {col: {} for col in sensitive}  # Store mappings for each column
    mapping = {}

    # Anonymize sensitive columns while maintaining consistency
    for col in sensitive:
        new_values = []
        for val in df[col].astype(str):
            if pd.notna(val):
                if val in column_mappings[col]:
                    anonymized_value = column_mappings[col][val]  # Use existing mapping
                else:
                    anonymized_value = f"{col}{column_counters[col]}"
                    column_mappings[col][val] = anonymized_value  # Store new mapping
                    column_counters[col] += 1  # Increment counter

                mapping[anonymized_value] = val
                new_values.append(anonymized_value)
            else:
                new_values.append(val)

        df[col] = new_values

    # Anonymize descriptive columns
    for col in desc:
        for idx, val in enumerate(df[col].astype(str)):
            if pd.notna(val):
                an_values = detect_noun_desc(val)
                df.at[idx, col] = an_values
                mapping[an_values] = val

    df.to_excel(output_file, index=False, sheet_name="Anonymized Data")
    print(f"✅ Anonymized file saved as {output_file}")

    # Save mapping as JSON
    with open("mappings.json", "w") as f:
        json.dump(mapping, f)

#------------------------------------------------------
#function for de-anonymizing excel data
#------------------------------------------------------

def excel_dean(input_file, output_file, mapping_file):
    print("🔄 Loading data...")

    with open(mapping_file, "r") as f:
        mapping = json.load(f)

    df = pd.read_excel(input_file)  # Read as string for safety
    mapping_keys = set(mapping.keys())
    df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
    df.to_excel(output_file, index=False, sheet_name="De-anonymized Data")

    print(f"✅ De-anonymized file saved as {output_file}")

#----------------------------------------------------------
#function for anonymizing csv data
#----------------------------------------------------------
def csv_an(input_file, output_file):
    df = pd.read_csv(input_file, engine="python")

    sensitive_old = detect_noun(input_file)
    desc = descriptive_columns(input_file)
    sensitive = list(set(sensitive_old) - set(desc))
    num_sensitive=detect_sensitive_numerical(input_file,sensitive_old)
    sensitive=sensitive+num_sensitive
    column_counters = {col: 1 for col in df.columns if col in sensitive}
    column_mappings = {col: {} for col in sensitive}  # Store mappings for each column

    # Anonymize sensitive columns while maintaining consistency
    for col in sensitive:
        new_values = []
        for val in df[col].astype(str):
            if pd.notna(val):
                if val in column_mappings[col]:
                    anonymized_value = column_mappings[col][val]  # Use existing mapping
                else:
                    anonymized_value = f"{col}{column_counters[col]}"
                    column_mappings[col][val] = anonymized_value  # Store new mapping
                    column_counters[col] += 1  # Increment counter

                mapping[anonymized_value] = val
                new_values.append(anonymized_value)
            else:
                new_values.append(val)

        df[col] = new_values

    # Anonymize descriptive columns
    for col in desc:
        for idx, val in enumerate(df[col].astype(str)):
            if pd.notna(val):
                an_values = detect_noun_desc(val)
                df.at[idx, col] = an_values
                mapping[an_values] = val

    df.to_csv(output_file, index=False)
    print(f"✅ Anonymized file saved as {output_file}")

    # Save mapping as JSON
    with open("mappings.json", "w") as f:
        json.dump(mapping, f)

#------------------------------------------------------------
#function for de anonymizing csv data
#------------------------------------------------------------
def csv_dean(input_file, output_file, mapping_file):
    print("🔄 Loading data...")

    with open(mapping_file, "r") as f:
        mapping = json.load(f)
    df = pd.read_csv(input_file, engine="python", dtype=str)  # Read as string for safety
    mapping_keys = set(mapping.keys())
    df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
    df.to_csv(output_file, index=False)

    print(f"✅ De-anonymized file saved as {output_file}")


#--------------------------------------------------------------
#function to determine whcih type of data is need to perform
#--------------------------------------------------------------
def anonymization(input_file):
    if input_file.endswith(".csv"):
        csv_an(input_file,"intermediate.csv")
        csv_dean("intermediate.csv","deanonymized.csv","mappings.json")
    elif input_file.endswith(".xlsx"):
        excel_an(input_file,"intermediate.xlsx")
        excel_dean("intermediate.xlsx","deanonymized.xlsx","mappings.json")





In [4]:
# file="numerical.csv"
# anonymization(file)

In [6]:
!pip install faker

Collecting faker
  Downloading faker-37.0.2-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.0.2-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m21.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-37.0.2


In [9]:
import json

import logging

import os

from datetime import datetime

from typing import List



import numpy as np

import pandas as pd

import spacy

from faker import Faker



logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)



class MaskingModule:

    def __init__(self, mapping_file: str = 'output_xlsx/data_mapping.json', seed: int = 42):

        self.fake = Faker()

        Faker.seed(seed)

        self.mapping_file = mapping_file

        self.forward_mapping = {}

        self.reverse_mapping = {}

        self._load_mapping_if_exists()

        logger.info("✅ ExcelDataMasker initialized.")



    def _generate_fake_value(self, original: str, column: str) -> str:

        if isinstance(original, (int, float, np.integer, np.floating)):

            return original



        original_str = str(original)

        if original_str in self.forward_mapping:

            return self.forward_mapping[original_str]



        column_lower = column.lower()

        unique_fake_values = set(self.forward_mapping.values())



        while True:

            if 'employee' in column_lower or 'company' in column_lower or 'account' in column_lower:

                fake_value = f"{self.fake.first_name()}{self.fake.random_number(digits=12)}{self.fake.first_name()}"

            elif 'project' in column_lower:

                fake_value = f"PRJ-{self.fake.random_number(digits=12)}"

            elif 'entity' in column_lower:

                fake_value = f"ENT-{self.fake.random_number(digits=12)}"

            elif 'program' in column_lower:

                fake_value = f"PROG-{self.fake.random_number(digits=12)}"

            else:

                fake_value = f"ANON-{self.fake.random_number(digits=12)}"



            if fake_value not in unique_fake_values:

                break



        self.forward_mapping[original_str] = fake_value

        self.reverse_mapping[fake_value] = original_str

        logger.info(f"🔄 Mapping created: {original_str} → {fake_value}")

        return fake_value



    def _save_mapping(self):

        mapping_data = {

            'forward_mapping': self.forward_mapping,

            'reverse_mapping': self.reverse_mapping,

            'metadata': {

                'updated_at': datetime.now().isoformat(),

                'record_count': len(self.forward_mapping)

            }

        }



        with open(self.mapping_file, 'w') as f:

            json.dump(mapping_data, f, indent=4)

        logger.info("💾 Mapping saved successfully.")



    def _load_mapping_if_exists(self):

        if os.path.exists(self.mapping_file):

            with open(self.mapping_file, 'r') as f:

                data = json.load(f)

                self.forward_mapping = data['forward_mapping']

                self.reverse_mapping = data['reverse_mapping']

            logger.info("📂 Existing mapping loaded.")



    def process_excel(self, input_file: str, output_file: str, mode: str = 'anonymize', include_columns: List[str] = None):

        if mode not in ['anonymize', 'deanonymize']:

            raise ValueError("Mode must be either 'anonymize' or 'deanonymize'")



        df = pd.read_excel(input_file)

        include_set = {'Account', 'Project ID', 'Program Name', 'Employee', 'Cost Category', 'Entity', 'Description'}



        if include_columns:

            include_set.update(include_columns)



        for column in df.columns:

            if column in include_set:

                if mode == 'anonymize':

                    df[column] = df[column].apply(lambda x: self._generate_fake_value(x, column) if pd.notna(x) else x)

                else:

                    df[column] = df[column].apply(lambda x: self.reverse_mapping.get(str(x), x) if pd.notna(x) and isinstance(x, str) else x)



        df.to_excel(output_file, index=False)



        if mode == 'anonymize':

            self._save_mapping()



        logger.info(f"✅ Data {mode}d and saved to {output_file}")



    def check_if_two_xlsx_is_same(self, input_file: str, restored_file: str):

        original_df = pd.read_excel(input_file)

        restored_df = pd.read_excel(restored_file)

        logger.info("🔎 Verifying file integrity...")

        test = original_df.equals(restored_df)

        logger.info(f"📊 Are files identical? {test}")



        if not test:

            logger.warning("⚠️ Files are not identical!")

            logger.info("🔍 Sample from original:")

            logger.info(original_df.head(2))

            logger.info("🔍 Sample from restored:")

            logger.info(restored_df.head(2))

        else:

            logger.info("✅ Files match perfectly!")



    def print_mapping_sample(self, n: int = 5):

        logger.info("📌 Forward Mapping (Original → Anonymized):")

        for i, (k, v) in enumerate(list(self.forward_mapping.items())[:n]):

            logger.info(f"{k} → {v}")



        logger.info("📌 Reverse Mapping (Anonymized → Original):")

        for i, (k, v) in enumerate(list(self.reverse_mapping.items())[:n]):

            logger.info(f"{k} → {v}")



    def mask_text(self,text, replace_dict):

        logger.info("🔄 Processing text masking...")

        nlp = spacy.load("en_core_web_sm")

        doc = nlp(text)



        for key, value in replace_dict.items():

            text = text.replace(key, value)



        logger.info("✅ Masking completed!")

        return text











if __name__ == "__main__":

    masker = MaskingModule()

    input_file = '/content/split_file_part1.csv'

    output_folder = "output_xlsx"



    masker.process_excel(input_file=input_file, output_file=f'{output_folder}/anonymized.xlsx', mode='anonymize')

    masker.print_mapping_sample()



    logger.info("\nStep 2: Deanonymize data...")

    masker.process_excel(input_file=f'{output_folder}/anonymized.xlsx', output_file=f'{output_folder}/restored.xlsx', mode='deanonymize')



    masker.check_if_two_xlsx_is_same(input_file, f'{output_folder}/restored.xlsx')







    mapping_file = 'output_xlsx/data_mapping.json'

    text = """ Analysis complete for the query: ### 🔍 Overview:

    The analysis reveals the average Seat Costs for 13 unique accounts in the cluster, ranging from $13.32 to $380.51 per seat, with an overall average of $156.39 across all accounts.



    ### 📊 Key Insights:

    - 📈 **Highest Seat Cost**: Account_002 has the highest average seat cost at $380.51.

    - 📉 **Lowest Seat Cost**: Account_013 has the lowest average seat cost at $13.32.

    - 🏆 **Top 3 Accounts by Seat Cost**:

    1. Account_002 ($380.51)

    2. Account_007 ($342.96)

    3. Account_001 ($290.59)

    - 📊 **Data Quality**: No NaN values were found in the 'Actuals' column, indicating complete data for this metric.



    ### 📑 Detailed Explanation:

    The expense report data was analyzed to calculate the average Seat Costs for all accounts in the cluster. Here's a breakdown of the findings:



    1. **Distribution of Seat Costs**:

    - The average Seat Costs vary significantly across accounts, with a range of $367.19 between the highest and lowest.

    - 5 out of 13 accounts (38.5%) have above-average Seat Costs.

    - 8 out of 13 accounts (61.5%) have below-average Seat Costs.



    2. **Cost Tiers**:

    - High Cost Tier (>$300): Account_002, Account_007

    - Medium Cost Tier ($150-$300): Account_001, Account_003, Account_004

    - Low Cost Tier (<$150): The remaining 8 accounts



    3. **Statistical Analysis**:

    - Median Seat Cost: $114.22 (Account_011)

    - The overall average ($156.39) is higher than the median, suggesting some high-cost outliers are pulling the average up.



    4. **Data Reliability**:

    - All 13 accounts have complete data for Seat Costs, with no NaN values reported.

    - This suggests high data quality and reliability for this particular metric.



    📌 **Next Steps**:

    - Investigate the factors contributing to the high Seat Costs in Account_002 and Account_007.

    - Analyze the cost-efficiency measures implemented by lower-cost accounts (e.g., Account_013) for potential best practices.

    - Consider segmenting accounts into cost tiers for targeted cost management strategies.



    This analysis provides a clear overview of Seat Costs across all accounts, highlighting significant variations that merit further investigation for cost optimization opportunities.

    """

    if os.path.exists(mapping_file):

        logger.info("📂 Loading mapping file...")



        with open(mapping_file, 'r') as f:

            data = json.load(f)

            forward_mapping = data['forward_mapping']

            reverse_mapping = data['reverse_mapping']



            logger.info("🔄 Applying forward mapping...")

            masked_text = masker.mask_text(text, forward_mapping)

            logger.info(f"🔏 Masked Text: {masked_text}")



            logger.info("======================================================================")



            logger.info("🔄 Applying reverse mapping...")

            de_encrypt = masker.mask_text(masked_text, reverse_mapping)

            logger.info(f"🔓 Decrypted Text: {de_encrypt}")

            print(f"😼 Match Status: {de_encrypt == text} 😼")

            logger.info(f"😼 Match Status: {de_encrypt == text} 😼")

    else:

        logger.error("❌ Mapping file not found!")



ValueError: Excel file format cannot be determined, you must specify an engine manually.

In [16]:
import json
import logging
import os
from datetime import datetime
from typing import List
import numpy as np
import pandas as pd
import spacy
from faker import Faker

logging.basicConfig(level=logging.CRITICAL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

class MaskingModule:
    def __init__(self, mapping_file: str = "output_xlsx/data_mapping.json", seed: int = 42):
        self.fake = Faker()
        Faker.seed(seed)
        self.mapping_file = mapping_file
        self.forward_mapping = {}
        self.reverse_mapping = {}
        self._load_mapping_if_exists()
        logger.info("✅ MaskingModule initialized.")

    def _generate_fake_value(self, original: str, column: str) -> str:
        """Generates fake values for sensitive columns while ensuring uniqueness."""
        if isinstance(original, (int, float, np.integer, np.floating)):
            original_str = str(original)
        else:
            original_str = str(original).strip()

        # Return already mapped values
        if original_str in self.forward_mapping:
            return self.forward_mapping[original_str]

        column_lower = column.lower()
        unique_fake_values = set(self.forward_mapping.values())

        while True:
            if any(keyword in column_lower for keyword in ["employee", "company", "account"]):
                fake_value = f"{self.fake.first_name()}{self.fake.random_number(digits=6)}"
            elif "project" in column_lower:
                fake_value = f"PRJ-{self.fake.random_number(digits=6)}"
            elif "entity" in column_lower:
                fake_value = f"ENT-{self.fake.random_number(digits=6)}"
            elif "program" in column_lower:
                fake_value = f"PROG-{self.fake.random_number(digits=6)}"
            elif original_str.isdigit():  # Handle pure numerical values
                fake_value = self.fake.random_number(digits=len(original_str))
            else:
                fake_value = f"ANON-{self.fake.random_number(digits=6)}"

            if fake_value not in unique_fake_values:
                break

        # Save to mapping
        self.forward_mapping[original_str] = str(fake_value)
        self.reverse_mapping[str(fake_value)] = original_str
        logger.info(f"🔄 Mapping created: {original_str} → {fake_value}")
        return str(fake_value)

    def _save_mapping(self):
        os.makedirs(os.path.dirname(self.mapping_file), exist_ok=True)
        mapping_data = {
            "forward_mapping": self.forward_mapping,
            "reverse_mapping": self.reverse_mapping,
            "metadata": {"updated_at": datetime.now().isoformat(), "record_count": len(self.forward_mapping)},
        }
        with open(self.mapping_file, "w") as f:
            json.dump(mapping_data, f, indent=4)
        logger.info("💾 Mapping saved successfully.")

    def _load_mapping_if_exists(self):
        if os.path.exists(self.mapping_file):
            with open(self.mapping_file, "r") as f:
                data = json.load(f)
                self.forward_mapping = data.get("forward_mapping", {})
                self.reverse_mapping = data.get("reverse_mapping", {})
            logger.info("📂 Existing mapping loaded.")

    def process_file(self, input_file: str, output_file: str, mode: str = "anonymize", include_columns: List[str] = None):
        """Handles CSV and Excel anonymization and deanonymization."""
        if mode not in ["anonymize", "deanonymize"]:
            raise ValueError("Mode must be either 'anonymize' or 'deanonymize'")

        file_type = "csv" if input_file.endswith(".csv") else "excel"
        df = pd.read_csv(input_file) if file_type == "csv" else pd.read_excel(input_file)

        include_set = {"Account", "Project ID", "Program Name", "Employee", "Cost Category", "Entity", "Description"}
        if include_columns:
            include_set.update(include_columns)

        for column in df.columns:
            if column in include_set:
                if mode == "anonymize":
                    df[column] = df[column].apply(lambda x: self._generate_fake_value(x, column) if pd.notna(x) else x)
                else:
                    df[column] = df[column].apply(lambda x: self.reverse_mapping.get(str(x), x) if pd.notna(x) and isinstance(x, str) else x)
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if file_type == "csv":
            df.to_csv(output_file, index=False)
        else:
            df.to_excel(output_file, index=False)

        if mode == "anonymize":
            self._save_mapping()

        logger.info(f"✅ Data {mode}d and saved to {output_file}")

    def check_if_two_files_are_same(self, input_file: str, restored_file: str):
        """Verifies if original and restored files are identical."""
        file_type = "csv" if input_file.endswith(".csv") else "excel"

        original_df = pd.read_csv(input_file) if file_type == "csv" else pd.read_excel(input_file)
        restored_df = pd.read_csv(restored_file) if file_type == "csv" else pd.read_excel(restored_file)

        logger.info("🔎 Verifying file integrity...")
        test = original_df.astype(str).equals(restored_df.astype(str))  # Normalize dtypes before comparing
        logger.info(f"📊 Are files identical? {test}")

        if not test:
            logger.warning("⚠️ Files are not identical!")
            logger.info("🔍 Sample from original:")
            logger.info(original_df.head(2))
            logger.info("🔍 Sample from restored:")
            logger.info(restored_df.head(2))
        else:
            logger.info("✅ Files match perfectly!")

    def mask_text(self, text, replace_dict):
        """Applies text masking based on the forward mapping."""
        logger.info("🔄 Processing text masking...")
        for key, value in replace_dict.items():
            text = text.replace(key, value)
        logger.info("✅ Masking completed!")
        return text


if __name__ == "__main__":
    masker = MaskingModule()
    input_file = "data.csv"  # Change to your input file (CSV or Excel)
    output_folder = "output_xlsx"

    # Step 1: Anonymize
    masker.process_file(input_file=input_file, output_file=f"{output_folder}/anonymized.csv", mode="anonymize")

    # Step 2: Deanonymize
    masker.process_file(input_file=f"{output_folder}/anonymized.csv", output_file=f"{output_folder}/restored.csv", mode="deanonymize")

    # Verify file integrity
    masker.check_if_two_files_are_same(input_file, f"{output_folder}/restored.csv")

    # Mask text using saved mappings
    if os.path.exists(masker.mapping_file):
        with open(masker.mapping_file, "r") as f:
            data = json.load(f)
            forward_mapping = data["forward_mapping"]
            reverse_mapping = data["reverse_mapping"]

            sample_text = "John works at Google Inc. in New York."
            masked_text = masker.mask_text(sample_text, forward_mapping)
            decrypted_text = masker.mask_text(masked_text, reverse_mapping)

            print(f"😼 Match Status: {decrypted_text == sample_text} 😼")


😼 Match Status: True 😼


In [21]:
import json
import logging
import os
from datetime import datetime
from typing import List
import numpy as np
import pandas as pd
from faker import Faker

logging.basicConfig(level=logging.CRITICAL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

fake = Faker()
Faker.seed(42)

MAPPING_FILE = "output_xlsx/data_mapping.json"
forward_mapping = {}
reverse_mapping = {}

def load_mapping():
    """Loads existing mappings from file if available."""
    global forward_mapping, reverse_mapping
    if os.path.exists(MAPPING_FILE):
        with open(MAPPING_FILE, "r") as f:
            data = json.load(f)
            forward_mapping = data.get("forward_mapping", {})
            reverse_mapping = data.get("reverse_mapping", {})
        logger.info("📂 Existing mapping loaded.")

def save_mapping():
    """Saves the mapping to a JSON file."""
    os.makedirs(os.path.dirname(MAPPING_FILE), exist_ok=True)
    mapping_data = {
        "forward_mapping": forward_mapping,
        "reverse_mapping": reverse_mapping,
        "metadata": {"updated_at": datetime.now().isoformat(), "record_count": len(forward_mapping)},
    }
    with open(MAPPING_FILE, "w") as f:
        json.dump(mapping_data, f, indent=4)
    logger.info("💾 Mapping saved successfully.")

def generate_fake_value(original: str, column: str) -> str:
    """Generates fake values while ensuring uniqueness."""
    if isinstance(original, (int, float, np.integer, np.floating)):
        original_str = str(original)
    else:
        original_str = str(original).strip()

    if original_str in forward_mapping:
        return forward_mapping[original_str]

    column_lower = column.lower()
    unique_fake_values = set(forward_mapping.values())

    while True:
        if any(keyword in column_lower for keyword in ["employee", "company", "account"]):
            fake_value = f"{fake.first_name()}{fake.random_number(digits=6)}"
        elif "project" in column_lower:
            fake_value = f"PRJ-{fake.random_number(digits=6)}"
        elif "entity" in column_lower:
            fake_value = f"ENT-{fake.random_number(digits=6)}"
        elif "program" in column_lower:
            fake_value = f"PROG-{fake.random_number(digits=6)}"
        elif original_str.isdigit():
            fake_value = fake.random_number(digits=len(original_str))
        else:
            fake_value = f"ANON-{fake.random_number(digits=6)}"

        if fake_value not in unique_fake_values:
            break

    forward_mapping[original_str] = str(fake_value)
    reverse_mapping[str(fake_value)] = original_str
    return str(fake_value)

def process_file(input_file: str, output_file: str, mode: str = "anonymize", include_columns: List[str] = None):
    """Handles CSV and Excel anonymization and deanonymization."""
    if mode not in ["anonymize", "deanonymize"]:
        raise ValueError("Mode must be either 'anonymize' or 'deanonymize'")

    file_type = "csv" if input_file.endswith(".csv") else "excel"
    df = pd.read_csv(input_file) if file_type == "csv" else pd.read_excel(input_file)

    include_set = {"Account", "Project ID", "Program Name", "Employee", "Cost Category", "Entity", "Description"}
    if include_columns:
        include_set.update(include_columns)

    for column in df.columns:
        if column in include_set:
            if mode == "anonymize":
                df[column] = df[column].apply(lambda x: generate_fake_value(x, column) if pd.notna(x) else x)
            else:
                df[column] = df[column].apply(lambda x: reverse_mapping.get(str(x), x) if pd.notna(x) and isinstance(x, str) else x)

    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    if file_type == "csv":
        df.to_csv(output_file, index=False)
    else:
        df.to_excel(output_file, index=False)

    if mode == "anonymize":
        save_mapping()

def check_if_two_files_are_same(input_file: str, restored_file: str):
    """Verifies if original and restored files are identical."""
    file_type = "csv" if input_file.endswith(".csv") else "excel"
    original_df = pd.read_csv(input_file) if file_type == "csv" else pd.read_excel(input_file)
    restored_df = pd.read_csv(restored_file) if file_type == "csv" else pd.read_excel(restored_file)

    test = original_df.astype(str).equals(restored_df.astype(str))
    logger.info(f"📊 Are files identical? {test}")
    return test

def mask_text(text, replace_dict):
    """Applies text masking using the forward mapping."""
    for key, value in replace_dict.items():
        text = text.replace(key, value)
    return text

if __name__ == "__main__":
    load_mapping()
    input_file = "data.csv"
    output_folder = "output_xlsx"
    process_file(input_file, f"{output_folder}/anonymized.csv", mode="anonymize")
    process_file(f"{output_folder}/anonymized.csv", f"{output_folder}/restored.csv", mode="deanonymize")
    check_if_two_files_are_same(input_file, f"{output_folder}/restored.csv")

    if os.path.exists(MAPPING_FILE):
        with open(MAPPING_FILE, "r") as f:
            data = json.load(f)
            sample_text = "John works at Google Inc. in New York."
            masked_text = mask_text(sample_text, data["forward_mapping"])
            decrypted_text = mask_text(masked_text, data["reverse_mapping"])
            print(f"😼 Match Status: {decrypted_text == sample_text} 😼")


😼 Match Status: True 😼


In [22]:
import json
import logging
import os
from datetime import datetime
import pandas as pd
from faker import Faker
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Initialize Faker and Presidio
fake = Faker()
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

# Dictionary for mapping anonymized values
forward_mapping = {}
reverse_mapping = {}


def generate_fake_value(entity_type: str) -> str:
    """Generates fake data based on detected entity type."""
    if entity_type == "PERSON":
        return fake.name()
    elif entity_type == "GPE":  # Geographic locations
        return fake.city()
    elif entity_type == "ORG":  # Organizations
        return fake.company()
    elif entity_type == "EMAIL":
        return fake.email()
    elif entity_type == "PHONE_NUMBER":
        return fake.phone_number()
    elif entity_type == "CREDIT_CARD":
        return fake.credit_card_number()
    elif entity_type == "IBAN":
        return fake.iban()
    elif entity_type == "DATE_TIME":
        return fake.date()
    else:
        return fake.word()  # Default replacement


def anonymize_text(text: str) -> str:
    """Anonymizes sensitive entities in a given text."""
    if not isinstance(text, str) or text.strip() == "":
        return text

    results = analyzer.analyze(text=text, entities=None, language="en")
    anonymized_text = text

    for result in results:
        entity_value = text[result.start:result.end]
        if entity_value in forward_mapping:
            fake_value = forward_mapping[entity_value]
        else:
            fake_value = generate_fake_value(result.entity_type)
            forward_mapping[entity_value] = fake_value
            reverse_mapping[fake_value] = entity_value

        anonymized_text = anonymized_text.replace(entity_value, fake_value)

    return anonymized_text


def process_file(input_file: str, output_file: str, mode: str = "anonymize"):
    """Handles anonymization and deanonymization for CSV and Excel files."""
    file_type = "csv" if input_file.endswith(".csv") else "excel"
    df = pd.read_csv(input_file) if file_type == "csv" else pd.read_excel(input_file)

    for column in df.columns:
        if mode == "anonymize":
            df[column] = df[column].apply(lambda x: anonymize_text(str(x)) if pd.notna(x) else x)
        else:  # Deanonymization
            df[column] = df[column].apply(lambda x: reverse_mapping.get(str(x), x) if pd.notna(x) and isinstance(x, str) else x)

    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    if file_type == "csv":
        df.to_csv(output_file, index=False)
    else:
        df.to_excel(output_file, index=False)

    logger.info(f"✅ Data {mode}d and saved to {output_file}")

    if mode == "anonymize":
        save_mapping("output_xlsx/data_mapping.json")


def save_mapping(mapping_file: str):
    """Saves the anonymization mapping."""
    mapping_data = {
        "forward_mapping": forward_mapping,
        "reverse_mapping": reverse_mapping,
        "metadata": {"updated_at": datetime.now().isoformat(), "record_count": len(forward_mapping)},
    }
    with open(mapping_file, "w") as f:
        json.dump(mapping_data, f, indent=4)
    logger.info("💾 Mapping saved successfully.")


def load_mapping(mapping_file: str):
    """Loads an existing anonymization mapping."""
    global forward_mapping, reverse_mapping
    if os.path.exists(mapping_file):
        with open(mapping_file, "r") as f:
            data = json.load(f)
            forward_mapping = data.get("forward_mapping", {})
            reverse_mapping = data.get("reverse_mapping", {})
        logger.info("📂 Existing mapping loaded.")


if __name__ == "__main__":
    input_file = "data.csv"  # Change to your input file (CSV or Excel)
    output_folder = "output_xlsx"
    mapping_file = "output_xlsx/data_mapping.json"

    load_mapping(mapping_file)

    # Step 1: Anonymize
    process_file(input_file=input_file, output_file=f"{output_folder}/anonymized.csv", mode="anonymize")

    # Step 2: Deanonymize
    process_file(input_file=f"{output_folder}/anonymized.csv", output_file=f"{output_folder}/restored.csv", mode="deanonymize")




In [17]:
pip install presidio_analyzer presidio_anonymizer

Collecting presidio_anonymizer
  Downloading presidio_anonymizer-2.2.358-py3-none-any.whl.metadata (8.1 kB)
Downloading presidio_anonymizer-2.2.358-py3-none-any.whl (31 kB)
Installing collected packages: presidio_anonymizer
Successfully installed presidio_anonymizer-2.2.358


In [24]:
import pandas as pd
import spacy
import re
import openpyxl
import json
from faker import Faker
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

# Initialize components
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
fake = Faker()
nlp = spacy.load("en_core_web_sm")
mapping = {}

#-------------------------------------------------------------------------------------------------------------------------------
# Function to detect and anonymize sensitive data using Presidio
#-------------------------------------------------------------------------------------------------------------------------------
def anonymize_text(text):
    results = analyzer.analyze(text=text, language="en")
    modified_text = text  # Start with the original text

    for result in sorted(results, key=lambda r: r.start, reverse=True):  # Process in reverse to avoid index shifting
        fake_value = ""
        if result.entity_type == "PERSON":
            fake_value = fake.name()
        elif result.entity_type == "GPE":
            fake_value = fake.city()
        elif result.entity_type == "ORG":
            fake_value = fake.company()
        elif result.entity_type in ["PHONE_NUMBER", "EMAIL", "CREDIT_CARD", "IBAN", "US_SSN"]:
            fake_value = fake.word()

        mapping[fake_value] = modified_text[result.start:result.end]
        modified_text = modified_text[:result.start] + fake_value + modified_text[result.end:]  # Replace entity with fake value

    return modified_text

#-------------------------------------------------------------------------------------------------------------------------------
# Function to find descriptive text columns
#-------------------------------------------------------------------------------------------------------------------------------
def descriptive_columns(file_path):
    keywords = ["description", "remarks", "notes", "comments", "observations", "details", "summary", "explanation", "reviews", "feedback"]

    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path, nrows=1)
    elif file_path.endswith((".xls", ".xlsx")):
        df = pd.read_excel(file_path, nrows=1, engine="openpyxl")
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")

    return [col for col in df.columns if any(re.search(keyword, col, re.IGNORECASE) for keyword in keywords)]

#-------------------------------------------------------------------------------------------------------------------------------
# Function for anonymizing CSV data
#-------------------------------------------------------------------------------------------------------------------------------
def csv_an(input_file, output_file):
    df = pd.read_csv(input_file, engine="python")
    desc_cols = descriptive_columns(input_file)

    for col in df.columns:
        df[col] = df[col].astype(str).apply(lambda x: anonymize_text(x) if pd.notna(x) else x)

    df.to_csv(output_file, index=False)
    with open("mappings.json", "w") as f:
        json.dump(mapping, f)
    print(f"✅ Anonymized file saved as {output_file}")

#-------------------------------------------------------------------------------------------------------------------------------
# Function for anonymizing Excel data
#-------------------------------------------------------------------------------------------------------------------------------
def excel_an(input_file, output_file):
    df = pd.read_excel(input_file)
    desc_cols = descriptive_columns(input_file)

    for col in df.columns:
        df[col] = df[col].astype(str).apply(lambda x: anonymize_text(x) if pd.notna(x) else x)

    df.to_excel(output_file, index=False, sheet_name="Anonymized Data")
    with open("mappings.json", "w") as f:
        json.dump(mapping, f)
    print(f"✅ Anonymized file saved as {output_file}")

#-------------------------------------------------------------------------------------------------------------------------------
# Function for de-anonymizing CSV data
#-------------------------------------------------------------------------------------------------------------------------------
def csv_dean(input_file, output_file, mapping_file):
    print("🔄 Loading data...")

    with open(mapping_file, "r") as f:
        mapping = json.load(f)
    df = pd.read_csv(input_file, engine="python", dtype=str)

    mapping_keys = set(mapping.keys())
    df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
    df.to_csv(output_file, index=False)
    print(f"✅ De-anonymized file saved as {output_file}")

#-------------------------------------------------------------------------------------------------------------------------------
# Function for de-anonymizing Excel data
#-------------------------------------------------------------------------------------------------------------------------------
def excel_dean(input_file, output_file, mapping_file):
    print("🔄 Loading data...")

    with open(mapping_file, "r") as f:
        mapping = json.load(f)
    df = pd.read_excel(input_file)

    mapping_keys = set(mapping.keys())
    df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
    df.to_excel(output_file, index=False, sheet_name="De-anonymized Data")
    print(f"✅ De-anonymized file saved as {output_file}")

#-------------------------------------------------------------------------------------------------------------------------------
# Function to determine file type and apply anonymization
#-------------------------------------------------------------------------------------------------------------------------------
def anonymization(input_file):
    if input_file.endswith(".csv"):
        csv_an(input_file, "intermediate.csv")
        csv_dean("intermediate.csv", "deanonymized.csv", "mappings.json")
    elif input_file.endswith(".xlsx"):
        excel_an(input_file, "intermediate.xlsx")
        excel_dean("intermediate.xlsx", "deanonymized.xlsx", "mappings.json")

# Run anonymization
file = "numerical.csv"
anonymization(file)




✅ Anonymized file saved as intermediate.csv
🔄 Loading data...
✅ De-anonymized file saved as deanonymized.csv


  df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
