Skip to content

Commit

Permalink
Remove queue dispatcher
Browse files Browse the repository at this point in the history
We remove the QueueDispatcher class from the library.
A new project has been created for it [0]

Also, we implement a simple algorithm for congestion
control, which consist in provide the 'congestion'
flag to reception() when there are no idle consumers
available to process queue items, and queue size is
greater than a maximum provided. The congestion
control is disabled by default.

[0] https://github.com/testillano/queuedispatcher
  • Loading branch information
testillano authored and Eduardo Ramos Testillano (eramedu) committed Jul 23, 2023
1 parent 7748eba commit c2de613
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 260 deletions.
8 changes: 8 additions & 0 deletions Dockerfile.build
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ WORKDIR /code/build
ARG make_procs=4
ARG build_type=Release
ARG ert_logger_ver=v1.0.10
ARG ert_queuedispatcher_ver=v1.0.2
ARG jupp0r_prometheuscpp_ver=v0.13.0
ARG civetweb_civetweb_ver=v1.14
ARG ert_metrics_ver=v1.0.2
Expand All @@ -23,6 +24,13 @@ RUN set -x && \
cd .. && rm -rf * && \
set +x

# ert_queuedispatcher
RUN set -x && \
wget https://github.com/testillano/queuedispatcher/archive/${ert_queuedispatcher_ver}.tar.gz && tar xvf ${ert_queuedispatcher_ver}.tar.gz && cd queuedispatcher-*/ && \
cmake -DERT_QUEUEDISPATCHER_BuildExamples=OFF -DCMAKE_BUILD_TYPE=${build_type} . && make -j${make_procs} && make install && \
cd .. && rm -rf * && \
set +x

# jupp0r prometheus-cpp
RUN if [ "${base_os}" = "alpine" ] ; then apk add zlib-dev curl-dev && rm -rf /var/cache/apk/* ; elif [ "${base_os}" = "ubuntu" ] ; then apt-get install -y zlib1g-dev libcurl4-openssl-dev && apt-get clean ; fi
RUN set -x && \
Expand Down
5 changes: 4 additions & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ base_tag__dflt=latest
make_procs__dflt=$(grep processor /proc/cpuinfo -c)
build_type__dflt=Release
ert_logger_ver__dflt=v1.0.10
ert_queuedispatcher_ver__dflt=v1.0.2
jupp0r_prometheuscpp_ver__dflt=v0.13.0
# 3rd party used by prometheus:
civetweb_civetweb_ver__dflt=v1.14
Expand All @@ -33,7 +34,7 @@ usage() {
For headless mode you may prepend or export asked/environment variables for the corresponding
docker procedure:
--builder-image: image_tag, base_os, base_tag (nghttp2), make_procs, build_type, ert_logger_ver, jupp0r_prometheuscpp_ver, civetweb_civetweb_ver, ert_metrics_ver
--builder-image: image_tag, base_os, base_tag (nghttp2), make_procs, build_type, ert_logger_ver, ert_queuedispatcher_ver, jupp0r_prometheuscpp_ver, civetweb_civetweb_ver, ert_metrics_ver
--project: make_procs, build_type, base_tag (http2comm_builder)
--project-image: image_tag, base_tag (http2comm_builder), make_procs, build_type
--auto: any of the variables above
Expand Down Expand Up @@ -80,6 +81,7 @@ build_builder_image() {
_read make_procs
_read build_type
_read ert_logger_ver
_read ert_queuedispatcher_ver
_read jupp0r_prometheuscpp_ver
_read civetweb_civetweb_ver
_read ert_metrics_ver
Expand All @@ -89,6 +91,7 @@ build_builder_image() {
bargs+=" --build-arg make_procs=${make_procs}"
bargs+=" --build-arg build_type=${build_type}"
bargs+=" --build-arg ert_logger_ver=${ert_logger_ver}"
bargs+=" --build-arg ert_queuedispatcher_ver=${ert_queuedispatcher_ver}"
bargs+=" --build-arg jupp0r_prometheuscpp_ver=${jupp0r_prometheuscpp_ver}"
bargs+=" --build-arg civetweb_civetweb_ver=${civetweb_civetweb_ver}"
bargs+=" --build-arg ert_metrics_ver=${ert_metrics_ver}"
Expand Down
3 changes: 3 additions & 0 deletions include/ert/http2comm/Http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const std::pair<int, std::string> METHOD_NOT_IMPLEMENTED (
ResponseCode::NOT_IMPLEMENTED,
"METHOD_NOT_IMPLEMENTED");

const std::pair<int, std::string> SERVICE_UNAVAILABLE (
ResponseCode::SERVICE_UNAVAILABLE,
"SERVICE_UNAVAILABLE");
}
}

27 changes: 23 additions & 4 deletions include/ert/http2comm/Http2Server.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
__________________________________________________________________________________
_________________________________________________________________________________
| _ _ _ _ ___ |
| | | | | | | | | |__ \ |
| ___ _ __| |_ __ | |__ | |_| |_ _ __ ) | __ ___ _ __ ___ _ __ ___ |
Expand Down Expand Up @@ -50,8 +50,8 @@ SOFTWARE.

#include <ert/http2comm/Stream.hpp>
#include <ert/http2comm/Http2Headers.hpp>
#include <ert/http2comm/QueueDispatcher.hpp>

#include <ert/queuedispatcher/QueueDispatcher.hpp>
#include <ert/metrics/Metrics.hpp>

namespace ert
Expand All @@ -70,7 +70,8 @@ class Http2Server
std::string api_name_{};
std::string api_version_{};
boost::asio::io_service *timers_io_service_;
QueueDispatcher *queue_dispatcher_;
ert::queuedispatcher::QueueDispatcher *queue_dispatcher_;
int max_queue_dispatcher_size_{};

nghttp2::asio_http2::server::request_cb handler();

Expand Down Expand Up @@ -114,8 +115,11 @@ class Http2Server
* @param workerThreads number of worker threads.
* @param maxWorkerThreads number of maximum worker threads which internal processing could grow to. Defaults to '0' which means that maximum equals to provided worker threads.
* @param timerIoService Optional io service to manage response delays
* @param maxQueueDispatcherSize This library implements a simple congestion control algorithm which will indicate congestion status when queue dispatcher (when used) has no
* idle consumer threads, and queue dispatcher size is over this value. Defaults to -1 which means 'no limit' to grow the queue (this probably implies response time degradation).
* So, to enable the described congestion control algorithm, provide a non-negative value.
*/
Http2Server(const std::string& name, size_t workerThreads, size_t maxWorkerThreads = 0, boost::asio::io_service *timerIoService = nullptr);
Http2Server(const std::string& name, size_t workerThreads, size_t maxWorkerThreads = 0, boost::asio::io_service *timerIoService = nullptr, int maxQueueDispatcherSize = -1 /* no limit */);
virtual ~Http2Server();

// setters
Expand All @@ -126,6 +130,21 @@ class Http2Server
*/
int busyThreads() const;

/**
* Gets the queue dispatcher size
*/
int getQueueDispacherSize() const;

/**
* Gets the queue dispatcher maximum size allowed on congestion control
* Defaults to -1 (no limit to grow the queue).
*
* This enables a simple congestion control algorithm which consist in indicate congestion when
* queue dispatcher has no idle consumer threads and also, queue size is over this specific
* value.
*/
int getMaxQueueDispacherSize() const;

/**
* Enable metrics
*
Expand Down
92 changes: 0 additions & 92 deletions include/ert/http2comm/QueueDispatcher.hpp

This file was deleted.

17 changes: 11 additions & 6 deletions include/ert/http2comm/Stream.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
__________________________________________________________________________________
_________________________________________________________________________________
| _ _ _ _ ___ |
| | | | | | | | | |__ \ |
| ___ _ __| |_ __ | |__ | |_| |_ _ __ ) | __ ___ _ __ ___ _ __ ___ |
Expand Down Expand Up @@ -44,6 +44,8 @@ SOFTWARE.
#include <memory>
#include <chrono>

#include <ert/queuedispatcher/StreamIf.hpp>

#include <boost/asio.hpp>

#include <nghttp2/asio_http2_server.h>
Expand Down Expand Up @@ -73,7 +75,7 @@ class Http2Server;
*
* @see https://gist.github.com/tatsuhiro-t/ba3f7d72d037027ae47b
*/
class Stream
class Stream : public ert::queuedispatcher::StreamIf
//class Stream : public std::enable_shared_from_this<Stream>
{
std::mutex mutex_;
Expand Down Expand Up @@ -101,9 +103,9 @@ class Stream
const nghttp2::asio_http2::server::response& res,
Http2Server *server);

Stream(const Stream&) = delete;
~Stream() = default;
Stream& operator=(const Stream&) = delete;
//Stream(const Stream&) = delete;
//~Stream() = default;
//Stream& operator=(const Stream&) = delete;

// nghttp2-asio request structure
const nghttp2::asio_http2::server::request& getReq() const {
Expand All @@ -117,8 +119,11 @@ class Stream
// append received data chunk
void appendData(const uint8_t* data, std::size_t len);

// Used by queue dispatcher:
void process(bool busyConsumers, int queueSize);

// Process reception
void process();
void reception(bool congestion = false);

// Completes the nghttp2 transaction (res.end()) with the values calculated at process()
void commit();
Expand Down
2 changes: 1 addition & 1 deletion include/ert/http2comm/URLFunctions.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
__________________________________________________________________________________
_________________________________________________________________________________
| _ _ _ _ ___ |
| | | | | | | | | |__ \ |
| ___ _ __| |_ __ | |__ | |_| |_ _ __ ) | __ ___ _ __ ___ _ __ ___ |
Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ add_library (${ERT_HTTP2COMM_TARGET_NAME} STATIC
${CMAKE_CURRENT_LIST_DIR}/Http2Headers.cpp
${CMAKE_CURRENT_LIST_DIR}/Stream.cpp
${CMAKE_CURRENT_LIST_DIR}/URLFunctions.cpp
${CMAKE_CURRENT_LIST_DIR}/QueueDispatcher.cpp
)

target_include_directories(${ERT_HTTP2COMM_TARGET_NAME}
Expand All @@ -14,6 +13,7 @@ target_include_directories(${ERT_HTTP2COMM_TARGET_NAME}

target_link_libraries(${ERT_HTTP2COMM_TARGET_NAME}
ert_logger
ert_queuedispatcher
ert_metrics
)

Expand Down
17 changes: 13 additions & 4 deletions src/Http2Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,22 @@ namespace ert
namespace http2comm
{

Http2Server::Http2Server(const std::string& name, size_t workerThreads, size_t maxWorkerThreads, boost::asio::io_service *timersIoService): name_(name), timers_io_service_(timersIoService), reception_id_(0), maximum_request_body_size_(0) {
Http2Server::Http2Server(const std::string& name, size_t workerThreads, size_t maxWorkerThreads, boost::asio::io_service *timersIoService, int maxQueueDispatcherSize):
name_(name), timers_io_service_(timersIoService), reception_id_(0), maximum_request_body_size_(0), max_queue_dispatcher_size_(maxQueueDispatcherSize) {

queue_dispatcher_ = (workerThreads > 1) ? new QueueDispatcher(name + "_queueDispatcher", workerThreads, maxWorkerThreads) : nullptr;
queue_dispatcher_ = (workerThreads > 1) ? new ert::queuedispatcher::QueueDispatcher(name + "_queueDispatcher", workerThreads, maxWorkerThreads) : nullptr;
}

int Http2Server::busyThreads() const {
return (queue_dispatcher_ ? queue_dispatcher_->busyThreads():0);
return (queue_dispatcher_ ? queue_dispatcher_->getBusyThreads():0);
}

int Http2Server::getQueueDispacherSize() const {
return (queue_dispatcher_ ? queue_dispatcher_->getSize():0);
}

int Http2Server::getMaxQueueDispacherSize() const {
return max_queue_dispatcher_size_;
}

void Http2Server::enableMetrics(ert::metrics::Metrics *metrics,
Expand Down Expand Up @@ -212,7 +221,7 @@ nghttp2::asio_http2::server::request_cb Http2Server::handler()
queue_dispatcher_->dispatch(stream);
}
else {
stream->process();
stream->reception();
stream->commit();
}
}
Expand Down
Loading

0 comments on commit c2de613

Please sign in to comment.