Skip to content

xstongxue/Python-Concurrent-Module

Repository files navigation

Python 并发处理模块

这是一个高性能的Python并发处理库,提供了两种不同的并发处理方案:线程池并发和异步协程并发。

模块组成

  • concurrent_threadpool.py - 基于ThreadPoolExecutor的线程池并发处理器
  • concurrent_async_coroutine.py - 基于asyncio的异步协程并发处理器

特性

线程池模块特性

  • ✅ 基于Python标准库ThreadPoolExecutor实现
  • ✅ 支持任务结果顺序保持
  • ✅ 提供完整的错误处理机制
  • ✅ 支持进度回调功能
  • ✅ 线程安全的结果收集
  • ✅ 动态调整线程池大小
  • ✅ 提供简化版本和完整版本接口

异步协程模块特性

  • ✅ 基于asyncio异步编程模型
  • ✅ 支持泛型类型注解,类型安全
  • ✅ 队列化任务管理
  • ✅ 可自定义成功/错误/进度回调
  • ✅ 优雅的工作协程管理
  • ✅ 详细的任务处理日志

安装要求

Python 3.7+
# 无需额外依赖,使用Python标准库

快速开始

1. 线程池并发处理

from concurrent_threadpool import ConcurrentWorker, concurrent_execute

# 方式一:使用类实例
worker = ConcurrentWorker(max_workers=4)

# 简单任务处理
def square(num):
    return num * num

numbers = [1, 2, 3, 4, 5]
results = worker.execute_simple(numbers, square)
print(f"结果: {results}")  # [1, 4, 9, 16, 25]

# 方式二:使用便捷函数
results = concurrent_execute(numbers, square, max_workers=4)

2. 异步协程并发处理

import asyncio
from concurrent_async_coroutine import AsyncTaskProcessor

# 定义异步任务处理函数
async def process_url(url: str) -> str:
    # 模拟网络请求
    await asyncio.sleep(0.1)
    return f"处理完成: {url}"

# 创建处理器
processor = AsyncTaskProcessor[str, str](max_workers=3)

# 处理任务
async def main():
    urls = ["url1", "url2", "url3", "url4", "url5"]
    results = await processor.process_tasks(urls, process_url)
    print(f"处理结果: {results}")

# 运行
asyncio.run(main())

详细使用说明

线程池模块详细用法

1. 基础使用

from concurrent_threadpool import ConcurrentWorker
import time

# 创建工作器
worker = ConcurrentWorker(max_workers=4)

# 定义处理函数
def cpu_intensive_task(data):
    time.sleep(0.1)  # 模拟耗时操作
    return data ** 2

# 执行任务
tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results = worker.execute_simple(tasks, cpu_intensive_task)
print(f"结果: {results}")

2. 完整版本(带错误处理)

def task_with_error_handling(index: int, task: any) -> tuple:
    """处理函数需要返回(result, error)元组"""
    try:
        if task == 0:
            raise ValueError("不能为零")
        result = 100 / task
        return result, None
    except Exception as e:
        return None, e

tasks = [1, 2, 0, 4, 5]  # 包含会出错的数据
results, errors = worker.execute(tasks, task_with_error_handling)

print(f"结果: {results}")
print(f"错误: {errors}")

3. 带进度回调

def progress_callback(completed: int, total: int):
    percentage = (completed / total) * 100
    print(f"进度: {completed}/{total} ({percentage:.1f}%)")

results, errors = worker.execute_with_progress(
    tasks, 
    task_with_error_handling,
    progress_callback=progress_callback
)

异步协程模块详细用法

1. 基础异步处理

import asyncio
import aiohttp
from concurrent_async_coroutine import AsyncTaskProcessor

async def fetch_url(url: str) -> dict:
    """异步获取URL内容"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return {
                'url': url,
                'status': response.status,
                'content_length': len(await response.text())
            }

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2", 
        "https://httpbin.org/json"
    ]
    
    processor = AsyncTaskProcessor[str, dict](max_workers=2)
    results = await processor.process_tasks(urls, fetch_url)
    
    for result in results:
        if result:
            print(f"URL: {result['url']}, Status: {result['status']}")

asyncio.run(main())

2. 带回调的高级用法

async def on_success_callback(task: str, result: dict):
    print(f"成功处理: {task} -> {result['status']}")

async def on_error_callback(task: str, error: Exception):
    print(f"处理失败: {task} -> {str(error)}")

async def on_progress_callback(completed: int, total: int):
    print(f"进度更新: {completed}/{total}")

# 创建带回调的处理器
processor = AsyncTaskProcessor[str, dict](
    max_workers=3,
    on_success=on_success_callback,
    on_error=on_error_callback,
    on_progress=on_progress_callback
)

results = await processor.process_tasks(urls, fetch_url)

使用场景对比

场景 线程池模块 异步协程模块
CPU密集型任务 ✅ 推荐 ❌ 不推荐
I/O密集型任务 ✅ 适用 ✅ 更推荐
网络请求 ✅ 适用 ✅ 性能更好
文件处理 ✅ 推荐 ✅ 适用
数据库操作 ✅ 适用 ✅ 推荐
简单快速集成 ✅ 更简单 ⚠️ 需要异步环境

性能对比

线程池 vs 异步协程

# 性能测试示例
import time
import asyncio

# 线程池测试
def test_threadpool():
    from concurrent_threadpool import concurrent_execute
    
    def io_task(n):
        time.sleep(0.1)  # 模拟I/O
        return n * 2
    
    start = time.time()
    results = concurrent_execute(list(range(20)), io_task, max_workers=4)
    duration = time.time() - start
    print(f"线程池耗时: {duration:.2f}秒")

# 异步协程测试
async def test_async():
    from concurrent_async_coroutine import AsyncTaskProcessor
    
    async def async_io_task(n):
        await asyncio.sleep(0.1)  # 模拟异步I/O
        return n * 2
    
    start = time.time()
    processor = AsyncTaskProcessor(max_workers=4)
    results = await processor.process_tasks(list(range(20)), async_io_task)
    duration = time.time() - start
    print(f"异步协程耗时: {duration:.2f}秒")

# 运行测试
test_threadpool()
asyncio.run(test_async())

错误处理

线程池错误处理

# 自动错误捕获
results, errors = worker.execute(tasks, task_func)
for i, error in enumerate(errors):
    if error:
        print(f"任务 {i} 发生错误: {error}")

异步协程错误处理

# 通过回调处理错误
async def handle_error(task, error):
    print(f"任务 {task} 错误: {error}")
    # 可以进行重试、日志记录等操作

processor = AsyncTaskProcessor(on_error=handle_error)

最佳实践

  1. 选择合适的并发模型

    • CPU密集型:使用线程池模块
    • I/O密集型:优先考虑异步协程模块
  2. 合理设置并发数

    • 线程池:通常设置为CPU核心数的2-4倍
    • 异步协程:可以设置更高的并发数
  3. 错误处理

    • 始终处理可能的异常情况
    • 使用完整版本接口获取详细错误信息
  4. 资源管理

    • 线程池会自动管理资源
    • 异步协程注意正确关闭连接和会话

贡献

欢迎提交Issue和Pull Request来改进这个项目。

许可证

MIT License

About

Python高性能双模式并发处理架构

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages