Skip to content

Commit

Permalink
Bugfix, pipeline and session close improvements.
Browse files Browse the repository at this point in the history
- Queue element to inform if the pipeline has been sent.
- Closes the socket only after the last message in queue has been sent.
  • Loading branch information
mzimbres committed Dec 14, 2019
1 parent c6c1260 commit 422fd5a
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 79 deletions.
7 changes: 5 additions & 2 deletions Makefile
Expand Up @@ -8,7 +8,7 @@ CPPFLAGS += -std=c++17 -g
CPPFLAGS += -I/opt/boost_1_71_0/include
CPPFLAGS += -DBOOST_ASIO_CONCURRENCY_HINT_1=BOOST_ASIO_CONCURRENCY_HINT_UNSAFE

all: examples
all: examples tests

Makefile.dep:
-$(CXX) -MM ./*.cpp > $@
Expand All @@ -18,7 +18,10 @@ Makefile.dep:
examples: % : %.o
$(CXX) -o $@ $^ $(CPPFLAGS) -lfmt -lpthread

tests: % : %.o
$(CXX) -o $@ $^ $(CPPFLAGS) -lfmt -lpthread

.PHONY: clean
clean:
rm -f examples examples.o Makefile.dep
rm -f examples examples.o tests tests.o Makefile.dep

212 changes: 138 additions & 74 deletions aedis.hpp
Expand Up @@ -105,7 +105,7 @@ auto assemble(char const* cmd)

template <class Iter>
auto assemble( char const* cmd
, std::initializer_list<std::string const> key
, std::initializer_list<std::string> key
, Iter begin
, Iter end
, int size = 1)
Expand Down Expand Up @@ -433,14 +433,14 @@ auto publish(std::string const& key, std::string const& msg)

inline
auto set( std::string const& key
, std::initializer_list<std::string const> const& args)
, std::initializer_list<std::string> args)
{
return resp::assemble("SET", {key}, std::cbegin(args), std::cend(args));
}

inline
auto hset( std::string const& key
, std::initializer_list<std::string const> const& l)
, std::initializer_list<std::string> l)
{
return resp::assemble("HSET", {key}, std::cbegin(l), std::cend(l));
}
Expand All @@ -467,11 +467,12 @@ auto hget(std::string const& key, std::string const& field)

inline
auto hmget( std::string const& key
, std::string const& field1
, std::string const& field2)
, std::initializer_list<std::string> fields)
{
auto par = {field1, field2};
return resp::assemble("HMGET", {key}, std::cbegin(par), std::cend(par));
return resp::assemble( "HMGET"
, {key}
, std::cbegin(fields)
, std::cend(fields));
}

inline
Expand All @@ -489,7 +490,7 @@ auto zadd(std::string const& key, int score, std::string const& value)
}

template <class Key, class T, class Compare, class Allocator>
auto zadd( std::initializer_list<std::string const> key
auto zadd( std::initializer_list<std::string> key
, std::map<Key, T, Compare, Allocator> const& m)
{
return resp::assemble("ZADD", key, std::cbegin(m), std::cend(m), 2);
Expand Down Expand Up @@ -653,18 +654,24 @@ class session {
};

private:
struct queue_item {
std::string payload;
bool sent;
};

std::string id_;
config cfg_;
ip::tcp::resolver resolver_;
tcp::socket socket_;

net::steady_timer timer_;
resp::buffer buffer_;
std::queue<std::string> msg_queue_;
std::queue<queue_item> msg_queue_;
int pipeline_size_ = 0;
long long pipeline_id_ = 0;
instance instance_;
int sentinel_idx_ = 0;
bool close_requested_ = false;

msg_handler_type msg_handler_ = [this](auto ec, auto const& res)
{
Expand Down Expand Up @@ -757,7 +764,10 @@ class session {
, id_
, ec.message()
, endpoint);
run();

if (!close_requested_)
run();

return;
}

Expand Down Expand Up @@ -798,24 +808,46 @@ class session {
, "{0}/on_resp: {1}."
, id_
, ec.message());

// Some of the possible errors are.
// net::error::eof
// net::error::connection_reset
// net::error::operation_aborted
close();
run();

close_impl();
if (!close_requested_)
run();

return;
}

msg_handler_(ec, std::move(buffer_.res));

do_read_resp();
if (!close_requested_)
do_read_resp();

if (std::empty(msg_queue_)) {
if (close_requested_)
close_impl();

if (std::empty(msg_queue_))
return;
}

// In practive, the if condition below will always hold as we pop the
// last written message as soon as the first response from a pipeline
// is received. If think the code is clearer this way.
if (msg_queue_.front().sent) {
msg_queue_.pop();

if (std::empty(msg_queue_)) {
if (close_requested_)
close_impl();

return;
}

if (!std::empty(msg_queue_))
do_write();
}
}

void on_write(boost::system::error_code ec, std::size_t n)
Expand All @@ -827,85 +859,45 @@ class session {
, id_, ec.message());
return;
}

msg_queue_.pop();
}

void do_write()
{
auto f = [this](auto ec, auto n)
{ on_write(ec, n); };

assert(!std::empty(msg_queue_));
assert(!std::empty(msg_queue_.front().payload));

net::async_write( socket_
, net::buffer(msg_queue_.front())
, net::buffer(msg_queue_.front().payload)
, f);
}

public:
session( net::io_context& ioc
, config cfg
, std::string id = {})
: id_(id)
, cfg_ {std::move(cfg)}
, resolver_ {ioc}
, socket_ {ioc}
, timer_ {ioc, std::chrono::steady_clock::time_point::max()}
{
if (cfg_.max_pipeline_size < 1)
cfg_.max_pipeline_size = 1;
}
msg_queue_.front().sent = true;

session(net::io_context& ioc) : session {ioc, {}, {}} { }

void set_on_conn_handler(on_conn_handler_type f)
{ conn_handler_ = std::move(f);};

void set_msg_handler(msg_handler_type f)
{ msg_handler_ = std::move(f);};

auto send(std::string msg)
{
auto const max_pp_size_reached =
pipeline_size_ >= cfg_.max_pipeline_size;

if (max_pp_size_reached)
pipeline_size_ = 0;

auto const is_empty = std::empty(msg_queue_);

if (is_empty || std::size(msg_queue_) == 1 || max_pp_size_reached) {
msg_queue_.push(std::move(msg));
++pipeline_id_;
} else {
msg_queue_.back() += msg; // Uses pipeline.
++pipeline_size_;
// If this was the last message and the user has requested a close we
// can shutdown the socket to disable sending and receiving.
if (std::empty(msg_queue_) && close_requested_) {
boost::system::error_code ec;
socket_.shutdown(net::ip::tcp::socket::shutdown_send, ec);
if (ec) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/on_write/shutdown: {1}."
, id_
, ec.message());
}
}

if (is_empty && socket_.is_open())
do_write();

return pipeline_id_;
}

void close()
void close_impl()
{
boost::system::error_code ec;
socket_.shutdown(net::ip::tcp::socket::shutdown_send, ec);

if (ec) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/close: {1}."
, id_
, ec.message());
}

ec = {};
socket_.close(ec);
if (ec) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/close: {1}."
, "{0}/close_imp: {1}."
, id_
, ec.message());
}
Expand Down Expand Up @@ -961,6 +953,58 @@ class session {
async_get_instance(socket_, &buffer_, &instance_, std::move(g));
}

public:
session( net::io_context& ioc
, config cfg
, std::string id = {})
: id_(id)
, cfg_ {std::move(cfg)}
, resolver_ {ioc}
, socket_ {ioc}
, timer_ {ioc, std::chrono::steady_clock::time_point::max()}
{
if (cfg_.max_pipeline_size < 1)
cfg_.max_pipeline_size = 1;
}

session(net::io_context& ioc) : session {ioc, {}, {}} { }

void set_on_conn_handler(on_conn_handler_type f)
{ conn_handler_ = std::move(f);};

void set_msg_handler(msg_handler_type f)
{ msg_handler_ = std::move(f);};

auto send(std::string msg)
{
assert(!std::empty(msg));
assert(!close_requested_);

auto const max_pp_size_reached =
pipeline_size_ >= cfg_.max_pipeline_size;

if (max_pp_size_reached)
pipeline_size_ = 0;

auto const is_empty = std::empty(msg_queue_);

// When std::size(msg_queue_) == 1 we know the message in the back of
// queue has already been sent and we are waiting for a reponse, we
// cannot pipeline in this case.
if (is_empty || std::size(msg_queue_) == 1 || max_pp_size_reached) {
msg_queue_.push({std::move(msg), false});
++pipeline_id_;
} else {
msg_queue_.back().payload += msg; // Uses pipeline.
++pipeline_size_;
}

if (is_empty && socket_.is_open())
do_write();

return pipeline_id_;
}

void run()
{
auto const n = std::size(cfg_.sentinels);
Expand Down Expand Up @@ -995,6 +1039,26 @@ class session {

net::async_connect(socket_, res, f);
}

void close()
{
assert(!close_requested_);
close_requested_ = true;

// If the the message queue is empty we can close the socket
// imediately without shutdown.
if (std::empty(msg_queue_)) {
boost::system::error_code ec;
socket_.close(ec);
if (ec) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/close/close: {1}."
, id_
, ec.message());
}
}
}
};

}
Expand Down
7 changes: 4 additions & 3 deletions examples.cpp
Expand Up @@ -106,11 +106,12 @@ void example1()
+ rpush("a", a)
+ lrange("a")
+ del("a")
//+ multi()
+ multi()
+ rpush("b", b)
+ lrange("b")
+ del("b")
+ hset("c", c)
+ hmget("c", {"Name", "Education", "Job"})
+ hvals("c")
+ zadd({"d"}, d)
+ zrange("d")
Expand Down Expand Up @@ -182,9 +183,9 @@ void example3()

int main(int argc, char* argv[])
{
//example1();
example1();
//example2();
//example3();
rpush_ex();
//rpush_ex();
}

0 comments on commit 422fd5a

Please sign in to comment.