Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replaced queues in RPC layer with MoodyCamel's lock-free queues #380

Merged
merged 5 commits into from Feb 5, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/libclipper/CMakeLists.txt
Expand Up @@ -28,7 +28,7 @@ target_include_directories(clipper PUBLIC
PRIVATE src)


target_link_libraries(clipper boost folly zmqcpp redox pthread rapidjson spdlog base64)
target_link_libraries(clipper boost folly zmqcpp redox pthread rapidjson spdlog base64 cqueue)

# This makes the project importable from the build directory
export(TARGETS clipper FILE ClipperConfig.cmake)
Expand Down
30 changes: 15 additions & 15 deletions src/libclipper/include/clipper/rpc_service.hpp
Expand Up @@ -6,6 +6,7 @@
#include <string>
#include <vector>

#include <concurrentqueue.h>
#include <boost/bimap.hpp>
#include <redox.hpp>
#include <zmq.hpp>
Expand All @@ -15,23 +16,22 @@
#include <clipper/metrics.hpp>
#include <clipper/util.hpp>

using zmq::socket_t;
using std::string;
using std::list;
using std::shared_ptr;
using std::string;
using std::vector;
using std::list;
using zmq::socket_t;

namespace clipper {

namespace rpc {

const std::string LOGGING_TAG_RPC = "RPC";

using RPCResponse = std::pair<const int, vector<uint8_t>>;
using RPCResponse = std::pair<int, vector<uint8_t>>;
/// Tuple of zmq_connection_id, message_id, vector of messages, creation time
using RPCRequest =
std::tuple<const int, const int, const std::vector<std::vector<uint8_t>>,
const long>;
std::tuple<int, int, std::vector<std::vector<uint8_t>>, long>;

enum class RPCEvent {
SentHeartbeat = 1,
Expand Down Expand Up @@ -73,13 +73,13 @@ class RPCService {
void stop();

/*
* Send message takes ownership of the msg data because the caller cannot
* know when the message will actually be sent.
*
* \param `msg`: A vector of individual messages to send to this container.
* The messages will be sent as a single, multi-part ZeroMQ message so
* it is very efficient.
*/
* Send message takes ownership of the msg data because the caller cannot
* know when the message will actually be sent.
*
* \param `msg`: A vector of individual messages to send to this container.
* The messages will be sent as a single, multi-part ZeroMQ message so
* it is very efficient.
*/
int send_message(const std::vector<std::vector<uint8_t>> msg,
const int zmq_connection_id);

Expand All @@ -105,8 +105,8 @@ class RPCService {

void shutdown_service(socket_t &socket);
std::thread rpc_thread_;
shared_ptr<Queue<RPCRequest>> request_queue_;
shared_ptr<Queue<RPCResponse>> response_queue_;
shared_ptr<moodycamel::ConcurrentQueue<RPCRequest>> request_queue_;
shared_ptr<moodycamel::ConcurrentQueue<RPCResponse>> response_queue_;
// Flag indicating whether rpc service is active
std::atomic_bool active_;
// The next available message id
Expand Down
49 changes: 26 additions & 23 deletions src/libclipper/src/rpc_service.cpp
Expand Up @@ -4,8 +4,7 @@
#include <chrono>
#include <iostream>

#include <redox.hpp>

#include <concurrentqueue.h>
#include <clipper/config.hpp>
#include <clipper/datatypes.hpp>
#include <clipper/logging.hpp>
Expand All @@ -15,13 +14,14 @@
#include <clipper/task_executor.hpp>
#include <clipper/threadpool.hpp>
#include <clipper/util.hpp>
#include <redox.hpp>

using zmq::socket_t;
using zmq::message_t;
using zmq::context_t;
using std::shared_ptr;
using std::string;
using std::vector;
using zmq::context_t;
using zmq::message_t;
using zmq::socket_t;

namespace clipper {

Expand All @@ -30,8 +30,11 @@ namespace rpc {
constexpr int INITIAL_REPLICA_ID_SIZE = 100;

RPCService::RPCService()
: request_queue_(std::make_shared<Queue<RPCRequest>>()),
response_queue_(std::make_shared<Queue<RPCResponse>>()),
: request_queue_(std::make_shared<moodycamel::ConcurrentQueue<RPCRequest>>(
sizeof(RPCRequest) * 10000)),
response_queue_(
std::make_shared<moodycamel::ConcurrentQueue<RPCResponse>>(
sizeof(RPCResponse) * 10000)),
active_(false),
// The version of the unordered_map constructor that allows
// you to specify your own hash function also requires you
Expand Down Expand Up @@ -84,27 +87,23 @@ int RPCService::send_message(const vector<vector<uint8_t>> msg,
.count();
RPCRequest request(zmq_connection_id, id, std::move(msg),
current_time_micros);
request_queue_->push(request);
request_queue_->enqueue(request);
return id;
}

vector<RPCResponse> RPCService::try_get_responses(const int max_num_responses) {
vector<RPCResponse> responses;
for (int i = 0; i < max_num_responses; i++) {
if (auto response = response_queue_->try_pop()) {
responses.push_back(*response);
} else {
break;
}
}
return responses;
std::vector<RPCResponse> vec(max_num_responses);
size_t num_dequeued =
response_queue_->try_dequeue_bulk(vec.begin(), vec.size());
vec.resize(num_dequeued);
return vec;
}

void RPCService::manage_service(const string address) {
// Map from container id to unique routing id for zeromq
// Note that zeromq socket id is a byte vector
log_info_formatted(LOGGING_TAG_RPC, "RPC thread started at address: ",
address);
log_info_formatted(LOGGING_TAG_RPC,
"RPC thread started at address: ", address);
boost::bimap<int, vector<uint8_t>> connections;
// Initializes a map to associate the ZMQ connection IDs
// of connected containers with their metadata, including
Expand Down Expand Up @@ -133,7 +132,7 @@ void RPCService::manage_service(const string address) {
// send. If there are messages to send, don't let the poll block at all.
// If there no messages to send, let the poll block for 1 ms.
int poll_timeout = 0;
if (request_queue_->size() == 0) {
if (request_queue_->size_approx() == 0) {
poll_timeout = 1;
}
zmq_poll(items, 1, poll_timeout);
Expand Down Expand Up @@ -162,12 +161,16 @@ void RPCService::shutdown_service(socket_t &socket) {

void RPCService::send_messages(
socket_t &socket, boost::bimap<int, vector<uint8_t>> &connections) {
while (request_queue_->size() > 0) {
int queue_size = request_queue_->size_approx();
std::vector<RPCRequest> requests(queue_size);
size_t num_requests =
request_queue_->try_dequeue_bulk(requests.begin(), queue_size);
for (size_t i = 0; i < num_requests; i++) {
RPCRequest &request = requests[i];
long current_time_micros =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
RPCRequest request = request_queue_->pop();
msg_queueing_hist_->insert(current_time_micros - std::get<3>(request));
boost::bimap<int, vector<uint8_t>>::left_const_iterator connection =
connections.left.find(std::get<0>(request));
Expand Down Expand Up @@ -301,7 +304,7 @@ void RPCService::receive_message(
TaskExecutionThreadPool::submit_job(
vm, replica_id, container_ready_callback_, vm, replica_id);

response_queue_->push(response);
response_queue_->enqueue(response);
}
} break;
case MessageType::Heartbeat:
Expand Down
10 changes: 10 additions & 0 deletions src/libs/CMakeLists.txt
Expand Up @@ -58,6 +58,16 @@ target_include_directories(base64 INTERFACE
$<INSTALL_INTERFACE:base64>)
export(TARGETS base64 FILE Base64Config.cmake)

##########################################
### Concurrent Queue
##########################################

add_library(cqueue INTERFACE)
target_include_directories(cqueue INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/concurrent_queue>
$<INSTALL_INTERFACE:cqueue>)
export(TARGETS cqueue FILE ConcurrentQueueConfig.cmake)

add_subdirectory(redox)

add_subdirectory(rapidjson)