-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker2.h
183 lines (167 loc) · 4.11 KB
/
worker2.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//
// Copyright 2023 Suzuki Yoshinori(wave.suzuki.z@gmail.com)
// since: 21 Apr 2023
//
#pragma once
#include "atomic_queue.h"
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <mutex>
#include <thread>
#include <vector>
namespace SimpleWorker
{
// 単独ワーカー
class Worker
{
public:
virtual ~Worker() = default;
virtual void run() = 0;
};
// Worker Thread
class WorkerThread
{
private:
using aint64 = std::atomic_int64_t;
using count_t = std::atomic_int32_t;
// 依存性のない単独ワーカー
class WorkerSlot
{
Worker* worker = nullptr;
count_t* counter = nullptr;
count_t* reference = nullptr;
public:
void set(Worker* w, count_t* c, count_t* r)
{
worker = w;
counter = c;
reference = r;
}
void run() { worker->run(); }
count_t* getCounter() { return counter; }
count_t* getReference() { return reference; }
};
std::vector<std::thread> threadList;
std::mutex executeMutex{};
std::condition_variable executeCondition{};
std::condition_variable waitCondition{};
std::atomic_bool enabled{false};
std::atomic_int executeCount{0};
Queue<WorkerSlot> workerQueue;
Queue<WorkerSlot> stockQueue;
std::vector<WorkerSlot> workerSlot;
// ワーカー実行
void update()
{
for (;;)
{
{
// 実行開始待ち
std::unique_lock lk(executeMutex);
while (enabled && workerQueue.empty())
{
executeCondition.wait(lk);
}
}
if (enabled == false)
{
// 実行終了
break;
}
// コンシューマ実行カウンタ(=スレッド数)
executeCount.fetch_add(1);
while (auto* wslot = workerQueue.pop())
{
auto* reference = wslot->getReference();
if (reference == nullptr || reference->load() <= 0)
{
// リファレンスカウンタの状態を見てワーカー実行
wslot->run();
if (auto* counter = wslot->getCounter())
{
// カウンタが設定されていたら減算
counter->fetch_sub(1);
}
// ストックに戻す
stockQueue.push(wslot);
}
else
{
// 今回実行しなかったのでもう一度キューに積む
workerQueue.push(wslot);
}
}
executeCount.fetch_sub(1);
if (executeCount.load() == 0 && workerQueue.empty())
{
// 全ての実行終了
waitCondition.notify_all();
}
}
}
public:
//
WorkerThread(const WorkerThread&) = delete;
WorkerThread& operator=(const WorkerThread&) = delete;
//
WorkerThread(size_t nbWorker = 2000, size_t nbThread = 3) : workerQueue(nbWorker), stockQueue(nbWorker)
{
enabled = true;
threadList.resize(nbThread);
for (auto& th : threadList)
{
th = std::thread([&] { update(); });
}
// ワーカー用スロットを確保して待機用キューに全て積んでおく
workerSlot.resize(nbWorker);
for (size_t i = 0; i < nbWorker; i++)
{
stockQueue.push(&workerSlot[i]);
}
}
//
virtual ~WorkerThread() { stop(); }
//
void stop()
{
enabled = false;
execute();
for (auto& th : threadList)
{
th.join();
}
}
//
void execute() { executeCondition.notify_all(); }
//
void wait()
{
std::unique_lock<std::mutex> lk(executeMutex);
waitCondition.wait(lk, [this] { return executeCount.load() == 0 && workerQueue.empty(); });
}
// ワーカーセット
bool push(Worker& worker, count_t* cnt = nullptr)
{
if (auto* wslot = stockQueue.pop())
{
wslot->set(&worker, cnt, nullptr);
workerQueue.push(wslot);
return true;
}
return false;
}
// カウンタ依存ワーカーセット
bool push(std::atomic_int* ref, Worker& worker, count_t* cnt = nullptr)
{
if (auto* wslot = stockQueue.pop())
{
wslot->set(&worker, cnt, ref);
workerQueue.push(wslot);
return true;
}
return false;
}
};
} // namespace SimpleWorker
//