<a href="https://colab.research.google.com/github/midhunjmes/presidio_final/blob/main/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install spacy
!python -m spacy download en_core_web_sm
!pip install openpyxl
!pip install presidio_analyzer
!python -m spacy download en_core_web_lg

Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m75.7 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
Collecting presidio_analyzer
  Downloading presidio_analyzer-2.2.358-py3-none-any.whl.metadata (3.2 kB)
Collecting phonenumbers<9.0.0,>=8.12 (from presidio_analyzer)
  Downloading phonenumbers-8.13.55-py2.py3-none-any.whl.metadata (11 kB)
Collecting tldextract (from presidio_analyzer)
  Downloading tldextract-5.1.3-py3-none-any.whl.m

In [2]:
# from presidio_analyzer import AnalyzerEngine

# analyzer = AnalyzerEngine()
# for recognizer in analyzer.get_recognizers():
#     print(recognizer.supported_entities)

In [4]:


import pandas as pd
import spacy
import re
import openpyxl
import json
from presidio_analyzer import AnalyzerEngine
analyzer = AnalyzerEngine()
mapping={}
nlp = spacy.load("en_core_web_sm")


#-------------------------------------------------------------------------------------------------------------------------------
#function to find out the noun values dominating columns considering it will be a sensitive data if there is so many nouns
#------------------------------------------------------------------------------------------------------------------------------
def detect_noun(file_path):
    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path, engine="python")
    elif file_path.endswith((".xls", ".xlsx")):
        df = pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")

    sensitive = []

    for col in df.columns:
        if df[col].dtype in ['int64', 'float64']:
            continue  # Skip purely numeric columns

        text_samples = df[col].astype(str).head(5)  # Take first 5 values
        noun_count = 0
        total_count = 0

        for value in text_samples:
            if "%" in value or value.replace(".", "").isdigit():
                continue  # Skip percentage or number-like values

            doc = nlp(value)
            for token in doc:
                if token.pos_ in ['NOUN', 'PROPN']:
                    noun_count += 1
                total_count += 1

        # Mark column as sensitive only if a significant portion are nouns
        if total_count > 0 and (noun_count / total_count) > 0.2:
            sensitive.append(col)

    return sensitive


#------------------------------------------------------------------------------------------
#finding out the descriptive data columns which may have sensitive data in the form of text
#------------------------------------------------------------------------------------------
def descriptive_columns(file_path):
    # Define keywords to filter out
    keywords = ["description", "remarks", "notes", "comments", "observations", "details", "summary", "explanation",
    "reviews", "feedback", "testimonials", "opinions", "assessment", "suggestions", "experience",
    "incident_report", "case_notes", "audit_notes", "findings", "status_update", "history", "progress_report",
    "additional_info", "clarifications", "justification", "annotations", "excerpts", "statement", "explanation_text"]

    # Ensure columns are properly loaded from CSV/Excel
    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path, nrows=1)  # Read only header
    elif file_path.endswith((".xls", ".xlsx")):
        df = pd.read_excel(file_path, nrows=1, engine="openpyxl")  # Read only header
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")

    # Get actual column names
    all_columns = df.columns.tolist()


    des=[col for col in all_columns if any(re.search(keyword, col, re.IGNORECASE) for keyword in keywords)]
    return des

#-----------------------------------------------------------------
#anonymizing descriptive values
#-----------------------------------------------------------------
def detect_noun_desc(text):
    doc = nlp(text)
    modified_text=[]
    for token in doc:
        if token.pos_ in ['PROPN']:
            modified_text.append("<sensitive>")
        else:
            modified_text.append(token.text)
    text=" ".join(modified_text)
    return text


#------------------------------------------------------------------
#function for detecting numerical sensitive
#------------------------------------------------------------------
def detect_sensitive_numerical(file_path,sensitive):
    if file_path.endswith(".csv"):
        df = pd.read_csv(file_path, engine="python")
    elif file_path.endswith((".xls", ".xlsx")):
        df = pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")

    sensitive_columns = []

    for col in df.columns:
        if col not in sensitive:
          text_samples = df[col].astype(str).head(5)  # Convert first 5 values to string

          for value in text_samples:
              if pd.notna(value) and value.strip():
                  results = analyzer.analyze(text=value, language="en")

                  for result in results:
                      # print(result,result.entity_type)
                      if result.entity_type in ["PHONE_NUMBER", "CREDIT_CARD", "IBAN", "US_SSN","EMAIL"]:
                          sensitive_columns.append(col)
                          break  # If any value in the column is sensitive, mark the whole column

    return list(set(sensitive_columns))
#-------------------------------------------------------------------
#function to anonymize a excel file
#-------------------------------------------------------------------
def excel_an(input_file, output_file):
    df = pd.read_excel(input_file)  # Read as string for safety

    sensitive_old = detect_noun(input_file)
    desc = descriptive_columns(input_file)
    sensitive = list(set(sensitive_old) - set(desc))
    num_sensitive=detect_sensitive_numerical(input_file,sensitive_old)
    sensitive=list(set(sensitive)+set(num_sensitive))

    column_counters = {col: 1 for col in df.columns if col in sensitive_old}
    column_mappings = {col: {} for col in sensitive}  # Store mappings for each column
    mapping = {}

    # Anonymize sensitive columns while maintaining consistency
    for col in sensitive:
        new_values = []
        for val in df[col].astype(str):
            if pd.notna(val):
                if val in column_mappings[col]:
                    anonymized_value = column_mappings[col][val]  # Use existing mapping
                else:
                    anonymized_value = f"{col}{column_counters[col]}"
                    column_mappings[col][val] = anonymized_value  # Store new mapping
                    column_counters[col] += 1  # Increment counter

                mapping[anonymized_value] = val
                new_values.append(anonymized_value)
            else:
                new_values.append(val)

        df[col] = new_values

    # Anonymize descriptive columns
    for col in desc:
        for idx, val in enumerate(df[col].astype(str)):
            if pd.notna(val):
                an_values = detect_noun_desc(val)
                df.at[idx, col] = an_values
                mapping[an_values] = val

    df.to_excel(output_file, index=False, sheet_name="Anonymized Data")
    print(f"✅ Anonymized file saved as {output_file}")

    # Save mapping as JSON
    with open("mappings.json", "w") as f:
        json.dump(mapping, f)

#------------------------------------------------------
#function for de-anonymizing excel data
#------------------------------------------------------

def excel_dean(input_file, output_file, mapping_file):
    print("🔄 Loading data...")

    with open(mapping_file, "r") as f:
        mapping = json.load(f)

    df = pd.read_excel(input_file)  # Read as string for safety
    mapping_keys = set(mapping.keys())
    df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
    df.to_excel(output_file, index=False, sheet_name="De-anonymized Data")

    print(f"✅ De-anonymized file saved as {output_file}")

#----------------------------------------------------------
#function for anonymizing csv data
#----------------------------------------------------------
def csv_an(input_file, output_file):
    df = pd.read_csv(input_file, engine="python")

    sensitive_old = detect_noun(input_file)
    desc = descriptive_columns(input_file)
    sensitive = list(set(sensitive_old) - set(desc))
    num_sensitive=detect_sensitive_numerical(input_file,sensitive_old)
    sensitive=sensitive+num_sensitive
    column_counters = {col: 1 for col in df.columns if col in sensitive}
    column_mappings = {col: {} for col in sensitive}  # Store mappings for each column

    # Anonymize sensitive columns while maintaining consistency
    for col in sensitive:
        new_values = []
        for val in df[col].astype(str):
            if pd.notna(val):
                if val in column_mappings[col]:
                    anonymized_value = column_mappings[col][val]  # Use existing mapping
                else:
                    anonymized_value = f"{col}{column_counters[col]}"
                    column_mappings[col][val] = anonymized_value  # Store new mapping
                    column_counters[col] += 1  # Increment counter

                mapping[anonymized_value] = val
                new_values.append(anonymized_value)
            else:
                new_values.append(val)

        df[col] = new_values

    # Anonymize descriptive columns
    for col in desc:
        for idx, val in enumerate(df[col].astype(str)):
            if pd.notna(val):
                an_values = detect_noun_desc(val)
                df.at[idx, col] = an_values
                mapping[an_values] = val

    df.to_csv(output_file, index=False)
    print(f"✅ Anonymized file saved as {output_file}")

    # Save mapping as JSON
    with open("mappings.json", "w") as f:
        json.dump(mapping, f)

#------------------------------------------------------------
#function for de anonymizing csv data
#------------------------------------------------------------
def csv_dean(input_file, output_file, mapping_file):
    print("🔄 Loading data...")

    with open(mapping_file, "r") as f:
        mapping = json.load(f)
    df = pd.read_csv(input_file, engine="python", dtype=str)  # Read as string for safety
    mapping_keys = set(mapping.keys())
    df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
    df.to_csv(output_file, index=False)

    print(f"✅ De-anonymized file saved as {output_file}")


#--------------------------------------------------------------
#function to determine whcih type of data is need to perform
#--------------------------------------------------------------
def anonymization(input_file):
    if input_file.endswith(".csv"):
        csv_an(input_file,"intermediate.csv")
        csv_dean("intermediate.csv","deanonymized.csv","mappings.json")
    elif input_file.endswith(".xlsx"):
        excel_an(input_file,"intermediate.xlsx")
        excel_dean("intermediate.xlsx","deanonymized.xlsx","mappings.json")





In [5]:
file="numerical.csv"
anonymization(file)

✅ Anonymized file saved as intermediate.csv
🔄 Loading data...
✅ De-anonymized file saved as deanonymized.csv


  df = df.applymap(lambda x: mapping[x] if x in mapping_keys else x)
