# Lab 14 - Part 2: Security and Authentication

## Overview
This notebook covers enterprise-grade security implementation including:
- Security manager implementation
- Password hashing and encryption
- JWT token generation
- User authentication with RBAC
- Failed login attempt tracking

**Duration:** 10 minutes  
**Prerequisites:** Part 1 completed

## Prerequisites

Import necessary modules and verify the production configuration from Part 1.

In [None]:
# Standard library imports
import time
import hashlib
import secrets
import json
import base64
from datetime import datetime, timedelta

# Check for optional security libraries
try:
    import jwt
    JWT_AVAILABLE = True
    print("✓ JWT library available")
except ImportError:
    JWT_AVAILABLE = False
    print("⚠️ JWT not available - using basic token generation")

try:
    from cryptography.fernet import Fernet
    CRYPTOGRAPHY_AVAILABLE = True
    print("✓ Cryptography library available")
except ImportError:
    CRYPTOGRAPHY_AVAILABLE = False
    print("⚠️ Cryptography not available - using basic encryption")

# Mock prod_config if not available from Part 1
try:
    prod_config
    print("✓ Production config loaded from Part 1")
except NameError:
    class MockConfig:
        def __init__(self):
            self.deployment_id = f"deploy_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    prod_config = MockConfig()
    print("⚠️ Using mock configuration - run Part 1 first for full setup")

## Security Manager Implementation

Create a comprehensive security manager with encryption, password hashing, and rate limiting.

In [None]:
class SecurityManager:
    """Enterprise security manager for production deployment"""
    
    def __init__(self):
        # Initialize encryption with fallback
        if CRYPTOGRAPHY_AVAILABLE:
            self.encryption_key = Fernet.generate_key()
            self.cipher_suite = Fernet(self.encryption_key)
        else:
            # Basic encryption fallback
            self.encryption_key = secrets.token_bytes(32)
            self.cipher_suite = None
            print("⚠️ Using basic encryption - install cryptography for production security")
        
        self.failed_login_attempts = {}
        self.max_login_attempts = 3
        self.lockout_duration = 300  # 5 minutes
    
    def hash_password(self, password: str) -> str:
        """Hash password using secure algorithm"""
        salt = secrets.token_hex(16)
        password_hash = hashlib.pbkdf2_hmac('sha256', 
                                           password.encode('utf-8'), 
                                           salt.encode('utf-8'), 
                                           100000)
        return f"{salt}:{password_hash.hex()}"
    
    def verify_password(self, password: str, hashed: str) -> bool:
        """Verify password against hash"""
        try:
            salt, stored_hash = hashed.split(':')
            password_hash = hashlib.pbkdf2_hmac('sha256',
                                               password.encode('utf-8'),
                                               salt.encode('utf-8'),
                                               100000)
            return password_hash.hex() == stored_hash
        except ValueError:
            return False
    
    def generate_jwt_token(self, user_data: dict, expiry_hours: int = 8) -> str:
        """Generate secure JWT token with fallback"""
        payload = {
            'user_id': user_data.get('user_id'),
            'role': user_data.get('role'),
            'permissions': user_data.get('permissions', []),
            'exp': datetime.utcnow() + timedelta(hours=expiry_hours),
            'iat': datetime.utcnow(),
            'deployment_id': prod_config.deployment_id
        }
        
        if JWT_AVAILABLE:
            return jwt.encode(payload, str(self.encryption_key), algorithm='HS256')
        else:
            # Basic token fallback
            token_data = json.dumps(payload, default=str)
            return base64.b64encode(token_data.encode()).decode()
    
    def verify_jwt_token(self, token: str) -> dict:
        """Verify and decode JWT token with fallback"""
        try:
            if JWT_AVAILABLE:
                payload = jwt.decode(token, str(self.encryption_key), algorithms=['HS256'])
                return payload
            else:
                # Basic token verification fallback
                token_data = base64.b64decode(token.encode()).decode()
                payload = json.loads(token_data)
                
                # Check expiration
                exp_time = datetime.fromisoformat(payload['exp'].replace('Z', '+00:00'))
                if datetime.utcnow() > exp_time.replace(tzinfo=None):
                    raise ValueError("Token has expired")
                
                return payload
        except Exception as e:
            raise ValueError(f"Invalid token: {e}")
    
    def check_rate_limiting(self, client_ip: str) -> bool:
        """Check if client is rate limited"""
        now = datetime.now()
        
        if client_ip in self.failed_login_attempts:
            attempts, last_attempt = self.failed_login_attempts[client_ip]
            
            # Reset if lockout period has passed
            if (now - last_attempt).seconds > self.lockout_duration:
                del self.failed_login_attempts[client_ip]
                return True
            
            # Check if still locked out
            if attempts >= self.max_login_attempts:
                return False
        
        return True
    
    def record_failed_login(self, client_ip: str):
        """Record failed login attempt"""
        now = datetime.now()
        
        if client_ip in self.failed_login_attempts:
            attempts, _ = self.failed_login_attempts[client_ip]
            self.failed_login_attempts[client_ip] = (attempts + 1, now)
        else:
            self.failed_login_attempts[client_ip] = (1, now)
    
    def encrypt_sensitive_data(self, data: str) -> str:
        """Encrypt sensitive data with fallback"""
        if CRYPTOGRAPHY_AVAILABLE and self.cipher_suite:
            return self.cipher_suite.encrypt(data.encode()).decode()
        else:
            # Basic encoding fallback (NOT secure for production)
            return base64.b64encode(data.encode()).decode()
    
    def decrypt_sensitive_data(self, encrypted_data: str) -> str:
        """Decrypt sensitive data with fallback"""
        if CRYPTOGRAPHY_AVAILABLE and self.cipher_suite:
            return self.cipher_suite.decrypt(encrypted_data.encode()).decode()
        else:
            # Basic decoding fallback
            return base64.b64decode(encrypted_data.encode()).decode()

# Initialize security manager
security_manager = SecurityManager()
print("✓ Security manager initialized with enterprise-grade protection")

## Test Password Hashing

Verify that password hashing and verification work correctly.

In [None]:
print("🔐 Testing Password Security...\n")

# Test password hashing
test_password = "SecurePassword123!"
hashed = security_manager.hash_password(test_password)
print(f"Original Password: {test_password}")
print(f"Hashed Password: {hashed[:50]}...")

# Test password verification
is_valid = security_manager.verify_password(test_password, hashed)
print(f"\n✓ Correct password verification: {is_valid}")

is_invalid = security_manager.verify_password("WrongPassword", hashed)
print(f"✗ Wrong password verification: {is_invalid}")

print("\n✅ Password hashing and verification working correctly")

## User Authentication System

Implement a comprehensive authentication system with Role-Based Access Control (RBAC).

In [None]:
class UserAuthenticationSystem:
    """Production user authentication with role-based access control"""
    
    def __init__(self, security_manager: SecurityManager):
        self.security_manager = security_manager
        self.users = {}
        self.roles = {
            "admin": ["read", "write", "delete", "manage_users", "system_admin"],
            "agent": ["read", "write", "customer_management", "policy_management"],
            "adjuster": ["read", "write", "claims_management", "investigation"],
            "customer": ["read", "self_service"],
            "auditor": ["read", "audit_access", "compliance_reports"]
        }
        
        # Create default production users
        self._create_default_users()
    
    def _create_default_users(self):
        """Create default production users"""
        default_users = [
            {
                "user_id": "prod_admin",
                "username": "production_admin",
                "password": "ProdAdmin@2024!",
                "role": "admin",
                "email": "admin@insurance-company.com",
                "department": "IT Operations"
            },
            {
                "user_id": "lead_agent",
                "username": "lead_agent_001",
                "password": "Agent@Secure123",
                "role": "agent",
                "email": "lead.agent@insurance-company.com",
                "department": "Sales"
            },
            {
                "user_id": "senior_adjuster",
                "username": "senior_adjuster_001",
                "password": "Adjuster@Pro456",
                "role": "adjuster",
                "email": "senior.adjuster@insurance-company.com",
                "department": "Claims"
            },
            {
                "user_id": "compliance_auditor",
                "username": "compliance_audit",
                "password": "Audit@Secure789",
                "role": "auditor",
                "email": "compliance@insurance-company.com",
                "department": "Compliance"
            }
        ]
        
        for user_data in default_users:
            self.create_user(user_data)
    
    def create_user(self, user_data: dict) -> bool:
        """Create new user with secure password hashing"""
        try:
            user_id = user_data['user_id']
            
            # Hash password
            hashed_password = self.security_manager.hash_password(user_data['password'])
            
            # Store user data
            self.users[user_id] = {
                'user_id': user_id,
                'username': user_data['username'],
                'password_hash': hashed_password,
                'role': user_data['role'],
                'permissions': self.roles.get(user_data['role'], []),
                'email': user_data['email'],
                'department': user_data.get('department'),
                'created_at': datetime.now().isoformat(),
                'last_login': None,
                'active': True
            }
            
            return True
            
        except Exception as e:
            print(f"Error creating user: {e}")
            return False
    
    def authenticate_user(self, username: str, password: str, client_ip: str) -> dict:
        """Authenticate user with rate limiting and security checks"""
        
        # Check rate limiting
        if not self.security_manager.check_rate_limiting(client_ip):
            raise ValueError("Too many failed login attempts. Please try again later.")
        
        # Find user by username
        user = None
        for user_data in self.users.values():
            if user_data['username'] == username:
                user = user_data
                break
        
        if not user or not user['active']:
            self.security_manager.record_failed_login(client_ip)
            raise ValueError("Invalid credentials")
        
        # Verify password
        if not self.security_manager.verify_password(password, user['password_hash']):
            self.security_manager.record_failed_login(client_ip)
            raise ValueError("Invalid credentials")
        
        # Update last login
        user['last_login'] = datetime.now().isoformat()
        
        # Generate JWT token
        token = self.security_manager.generate_jwt_token(user)
        
        return {
            'user_id': user['user_id'],
            'username': user['username'],
            'role': user['role'],
            'permissions': user['permissions'],
            'token': token,
            'expires_at': (datetime.utcnow() + timedelta(hours=8)).isoformat()
        }
    
    def verify_token_and_permissions(self, token: str, required_permission: str) -> bool:
        """Verify token and check permissions"""
        try:
            payload = self.security_manager.verify_jwt_token(token)
            user_permissions = payload.get('permissions', [])
            return required_permission in user_permissions
        except ValueError:
            return False

# Initialize authentication system
auth_system = UserAuthenticationSystem(security_manager)
print("✓ User authentication system configured with RBAC")
print(f"\n👥 Created {len(auth_system.users)} default users")

## Test Authentication System

Test user authentication with various scenarios including successful login and rate limiting.

In [None]:
print("🔑 Testing Authentication System...\n")

# Test successful authentication
try:
    auth_result = auth_system.authenticate_user(
        username="production_admin",
        password="ProdAdmin@2024!",
        client_ip="192.168.1.100"
    )
    
    print("✓ Authentication successful!")
    print(f"  User: {auth_result['username']}")
    print(f"  Role: {auth_result['role']}")
    print(f"  Permissions: {', '.join(auth_result['permissions'])}")
    print(f"  Token: {auth_result['token'][:50]}...")
    print(f"  Expires: {auth_result['expires_at']}")
    
    # Store token for testing
    admin_token = auth_result['token']
    
except ValueError as e:
    print(f"✗ Authentication failed: {e}")

# Test failed authentication
print("\n🔍 Testing failed login...")
try:
    auth_system.authenticate_user(
        username="production_admin",
        password="WrongPassword",
        client_ip="192.168.1.101"
    )
except ValueError as e:
    print(f"✓ Expected failure: {e}")

# Test permission verification
print("\n🔐 Testing permission verification...")
has_permission = auth_system.verify_token_and_permissions(admin_token, "system_admin")
print(f"✓ Admin has 'system_admin' permission: {has_permission}")

no_permission = auth_system.verify_token_and_permissions(admin_token, "invalid_permission")
print(f"✗ Admin has 'invalid_permission' permission: {no_permission}")

print("\n✅ Authentication system fully operational")

## Display User Roles and Permissions

Show all configured roles and their associated permissions.

In [None]:
print("📋 Role-Based Access Control Configuration:\n")

for role, permissions in auth_system.roles.items():
    print(f"{role.upper()}:")
    for perm in permissions:
        print(f"  - {perm}")
    print()

print("\n👥 Configured Users:")
for user_id, user_data in auth_system.users.items():
    print(f"\n{user_data['username']}:")
    print(f"  Role: {user_data['role']}")
    print(f"  Email: {user_data['email']}")
    print(f"  Department: {user_data['department']}")
    print(f"  Status: {'Active' if user_data['active'] else 'Inactive'}")

## Key Takeaways

In this notebook, you've:
1. ✅ Implemented enterprise security manager
2. ✅ Created secure password hashing with PBKDF2
3. ✅ Configured JWT token generation and verification
4. ✅ Built user authentication system with RBAC
5. ✅ Implemented rate limiting and failed login tracking

**Security Best Practices:**
- Always use strong password hashing (PBKDF2, bcrypt, or Argon2)
- Implement rate limiting to prevent brute force attacks
- Use JWT tokens with reasonable expiration times
- Apply role-based access control for authorization
- Log all authentication attempts for audit purposes

**Next Steps:** Proceed to notebook 03 for Monitoring and Logging implementation.