# JSON Data Generation for PII

This notebook demonstrates how to generate synthetic PII data in JSON format using Presidio's data generation capabilities.

## Define JSON Template

The template includes various PII fields in a structured JSON format.

In [None]:
# Define different JSON patterns
PATTERNS = {
    "patient_record": {
        "template": {
            "patient": {
                "name": "PERSON",
                "age": "AGE",
                "gender": "GENDER",
                "contact": {
                    "phone": "PHONE_NUMBER",
                    "email": "EMAIL_ADDRESS"
                },
                "identification": {
                    "ssn": "US_SSN",
                    "passport": "US_PASSPORT",
                    "driver_license": "US_DRIVER_LICENSE"
                },
                "medical": {
                    "conditions": ["MEDICAL_CONDITION"],
                    "medications": [{
                        "name": "DRUG",
                        "dosage": "DOSAGE",
                        "frequency": "DRUG_FREQUENCY"
                    }]
                }
            }
        },
        "weight": 0.6  # 60% chance of this pattern
    },
    "financial_record": {
        "template": {
            "transaction": {
                "timestamp": "DATE_TIME",
                "amount": "RANDOM_AMOUNT",
                "from_account": "BANK_NUMBER",
                "to_account": "BANK_NUMBER",
                "reference": "reference number AB12C45V323",
                "initiated_by": {
                    "name": "PERSON",
                    "email": "EMAIL_ADDRESS",
                    "phone": "PHONE_NUMBER"
                }
            }
        },
        "weight": 0.4  # 40% chance of this pattern
    }
}


## Initialize Encrypt Decrypt Functions

In [None]:
# Generate fake PII data using the Presidio Sentence Faker
# Encrypt / Decrypt Functions

import os
import base64
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Hash import SHA256

# Generate a 32-byte key from a string
def generate_key(key_string):
    hash_object = SHA256.new(key_string.encode())
    return hash_object.digest()

# Encrypts the given plaintext using AES and returns the ciphertext
def encrypt(plaintext, key):
    cipher = AES.new(key, AES.MODE_CBC)
    iv = cipher.iv
    padded_plaintext = pad(plaintext.encode(), AES.block_size)  # Pad the plaintext
    ciphertext = cipher.encrypt(padded_plaintext)
    return base64.b64encode(iv + ciphertext).decode('utf-8')  # Prepend IV for decryption

# Decrypts the given ciphertext using AES and returns the plaintext
def decrypt(ciphertext, key):
    ciphertext_bytes = base64.b64decode(ciphertext)
    iv = ciphertext_bytes[:AES.block_size]
    cipher = AES.new(key, AES.MODE_CBC, iv)
    padded_plaintext = cipher.decrypt(ciphertext_bytes[AES.block_size:])
    return unpad(padded_plaintext, AES.block_size).decode('utf-8')  # Unpad the plaintext

# Example usage

key_string = "marveluniverse"  # Use a secure key
eKey = generate_key(key_string)

# Example plaintext
plaintext = "This is my new test string for testing decryption"

# Encrypt
encrypted = encrypt(plaintext, eKey)
print("Encrypted:", encrypted)

# Decrypt
decrypted = decrypt(encrypted, eKey)
print("Decrypted:", decrypted)


## Code to Generate JSON data based on the Patterns defined above

In [None]:
import json
from faker import Faker
from presidio_evaluator.data_generator.faker_extensions import *
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional, Union
import random

# Initialize Faker with custom providers
faker = Faker('en_US')
faker.add_provider(MedicalProvider)
faker.add_provider(UsPassportProvider)
faker.add_provider(UsDriverLicenseProvider)
faker.add_provider(OrganizationProvider)


def generate_entity(entity_type: str) -> Any:
    """Generate a random entity using the appropriate Faker provider."""
    if entity_type == "PERSON":
        return faker.name()
    elif entity_type == "AGE":
        return random.randint(1, 100)
    elif entity_type == "GENDER":
        return random.choice(["Male", "Female", "Other"])
    elif entity_type == "PHONE_NUMBER":
        return faker.phone_number()
    elif entity_type == "EMAIL_ADDRESS":
        return faker.email()
    elif entity_type == "US_SSN":
        return faker.ssn()
    elif entity_type == "US_PASSPORT":
        return faker.us_passport()
    elif entity_type == "US_DRIVER_LICENSE":
        return faker.us_driver_license()
    elif entity_type == "BANK_NUMBER":
        return faker.bank_number()
    elif entity_type == "IBAN_CODE":
        return faker.iban()
    elif entity_type == "ORGANIZATION":
        return faker.organization()
    elif entity_type == "CREDIT_CARD":
        return faker.credit_card_number()
    elif entity_type == "DATE_TIME":
        return datetime.now().isoformat()
    elif entity_type == "URL":
        return faker.url()
    elif entity_type == "LOCATION":
        return faker.address().replace('\n', ', ')
    elif entity_type == "MEDICAL_CONDITION":
        return faker.medical_condition()
    elif entity_type == "MEDICAL_PROCEDURE":
        return faker.medical_procedure()
    elif entity_type == "DRUG":
        return faker.drug()
    elif entity_type == "DOSAGE":
        return faker.dosage()
    elif entity_type == "DRUG_FREQUENCY":
        return faker.drug_frequency()
    elif entity_type == "RANDOM_AMOUNT":
        return round(random.uniform(10, 10000), 2)
    elif entity_type == "CURRENCY":
        return random.choice(["USD", "EUR", "GBP", "INR"])
    elif entity_type == "TEXT_WITH_ENTITIES":
        return f"Payment for {faker.medical_procedure()} at {faker.organization()}"
    else:
        return ""

def generate_from_template(template: Union[dict, list, str], path: str = "") -> tuple:
    """Recursively generate data from a template and collect spans."""
    if isinstance(template, dict):
        result = {}
        spans = []
        for key, value in template.items():
            new_path = f"{path}.{key}" if path else key
            if isinstance(value, str) and value in globals().get('PATTERNS', {}):
                # Handle nested patterns
                pattern_data, pattern_spans = generate_from_pattern(value)
                result[key] = pattern_data
                spans.extend(pattern_spans)
            else:
                data_item, item_spans = generate_from_template(value, new_path)
                result[key] = data_item
                spans.extend(item_spans)
        return result, spans
    elif isinstance(template, list):
        result = []
        spans = []
        for i, item in enumerate(template):
            data_item, item_spans = generate_from_template(item, f"{path}[{i}]")
            result.append(data_item)
            spans.extend(item_spans)
        return result, spans
    elif isinstance(template, str) and template.isupper() and hasattr(faker, template.lower()):
        # This is an entity type
        value = generate_entity(template)
        return value, [{
            "entity_type": template,
            "entity_value": encrypt(str(value),eKey),
            "start_position": 0,  # Will be updated later
            "end_position": 0,    # Will be updated later
            "field_name": path
        }]
    else:
        return template, []

def generate_from_pattern(pattern_name: str) -> tuple:
    """Generate data from a named pattern."""
    if pattern_name in PATTERNS:
        return generate_from_template(PATTERNS[pattern_name]["template"])
    return {}, []

def generate_record() -> Dict[str, Any]:
    """Generate a single record using weighted random pattern selection."""
    pattern_name = random.choices(
        list(PATTERNS.keys()),
        weights=[p["weight"] for p in PATTERNS.values()],
        k=1
    )[0]
    
    data, spans = generate_from_pattern(pattern_name)
    
    # Convert to JSON string to calculate positions
    json_str = json.dumps(data, indent=2)
    
    # Update span positions
    for span in spans:
        if "value" in span:
            value_str = json.dumps(span["value"]) if isinstance(span["value"], (str, int, float, bool)) else str(span["value"])
            pos = json_str.find(value_str)
            if pos != -1:
                span["start"] = pos
                span["end"] = pos + len(value_str)
    
    return {
        "pattern": pattern_name,
        "text": json_str,
        "spans": spans,
        "data": data
    }

def generate_records(count: int) -> List[Dict]:
    """Generate multiple records with different patterns."""
    return [generate_record() for _ in range(count)]

def save_records(records: List[Dict], filename: Union[str, Path], mode: str = 'a') -> Path:
    """Save records to a file, handling JSON array appending properly."""
    if not isinstance(filename, Path):
        filename = Path(filename)
    
    # Create parent directories if they don't exist
    filename.parent.mkdir(parents=True, exist_ok=True)
    
    # Prepare records for saving
    output = []
    for record in records:
        output.append({
            #"pattern": record["pattern"],
            "full_text": encrypt(record["text"],eKey),
            "spans": record["spans"]
            #"metadata": {
            #    "data": record["data"]
            #}
        })
    
    # Write to file
    if mode == 'w' or not filename.exists() or filename.stat().st_size == 0:
        # Write new file with array
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(output, f, indent=2, ensure_ascii=False)
    else:
        # Read existing content
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                content = f.read().strip()
            
            # Parse existing content
            if content:
                existing_records = json.loads(content)
                if not isinstance(existing_records, list):
                    existing_records = [existing_records]
            else:
                existing_records = []
            
            # Append new records
            existing_records.extend(output)
            
            # Write back to file
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(existing_records, f, indent=2, ensure_ascii=False)
                
        except (json.JSONDecodeError, FileNotFoundError):
            # If file exists but isn't valid JSON, overwrite it
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(output, f, indent=2, ensure_ascii=False)
    
    print(f"Saved {len(records)} records to {filename}")
    return filename

# Example usage
if __name__ == "__main__":
    # Here i want to delete the existing file if it exists
    if Path("../data/test_data_s3_json.json").exists():
        Path("../data/test_data_s3_json.json").unlink()

    # Generate and save initial batch
    records = generate_records(5)
    #save_records(records, "data/mixed_records.json", 'w')
    
    # Generate and append more records
    more_records = generate_records(5)
    records.extend(more_records)
    save_records(records, "../data/test_data_s3_json.json", 'a')
    
    print("Sample record:")
    print(json.dumps(records[0], indent=2))

    # Here i just want to write the record[0].text to a file
    #with open("../data/mixed_records1.json", "w") as f:
    #    f.write(records[0]["text"])
