In [None]:
              ┌──────────────────────────┐
              │   Governance Policies    │
              └───────────┬─────────────┘
                          │
           ┌──────────────┼────────────────┐
           │               │                │
 Data Catalog       Access Control      Data Quality Rules
(Metadata & tags)     (RBAC/ABAC)       (Validations)
           │               │                │
           └──────────────┼────────────────┘
                          ▼
                ETL / ELT Pipelines
                          ▼
                 Secure Data Storage
                          ▼
                 Compliance Monitoring


In [None]:
import re
import logging

logging.basicConfig(level=logging.INFO)

# --------------------------------------
# 1. PII DETECTION RULES
# --------------------------------------
PII_PATTERNS = {
    "email": r"[^@]+@[^@]+\.[^@]+",
    "phone": r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b",
    "ssn": r"\b\d{3}-\d{2}-\d{4}\b"
}

def detect_pii(record):
    detected = []
    for field, value in record.items():
        for pii_type, pattern in PII_PATTERNS.items():
            if re.search(pattern, str(value)):
                detected.append((field, pii_type))
    return detected


# --------------------------------------
# 2. REDACTION OF SENSITIVE VALUES
# --------------------------------------
def redact_pii(record, detected):
    for field, pii_type in detected:
        record[field] = "***REDACTED***"
    return record


# --------------------------------------
# 3. COMPLIANCE VALIDATION + LOGGING
# --------------------------------------
def compliance_check(record):
    detected = detect_pii(record)
    if detected:
        logging.warning(f"PII detected: {detected}")
        record = redact_pii(record, detected)
    return record


# --------------------------------------
# 4. DEMO DATA
# --------------------------------------
data = [
    {"name": "Alice", "email": "alice@example.com", "ssn": "111-22-3333"},
    {"name": "Bob", "email": "not-an-email", "ssn": None},
]

# --------------------------------------
# 5. RUN
# --------------------------------------
clean_data = [compliance_check(r.copy()) for r in data]

print("\n=== Final Secure Data ===")
print(clean_data)


In [None]:
USER_ROLES = {
    "analyst": ["read_non_pii"],
    "engineer": ["read_all", "write"],
    "auditor": ["read_all", "view_logs"]
}

def check_access(user, action):
    actions = USER_ROLES.get(user, [])
    if action not in actions:
        raise PermissionError(f"User '{user}' not allowed to perform '{action}'")
    return True

# Example usage:
check_access("analyst", "read_non_pii")  # OK
check_access("analyst", "read_all")      # ERROR


In [None]:
import uuid
from datetime import datetime

lineage_log = []

def log_lineage(step_name, input_source, output_target):
    lineage_log.append({
        "event_id": str(uuid.uuid4()),
        "timestamp": datetime.utcnow().isoformat(),
        "step": step_name,
        "input": input_source,
        "output": output_target
    })

# Example usage
log_lineage("transform", "raw/customers.csv", "curated/customers_clean.parquet")
log_lineage("enrich", "curated/customers_clean.parquet", "gold/customers_final.parquet")

print(lineage_log)
