# TOCTOU 竞态条件学习笔记

## 什么是TOCTOU？

**TOCTOU** = **Time-of-Check-Time-of-Use**（检查时间-使用时间）

这是一种经典的竞态条件安全漏洞，发生在程序：
1. **检查**某个条件或资源状态（Time of Check）
2. **使用**该资源（Time of Use）

在这两个操作之间存在时间窗口，其他进程可能会改变资源状态，导致安全问题。

## 今天的实际案例

在我们的Chrome插件项目中发现了TOCTOU问题：

```javascript
// ❌ 有问题的代码（存在竞态条件）
const initializeOutputDir = async () => {
  try {
    await fs.access(outputDir);  // 1. 检查目录是否存在
  } catch (error) {
    await fs.mkdir(outputDir, { recursive: true });  // 2. 创建目录
  }
};

// ✅ 修复后的代码（原子操作）
const initializeOutputDir = async () => {
  try {
    await fs.mkdir(outputDir, { recursive: true });  // 直接创建，幂等操作
    console.log('确保 captured 目录存在');
  } catch (error) {
    console.error(`创建目录失败: ${outputDir}`, error);
    process.exit(1);
  }
};
```

# 1. 理解TOCTOU漏洞

## TOCTOU的核心问题

TOCTOU漏洞的根本原因是**非原子性操作**：

1. **时间窗口**：检查和使用之间存在时间间隔
2. **状态变化**：其他进程可能在此期间修改资源状态
3. **竞态条件**：多个进程竞争访问同一资源

## 常见场景

- 文件权限检查后写入
- 目录存在性检查后创建
- 资源可用性检查后使用
- 锁状态检查后获取

In [None]:
import os
import time
import threading
from pathlib import Path

# 2. 简单的文件操作TOCTOU示例

def vulnerable_file_operation(filename):
    """❌ 有漏洞的文件操作示例"""
    print(f"开始处理文件: {filename}")
    
    # 检查文件是否存在
    if os.path.exists(filename):
        print(f"✓ 文件存在检查通过: {filename}")
        
        # 模拟处理时间（这里就是攻击窗口！）
        time.sleep(0.1)
        
        # 尝试读取文件
        try:
            with open(filename, 'r') as f:
                content = f.read()
                print(f"文件内容: {content[:50]}...")
                return content
        except FileNotFoundError:
            print(f"❌ 错误：文件在检查后被删除了！{filename}")
            return None
    else:
        print(f"文件不存在: {filename}")
        return None

# 创建测试文件
test_file = "test_toctou.txt"
with open(test_file, 'w') as f:
    f.write("这是一个测试TOCTOU的文件内容")

print("=== 正常情况 ===")
result = vulnerable_file_operation(test_file)

# 清理
if os.path.exists(test_file):
    os.remove(test_file)

In [None]:
# 3. 竞态条件演示

def simulate_race_condition():
    """模拟竞态条件攻击"""
    test_file = "race_condition_test.txt"
    
    # 创建测试文件
    with open(test_file, 'w') as f:
        f.write("原始内容")
    
    def victim_thread():
        """受害者线程 - 执行有漏洞的操作"""
        print("🎯 受害者: 开始执行有漏洞的操作")
        
        # 检查文件是否存在
        if os.path.exists(test_file):
            print("🎯 受害者: 文件存在检查通过")
            
            # 这里是攻击窗口！
            time.sleep(0.2)  # 模拟处理时间
            
            # 尝试使用文件
            try:
                with open(test_file, 'r') as f:
                    content = f.read()
                    print(f"🎯 受害者: 成功读取内容: {content}")
            except FileNotFoundError:
                print("🎯 受害者: ❌ 文件在检查后消失了！")
        else:
            print("🎯 受害者: 文件不存在")
    
    def attacker_thread():
        """攻击者线程 - 在时间窗口内删除文件"""
        time.sleep(0.1)  # 等待受害者完成检查
        print("🔥 攻击者: 在检查和使用之间删除文件")
        
        if os.path.exists(test_file):
            os.remove(test_file)
            print("🔥 攻击者: 文件已删除")
    
    # 启动两个线程
    victim = threading.Thread(target=victim_thread)
    attacker = threading.Thread(target=attacker_thread)
    
    victim.start()
    attacker.start()
    
    victim.join()
    attacker.join()
    
    # 清理
    if os.path.exists(test_file):
        os.remove(test_file)
    
    print("竞态条件演示完成")

print("=== 演示竞态条件攻击 ===")
simulate_race_condition()

In [None]:
# 4. 多线程环境下的TOCTOU

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class SharedCounter:
    """共享计数器 - 模拟共享资源"""
    def __init__(self):
        self.value = 0
        self.max_value = 10
    
    def vulnerable_increment(self, thread_id):
        """❌ 有漏洞的递增操作"""
        # 检查是否可以递增
        if self.value < self.max_value:
            print(f"线程 {thread_id}: 检查通过，当前值 {self.value}")
            
            # 模拟处理时间（攻击窗口）
            time.sleep(0.01)
            
            # 执行递增
            old_value = self.value
            self.value += 1
            print(f"线程 {thread_id}: 从 {old_value} 递增到 {self.value}")
            
            # 检查是否超出限制
            if self.value > self.max_value:
                print(f"❌ 线程 {thread_id}: 超出最大值限制！{self.value} > {self.max_value}")
                return False
            return True
        else:
            print(f"线程 {thread_id}: 已达到最大值，无法递增")
            return False

def test_concurrent_access():
    """测试并发访问共享资源"""
    counter = SharedCounter()
    
    def worker(thread_id):
        for i in range(3):
            counter.vulnerable_increment(f"{thread_id}-{i}")
            time.sleep(0.05)
    
    print("=== 多线程TOCTOU测试 ===")
    
    # 启动多个线程
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(worker, i) for i in range(5)]
        
        # 等待所有线程完成
        for future in futures:
            future.result()
    
    print(f"最终计数器值: {counter.value} (最大值: {counter.max_value})")
    
    if counter.value > counter.max_value:
        print("❌ 检测到TOCTOU漏洞：计数器超出了预期的最大值！")
    else:
        print("✓ 计数器值在预期范围内")

test_concurrent_access()

In [None]:
# 5. 防范技术：原子操作

import threading
import fcntl  # 文件锁（Linux/macOS）
import tempfile
import os

class SecureCounter:
    """安全的计数器实现"""
    def __init__(self):
        self.value = 0
        self.max_value = 10
        self.lock = threading.Lock()  # 互斥锁
    
    def secure_increment(self, thread_id):
        """✅ 安全的递增操作"""
        with self.lock:  # 原子操作
            if self.value < self.max_value:
                old_value = self.value
                self.value += 1
                print(f"线程 {thread_id}: 安全递增 {old_value} -> {self.value}")
                return True
            else:
                print(f"线程 {thread_id}: 已达到最大值")
                return False

def secure_file_operation(filename):
    """✅ 安全的文件操作"""
    try:
        # 直接尝试打开文件，使用异常处理而不是预检查
        with open(filename, 'r') as f:
            content = f.read()
            print(f"✅ 安全读取文件: {filename}")
            return content
    except FileNotFoundError:
        print(f"文件不存在: {filename}")
        return None
    except PermissionError:
        print(f"没有权限访问: {filename}")
        return None

def secure_directory_creation(dir_path):
    """✅ 安全的目录创建（类似我们修复的代码）"""
    try:
        # 直接创建，使用幂等操作
        os.makedirs(dir_path, exist_ok=True)
        print(f"✅ 确保目录存在: {dir_path}")
        return True
    except OSError as e:
        print(f"❌ 创建目录失败: {e}")
        return False

# 测试安全实现
print("=== 测试安全的计数器 ===")
secure_counter = SecureCounter()

def secure_worker(thread_id):
    for i in range(3):
        secure_counter.secure_increment(f"{thread_id}-{i}")
        time.sleep(0.01)

# 并发测试
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(secure_worker, i) for i in range(5)]
    for future in futures:
        future.result()

print(f"安全计数器最终值: {secure_counter.value}")

print("\n=== 测试安全的目录创建 ===")
test_dir = "test_secure_dir"
secure_directory_creation(test_dir)

# 清理
if os.path.exists(test_dir):
    os.rmdir(test_dir)

# 6. 安全文件处理最佳实践

## 🎯 核心原则

### 1. 避免检查-使用模式
```python
# ❌ 不安全：分离的检查和使用
if os.path.exists(filename):
    with open(filename, 'r') as f:  # 可能在此时文件被删除
        content = f.read()

# ✅ 安全：直接尝试使用
try:
    with open(filename, 'r') as f:
        content = f.read()
except FileNotFoundError:
    # 处理文件不存在的情况
```

### 2. 使用原子操作
```python
# ❌ 非原子操作
if not os.path.exists(directory):
    os.mkdir(directory)

# ✅ 原子操作（幂等）
os.makedirs(directory, exist_ok=True)
```

### 3. 使用锁机制
```python
import threading

lock = threading.Lock()

def safe_operation():
    with lock:  # 确保操作的原子性
        # 检查和使用在同一个锁保护下
        pass
```

## 🔧 Node.js/JavaScript 中的实践

```javascript
// ❌ 有TOCTOU问题
const fs = require('fs').promises;

async function vulnerable() {
    try {
        await fs.access(dir);  // 检查
    } catch {
        await fs.mkdir(dir);   // 使用
    }
}

// ✅ 安全的方式
async function secure() {
    try {
        await fs.mkdir(dir, { recursive: true });  // 幂等操作
    } catch (error) {
        if (error.code !== 'EEXIST') {
            throw error;
        }
    }
}
```

## 📝 关键要点总结

1. **消除时间窗口**：合并检查和使用操作
2. **使用幂等操作**：重复执行不会产生副作用
3. **异常处理优于预检查**：EAFP > LBYL
4. **原子操作**：不可分割的操作序列
5. **锁机制**：保护临界区资源

In [None]:
# 7. 实际修复案例：Chrome插件项目

def demonstrate_our_fix():
    """演示我们在Chrome插件项目中的实际修复"""
    
    print("=== 我们项目的TOCTOU修复案例 ===")
    
    # 模拟Node.js的fs.mkdir行为
    def fs_mkdir_simulation(path, recursive=True):
        """模拟fs.mkdir的幂等行为"""
        try:
            os.makedirs(path, exist_ok=recursive)
            return True
        except OSError:
            return False
    
    # ❌ 原来的代码逻辑
    def old_vulnerable_approach(output_dir):
        print("❌ 原来的方法（有TOCTOU问题）:")
        print("1. 检查目录是否存在")
        exists = os.path.exists(output_dir)
        print(f"   目录存在: {exists}")
        
        if not exists:
            print("2. 创建目录")
            os.makedirs(output_dir, exist_ok=True)
            print("   目录已创建")
        else:
            print("2. 目录已存在，跳过创建")
    
    # ✅ 修复后的代码逻辑  
    def new_secure_approach(output_dir):
        print("✅ 修复后的方法（原子操作）:")
        print("1. 直接创建目录（幂等操作）")
        try:
            os.makedirs(output_dir, exist_ok=True)
            print("   确保目录存在（无论之前是否存在）")
        except OSError as e:
            print(f"   创建失败: {e}")
    
    test_dir = "captured_demo"
    
    print("\n演示修复前的方法:")
    old_vulnerable_approach(test_dir)
    
    # 清理
    if os.path.exists(test_dir):
        os.rmdir(test_dir)
    
    print("\n演示修复后的方法:")
    new_secure_approach(test_dir)
    
    print("\n再次运行修复后的方法（测试幂等性）:")
    new_secure_approach(test_dir)
    
    # 清理
    if os.path.exists(test_dir):
        os.rmdir(test_dir)

demonstrate_our_fix()

print("\n" + "="*50)
print("🎓 学习总结")
print("="*50)
print("✅ TOCTOU是检查-使用之间的时间窗口问题")
print("✅ 解决方案：使用原子操作和幂等函数")  
print("✅ Node.js中fs.mkdir({recursive: true})是安全的")
print("✅ 异常处理优于预检查（EAFP > LBYL）")
print("✅ 消除时间窗口是关键")
print("="*50)