# Lab 11: Session Management & Security**Duration:** 45 minutes  **Objective:** Implement secure session management and authentication patterns## 🎯 Learning Objectives- Implement secure session storage- Build JWT token management- Create OAuth token storage- Implement security best practices- Handle session expiration and renewal

## Part 1: Session Management (10 minutes)

In [None]:
import redisimport jsonimport hashlibimport secretsimport timeimport jwtfrom datetime import datetime, timedeltafrom colorama import init, Fore, Stylefrom typing import Dict, Optional, Listimport base64from cryptography.fernet import Fernetinit(autoreset=True)# Connect to Redisr = redis.Redis(host='localhost', port=6379, decode_responses=True)r.flushdb()print(f"{Fore.GREEN}✅ Connected to Redis and cleared database{Style.RESET_ALL}")

In [None]:
class SessionManager:    """Secure session management with Redis."""        def __init__(self, redis_client, session_ttl=1800, max_sessions_per_user=5):        self.r = redis_client        self.session_ttl = session_ttl  # 30 minutes default        self.max_sessions_per_user = max_sessions_per_user                # Generate encryption key        self.encryption_key = Fernet.generate_key()        self.cipher = Fernet(self.encryption_key)        def create_session(self, user_id: str, user_data: Dict,                       device_info: Dict = None) -> str:        """Create a new session for user."""        # Generate secure session ID        session_id = secrets.token_urlsafe(32)                # Create session data        session_data = {            'session_id': session_id,            'user_id': user_id,            'user_data': user_data,            'device_info': device_info or {},            'created_at': datetime.now().isoformat(),            'last_activity': datetime.now().isoformat(),            'ip_address': device_info.get('ip_address') if device_info else None        }                # Encrypt sensitive data        encrypted_data = self._encrypt_session_data(session_data)                # Store session        session_key = f"session:{session_id}"        self.r.setex(session_key, self.session_ttl, encrypted_data)                # Add to user's session index        user_sessions_key = f"user:{user_id}:sessions"        self.r.zadd(user_sessions_key, {session_id: time.time()})                # Enforce max sessions per user        self._enforce_session_limit(user_id)                return session_id        def get_session(self, session_id: str) -> Optional[Dict]:        """Get and validate session."""        session_key = f"session:{session_id}"        encrypted_data = self.r.get(session_key)                if not encrypted_data:            return None                # Decrypt session data        session_data = self._decrypt_session_data(encrypted_data)                # Update last activity and refresh TTL        session_data['last_activity'] = datetime.now().isoformat()        self.r.expire(session_key, self.session_ttl)                return session_data        def destroy_session(self, session_id: str) -> bool:        """Destroy a session."""        session_key = f"session:{session_id}"        session_data = self.get_session(session_id)                if not session_data:            return False                user_id = session_data['user_id']                # Remove session        self.r.delete(session_key)                # Remove from user's session index        user_sessions_key = f"user:{user_id}:sessions"        self.r.zrem(user_sessions_key, session_id)                return True        def _encrypt_session_data(self, data: Dict) -> str:        json_data = json.dumps(data)        encrypted = self.cipher.encrypt(json_data.encode())        return base64.b64encode(encrypted).decode()        def _decrypt_session_data(self, encrypted_data: str) -> Dict:        decoded = base64.b64decode(encrypted_data.encode())        decrypted = self.cipher.decrypt(decoded)        return json.loads(decrypted.decode())        def _enforce_session_limit(self, user_id: str):        user_sessions_key = f"user:{user_id}:sessions"        session_count = self.r.zcard(user_sessions_key)                if session_count > self.max_sessions_per_user:            # Remove oldest sessions            old_sessions = self.r.zrange(                user_sessions_key, 0,                 session_count - self.max_sessions_per_user - 1            )            for old_session in old_sessions:                self.destroy_session(old_session)# Test session managementprint(f"{Fore.CYAN}=== SESSION MANAGEMENT ==={Style.RESET_ALL}\n")sm = SessionManager(r)# Create sessionsuser1_data = {'username': 'john_doe', 'role': 'admin', 'email': 'john@example.com'}device1 = {'device_type': 'desktop', 'browser': 'Chrome', 'ip_address': '192.168.1.100'}session1 = sm.create_session('USER-001', user1_data, device1)print(f"Created session: {session1[:16]}...")# Verify sessionsession_data = sm.get_session(session1)if session_data:    print(f"User: {session_data['user_data']['username']}")    print(f"Role: {session_data['user_data']['role']}")

## Part 2: JWT Token Management (15 minutes)

In [None]:
class JWTManager:    """JWT token management with Redis blacklist."""        def __init__(self, redis_client, secret_key=None):        self.r = redis_client        self.secret_key = secret_key or secrets.token_urlsafe(32)        self.algorithm = 'HS256'        self.access_token_ttl = 900  # 15 minutes        self.refresh_token_ttl = 86400  # 24 hours        def generate_tokens(self, user_id: str, user_data: Dict) -> Dict[str, str]:        """Generate access and refresh tokens."""        now = datetime.utcnow()                # Access token payload        access_payload = {            'user_id': user_id,            'username': user_data.get('username'),            'role': user_data.get('role'),            'type': 'access',            'iat': now,            'exp': now + timedelta(seconds=self.access_token_ttl),            'jti': secrets.token_hex(16)        }                # Refresh token payload        refresh_payload = {            'user_id': user_id,            'type': 'refresh',            'iat': now,            'exp': now + timedelta(seconds=self.refresh_token_ttl),            'jti': secrets.token_hex(16)        }                # Generate tokens        access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)        refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)                # Store refresh token in Redis        refresh_key = f"refresh_token:{refresh_payload['jti']}"        self.r.setex(            refresh_key,            self.refresh_token_ttl,            json.dumps({                'user_id': user_id,                'created_at': now.isoformat()            })        )                return {            'access_token': access_token,            'refresh_token': refresh_token,            'token_type': 'Bearer',            'expires_in': self.access_token_ttl        }        def verify_token(self, token: str, token_type: str = 'access') -> Optional[Dict]:        """Verify and decode token."""        try:            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])                        if payload.get('type') != token_type:                return None                        # Check if token is blacklisted            if self.is_token_blacklisted(payload['jti']):                return None                        return payload                    except jwt.ExpiredSignatureError:            return None        except jwt.InvalidTokenError:            return None        def revoke_token(self, token: str) -> bool:        """Revoke a token by adding to blacklist."""        try:            payload = jwt.decode(                token,                self.secret_key,                algorithms=[self.algorithm],                options={"verify_exp": False}            )                        exp_timestamp = payload['exp']            now_timestamp = datetime.utcnow().timestamp()            ttl = max(0, int(exp_timestamp - now_timestamp))                        if ttl > 0:                blacklist_key = f"blacklist:{payload['jti']}"                self.r.setex(blacklist_key, ttl, "revoked")                return True                        return False                    except jwt.InvalidTokenError:            return False        def is_token_blacklisted(self, jti: str) -> bool:        return self.r.exists(f"blacklist:{jti}")# Test JWT managementprint(f"{Fore.CYAN}=== JWT TOKEN MANAGEMENT ==={Style.RESET_ALL}\n")jwt_manager = JWTManager(r)# Generate tokensuser_data = {'username': 'alice', 'role': 'user'}tokens = jwt_manager.generate_tokens('USER-002', user_data)print(f"Access token: {tokens['access_token'][:50]}...")print(f"Expires in: {tokens['expires_in']} seconds")# Verify tokenpayload = jwt_manager.verify_token(tokens['access_token'])if payload:    print(f"Valid token for user: {payload['user_id']}")

## Part 3: OAuth Token Storage (10 minutes)

In [None]:
class OAuthTokenManager:    """Manage OAuth tokens and authorization codes."""        def __init__(self, redis_client):        self.r = redis_client        self.auth_code_ttl = 600  # 10 minutes        self.access_token_ttl = 3600  # 1 hour        self.refresh_token_ttl = 2592000  # 30 days        def store_authorization_code(self, code: str, client_id: str,                                 user_id: str, redirect_uri: str,                                 scope: List[str]) -> bool:        code_data = {            'client_id': client_id,            'user_id': user_id,            'redirect_uri': redirect_uri,            'scope': scope,            'created_at': datetime.now().isoformat()        }                code_key = f"oauth:code:{code}"        self.r.setex(code_key, self.auth_code_ttl, json.dumps(code_data))        return True        def exchange_code_for_tokens(self, code: str, client_id: str,                                 redirect_uri: str) -> Optional[Dict]:        code_key = f"oauth:code:{code}"        code_data_json = self.r.get(code_key)                if not code_data_json:            return None                code_data = json.loads(code_data_json)                # Validate client and redirect URI        if (code_data['client_id'] != client_id or             code_data['redirect_uri'] != redirect_uri):            return None                # Delete authorization code (single use)        self.r.delete(code_key)                # Generate tokens        access_token = secrets.token_urlsafe(32)        refresh_token = secrets.token_urlsafe(32)                # Store tokens        access_data = {            'user_id': code_data['user_id'],            'client_id': client_id,            'scope': code_data['scope'],            'created_at': datetime.now().isoformat()        }                access_key = f"oauth:access:{access_token}"        self.r.setex(access_key, self.access_token_ttl, json.dumps(access_data))                refresh_key = f"oauth:refresh:{refresh_token}"        self.r.setex(refresh_key, self.refresh_token_ttl, json.dumps(access_data))                return {            'access_token': access_token,            'refresh_token': refresh_token,            'token_type': 'Bearer',            'expires_in': self.access_token_ttl,            'scope': ' '.join(code_data['scope'])        }# Test OAuthprint(f"{Fore.CYAN}=== OAUTH TOKEN MANAGEMENT ==={Style.RESET_ALL}\n")oauth = OAuthTokenManager(r)# OAuth flowauth_code = secrets.token_urlsafe(16)client_id = "client_app_123"oauth.store_authorization_code(auth_code, client_id, "USER-003",                               "https://app.example.com/callback", ["read", "write"])tokens = oauth.exchange_code_for_tokens(auth_code, client_id,                                        "https://app.example.com/callback")if tokens:    print(f"OAuth tokens obtained successfully")    print(f"Access token: {tokens['access_token'][:20]}...")

## Part 4: Security Best Practices (10 minutes)

In [None]:
class SecurityManager:    """Implement security best practices."""        def __init__(self, redis_client):        self.r = redis_client        def implement_rate_limiting(self, identifier: str, limit: int = 10,                                window: int = 60) -> bool:        key = f"rate_limit:{identifier}:{int(time.time()/window)}"        current = self.r.incr(key)                if current == 1:            self.r.expire(key, window)                return current <= limit        def track_failed_login(self, user_id: str, ip_address: str) -> int:        # Track by user        user_key = f"failed_login:user:{user_id}"        user_attempts = self.r.incr(user_key)        self.r.expire(user_key, 3600)  # Reset after 1 hour                # Track by IP        ip_key = f"failed_login:ip:{ip_address}"        ip_attempts = self.r.incr(ip_key)        self.r.expire(ip_key, 3600)                return max(user_attempts, ip_attempts)        def check_account_lockout(self, user_id: str, max_attempts: int = 5) -> bool:        user_key = f"failed_login:user:{user_id}"        attempts = self.r.get(user_key)                if attempts and int(attempts) >= max_attempts:            lock_key = f"account_locked:{user_id}"            self.r.setex(lock_key, 1800, "locked")  # 30 minutes lock            return True                return False# Test security featuresprint(f"{Fore.CYAN}=== SECURITY BEST PRACTICES ==={Style.RESET_ALL}\n")security = SecurityManager(r)# Test rate limitingprint("Testing Rate Limiting:")for i in range(12):    allowed = security.implement_rate_limiting(f"login:192.168.1.100", limit=10)    if not allowed:        print(f"  Request {i+1}: BLOCKED (rate limit exceeded)")        break    elif i < 5:        print(f"  Request {i+1}: Allowed")print("\n✅ Session management and security completed")