Skip to content

Commit

Permalink
add worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sewenew committed Sep 16, 2021
1 parent c34a5f6 commit 380f8b9
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 5 deletions.
26 changes: 24 additions & 2 deletions src/sw/vector-engine/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void Reactor::_on_connect(uv_stream_t *server, int status) {
if (status < 0) {
// TODO: do log instead of throw exception
//throw UvError(status, "failed to create new connection");
std::cout << "on connect error: " << uv::err_msg(status) << std::endl;
return;
}

Expand All @@ -45,10 +46,12 @@ void Reactor::_on_connect(uv_stream_t *server, int status) {
assert(reactor != nullptr);

auto [id, client] = reactor->_create_client();
std::cout << "in on connect: " << id << std::endl;
auto *cli = client.get();
if (uv_accept(server, uv::to_stream(cli)) == 0) {
uv_read_start(uv::to_stream(cli), _on_alloc, _on_read);
} else {
std::cout << "refuse to accept" << std::endl;
uv::handle_close(cli, _on_close);
}

Expand All @@ -60,6 +63,7 @@ void Reactor::_on_close(uv_handle_t *handle) {

auto *connection = uv::get_data<Connection>(handle);
assert(connection != nullptr);
std::cout << "close conneciton: " << connection->id() << std::endl;

connection->reactor()._close_client(handle);
}
Expand Down Expand Up @@ -92,6 +96,8 @@ void Reactor::_on_read(uv_stream_t *client, ssize_t nread, const uv_buf_t * /*bu
}
}
std::cout << "--------" << std::endl;

conn->reactor()._dispatch(*conn, std::move(requests));
}
} catch (const Error &e) {
// TODO: do log and send error reply
Expand Down Expand Up @@ -144,19 +150,28 @@ void Reactor::_on_write(uv_write_t *req, int status) {
delete req;
}

Reactor::Reactor(const ReactorOptions &opts) :
Reactor::Reactor(const ReactorOptions &opts, const WorkerPoolSPtr &worker_pool) :
_loop(uv::make_loop()),
_opts(opts) {
_opts(opts),
_worker_pool(worker_pool) {
_server = uv::make_tcp_server(*_loop, _opts.tcp_opts, _on_connect, this);

_stop_async = uv::make_async(*_loop, _on_stop, this);

_reply_async = uv::make_async(*_loop, _on_reply, this);

_loop_thread = std::thread([this]() { uv_run(this->_loop.get(), UV_RUN_DEFAULT); });
uv_timer_init(_loop.get(), &timer);
uv_timer_start(&timer, _on_timer, 2000, 2000);
}

void Reactor::_on_timer(uv_timer_t *handle) {
std::cout << "timer" << std::endl;
}

Reactor::~Reactor() {
_stop();

if (_loop_thread.joinable()) {
_loop_thread.join();
}
Expand Down Expand Up @@ -228,4 +243,11 @@ void Reactor::_send() {
}
}

void Reactor::_dispatch(Connection &connection, std::vector<RespCommand> requests) {
auto id = connection.id();
auto &worker = _worker_pool->fetch(id);
Task task = {std::move(requests), id, this};
worker.submit(std::move(task));
}

}
10 changes: 9 additions & 1 deletion src/sw/vector-engine/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct ReplyContext {

class Reactor {
public:
explicit Reactor(const ReactorOptions &opts);
Reactor(const ReactorOptions &opts, const WorkerPoolSPtr &worker_pool);

Reactor(const Reactor &) = delete;
Reactor& operator=(const Reactor &) = delete;
Expand All @@ -73,6 +73,8 @@ class Reactor {

static void _on_write(uv_write_t *req, int status);

static void _on_timer(uv_timer_t *handle);

void _stop();

void _notify();
Expand All @@ -89,6 +91,8 @@ class Reactor {

void _send(Reply reply);

void _dispatch(Connection &connection, std::vector<RespCommand> requests);

ReactorOptions _opts;

LoopUPtr _loop;
Expand All @@ -108,6 +112,10 @@ class Reactor {
std::mutex _mutex;

std::vector<Reply> _replies;

WorkerPoolSPtr _worker_pool;

uv_timer_t timer;
};

}
Expand Down
25 changes: 23 additions & 2 deletions src/sw/vector-engine/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ Worker::~Worker() {
}

void Worker::submit(Task task) {
std::lock_guard<std::mutex> lock(_mutex);
{
std::lock_guard<std::mutex> lock(_mutex);

_tasks.push_back(std::move(task));
}

_tasks.push_back(std::move(task));
_cv.notify_one();
}

void Worker::_run() {
Expand Down Expand Up @@ -123,4 +127,21 @@ void Worker::_send_reply(Reply reply) {
reactor->send(std::move(reply));
}

WorkerPool::WorkerPool(std::size_t num) {
if (num == 0) {
throw Error("size of worker pool must larger than 0");
}

_workers.reserve(num);
for (auto idx = 0U; idx != num; ++idx) {
_workers.push_back(std::make_unique<Worker>());
}
}

Worker& WorkerPool::fetch(uint64_t id) {
assert(_workers.size() > 0);

return *(_workers[id % _workers.size()]);
}

}
14 changes: 14 additions & 0 deletions src/sw/vector-engine/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>
#include <mutex>
#include <atomic>
#include <memory>
#include <thread>
#include <condition_variable>
#include "resp.h"
Expand Down Expand Up @@ -72,9 +73,22 @@ class Worker {
std::condition_variable _cv;

std::atomic<bool> _stop{false};
};

using WorkerUPtr = std::unique_ptr<Worker>;

class WorkerPool {
public:
explicit WorkerPool(std::size_t num);

Worker& fetch(uint64_t id);

private:
std::vector<WorkerUPtr> _workers;
};

using WorkerPoolSPtr = std::shared_ptr<WorkerPool>;

}

#endif // end SW_VECTOR_ENGINE_WORKER_H

0 comments on commit 380f8b9

Please sign in to comment.