### 多线程编程

In [1]:
import threading 


In [13]:
## 查看当前线程的名称
print(threading.current_thread().name)
print(f'当前线程数：{threading.active_count()}')


MainThread
当前线程数：7


In [24]:
import threading

# 打印所有活跃线程的详细信息
for thread in threading.enumerate():
    print(f"线程名: {thread.name:<20} | ID: {thread.ident} | 是否存活: {thread.is_alive()}")


线程名: MainThread           | ID: 8610353664 | 是否存活: True
线程名: IOPub                | ID: 12954116096 | 是否存活: True
线程名: Heartbeat            | ID: 12970905600 | 是否存活: True
线程名: Thread-1 (_watch_pipe_fd) | ID: 12988768256 | 是否存活: True
线程名: Thread-2 (_watch_pipe_fd) | ID: 13005557760 | 是否存活: True
线程名: Control              | ID: 13022347264 | 是否存活: True
线程名: IPythonHistorySavingThread | ID: 13039136768 | 是否存活: True


#### 创建线程

In [21]:
## 方法1：使用Thread类
def task():
    print('threading is running')
t1 = threading.Thread(target=task, name='Thread-1')
t1.start() 

## 方法2：继承Thread类
class MyThread(threading.Thread):
    def run(self):
        print('MyThread is running')

t2 = MyThread()
t2.start()

print(f'当前线程数：{threading.active_count()}')


threading is running
MyThread is running
当前线程数：7


In [10]:
def task():
    print('threading is running')
t1 = threading.Thread(target=task, name='Thread-1')
t1.start()  ## 启动线程
t1.join()  ## 等待线程结束
print(f'线程是否存活：{t1.is_alive()}') 
print(f'当前线程对象：{threading.current_thread}')  
print(f'线程名称：{t1.name}')  ## 获取线程名称
print(f'线程ID:{t1.ident}')  ## 获取线程ID


threading is running
线程是否存活：False
当前线程对象：<function current_thread at 0x109f853f0>
线程名称：Thread-1
线程ID:13056487424


#### 多线程运行：进程池

In [None]:
import time
from concurrent.futures import ThreadPoolExecutor

def task(name, delay):
    print(f"线程 {name} 开始")
    time.sleep(delay)
    return f"{name}完成"

with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务
    future1 = executor.submit(task, "A", 2)
    future2 = executor.submit(task, "B", 1)    
    # 获取结果
    print(future1.result())  # 获取结果
    print(future2.result())



线程 A 开始
线程 B 开始
A完成
B完成


#### 单线程 vs. 多线程

In [12]:
import time

def task(n):
    print(f"任务 {n} 开始")
    time.sleep(1)   # 模拟I/O等待
    print(f"任务 {n} 完成")
    return n * n

start = time.time()

# 顺序执行5个任务
results = [task(i+1) for i in range(5)]

end = time.time()
print(f"顺序执行结果: {results}")
print(f"总耗时: {end-start:.2f}秒")




任务 1 开始
任务 1 完成
任务 2 开始
任务 2 完成
任务 3 开始
任务 3 完成
任务 4 开始
任务 4 完成
任务 5 开始
任务 5 完成
顺序执行结果: [1, 4, 9, 16, 25]
总耗时: 5.02秒


In [19]:
from concurrent.futures import ThreadPoolExecutor

def task(n):
    print(f"任务 {n} 开始")
    time.sleep(1)   # 模拟I/O等待
    print(f"任务 {n} 完成")
    return n * n

def main():
    start = time.time()
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 提交所有任务
        futures = [executor.submit(task, i+1) for i in range(5)]        
        # 收集结果
        results = [f.result() for f in futures]    
    end = time.time()
    print(f"结果: {results}")
    print(f"线程池耗时: {end-start:.2f}秒")

if __name__ == "__main__":
    main()


任务 1 开始
任务 2 开始
任务 3 开始
任务 4 开始
任务 5 开始
任务 4 完成任务 2 完成
任务 1 完成
任务 3 完成
任务 5 完成

结果: [1, 4, 9, 16, 25]
线程池耗时: 1.00秒


#### 实例：数据读入   


In [41]:
from glob import glob
import numpy as np
import time


In [114]:
for i in range(10):
    arr = np.random.rand(10000, 10000)  # 生成一个1000x1000的随机数组
    np.save(f'第7章-并发编程/data/data_{i}.npy', arr)  # 保存为.npy文件


In [115]:
paths = glob('第7章-并发编程/data/*.npy')  # 获取所有数据文件路径
paths_list = [paths[i::2] for i in range(2)]


In [154]:
def load_data(paths):
    data = []
    for path in paths:
        arr = np.load(path)  # 读取数据
        data.append(arr)
    return len(data)

if __name__ == "__main__":
    # 测试单线程读取数据
    start = time.time()
    num_data = load_data(paths)  # 测试函数
    end = time.time()
    print(f'单线程读取数据个数: {num_data}')  # 输出读取的数据个数
    print(f'顺序读取耗时: {end - start:.2f}秒')  # 输出耗时



单线程读取数据个数: 10
顺序读取耗时: 1.64秒


In [151]:
from concurrent.futures import ThreadPoolExecutor

if __name__ == "__main__":
    # 测试多线程读取数据
    paths_list = [paths[i::5] for i in range(5)]  # 将路径分成5份
    start = time.time()
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(load_data, paths) for paths in paths_list]        
        results = [f.result() for f in futures]    
        results = sum(results)  # 合并结果
    end = time.time()
    print(f'多线程读取数据个数: {results}')  # 输出读取的数据个数
    print(f'多线程读取耗时: {end - start:.2f}秒')



多线程读取数据个数: 10
多线程读取耗时: 0.97秒
