Skip to content

Commit bd6868f

Browse files
committed
修改完善更新线程池实现的内容与解释 Mq-b#12
1 parent bfdb4a2 commit bd6868f

File tree

1 file changed

+113
-21
lines changed

1 file changed

+113
-21
lines changed

md/详细分析/04线程池.md

Lines changed: 113 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ int main() {
119119
120120
- 线程池中的线程会从任务队列中取出任务并执行,任务执行完毕后,线程继续取下一个任务或者休眠。
121121
122-
- 调用 join 方法等待所有任务执行完毕并关闭线程池(`thread_pool` 析构同样会调用它,非必要)
122+
- 调用 join 方法等待所有任务执行完毕并关闭线程池。
123123
124124
如果我们不自己指明线程池的线程数量,那么 Asio 会根据函数 [`default_thread_pool_size`](https://github.com/boostorg/asio/blob/44238d033e1503c694782925d647811380a067c2/include/boost/asio/impl/thread_pool.ipp#L53-L58) 计算并返回一个**线程池的默认线程数量**。它根据系统的硬件并发能力来决定使用的线程数,通常是硬件并发能力的两倍。
125125
@@ -151,12 +151,26 @@ thread_pool::~thread_pool()
151151
}
152152
```
153153

154-
- `stop` :修改内部的标志位存在使得线程池能够识别何时需要停止接收新的任务,然后唤醒所有线程。
154+
- `stop` :修改内部的标志位存在使得线程池能够识别何时需要停止接收新的任务,以及关闭还没开始执行的任务,然后唤醒所有线程。
155155
- `join()` :等待所有线程完成它们的工作,确保所有线程都已终止。
156156
- `shutdown()` :进行最终的清理,释放资源,确保线程池的完全清理和资源的正确释放
157157

158158
> 此处可阅读部分源码,帮助理解与记忆
159159
160+
析构函数先调用了 `stop()` ,然后再进行 `join()` 。那如果我们没有提前显式调用 `join()` 成员函数,**可能导致一些任务没有执行,析构函数并不会等待所有任务执行完毕**
161+
162+
```cpp
163+
boost::asio::thread_pool pool{ 4 };
164+
165+
for (int i = 0; i < 10; ++i) {
166+
boost::asio::post(pool, [i]() { print_task(i); });
167+
}
168+
```
169+
170+
> [运行](https://godbolt.org/z/haPqKb1h7)测试。
171+
172+
因为析构函数并不是堵塞执行完所有任务,而是先**停止**,再 `join()` 以及 `shutdown()`
173+
160174
`Boost.Asio` 提供的线程池使用十分简单,接口高度封装,几乎无需关心底层具体实现,易于使用。
161175

162176
我们的操作几乎只需创建线程池对象、将任务加入线程池、在需要时调用 `join()`
@@ -257,7 +271,7 @@ threadPool->start([=]{
257271
258272
## 实现线程池
259273
260-
实现一个普通的能够满足日常开发需求的线程池实际上非常简单,也只需要一百多行代码
274+
实现一个普通的能够满足日常开发需求的线程池实际上非常简单,只需要不到一百行代码
261275
262276
> - “*普通的能够满足日常开发需*求的”
263277
>
@@ -315,14 +329,30 @@ public:
315329
: stop_{ false }, num_threads_{ num_thread } {
316330
start();
317331
}
318-
~ThreadPool(){
332+
333+
~ThreadPool() {
319334
stop();
335+
join();
336+
}
337+
338+
void stop() {
339+
stop_.store(true);
340+
cv_.notify_all();
341+
}
342+
343+
void join() {
344+
for (auto& thread : pool_) {
345+
if (thread.joinable()) {
346+
thread.join();
347+
}
348+
}
349+
pool_.clear();
320350
}
321351

322352
template<typename F, typename... Args>
323-
std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>> submit(F&& f, Args&&...args){
353+
std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>> submit(F&& f, Args&&...args) {
324354
using RetType = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
325-
if (stop_.load()){
355+
if (stop_.load()) {
326356
throw std::runtime_error("ThreadPool is stopped");
327357
}
328358

@@ -338,41 +368,103 @@ public:
338368
return ret;
339369
}
340370

341-
void stop(){
342-
stop_.store(true);
343-
cv_.notify_all();
344-
for (auto& thread : pool_){
345-
if (thread.joinable()) {
346-
thread.join();
347-
}
348-
}
349-
}
350-
351-
void start(){
352-
for (std::size_t i = 0; i < num_threads_; ++i){
371+
void start() {
372+
for (std::size_t i = 0; i < num_threads_; ++i) {
353373
pool_.emplace_back([this] {
354374
while (!stop_) {
355375
Task task;
356376
{
357377
std::unique_lock<std::mutex> lc{ mutex_ };
358-
cv_.wait(lc, [this] {return stop_ || !tasks_.empty(); });
359378
if (tasks_.empty())
360379
return;
380+
cv_.wait(lc, [this] {return stop_ || !tasks_.empty(); });
361381
task = std::move(tasks_.front());
362382
tasks_.pop();
363383
}
364384
task();
365385
}
366-
});
386+
});
367387
}
368388
}
369389

370390
private:
371391
std::mutex mutex_;
372392
std::condition_variable cv_;
373-
std::atomic<bool> stop_ ;
393+
std::atomic<bool> stop_;
374394
std::atomic<std::size_t> num_threads_;
375395
std::queue<Task> tasks_;
376396
std::vector<std::thread> pool_;
377397
};
378398
```
399+
400+
**标头依赖**
401+
402+
```cpp
403+
#include <iostream>
404+
#include <thread>
405+
#include <mutex>
406+
#include <condition_variable>
407+
#include <future>
408+
#include <atomic>
409+
#include <queue>
410+
#include <vector>
411+
#include <syncstream>
412+
#include <functional>
413+
```
414+
415+
**测试 demo**
416+
417+
```cpp
418+
void print_task(int n) {
419+
std::osyncstream{ std::cout } << "Task " << n << " is running." << std::endl;
420+
}
421+
void print_task2(int n) {
422+
std::osyncstream{ std::cout } << "🐢🐢🐢 " << n << " 🐉🐉🐉" << std::endl;
423+
}
424+
425+
int main() {
426+
ThreadPool pool{ 4 }; // 创建一个有 4 个线程的线程池
427+
428+
for (int i = 0; i < 10; ++i) {
429+
pool.submit(print_task, i);
430+
}
431+
pool.join(); // 确保执行完所有任务
432+
433+
std::puts("---------------------");
434+
435+
pool.start(); // 重新启动线程池
436+
437+
for (int i = 0; i < 10; ++i) {
438+
pool.submit(print_task2, i);
439+
}
440+
pool.join(); // 确保执行完所有任务
441+
} // 析构自动 stop() join()
442+
```
443+
444+
**可能的[运行结果](https://godbolt.org/z/rM6vz1s9v)**:
445+
446+
```shell
447+
Task 1 is running.
448+
Task 2 is running.
449+
Task 0 is running.
450+
Task 3 is running.
451+
Task 4 is running.
452+
Task 5 is running.
453+
Task 6 is running.
454+
Task 7 is running.
455+
Task 8 is running.
456+
Task 9 is running.
457+
---------------------
458+
🐢🐢🐢 0 🐉🐉🐉
459+
🐢🐢🐢 2 🐉🐉🐉
460+
🐢🐢🐢 1 🐉🐉🐉
461+
🐢🐢🐢 5 🐉🐉🐉
462+
🐢🐢🐢 7 🐉🐉🐉
463+
🐢🐢🐢 3 🐉🐉🐉
464+
🐢🐢🐢 4 🐉🐉🐉
465+
🐢🐢🐢 6 🐉🐉🐉
466+
🐢🐢🐢 8 🐉🐉🐉
467+
🐢🐢🐢 9 🐉🐉🐉
468+
```
469+
470+
如果不自己显式调用 `join()` ,而是等待线程池析构调用,那么效果如同 `asio::thread_pool`,会先进行 `stop`,导致一些任务不执行。

0 commit comments

Comments
 (0)