# Python网络安全练习 - Web安全基础

本练习涵盖Web安全的常见漏洞和防护技术。

**重要提示**: 这些技术仅用于教育目的和授权的安全测试。未经授权的渗透测试是违法的！

## 练习1: SQL注入检测与防护

**任务**: 理解SQL注入漏洞和如何防护

**知识点**: SQL注入是最常见的Web漏洞之一

In [None]:
import sqlite3

# 创建示例数据库
def setup_database():
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    
    # 创建用户表
    cursor.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            username TEXT,
            password TEXT,
            email TEXT
        )
    ''')
    
    # 插入测试数据
    users = [
        ('admin', 'admin123', 'admin@example.com'),
        ('user1', 'pass123', 'user1@example.com'),
        ('user2', 'pass456', 'user2@example.com')
    ]
    
    cursor.executemany('INSERT INTO users (username, password, email) VALUES (?, ?, ?)', users)
    conn.commit()
    
    return conn

# 不安全的登录函数 (存在SQL注入漏洞)
def unsafe_login(conn, username, password):
    """
    不安全的登录实现 - 容易受到SQL注入攻击
    """
    cursor = conn.cursor()
    
    # 危险! 直接拼接用户输入到SQL查询中
    query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
    print(f"执行的SQL: {query}")
    
    try:
        cursor.execute(query)
        result = cursor.fetchone()
        return result is not None
    except Exception as e:
        print(f"错误: {e}")
        return False

# 测试正常登录
conn = setup_database()
print("正常登录测试:")
print(unsafe_login(conn, 'admin', 'admin123'))

# SQL注入攻击示例
print("\nSQL注入攻击:")
# 输入: username = admin' --
# 这会注释掉密码检查部分
print(unsafe_login(conn, "admin' --", "anything"))

In [None]:
# 练习: 实现安全的登录函数
def safe_login(conn, username, password):
    """
    安全的登录实现 - 使用参数化查询防止SQL注入
    """
    cursor = conn.cursor()
    
    # TODO: 使用参数化查询(使用 ? 占位符)
    # 提示: cursor.execute("SELECT * FROM users WHERE username=? AND password=?", (username, password))
    
    pass

# 测试安全函数
conn = setup_database()
print("安全登录测试:")
# print(safe_login(conn, 'admin', 'admin123'))  # 应该返回 True
# print(safe_login(conn, "admin' --", "anything"))  # 应该返回 False

## 练习2: XSS (跨站脚本) 检测

**任务**: 检测和过滤XSS攻击载荷

**知识点**: XSS允许攻击者在受害者浏览器中执行恶意脚本

In [None]:
import re
import html

# XSS攻击载荷示例
xss_payloads = [
    "<script>alert('XSS')</script>",
    "<img src=x onerror=alert('XSS')>",
    "<iframe src='javascript:alert(1)'></iframe>",
    "<svg onload=alert('XSS')>",
    "javascript:alert('XSS')"
]

def detect_xss(user_input):
    """
    检测可能的XSS攻击
    
    参数:
        user_input: 用户输入的字符串
    
    返回:
        True 如果检测到XSS模式，否则 False
    """
    # XSS特征模式
    xss_patterns = [
        r'<script[^>]*>.*?</script>',
        r'javascript:',
        r'on\w+\s*=',  # onerror=, onclick=, etc.
        r'<iframe',
        r'<embed',
        r'<object'
    ]
    
    for pattern in xss_patterns:
        if re.search(pattern, user_input, re.IGNORECASE):
            return True
    
    return False

# 测试XSS检测
print("XSS检测测试:\n")
for payload in xss_payloads:
    is_xss = detect_xss(payload)
    print(f"输入: {payload}")
    print(f"检测结果: {'检测到XSS!' if is_xss else '安全'}\n")

In [None]:
# 练习: 实现XSS防护函数
def sanitize_html(user_input):
    """
    清理用户输入，防止XSS攻击
    
    参数:
        user_input: 用户输入
    
    返回:
        清理后的安全字符串
    """
    # TODO: 实现HTML转义
    # 提示: 使用 html.escape() 函数
    # 或者移除所有HTML标签
    
    pass

# 测试
dangerous_input = "<script>alert('XSS')</script>Hello"
# safe_output = sanitize_html(dangerous_input)
# print(f"原始输入: {dangerous_input}")
# print(f"清理后: {safe_output}")

## 练习3: 目录遍历攻击检测

**任务**: 检测和防止目录遍历攻击

**知识点**: 目录遍历允许攻击者访问应用程序目录之外的文件

In [None]:
import os

def is_path_traversal(filename):
    """
    检测路径遍历攻击
    
    参数:
        filename: 文件名或路径
    
    返回:
        True 如果检测到路径遍历，否则 False
    """
    # 危险模式
    dangerous_patterns = ['../', '..\\', '%2e%2e', '%252e']
    
    for pattern in dangerous_patterns:
        if pattern in filename.lower():
            return True
    
    return False

# 测试用例
test_paths = [
    'file.txt',  # 安全
    '../../../etc/passwd',  # 路径遍历
    '..\\..\\..\\windows\\system32\\config\\sam',  # 路径遍历
    'images/photo.jpg',  # 安全
    'downloads/../../secret.txt'  # 路径遍历
]

print("路径遍历检测:\n")
for path in test_paths:
    is_attack = is_path_traversal(path)
    print(f"路径: {path}")
    print(f"检测结果: {'危险!' if is_attack else '安全'}\n")

In [None]:
# 练习: 实现安全的文件访问函数
def safe_file_access(base_dir, requested_file):
    """
    安全地访问文件，防止目录遍历
    
    参数:
        base_dir: 允许访问的基础目录
        requested_file: 请求的文件路径
    
    返回:
        (is_safe, full_path) 元组
    """
    # TODO: 实现安全检查
    # 提示:
    # 1. 使用 os.path.join() 构建完整路径
    # 2. 使用 os.path.abspath() 获取绝对路径
    # 3. 使用 os.path.commonpath() 验证路径在允许的目录内
    
    pass

# 测试
# base = '/var/www/uploads'
# print(safe_file_access(base, 'photo.jpg'))  # 应该安全
# print(safe_file_access(base, '../../../etc/passwd'))  # 应该不安全

## 练习4: HTTP请求分析

**任务**: 分析HTTP请求头，检测可疑活动

**知识点**: 请求头可以包含有价值的安全信息

In [None]:
import requests

def analyze_http_headers(url):
    """
    分析HTTP响应头的安全特性
    
    参数:
        url: 目标URL
    
    返回:
        安全分析报告
    """
    try:
        response = requests.get(url, timeout=5)
        headers = response.headers
        
        print(f"分析URL: {url}\n")
        print("安全相关的HTTP头:\n")
        
        # 检查重要的安全头
        security_headers = {
            'X-Frame-Options': '防止点击劫持',
            'X-Content-Type-Options': '防止MIME类型嗅探',
            'X-XSS-Protection': 'XSS保护',
            'Strict-Transport-Security': 'HSTS - 强制HTTPS',
            'Content-Security-Policy': '内容安全策略',
            'Referrer-Policy': '引用策略'
        }
        
        for header, description in security_headers.items():
            if header in headers:
                print(f"✓ {header}: {headers[header]}")
                print(f"  ({description})")
            else:
                print(f"✗ {header}: 未设置")
                print(f"  ({description})")
            print()
        
        # 其他有用的头
        print("\n其他信息:")
        print(f"Server: {headers.get('Server', '未披露')}")
        print(f"X-Powered-By: {headers.get('X-Powered-By', '未披露')}")
        
    except Exception as e:
        print(f"错误: {e}")

# 示例 - 分析一个网站的安全头
# analyze_http_headers('https://www.example.com')

## 练习5: JWT (JSON Web Token) 安全

**任务**: 创建、验证和分析JWT

**知识点**: JWT常用于身份验证和授权

**注意**: 需要安装 PyJWT: `pip install pyjwt`

In [None]:
# 取消注释安装
# !pip install pyjwt

import jwt
import json
from datetime import datetime, timedelta
import base64

def create_jwt(payload, secret_key, algorithm='HS256'):
    """
    创建JWT token
    
    参数:
        payload: 载荷数据(字典)
        secret_key: 密钥
        algorithm: 签名算法
    
    返回:
        JWT token字符串
    """
    # 添加过期时间
    payload['exp'] = datetime.utcnow() + timedelta(hours=1)
    payload['iat'] = datetime.utcnow()
    
    token = jwt.encode(payload, secret_key, algorithm=algorithm)
    return token

def verify_jwt(token, secret_key, algorithms=['HS256']):
    """
    验证JWT token
    
    参数:
        token: JWT token
        secret_key: 密钥
        algorithms: 允许的算法列表
    
    返回:
        解码后的载荷，如果无效则返回None
    """
    try:
        payload = jwt.decode(token, secret_key, algorithms=algorithms)
        return payload
    except jwt.ExpiredSignatureError:
        print("Token已过期")
        return None
    except jwt.InvalidTokenError:
        print("无效的Token")
        return None

def decode_jwt_without_verification(token):
    """
    不验证签名，仅解码JWT (用于调试)
    警告: 生产环境中不要使用此方法!
    """
    parts = token.split('.')
    if len(parts) != 3:
        return None
    
    # 解码header和payload
    header = json.loads(base64.urlsafe_b64decode(parts[0] + '=='))
    payload = json.loads(base64.urlsafe_b64decode(parts[1] + '=='))
    
    return {'header': header, 'payload': payload}

# 示例使用
secret = "my_super_secret_key_2024"

# 创建token
user_data = {
    'user_id': 123,
    'username': 'admin',
    'role': 'administrator'
}

token = create_jwt(user_data, secret)
print(f"生成的JWT:\n{token}\n")

# 解码查看内容
decoded = decode_jwt_without_verification(token)
print("JWT内容 (未验证):")
print(json.dumps(decoded, indent=2))

# 验证token
print("\n验证JWT:")
verified = verify_jwt(token, secret)
if verified:
    print(f"验证成功! 用户: {verified['username']}")

In [None]:
# 练习: JWT安全漏洞 - 算法混淆攻击
# 某些JWT库允许使用"none"算法，这是一个安全漏洞

def insecure_jwt_verify(token, secret_key):
    """
    不安全的JWT验证 - 允许 'none' 算法
    """
    decoded = decode_jwt_without_verification(token)
    
    if decoded['header']['alg'] == 'none':
        # 危险! 接受无签名的token
        print("警告: 接受了无签名的JWT!")
        return decoded['payload']
    
    return verify_jwt(token, secret_key)

# TODO: 尝试创建一个使用"none"算法的JWT
# 研究为什么这是危险的

## 练习6: CSRF (跨站请求伪造) Token生成

**任务**: 实现CSRF token生成和验证机制

**知识点**: CSRF token防止恶意网站代表用户执行操作

In [None]:
import secrets
import hashlib
import time

class CSRFProtection:
    """
    CSRF保护实现
    """
    
    def __init__(self, secret_key):
        self.secret_key = secret_key
        self.tokens = {}  # 存储有效token
    
    def generate_token(self, session_id):
        """
        为特定会话生成CSRF token
        
        参数:
            session_id: 会话ID
        
        返回:
            CSRF token
        """
        # 生成随机token
        random_token = secrets.token_urlsafe(32)
        
        # 使用会话ID和密钥创建签名
        signature = hashlib.sha256(
            f"{session_id}{random_token}{self.secret_key}".encode()
        ).hexdigest()
        
        token = f"{random_token}.{signature}"
        
        # 存储token和创建时间
        self.tokens[token] = {
            'session_id': session_id,
            'created_at': time.time()
        }
        
        return token
    
    def validate_token(self, token, session_id, max_age=3600):
        """
        验证CSRF token
        
        参数:
            token: CSRF token
            session_id: 会话ID
            max_age: token最大有效期(秒)
        
        返回:
            True 如果有效，否则 False
        """
        if token not in self.tokens:
            print("Token不存在")
            return False
        
        token_data = self.tokens[token]
        
        # 检查会话ID是否匹配
        if token_data['session_id'] != session_id:
            print("会话ID不匹配")
            return False
        
        # 检查token是否过期
        if time.time() - token_data['created_at'] > max_age:
            print("Token已过期")
            del self.tokens[token]
            return False
        
        # 验证签名
        parts = token.split('.')
        if len(parts) != 2:
            return False
        
        random_token, signature = parts
        expected_signature = hashlib.sha256(
            f"{session_id}{random_token}{self.secret_key}".encode()
        ).hexdigest()
        
        if signature != expected_signature:
            print("签名验证失败")
            return False
        
        # Token有效，使用后删除(一次性使用)
        del self.tokens[token]
        return True

# 示例使用
csrf = CSRFProtection("super_secret_key")

# 模拟用户会话
session_id = "user_session_12345"

# 生成CSRF token
token = csrf.generate_token(session_id)
print(f"生成的CSRF Token: {token}\n")

# 验证token
print("验证CSRF Token:")
is_valid = csrf.validate_token(token, session_id)
print(f"Token有效: {is_valid}\n")

# 尝试重复使用token (应该失败)
print("尝试重复使用Token:")
is_valid = csrf.validate_token(token, session_id)
print(f"Token有效: {is_valid}")

## 挑战练习: Web应用安全扫描器

**任务**: 创建一个简单的Web应用安全扫描器

**要求**:
1. 检查常见的安全头
2. 测试SQL注入漏洞
3. 测试XSS漏洞
4. 检测敏感信息泄露
5. 生成安全报告

In [None]:
import requests
from urllib.parse import urljoin
import re

class WebSecurityScanner:
    """
    简单的Web安全扫描器
    """
    
    def __init__(self, target_url):
        self.target_url = target_url
        self.vulnerabilities = []
    
    def check_security_headers(self):
        """
        检查安全相关的HTTP头
        """
        print("[*] 检查安全头...")
        
        try:
            response = requests.get(self.target_url, timeout=5)
            headers = response.headers
            
            required_headers = [
                'X-Frame-Options',
                'X-Content-Type-Options',
                'Strict-Transport-Security',
                'Content-Security-Policy'
            ]
            
            for header in required_headers:
                if header not in headers:
                    self.vulnerabilities.append({
                        'type': '缺失安全头',
                        'header': header,
                        'severity': 'Medium'
                    })
                    print(f"  [-] 缺失: {header}")
                else:
                    print(f"  [+] 存在: {header}")
        
        except Exception as e:
            print(f"  [!] 错误: {e}")
    
    def test_sql_injection(self, test_param='id'):
        """
        测试简单的SQL注入
        """
        print("\n[*] 测试SQL注入...")
        
        sql_payloads = [
            "'",
            "' OR '1'='1",
            "' OR '1'='1' --",
            "1' AND '1'='1"
        ]
        
        # TODO: 实现SQL注入测试逻辑
        # 提示: 发送带有payload的请求，检查响应中的SQL错误消息
        pass
    
    def test_xss(self):
        """
        测试XSS漏洞
        """
        print("\n[*] 测试XSS...")
        
        xss_payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>"
        ]
        
        # TODO: 实现XSS测试逻辑
        pass
    
    def check_sensitive_info(self):
        """
        检查敏感信息泄露
        """
        print("\n[*] 检查敏感信息泄露...")
        
        try:
            response = requests.get(self.target_url, timeout=5)
            content = response.text
            
            # 检查常见的敏感信息模式
            patterns = {
                'API密钥': r'api[_-]?key[\s:=]+[\w-]+',
                '密码': r'password[\s:=]+\w+',
                '邮箱': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
                'AWS密钥': r'AKIA[0-9A-Z]{16}'
            }
            
            for info_type, pattern in patterns.items():
                matches = re.findall(pattern, content, re.IGNORECASE)
                if matches:
                    print(f"  [-] 发现{info_type}: {len(matches)}个")
                    self.vulnerabilities.append({
                        'type': f'{info_type}泄露',
                        'count': len(matches),
                        'severity': 'High'
                    })
        
        except Exception as e:
            print(f"  [!] 错误: {e}")
    
    def generate_report(self):
        """
        生成扫描报告
        """
        print("\n" + "="*50)
        print("安全扫描报告")
        print("="*50)
        print(f"目标: {self.target_url}")
        print(f"发现漏洞数量: {len(self.vulnerabilities)}\n")
        
        if self.vulnerabilities:
            for i, vuln in enumerate(self.vulnerabilities, 1):
                print(f"{i}. {vuln}")
        else:
            print("未发现明显漏洞")
        
        print("="*50)
    
    def scan(self):
        """
        执行完整扫描
        """
        print(f"开始扫描: {self.target_url}\n")
        
        self.check_security_headers()
        # self.test_sql_injection()
        # self.test_xss()
        self.check_sensitive_info()
        
        self.generate_report()

# 示例使用 (请使用你有权限测试的网站)
# scanner = WebSecurityScanner('https://example.com')
# scanner.scan()

## 总结

通过这些练习，你应该学会了:

1. **SQL注入**: 如何检测和防护SQL注入攻击
2. **XSS**: 跨站脚本攻击的检测和防护
3. **目录遍历**: 路径遍历漏洞的识别
4. **HTTP安全头**: 重要安全头的作用
5. **JWT安全**: Token的创建和验证
6. **CSRF防护**: 跨站请求伪造的防御机制
7. **安全扫描**: 如何构建基础的安全扫描工具

### 下一步学习建议:

- 深入学习OWASP Top 10漏洞
- 练习使用专业工具: Burp Suite, OWASP ZAP
- 参加CTF竞赛提升实战能力
- 学习安全编码最佳实践

**记住**: 永远只在授权的环境中进行安全测试！