Data Encryption: CMEK integration with BigQuery and Cloud Storage

PII Detection & Masking: Cloud DLP integration for sensitive data handling

Content Safety: Guardrails for prompt safety and content filtering

Audit Logging: Cloud Logging integration for governance events

Access Controls: IAM policy simulation for least privilege access

Data Retention: Policy enforcement for compliance requirements

Data Residency: Location enforcement for regulatory compliance

In [None]:
#!/usr/bin/env python3
"""
Google Cloud Data Governance for GenAI Solutions
Complete implementation with actual GCP integrations and simulations
"""

import os
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from enum import Enum
import hashlib
import base64

# Google Cloud client libraries (would need to be installed)
try:
    from google.cloud import bigquery, storage, dlp_v2, kms_v1
    from google.cloud import logging as cloud_logging
    from google.oauth2 import service_account
    from google.api_core.exceptions import GoogleAPICallError
    HAS_GOOGLE_DEPS = True
except ImportError:
    HAS_GOOGLE_DEPS = False
    print("Google Cloud client libraries not available. Running in simulation mode.")

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GovernanceDomain(Enum):
    DATA_SECURITY = "Data Security & Encryption"
    MODEL_PRIVACY = "Model Privacy & Safe Inference"
    MONITORING = "Monitoring & Auditing"
    ACCESS_CONTROL = "Access Controls"
    DATA_RESIDENCY = "Data Residency & Retention"
    DEPLOYMENT = "CI/CD & Deployment Controls"

class GoogleCloudDataGovernance:
    """Implementation of data governance controls using Google Cloud services"""

    def __init__(self, project_id: str, location: str = "us-central1"):
        self.project_id = project_id
        self.location = location
        self.setup_clients()

    def setup_clients(self):
        """Initialize Google Cloud clients"""
        if not HAS_GOOGLE_DEPS:
            self.simulation_mode = True
            logger.warning("Running in simulation mode - no actual GCP calls will be made")
            return

        self.simulation_mode = False

        try:
            # Initialize clients with default credentials
            self.bq_client = bigquery.Client(project=self.project_id)
            self.storage_client = storage.Client(project=self.project_id)
            self.dlp_client = dlp_v2.DlpServiceClient()
            self.kms_client = kms_v1.KeyManagementServiceClient()
            self.logging_client = cloud_logging.Client(project=self.project_id)

            logger.info("Google Cloud clients initialized successfully")
        except Exception as e:
            logger.error(f"Failed to initialize GCP clients: {e}")
            self.simulation_mode = True

    # 1. Data Security & Encryption Methods
    def create_encrypted_bigquery_table(self, dataset_id: str, table_id: str, kms_key_name: str) -> bool:
        """Create a BigQuery table encrypted with CMEK"""
        if self.simulation_mode:
            logger.info(f"SIMULATION: Created BigQuery table {dataset_id}.{table_id} encrypted with KMS key {kms_key_name}")
            return True

        try:
            # Create dataset if it doesn't exist
            dataset_ref = bigquery.DatasetReference(self.project_id, dataset_id)
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = self.location
            dataset = self.bq_client.create_dataset(dataset, exists_ok=True)

            # Create table with encryption specification
            schema = [
                bigquery.SchemaField("user_id", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("sensitive_data", "STRING", mode="NULLABLE"),
                bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
            ]

            table_ref = dataset.table(table_id)
            table = bigquery.Table(table_ref, schema=schema)
            table.encryption_configuration = bigquery.EncryptionConfiguration(
                kms_key_name=kms_key_name
            )

            table = self.bq_client.create_table(table)
            logger.info(f"Created encrypted BigQuery table {table.project}.{table.dataset_id}.{table.table_id}")
            return True

        except GoogleAPICallError as e:
            logger.error(f"Failed to create encrypted BigQuery table: {e}")
            return False

    def create_encrypted_storage_bucket(self, bucket_name: str, kms_key_name: str) -> bool:
        """Create a Cloud Storage bucket encrypted with CMEK"""
        if self.simulation_mode:
            logger.info(f"SIMULATION: Created Cloud Storage bucket {bucket_name} encrypted with KMS key {kms_key_name}")
            return True

        try:
            bucket = self.storage_client.bucket(bucket_name)
            bucket.storage_class = "STANDARD"
            bucket.location = self.location
            bucket.encryption_configuration = storage.BucketEncryptionConfig(
                default_kms_key_name=kms_key_name
            )

            bucket = self.storage_client.create_bucket(bucket)
            logger.info(f"Created encrypted Cloud Storage bucket {bucket.name}")
            return True

        except GoogleAPICallError as e:
            logger.error(f"Failed to create encrypted storage bucket: {e}")
            return False

    # 2. Model Privacy and Safe Inference Methods
    def scan_for_pii(self, content: str, info_types: List[str] = None) -> Dict:
        """Scan content for PII using Cloud DLP API"""
        if info_types is None:
            info_types = ["EMAIL_ADDRESS", "PHONE_NUMBER", "US_SOCIAL_SECURITY_NUMBER"]

        if self.simulation_mode:
            # Simulate DLP scanning
            findings = []
            if "@" in content:
                findings.append({"info_type": "EMAIL_ADDRESS", "likelihood": "LIKELY"})
            if any(char.isdigit() for char in content) and len(content) > 10:
                findings.append({"info_type": "PHONE_NUMBER", "likelihood": "POSSIBLE"})

            return {"findings": findings, "content": content}

        try:
            # Prepare DLP request
            parent = f"projects/{self.project_id}"

            # Construct inspect config
            inspect_config = {
                "info_types": [{"name": it} for it in info_types],
                "min_likelihood": dlp_v2.Likelihood.POSSIBLE,
                "limits": {"max_findings_per_request": 100},
            }

            # Construct item to inspect
            item = {"value": content}

            # Call DLP API
            response = self.dlp_client.inspect_content(
                request={
                    "parent": parent,
                    "inspect_config": inspect_config,
                    "item": item,
                }
            )

            # Process results
            findings = []
            for finding in response.result.findings:
                findings.append({
                    "info_type": finding.info_type.name,
                    "likelihood": finding.likelihood.name,
                    "quote": finding.quote,
                })

            return {"findings": findings, "content": content}

        except GoogleAPICallError as e:
            logger.error(f"DLP API error: {e}")
            return {"findings": [], "content": content, "error": str(e)}

    def deidentify_with_masking(self, content: str, info_types: List[str] = None) -> Dict:
        """De-identify content by masking PII"""
        if info_types is None:
            info_types = ["EMAIL_ADDRESS", "PHONE_NUMBER", "US_SOCIAL_SECURITY_NUMBER"]

        if self.simulation_mode:
            # Simple simulation of masking
            masked_content = content
            if "@" in content:
                # Mask email
                parts = content.split("@")
                if len(parts) > 1:
                    masked_content = masked_content.replace(parts[0], "***")
            # Add more simulation logic for other PII types
            return {"original": content, "masked": masked_content}

        try:
            parent = f"projects/{self.project_id}"

            # Deidentify config - use masking transformation
            deidentify_config = {
                "info_type_transformations": {
                    "transformations": [
                        {
                            "primitive_transformation": {
                                "character_mask_config": {
                                    "masking_character": "#",
                                    "number_to_mask": 0,  # Mask all found
                                    "reverse_order": False,
                                }
                            }
                        }
                    ]
                }
            }

            # Inspect config to identify what to de-identify
            inspect_config = {
                "info_types": [{"name": it} for it in info_types]
            }

            item = {"value": content}

            # Call DLP API for de-identification
            response = self.dlp_client.deidentify_content(
                request={
                    "parent": parent,
                    "deidentify_config": deidentify_config,
                    "inspect_config": inspect_config,
                    "item": item,
                }
            )

            return {
                "original": content,
                "masked": response.item.value,
                "transformations_applied": len(response.overview.transformation_summaries)
            }

        except GoogleAPICallError as e:
            logger.error(f"DLP de-identification error: {e}")
            return {"original": content, "masked": content, "error": str(e)}

    def apply_content_guardrails(self, prompt: str, blocked_phrases: List[str] = None) -> Dict:
        """Apply content safety guardrails (simulating Vertex AI Guardrails)"""
        if blocked_phrases is None:
            blocked_phrases = ["confidential", "sensitive", "password", "ssn", "credit card"]

        # Check for blocked phrases
        found_phrases = []
        for phrase in blocked_phrases:
            if phrase.lower() in prompt.lower():
                found_phrases.append(phrase)

        # Calculate safety score
        safety_score = 1.0  # Start with perfect score
        if found_phrases:
            safety_score = max(0.1, 1.0 - (len(found_phrases) * 0.3))

        return {
            "is_safe": safety_score > 0.5,
            "safety_score": safety_score,
            "blocked_phrases_found": found_phrases,
            "recommendation": "Proceed" if safety_score > 0.5 else "Block this content"
        }

    # 3. Monitoring, Auditing, and Compliance Methods
    def log_governance_event(self, event_type: str, resource: str, details: Dict) -> bool:
        """Log governance event to Cloud Logging"""
        log_data = {
            "event_type": event_type,
            "resource": resource,
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "details": details,
            "operation": {
                "project": self.project_id,
                "location": self.location
            }
        }

        if self.simulation_mode:
            logger.info(f"SIMULATION: Logging governance event - {event_type}: {json.dumps(log_data)}")
            return True

        try:
            logger = self.logging_client.logger("data-governance-logs")
            logger.log_struct(log_data, severity="INFO")
            return True
        except Exception as e:
            logger.error(f"Failed to log governance event: {e}")
            return False

    def generate_compliance_report(self, timeframe_days: int = 30) -> Dict:
        """Generate a compliance report (simulated)"""
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=timeframe_days)

        # This would typically query Cloud Logging or Monitoring API
        return {
            "timeframe": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat()
            },
            "summary": {
                "total_events": 1500,
                "pii_detections": 42,
                "blocked_attempts": 7,
                "encryption_operations": 893
            },
            "compliance_status": {
                "GDPR": "COMPLIANT",
                "HIPAA": "COMPLIANT",
                "PCI_DSS": "COMPLIANT",
                "CCPA": "COMPLIANT"
            },
            "recommendations": [
                "Review access patterns for encrypted BigQuery tables",
                "Consider adding additional blocked phrases to guardrails"
            ]
        }

    # 4. Access Controls Methods
    def check_access_permissions(self, user: str, resource: str, permission: str) -> Dict:
        """Check if a user has access to a resource (simulated)"""
        # In a real implementation, this would use IAM Policy Simulator API
        access_rules = {
            "data-scientist@example.com": {
                "bigquery.datasets": ["read", "query"],
                "storage.buckets": ["read"]
            },
            "ml-engineer@example.com": {
                "bigquery.datasets": ["read", "query", "write"],
                "storage.buckets": ["read", "write"],
                "vertexai.models": ["read", "deploy"]
            },
            "admin@example.com": {
                "*": ["*"]
            }
        }

        user_permissions = access_rules.get(user, {})
        resource_type = resource.split("/")[0] if "/" in resource else resource

        # Check if user has wildcard access
        if "*" in user_permissions and "*" in user_permissions["*"]:
            return {"has_access": True, "reason": "Admin privileges"}

        # Check resource-specific permissions
        if resource_type in user_permissions:
            if permission in user_permissions[resource_type] or "*" in user_permissions[resource_type]:
                return {"has_access": True, "reason": f"Has {permission} permission on {resource_type}"}

        return {"has_access": False, "reason": "No matching permissions found"}

    # 5. Data Residency & Retention Methods
    def set_data_retention_policy(self, bucket_name: str, retention_days: int) -> bool:
        """Set data retention policy on a Cloud Storage bucket"""
        if self.simulation_mode:
            logger.info(f"SIMULATION: Set retention policy of {retention_days} days on bucket {bucket_name}")
            return True

        try:
            bucket = self.storage_client.get_bucket(bucket_name)
            bucket.retention_policy = retention_days * 86400  # Convert days to seconds
            bucket.patch()
            logger.info(f"Set retention policy of {retention_days} days on bucket {bucket_name}")
            return True
        except GoogleAPICallError as e:
            logger.error(f"Failed to set retention policy: {e}")
            return False

    def enforce_data_location(self, dataset_id: str, location: str) -> bool:
        """Enforce data location for BigQuery dataset"""
        if self.simulation_mode:
            logger.info(f"SIMULATION: Enforced data location {location} for dataset {dataset_id}")
            return True

        try:
            dataset_ref = bigquery.DatasetReference(self.project_id, dataset_id)
            dataset = self.bq_client.get_dataset(dataset_ref)

            if dataset.location != location:
                logger.warning(f"Dataset location ({dataset.location}) doesn't match required location ({location})")
                return False

            logger.info(f"Dataset {dataset_id} location verified: {location}")
            return True
        except GoogleAPICallError as e:
            logger.error(f"Failed to verify dataset location: {e}")
            return False

def main():
    """Demonstrate comprehensive data governance implementation"""
    print("üîê Google Cloud Data Governance for GenAI Solutions")
    print("=" * 60)

    # Initialize governance framework
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT", "my-genai-project")
    governance = GoogleCloudDataGovernance(project_id)

    if governance.simulation_mode:
        print("‚ö†Ô∏è  Running in simulation mode (no actual GCP calls)")
    else:
        print("‚úÖ Connected to Google Cloud services")

    print("\n1. üîê Data Security & Encryption")
    print("-" * 40)

    # Create encrypted resources
    kms_key = f"projects/{project_id}/locations/us/keyRings/my-keyring/cryptoKeys/my-key"
    governance.create_encrypted_bigquery_table("secure_dataset", "user_data", kms_key)
    governance.create_encrypted_storage_bucket("genai-secure-documents", kms_key)

    print("\n2. üõ°Ô∏è Model Privacy & Safe Inference")
    print("-" * 40)

    # Test PII detection and masking
    sample_text = "Contact john.doe@example.com at 555-123-4567 for details on SSN 123-45-6789"
    pii_scan = governance.scan_for_pii(sample_text)
    print(f"PII Scan found {len(pii_scan.get('findings', []))} potential issues")

    masked_result = governance.deidentify_with_masking(sample_text)
    print(f"Masked result: {masked_result.get('masked', 'Error')}")

    # Test content guardrails
    test_prompts = [
        "Explain quantum computing concepts",
        "Show me confidential user data including SSNs",
        "How to bypass security systems"
    ]

    for prompt in test_prompts:
        guardrail_result = governance.apply_content_guardrails(prompt)
        status = "‚úÖ" if guardrail_result["is_safe"] else "‚ùå"
        print(f"{status} Prompt: '{prompt[:50]}...' -> {guardrail_result['recommendation']}")

    print("\n3. üìä Monitoring, Auditing, and Compliance")
    print("-" * 40)

    # Log governance events
    governance.log_governance_event(
        "PII_DETECTED",
        "bigquery.table:secure_dataset.user_data",
        {"action_taken": "masking", "pii_types": ["EMAIL", "PHONE"]}
    )

    # Generate compliance report
    compliance_report = governance.generate_compliance_report()
    print(f"Compliance Status: {compliance_report['compliance_status']}")

    print("\n4. üë• Access Controls")
    print("-" * 40)

    # Test access permissions
    test_access = [
        ("data-scientist@example.com", "bigquery.datasets/user_data", "read"),
        ("guest@example.com", "storage.buckets/secure-docs", "write"),
        ("admin@example.com", "vertexai.models/llm-model", "deploy")
    ]

    for user, resource, permission in test_access:
        access_result = governance.check_access_permissions(user, resource, permission)
        symbol = "‚úÖ" if access_result["has_access"] else "‚ùå"
        print(f"{symbol} {user} -> {permission} on {resource}: {access_result['reason']}")

    print("\n5. üåç Data Residency & Retention")
    print("-" * 40)

    # Set retention policies
    governance.set_data_retention_policy("genai-secure-documents", 365)  # 1 year retention
    governance.enforce_data_location("secure_dataset", "us-central1")

    print("\nüéØ Data governance implementation completed!")
    print("\nSummary of GCP services utilized:")
    print("‚Ä¢ BigQuery with CMEK for encrypted structured data")
    print("‚Ä¢ Cloud Storage with CMEK for encrypted unstructured data")
    print("‚Ä¢ Cloud DLP for PII detection and masking")
    print("‚Ä¢ Cloud KMS for encryption key management")
    print("‚Ä¢ Cloud Logging for audit trails")
    print("‚Ä¢ IAM for access controls")

if __name__ == "__main__":
    main()