In [None]:
import os
import sunra_client
import getpass
import time
import json
from datetime import datetime
from typing import Dict, Any, Optional

# Set up API key if not already configured
if 'SUNRA_KEY' not in os.environ:
    api_key = getpass.getpass("Enter your Sunra API key: ")
    os.environ['SUNRA_KEY'] = api_key

print("✓ Environment configured!")


In [None]:
def demonstrate_configuration_methods():
    """Demonstrate different ways to configure the Sunra client"""
    
    api_key = os.environ.get('SUNRA_KEY')
    
    print("=== Configuration Methods ===")
    print("1. Environment Variables (Default)")
    print("2. Global Configuration")
    print("3. Explicit Client Configuration")
    print()
    
    # Method 1: Environment Variables
    print("Method 1: Environment Variables")
    print("- API key automatically read from SUNRA_KEY environment variable")
    print("- No additional configuration needed")
    print("- Most secure for production environments")
    print()
    
    # Method 2: Global Configuration
    print("Method 2: Global Configuration")
    sunra_client.config(credentials=api_key)
    print("- sunra_client.config(credentials=api_key)")
    print("- Sets credentials globally for all client instances")
    print("- Useful for dynamic credential management")
    print()
    
    # Method 3: Explicit Client Configuration
    print("Method 3: Explicit Client Configuration")
    sync_client = sunra_client.SyncClient(key=api_key)
    print("- client = sunra_client.SyncClient(key=api_key)")
    print("- Overrides global and environment configurations")
    print("- Best for multiple API keys or client-specific settings")
    print()
    
    # Priority demonstration
    print("=== Configuration Priority (Highest to Lowest) ===")
    print("1. Explicit client key (overrides all)")
    print("2. Global config (via sunra_client.config())")
    print("3. Environment variables (default fallback)")
    
    return sync_client

# Demonstrate configuration methods
explicit_client = demonstrate_configuration_methods()


In [None]:
class SunraCredentialManager:
    """Manage multiple Sunra API credentials"""
    
    def __init__(self):
        self.credentials = {}
        self.current_profile = None
    
    def add_credential(self, profile_name: str, api_key: str, description: str = ""):
        """Add a new credential profile"""
        self.credentials[profile_name] = {
            'api_key': api_key,
            'description': description,
            'created_at': datetime.now().isoformat()
        }
        print(f"✓ Added credential profile: {profile_name}")
    
    def list_profiles(self):
        """List all available credential profiles"""
        print("Available credential profiles:")
        for profile, info in self.credentials.items():
            active = " (ACTIVE)" if profile == self.current_profile else ""
            print(f"  - {profile}: {info['description']}{active}")
    
    def switch_profile(self, profile_name: str):
        """Switch to a different credential profile"""
        if profile_name not in self.credentials:
            print(f"❌ Profile '{profile_name}' not found")
            return False
        
        api_key = self.credentials[profile_name]['api_key']
        sunra_client.config(credentials=api_key)
        self.current_profile = profile_name
        print(f"✓ Switched to profile: {profile_name}")
        return True
    
    def get_current_profile(self):
        """Get the currently active profile"""
        return self.current_profile
    
    def test_profile(self, profile_name: str):
        """Test a credential profile"""
        if profile_name not in self.credentials:
            print(f"❌ Profile '{profile_name}' not found")
            return False
        
        print(f"Testing profile: {profile_name}")
        current = self.current_profile
        
        # Temporarily switch to test profile
        self.switch_profile(profile_name)
        
        try:
            # Make a simple API call to test
            result = sunra_client.subscribe(
                "black-forest-labs/flux-kontext-pro/text-to-image",
                arguments={
                    "prompt": "test image",
                    "prompt_enhancer": False,
                    "seed": 42,
                    "aspect_ratio": "1:1",
                    "output_format": "jpeg",
                    "safety_tolerance": 6
                },
                with_logs=False
            )
            
            print(f"✓ Profile '{profile_name}' is working correctly")
            
            # Switch back to original profile
            if current:
                self.switch_profile(current)
            
            return True
            
        except Exception as e:
            print(f"❌ Profile '{profile_name}' failed: {e}")
            
            # Switch back to original profile
            if current:
                self.switch_profile(current)
            
            return False

# Example usage
credential_manager = SunraCredentialManager()

# Add profiles (in practice, you'd use different API keys)
main_api_key = os.environ.get('SUNRA_KEY')
credential_manager.add_credential("main", main_api_key, "Main production API key")
credential_manager.add_credential("backup", main_api_key, "Backup API key")  # Use same key for demo

# List profiles
credential_manager.list_profiles()

# Switch profiles
print("\n=== Profile Switching Demo ===")
credential_manager.switch_profile("main")
credential_manager.switch_profile("backup")

print(f"Current profile: {credential_manager.get_current_profile()}")


In [None]:
class RequestTracker:
    """Track and log request progress"""
    
    def __init__(self):
        self.requests = {}
        self.start_time = None
    
    def on_enqueue(self, request_id: str):
        """Handle request enqueue event"""
        self.start_time = time.time()
        self.requests[request_id] = {
            'enqueued_at': datetime.now().isoformat(),
            'status_updates': [],
            'completed_at': None
        }
        print(f"🚀 Request {request_id} enqueued at {datetime.now().strftime('%H:%M:%S')}")
    
    def on_queue_update(self, status_update: Dict[str, Any]):
        """Handle queue status updates"""
        request_id = status_update.get('id', 'unknown')
        status = status_update.get('status', 'unknown')
        
        if request_id in self.requests:
            self.requests[request_id]['status_updates'].append({
                'timestamp': datetime.now().isoformat(),
                'status': status,
                'update': status_update
            })
        
        elapsed = time.time() - self.start_time if self.start_time else 0
        print(f"📊 [{elapsed:.1f}s] Status: {status}")
        
        # Show progress if available
        if 'progress' in status_update:
            progress = status_update['progress']
            print(f"   Progress: {progress}%")
        
        # Show queue position if available
        if 'queue_position' in status_update:
            position = status_update['queue_position']
            print(f"   Queue position: {position}")
    
    def on_complete(self, request_id: str, result: Dict[str, Any]):
        """Handle request completion"""
        if request_id in self.requests:
            self.requests[request_id]['completed_at'] = datetime.now().isoformat()
        
        elapsed = time.time() - self.start_time if self.start_time else 0
        print(f"✅ Request {request_id} completed in {elapsed:.1f}s")
        
        # Log result summary
        if result.get('images'):
            print(f"   Generated {len(result['images'])} image(s)")
        if result.get('videos'):
            print(f"   Generated {len(result['videos'])} video(s)")
        if result.get('text'):
            print(f"   Generated text: {len(result['text'])} characters")
    
    def get_request_summary(self, request_id: str) -> Dict[str, Any]:
        """Get summary of a specific request"""
        return self.requests.get(request_id, {})
    
    def get_all_requests(self) -> Dict[str, Dict[str, Any]]:
        """Get all tracked requests"""
        return self.requests

# Example usage with custom callbacks
tracker = RequestTracker()

print("=== Advanced Callback Example ===")
print("Making a request with detailed tracking...")

try:
    result = sunra_client.subscribe(
        "black-forest-labs/flux-kontext-pro/text-to-image",
        arguments={
            "prompt": "a detailed landscape with mountains and lakes",
            "prompt_enhancer": False,
            "seed": 123,
            "aspect_ratio": "16:9",
            "output_format": "jpeg",
            "safety_tolerance": 6
        },
        with_logs=True,
        on_enqueue=tracker.on_enqueue,
        on_queue_update=tracker.on_queue_update,
    )
    
    # Simulate completion tracking
    if result:
        # Find the request ID (in practice, you'd get this from on_enqueue)
        request_id = list(tracker.requests.keys())[-1] if tracker.requests else "unknown"
        tracker.on_complete(request_id, result)
        
        print(f"\n📋 Request Summary:")
        summary = tracker.get_request_summary(request_id)
        print(f"   Enqueued: {summary.get('enqueued_at', 'Unknown')}")
        print(f"   Status updates: {len(summary.get('status_updates', []))}")
        print(f"   Completed: {summary.get('completed_at', 'Unknown')}")
        
        if result.get('images'):
            print(f"   Result: {result['images'][0]['url']}")
            
except Exception as e:
    print(f"❌ Error: {e}")


In [None]:
import random
from time import sleep

class SunraClientWithRetry:
    """Sunra client wrapper with retry logic"""
    
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def _exponential_backoff(self, attempt: int) -> float:
        """Calculate exponential backoff delay"""
        delay = self.base_delay * (2 ** attempt)
        # Add jitter to prevent thundering herd
        jitter = random.uniform(0.1, 0.5)
        return delay + jitter
    
    def subscribe_with_retry(self, endpoint: str, arguments: Dict[str, Any], **kwargs) -> Optional[Dict[str, Any]]:
        """Subscribe with retry logic"""
        
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                print(f"Attempt {attempt + 1}/{self.max_retries + 1}")
                
                result = sunra_client.subscribe(
                    endpoint,
                    arguments=arguments,
                    **kwargs
                )
                
                print(f"✅ Success on attempt {attempt + 1}")
                return result
                
            except Exception as e:
                last_exception = e
                print(f"❌ Attempt {attempt + 1} failed: {e}")
                
                if attempt < self.max_retries:
                    delay = self._exponential_backoff(attempt)
                    print(f"⏳ Retrying in {delay:.1f} seconds...")
                    sleep(delay)
                else:
                    print(f"💥 All {self.max_retries + 1} attempts failed")
        
        # Re-raise the last exception if all retries failed
        if last_exception:
            raise last_exception
        
        return None
    
    def run_with_retry(self, endpoint: str, arguments: Dict[str, Any], **kwargs) -> Optional[Dict[str, Any]]:
        """Run with retry logic (for blocking calls)"""
        
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                print(f"Attempt {attempt + 1}/{self.max_retries + 1}")
                
                result = sunra_client.run(
                    endpoint,
                    arguments=arguments,
                    **kwargs
                )
                
                print(f"✅ Success on attempt {attempt + 1}")
                return result
                
            except Exception as e:
                last_exception = e
                print(f"❌ Attempt {attempt + 1} failed: {e}")
                
                if attempt < self.max_retries:
                    delay = self._exponential_backoff(attempt)
                    print(f"⏳ Retrying in {delay:.1f} seconds...")
                    sleep(delay)
                else:
                    print(f"💥 All {self.max_retries + 1} attempts failed")
        
        # Re-raise the last exception if all retries failed
        if last_exception:
            raise last_exception
        
        return None

# Example usage
retry_client = SunraClientWithRetry(max_retries=2, base_delay=1.0)

print("=== Retry Strategy Example ===")
print("This will demonstrate retry logic (may take time if retries are needed)")

try:
    result = retry_client.subscribe_with_retry(
        "black-forest-labs/flux-kontext-pro/text-to-image",
        arguments={
            "prompt": "a robust system with error handling",
            "prompt_enhancer": False,
            "seed": 456,
            "aspect_ratio": "1:1",
            "output_format": "jpeg",
            "safety_tolerance": 6
        },
        with_logs=False,
        on_enqueue=lambda req_id: print(f"📝 Request {req_id} enqueued"),
        on_queue_update=lambda status: print(f"📊 Status: {status.get('status', 'unknown')}")
    )
    
    if result and result.get('images'):
        print(f"🎉 Final result: {result['images'][0]['url']}")
        
except Exception as e:
    print(f"💥 Final error after all retries: {e}")


In [None]:
class EnvironmentConfig:
    """Environment-specific configuration management"""
    
    def __init__(self):
        self.configs = {
            'development': {
                'api_key': os.environ.get('SUNRA_KEY_DEV', os.environ.get('SUNRA_KEY')),
                'timeout': 30,
                'retry_attempts': 1,
                'debug': True,
                'log_level': 'verbose'
            },
            'staging': {
                'api_key': os.environ.get('SUNRA_KEY_STAGING', os.environ.get('SUNRA_KEY')),
                'timeout': 60,
                'retry_attempts': 2,
                'debug': True,
                'log_level': 'info'
            },
            'production': {
                'api_key': os.environ.get('SUNRA_KEY_PROD', os.environ.get('SUNRA_KEY')),
                'timeout': 120,
                'retry_attempts': 3,
                'debug': False,
                'log_level': 'error'
            }
        }
        
        # Detect environment
        self.current_env = os.environ.get('ENVIRONMENT', 'development')
        
    def get_config(self, env: str = None) -> Dict[str, Any]:
        """Get configuration for specific environment"""
        env = env or self.current_env
        return self.configs.get(env, self.configs['development'])
    
    def apply_config(self, env: str = None):
        """Apply configuration for specific environment"""
        config = self.get_config(env)
        env_name = env or self.current_env
        
        print(f"=== Applying {env_name.upper()} Configuration ===")
        
        # Configure API key
        if config['api_key']:
            sunra_client.config(credentials=config['api_key'])
            print(f"✓ API key configured for {env_name}")
        else:
            print(f"⚠️  No API key found for {env_name}")
        
        # Log configuration
        print(f"✓ Timeout: {config['timeout']}s")
        print(f"✓ Retry attempts: {config['retry_attempts']}")
        print(f"✓ Debug mode: {config['debug']}")
        print(f"✓ Log level: {config['log_level']}")
        
        return config
    
    def list_environments(self):
        """List all available environments"""
        print("Available environments:")
        for env in self.configs.keys():
            current = " (CURRENT)" if env == self.current_env else ""
            print(f"  - {env}{current}")
    
    def validate_config(self, env: str = None) -> bool:
        """Validate configuration for specific environment"""
        config = self.get_config(env)
        env_name = env or self.current_env
        
        print(f"Validating {env_name} configuration...")
        
        # Check API key
        if not config['api_key']:
            print(f"❌ No API key configured for {env_name}")
            return False
        
        # Check API key format (basic validation)
        if len(config['api_key']) < 10:
            print(f"❌ API key for {env_name} seems too short")
            return False
        
        print(f"✅ Configuration for {env_name} is valid")
        return True

# Example usage
env_config = EnvironmentConfig()

print("Current environment setup:")
env_config.list_environments()

print(f"\nDetected environment: {env_config.current_env}")

# Apply configuration
config = env_config.apply_config()

# Validate configuration
env_config.validate_config()

# Show configuration switching
print("\n=== Environment Switching Example ===")
for env in ['development', 'staging', 'production']:
    print(f"\n--- {env.upper()} ---")
    test_config = env_config.get_config(env)
    print(f"Timeout: {test_config['timeout']}s")
    print(f"Retries: {test_config['retry_attempts']}")
    print(f"Debug: {test_config['debug']}")
    valid = env_config.validate_config(env)
    print(f"Valid: {valid}")


In [None]:
# Example configuration file content
example_config = {
    "sunra": {
        "api_key": "${SUNRA_KEY}",  # Environment variable reference
        "timeout": 60,
        "retry_attempts": 3,
        "default_parameters": {
            "text_to_image": {
                "aspect_ratio": "16:9",
                "output_format": "jpeg",
                "safety_tolerance": 6,
                "prompt_enhancer": False
            },
            "image_to_video": {
                "duration": 5,
                "guidance_scale": 0.5,
                "aspect_ratio": "16:9"
            },
            "speech_to_text": {
                "language": "English",
                "tag_audio_events": True,
                "speaker_diarization": True
            }
        }
    },
    "logging": {
        "level": "INFO",
        "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    }
}

class ConfigManager:
    """Manage configuration from files and environment variables"""
    
    def __init__(self, config_dict: Dict[str, Any] = None):
        self.config = config_dict or {}
        self.resolved_config = {}
        
    def load_from_dict(self, config_dict: Dict[str, Any]):
        """Load configuration from dictionary"""
        self.config = config_dict
        self.resolved_config = {}
        
    def resolve_environment_variables(self, value: Any) -> Any:
        """Resolve environment variable references in configuration"""
        if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
            env_var = value[2:-1]  # Remove ${ and }
            return os.environ.get(env_var, value)
        elif isinstance(value, dict):
            return {k: self.resolve_environment_variables(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [self.resolve_environment_variables(v) for v in value]
        else:
            return value
    
    def get_resolved_config(self) -> Dict[str, Any]:
        """Get configuration with environment variables resolved"""
        if not self.resolved_config:
            self.resolved_config = self.resolve_environment_variables(self.config)
        return self.resolved_config
    
    def get_sunra_config(self) -> Dict[str, Any]:
        """Get Sunra-specific configuration"""
        resolved = self.get_resolved_config()
        return resolved.get('sunra', {})
    
    def get_default_parameters(self, endpoint_type: str) -> Dict[str, Any]:
        """Get default parameters for specific endpoint type"""
        sunra_config = self.get_sunra_config()
        defaults = sunra_config.get('default_parameters', {})
        return defaults.get(endpoint_type, {})
    
    def apply_sunra_config(self):
        """Apply Sunra configuration"""
        sunra_config = self.get_sunra_config()
        
        # Configure API key
        api_key = sunra_config.get('api_key')
        if api_key:
            sunra_client.config(credentials=api_key)
            print(f"✓ API key configured from config file")
        
        # Log other settings
        print(f"✓ Timeout: {sunra_config.get('timeout', 'default')}")
        print(f"✓ Retry attempts: {sunra_config.get('retry_attempts', 'default')}")
        
        return sunra_config
    
    def create_request_with_defaults(self, endpoint: str, endpoint_type: str, 
                                   custom_arguments: Dict[str, Any] = None) -> Dict[str, Any]:
        """Create request with default parameters"""
        defaults = self.get_default_parameters(endpoint_type)
        arguments = defaults.copy()
        
        if custom_arguments:
            arguments.update(custom_arguments)
        
        return {
            'endpoint': endpoint,
            'arguments': arguments,
            'defaults_applied': list(defaults.keys())
        }

# Example usage
config_manager = ConfigManager(example_config)

print("=== Configuration File Management ===")
print("Example configuration loaded:")
print(json.dumps(example_config, indent=2))

print("\n=== Resolved Configuration ===")
resolved = config_manager.get_resolved_config()
print(f"API Key: {resolved['sunra']['api_key'][:20]}..." if resolved['sunra']['api_key'] else "Not configured")

# Apply configuration
sunra_config = config_manager.apply_sunra_config()

print("\n=== Default Parameters ===")
for endpoint_type in ['text_to_image', 'image_to_video', 'speech_to_text']:
    defaults = config_manager.get_default_parameters(endpoint_type)
    print(f"{endpoint_type}: {list(defaults.keys())}")

# Create request with defaults
print("\n=== Request with Defaults ===")
request_config = config_manager.create_request_with_defaults(
    "black-forest-labs/flux-kontext-pro/text-to-image",
    "text_to_image",
    {"prompt": "a configuration example image", "seed": 789}
)

print(f"Endpoint: {request_config['endpoint']}")
print(f"Arguments: {request_config['arguments']}")
print(f"Defaults applied: {request_config['defaults_applied']}")


In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import threading

class PerformanceOptimizer:
    """Performance optimization utilities"""
    
    def __init__(self):
        self.stats = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'total_time': 0,
            'average_time': 0
        }
        self.lock = threading.Lock()
    
    def time_request(self, func, *args, **kwargs):
        """Time a request and update statistics"""
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            
            with self.lock:
                self.stats['successful_requests'] += 1
                
            return result
            
        except Exception as e:
            with self.lock:
                self.stats['failed_requests'] += 1
            raise e
            
        finally:
            end_time = time.time()
            elapsed = end_time - start_time
            
            with self.lock:
                self.stats['total_requests'] += 1
                self.stats['total_time'] += elapsed
                self.stats['average_time'] = self.stats['total_time'] / self.stats['total_requests']
    
    def get_stats(self) -> Dict[str, Any]:
        """Get performance statistics"""
        with self.lock:
            return self.stats.copy()
    
    def reset_stats(self):
        """Reset performance statistics"""
        with self.lock:
            self.stats = {
                'total_requests': 0,
                'successful_requests': 0,
                'failed_requests': 0,
                'total_time': 0,
                'average_time': 0
            }

def batch_generate_images(prompts, max_workers=3):
    """Generate multiple images in parallel"""
    print(f"Generating {len(prompts)} images with {max_workers} workers...")
    
    optimizer = PerformanceOptimizer()
    results = []
    
    def generate_single_image(prompt_data):
        prompt, seed = prompt_data
        try:
            result = optimizer.time_request(
                sunra_client.subscribe,
                "black-forest-labs/flux-kontext-pro/text-to-image",
                arguments={
                    "prompt": prompt,
                    "prompt_enhancer": False,
                    "seed": seed,
                    "aspect_ratio": "1:1",
                    "output_format": "jpeg",
                    "safety_tolerance": 6
                },
                with_logs=False
            )
            return {'prompt': prompt, 'seed': seed, 'result': result, 'error': None}
        except Exception as e:
            return {'prompt': prompt, 'seed': seed, 'result': None, 'error': str(e)}
    
    # Use ThreadPoolExecutor for parallel processing
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_prompt = {
            executor.submit(generate_single_image, (prompt, seed)): (prompt, seed)
            for prompt, seed in prompts
        }
        
        # Collect results
        for future in future_to_prompt:
            result = future.result()
            results.append(result)
            
            if result['error']:
                print(f"❌ Failed: {result['prompt']} - {result['error']}")
            else:
                print(f"✅ Success: {result['prompt']}")
    
    # Print performance stats
    stats = optimizer.get_stats()
    print(f"\n📊 Performance Statistics:")
    print(f"   Total requests: {stats['total_requests']}")
    print(f"   Successful: {stats['successful_requests']}")
    print(f"   Failed: {stats['failed_requests']}")
    print(f"   Average time: {stats['average_time']:.2f}s")
    
    return results

# Performance optimization tips
print("=== Performance Optimization Tips ===")
print("1. Connection Pooling:")
print("   - Reuse client instances when possible")
print("   - Avoid creating new clients for each request")

print("\n2. Parallel Processing:")
print("   - Use ThreadPoolExecutor for I/O-bound operations")
print("   - Limit concurrent requests to avoid rate limiting")

print("\n3. Request Batching:")
print("   - Group similar requests together")
print("   - Use appropriate batch sizes")

print("\n4. Caching:")
print("   - Cache results for identical requests")
print("   - Implement request deduplication")

print("\n5. Error Handling:")
print("   - Implement exponential backoff")
print("   - Use circuit breakers for failing services")

# Example batch processing (commented out to save time/resources)
print("\n=== Batch Processing Example ===")
print("Example of generating multiple images in parallel:")

# Uncomment the following lines to run the batch example
# test_prompts = [
#     ("a cat sitting on a chair", 100),
#     ("a dog running in a park", 200),
#     ("a bird flying in the sky", 300)
# ]
# 
# batch_results = batch_generate_images(test_prompts, max_workers=2)
# 
# print(f"Batch processing completed: {len(batch_results)} results")

print("(Batch processing example commented out to save resources)")
