Skip to content

Commit

Permalink
Enable send complete callback.
Browse files Browse the repository at this point in the history
Change YASIO_ENABLE_KCP to YASIO_HAVE_KCP.
Add option: YOPT_DEFER_HANDLER.
  • Loading branch information
halx99 committed Nov 7, 2019
1 parent f4734fd commit ff37550
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Makefile
@@ -1,7 +1,7 @@
config=release
TARGET=libyasio.so

CXXFLAGS = -c -fPIC -Wall -Wno-unused-result -Wextra -Wundef -Wcast-align -Wcast-qual -Wno-old-style-cast -Wdouble-promotion -DYASIO_ENABLE_KCP=1 -std=$(cxxstd)
CXXFLAGS = -c -fPIC -Wall -Wno-unused-result -Wextra -Wundef -Wcast-align -Wcast-qual -Wno-old-style-cast -Wdouble-promotion -DYASIO_HAVE_KCP=1 -std=$(cxxstd)
CFLAGS = -c -fPIC

ifeq ($(CXX),clang++)
Expand Down
4 changes: 2 additions & 2 deletions yasio/detail/concurrent_queue.hpp
Expand Up @@ -52,7 +52,7 @@ template <typename _T, bool _Dual> class concurrent_queue : public moodycamel::R
{
public:
bool empty() const { return this->peek() == nullptr; }
void consume(size_t count, std::function<void(_T&&)>& func)
void consume(size_t count, const std::function<void(_T&&)>& func)
{
_T event;
while (count-- > 0 && this->try_dequeue(event))
Expand Down Expand Up @@ -92,7 +92,7 @@ template <typename _T> class concurrent_queue<_T, false> : public concurrent_que
template <typename _T> class concurrent_queue<_T, true> : public concurrent_queue_primitive<_T>
{
public:
void consume(size_t count, std::function<void(_T&&)>& func)
void consume(size_t count, const std::function<void(_T&&)>& func)
{
if (this->deal_.empty())
{
Expand Down
10 changes: 7 additions & 3 deletions yasio/detail/config.hpp
Expand Up @@ -58,13 +58,13 @@ SOFTWARE.
// #define YASIO_DISABLE_OBJECT_POOL 1

/*
** Uncomment or add compiler flag -DYASIO_ENABLE_KCP=1 to enable kcp support
** Uncomment or add compiler flag -DYASIO_HAVE_KCP for kcp support
** Remember, before thus, please ensure:
** 1. Execute: `git submodule update --init --recursive` to clone kcp sources.
** 1. Execute: `git submodule update --init --recursive` to clone the kcp sources.
** 2. Add yasio/kcp/ikcp.c to your build system, even through the `YASIO_HEADER_ONLY` was defined.
** pitfall: yasio kcp support is experimental currently.
*/
// #define YASIO_ENABLE_KCP 1
// #define YASIO_HAVE_KCP 1

#if defined(YASIO_HEADER_ONLY)
# define YASIO__DECL inline
Expand Down Expand Up @@ -98,6 +98,10 @@ SOFTWARE.
# define YASIO_LOGV YASIO_LOG
#endif

#define YASIO_INET_BUFFER_SIZE 65536

#define YASIO_ARRAYSIZE(A) (sizeof(A) / sizeof((A)[0]))

#include "strfmt.hpp"

#endif
108 changes: 60 additions & 48 deletions yasio/yasio.cpp
Expand Up @@ -46,7 +46,7 @@ SOFTWARE.
#include <sys/stat.h>
#include <fcntl.h>

#if defined(YASIO_ENABLE_KCP)
#if defined(YASIO_HAVE_KCP)
# include "yasio/kcp/ikcp.h"
#endif

Expand Down Expand Up @@ -175,14 +175,13 @@ static void _set_thread_name(const char* threadName)
class a_pdu
{
public:
a_pdu(std::vector<char>&& right)
{
data_ = std::move(right);
offset_ = 0;
}
std::vector<char> data_; // sending data
size_t offset_; // offset
a_pdu(std::vector<char>&& buffer, std::function<void()>&& handler)
: offset_(0), buffer_(std::move(buffer)), handler_(std::move(handler))
{}

size_t offset_; // offset
std::vector<char> buffer_; // sending data buffer
std::function<void()> handler_;
#if !defined(YASIO_USE_OBJECT_POOL)
DEFINE_CONCURRENT_OBJECT_POOL_ALLOCATION(a_pdu, 512)
#endif
Expand Down Expand Up @@ -323,17 +322,17 @@ io_transport::io_transport(io_channel* ctx, std::shared_ptr<xxsocket> sock) : ct
}

// -------------------- io_transport_posix ---------------------
void io_transport_posix::send(std::vector<char>&& data)
void io_transport_posix::write(std::vector<char>&& buffer, std::function<void()>&& handler)
{
send_queue_.emplace(std::make_shared<a_pdu>(std::move(data)));
send_queue_.emplace(std::make_shared<a_pdu>(std::move(buffer), std::move(handler)));
}
int io_transport_posix::recv(int& error)
int io_transport_posix::do_read(int& error)
{
int n = socket_->recv_i(buffer_ + offset_, sizeof(buffer_) - offset_);
error = n < 0 ? xxsocket::get_last_errno() : 0;
return n;
}
bool io_transport_posix::flush(long long& max_wait_duration)
bool io_transport_posix::do_write(long long& max_wait_duration)
{
bool ret = false;
do
Expand All @@ -344,8 +343,8 @@ bool io_transport_posix::flush(long long& max_wait_duration)
if (pv != nullptr)
{
auto v = *pv;
auto outstanding_bytes = static_cast<int>(v->data_.size() - v->offset_);
int n = socket_->send_i(v->data_.data() + v->offset_, outstanding_bytes);
auto outstanding_bytes = static_cast<int>(v->buffer_.size() - v->offset_);
int n = socket_->send_i(v->buffer_.data() + v->offset_, outstanding_bytes);
if (n == outstanding_bytes)
{ // All pdu bytes sent.
send_queue_.pop();
Expand All @@ -357,12 +356,20 @@ bool io_transport_posix::flush(long long& max_wait_duration)
socket_->local_endpoint().to_string().c_str(),
socket_->peer_endpoint().to_string().c_str());
#endif
if (v->handler_)
{
auto& owner = get_service();
if (owner.options_.deferred_handler_)
owner.handlers_.enqueue(std::move(v->handler_));
else
v->handler_();
}
}
else if (n > 0)
{
// #performance: change offset only, remain data will be send next loop.
v->offset_ += n;
outstanding_bytes = static_cast<int>(v->data_.size() - v->offset_);
outstanding_bytes = static_cast<int>(v->buffer_.size() - v->offset_);
}
else
{ // n <= 0
Expand All @@ -389,7 +396,7 @@ bool io_transport_posix::flush(long long& max_wait_duration)
return ret;
}

#if defined(YASIO_ENABLE_KCP)
#if defined(YASIO_HAVE_KCP)
// ----------------------- io_transport_kcp ------------------
io_transport_kcp::io_transport_kcp(io_channel* ctx, std::shared_ptr<xxsocket> sock)
: io_transport(ctx, sock), kcp_(nullptr)
Expand All @@ -403,12 +410,12 @@ io_transport_kcp::io_transport_kcp(io_channel* ctx, std::shared_ptr<xxsocket> so
}
io_transport_kcp::~io_transport_kcp() { ::ikcp_release(this->kcp_); }

void io_transport_kcp::send(std::vector<char>&& data)
void io_transport_kcp::write(std::vector<char>&& buffer, std::function<void()>&& /*handler*/)
{
std::lock_guard<std::recursive_mutex> lck(send_mtx_);
::ikcp_send(kcp_, data.data(), static_cast<int>(data.size()));
::ikcp_send(kcp_, buffer.data(), static_cast<int>(buffer.size()));
}
int io_transport_kcp::recv(int& error)
int io_transport_kcp::do_read(int& error)
{
char sbuf[2048];
int n = socket_->recv_i(sbuf, sizeof(sbuf));
Expand All @@ -427,7 +434,7 @@ int io_transport_kcp::recv(int& error)
error = xxsocket::get_last_errno();
return n;
}
bool io_transport_kcp::flush(long long& max_wait_duration)
bool io_transport_kcp::do_write(long long& max_wait_duration)
{
std::lock_guard<std::recursive_mutex> lck(send_mtx_);

Expand Down Expand Up @@ -489,8 +496,9 @@ void io_service::start_service(const io_hostent* channel_eps, int channel_count,
}
else
{
this->worker_id_ = std::this_thread::get_id();
this->options_.deferred_event_ = false;
this->worker_id_ = std::this_thread::get_id();
this->options_.deferred_event_ = false;
this->options_.deferred_handler_ = false;
run();
this->state_ = io_service::state::STOPPED;
cleanup();
Expand Down Expand Up @@ -553,7 +561,8 @@ void io_service::cleanup()
{
clear_transports();
clear_channels();
this->event_queue_.clear();
this->events_.clear();
this->handlers_.clear();
this->timer_queue_.clear();

unregister_descriptor(interrupter_.read_descriptor(), YEM_POLLIN);
Expand Down Expand Up @@ -596,12 +605,11 @@ void io_service::clear_transports()
transports_.clear();
}

void io_service::dispatch_events(int count)
void io_service::dispatch(int count)
{
if (!options_.on_event_)
return;

this->event_queue_.consume(count, options_.on_event_);
if (options_.on_event_)
this->events_.consume(count, options_.on_event_);
this->handlers_.consume(count, [](std::function<void()>&& func) { func(); });
}

void io_service::run()
Expand Down Expand Up @@ -661,7 +669,7 @@ void io_service::perform_transports(fd_set* fds_array, long long& max_wait_durat
for (auto iter = transports_.begin(); iter != transports_.end();)
{
auto transport = *iter;
if (do_read(transport, fds_array, max_wait_duration) && transport->flush(max_wait_duration))
if (do_read(transport, fds_array, max_wait_duration) && do_write(transport, max_wait_duration))
++iter;
else
{
Expand All @@ -684,7 +692,7 @@ void io_service::perform_transports(fd_set* fds_array, long long& max_wait_durat
for (auto iter = dgram_transports_.begin(); iter != dgram_transports_.end();)
{
auto transport = iter->second;
if (transport->flush(max_wait_duration))
if (do_write(transport, max_wait_duration))
++iter;
else
{
Expand Down Expand Up @@ -876,15 +884,16 @@ void io_service::unregister_descriptor(const socket_native_type fd, int flags)
if ((flags & YEM_POLLERR) != 0)
FD_CLR(fd, &(fds_array_[except_op]));
}
int io_service::write(transport_handle_t transport, std::vector<char> data)
int io_service::write(transport_handle_t transport, std::vector<char> buffer,
std::function<void()> handler)
{
if (transport && transport->is_open())
{
if (!data.empty())
if (!buffer.empty())
{
transport->send(std::move(data));
transport->write(std::move(buffer), std::move(handler));
this->interrupt();
return static_cast<int>(data.size());
return static_cast<int>(buffer.size());
}
return 0;
}
Expand All @@ -897,9 +906,7 @@ int io_service::write(transport_handle_t transport, std::vector<char> data)
void io_service::handle_event(event_ptr event)
{
if (options_.deferred_event_)
{
event_queue_.emplace(std::move(event));
}
events_.emplace(std::move(event));
else
options_.on_event_(std::move(event));
}
Expand Down Expand Up @@ -1165,7 +1172,7 @@ transport_handle_t io_service::allocate_transport(io_channel* ctx, std::shared_p
}
else
vp = operator new(sizeof(io_transport_posix));
#if defined(YASIO_ENABLE_KCP)
#if defined(YASIO_HAVE_KCP)
if (!(ctx->mask_ & YCM_KCP))
transport = new (vp) io_transport_posix(ctx, socket);
else
Expand Down Expand Up @@ -1203,7 +1210,7 @@ bool io_service::do_read(transport_handle_t transport, fd_set* fds_array,
int n = -1, error = EWOULDBLOCK;
if (FD_ISSET(transport->socket_->native_handle(), &(fds_array[read_op])))
{
n = transport->recv(error);
n = transport->do_read(error);
}
if (n > 0 || !SHOULD_CLOSE_0(n, error))
{
Expand All @@ -1229,7 +1236,7 @@ bool io_service::do_read(transport_handle_t transport, fd_set* fds_array,
transport->expected_packet_.reserve(
(std::min)(transport->expected_packet_size_,
YASIO_MAX_PDU_BUFFER_SIZE)); // #perfomance, avoid memory reallocte.
do_unpack(transport, transport->expected_packet_size_, n, max_wait_duration);
unpack(transport, transport->expected_packet_size_, n, max_wait_duration);
}
else if (length == 0)
{
Expand All @@ -1245,10 +1252,10 @@ bool io_service::do_read(transport_handle_t transport, fd_set* fds_array,
}
else
{ // process incompleted pdu
do_unpack(transport,
transport->expected_packet_size_ -
static_cast<int>(transport->expected_packet_.size()),
n, max_wait_duration);
unpack(transport,
transport->expected_packet_size_ -
static_cast<int>(transport->expected_packet_.size()),
n, max_wait_duration);
}
}
else
Expand All @@ -1265,8 +1272,8 @@ bool io_service::do_read(transport_handle_t transport, fd_set* fds_array,
return ret;
}

void io_service::do_unpack(transport_handle_t transport, int bytes_expected, int bytes_transferred,
long long& max_wait_duration)
void io_service::unpack(transport_handle_t transport, int bytes_expected, int bytes_transferred,
long long& max_wait_duration)
{
auto bytes_available = bytes_transferred + transport->offset_;
transport->expected_packet_.insert(transport->expected_packet_.end(), transport->buffer_,
Expand Down Expand Up @@ -1388,8 +1395,10 @@ void io_service::perform_timers()
auto timer_ctl = earliest.first;
auto& timer_cb = earliest.second;
timer_queue_.pop_back(); // pop the expired timer from timer queue

timer_cb(timer_ctl->cancelled_);
if (!options_.deferred_handler_)
this->handlers_.enqueue([=, cancelled = timer_ctl->cancelled_]() { timer_cb(cancelled); });
else
timer_cb(timer_ctl->cancelled_);
if (timer_ctl->repeated_)
{
timer_ctl->expires_from_now();
Expand Down Expand Up @@ -1587,6 +1596,9 @@ void io_service::set_option(int option, ...)
case YOPT_DEFER_EVENT:
options_.deferred_event_ = !!va_arg(ap, int);
break;
case YOPT_DEFER_HANDLER:
options_.deferred_handler_ = !!va_arg(ap, int);
break;
case YOPT_TCP_KEEPALIVE:
options_.tcp_keepalive_.onoff = 1;
options_.tcp_keepalive_.idle = va_arg(ap, int);
Expand Down

0 comments on commit ff37550

Please sign in to comment.