Skip to content
Permalink
Browse files

[io] rewrite linuxaio to enqueue _multiple_ requests at once

  • Loading branch information...
bingmann committed Mar 19, 2019
1 parent 67857a7 commit c400389f72f70d50076a463e5a3ad20f03689403
Showing with 107 additions and 77 deletions.
  1. +1 −1 extlib/tlx
  2. +96 −46 foxxll/io/linuxaio_queue.cpp
  3. +8 −26 foxxll/io/linuxaio_request.cpp
  4. +1 −3 foxxll/io/linuxaio_request.hpp
  5. +1 −1 tests/mng/CMakeLists.txt
@@ -21,6 +21,7 @@

#include <algorithm>

#include <tlx/define/likely.hpp>
#include <tlx/die/core.hpp>
#include <tlx/logger/core.hpp>

@@ -82,8 +83,9 @@ void linuxaio_queue::add_request(request_ptr& req)
tlx_die("Non-LinuxAIO request submitted to LinuxAIO queue.");

std::unique_lock<std::mutex> lock(waiting_mtx_);

waiting_requests_.push_back(req);
lock.unlock();

num_waiting_requests_.signal();
}

@@ -126,11 +128,11 @@ bool linuxaio_queue::cancel_request(request_ptr& req)
if (canceled_io_operation)
{
lock.unlock();
num_free_events_.signal();

// request is canceled, already posted, but canceled in kernel
areq->completed(true, true);

num_free_events_.signal();
num_posted_requests_.wait(); // will never block
return true;
}
@@ -141,12 +143,11 @@ bool linuxaio_queue::cancel_request(request_ptr& req)
// internal routines, run by the posting thread
void linuxaio_queue::post_requests()
{
request_ptr req;
io_event* events = new io_event[max_events_];
tlx::simple_vector<io_event> events(max_events_);

for ( ; ; ) // as long as thread is running
{
// might block until next request or message comes in
// block until next request or message comes in
int num_currently_waiting_requests = num_waiting_requests_.wait();

// terminate if termination has been requested
@@ -155,68 +156,116 @@ void linuxaio_queue::post_requests()
break;

std::unique_lock<std::mutex> lock(waiting_mtx_);
if (!waiting_requests_.empty())
{
req = waiting_requests_.front();
waiting_requests_.pop_front();
if (TLX_UNLIKELY(waiting_requests_.empty())) {
// unlock queue
lock.unlock();

num_free_events_.wait(); // might block because too many requests are posted
// num_waiting_requests_-- was premature, compensate for that
num_waiting_requests_.signal();
continue;
}

// collect requests from waiting queue: first is there
std::vector<request_ptr> reqs;

request_ptr req = waiting_requests_.front();
waiting_requests_.pop_front();
reqs.emplace_back(std::move(req));

// collect additional requests
while (!waiting_requests_.empty()) {
// acquire one free event, but keep one in slack
if (!num_free_events_.try_acquire(/* delta */ 1, /* slack */ 1))
break;
if (!num_waiting_requests_.try_acquire()) {
num_free_events_.signal();
break;
}

request_ptr req = waiting_requests_.front();
waiting_requests_.pop_front();
reqs.emplace_back(std::move(req));
}

lock.unlock();

// the last free_event must be acquired outside of the lock.
num_free_events_.wait();

// construct batch iocb
tlx::simple_vector<iocb*> cbs(reqs.size());

for (size_t i = 0; i < reqs.size(); ++i) {
// polymorphic_downcast
while (!dynamic_cast<linuxaio_request*>(req.get())->post())
{
// post failed, so first handle events to make queues (more)
// empty, then try again.

// wait for at least one event to complete, no time limit
long num_events = syscall(
SYS_io_getevents, context_, 1, max_events_, events, nullptr
);
if (num_events < 0) {
FOXXLL_THROW_ERRNO(
io_error, "linuxaio_queue::post_requests"
" io_getevents() nr_events=" << num_events
);
}
auto ar = dynamic_cast<linuxaio_request*>(reqs[i].get());
cbs[i] = ar->fill_control_block();
}
reqs.clear();

// io_submit loop
size_t cb_done = 0;
while (cb_done < cbs.size()) {
long success = syscall(
SYS_io_submit, context_,
cbs.size() - cb_done,
cbs.data() + cb_done
);

handle_events(events, num_events, false);
if (success <= 0 && errno != EAGAIN) {
FOXXLL_THROW_ERRNO(
io_error, "linuxaio_request::post io_submit()"
);
}
if (success > 0) {
// request is posted
num_posted_requests_.signal(success);

// request is finally posted
num_posted_requests_.signal();
}
else
{
lock.unlock();
cb_done += success;
if (cb_done == cbs.size())
break;
}

// num_waiting_requests_-- was premature, compensate for that
num_waiting_requests_.signal();
// post failed, so first handle events to make queues (more) empty,
// then try again.

// wait for at least one event to complete, no time limit
long num_events = syscall(
SYS_io_getevents, context_, 0,
max_events_, events.data(), nullptr
);
if (num_events < 0) {
FOXXLL_THROW_ERRNO(
io_error, "linuxaio_queue::post_requests"
" io_getevents() nr_events=" << num_events
);
}
if (num_events > 0)
handle_events(events.data(), num_events, false);
}
}

delete[] events;
}

void linuxaio_queue::handle_events(io_event* events, long num_events, bool canceled)
{
// first mark all events as free
num_free_events_.signal(num_events);

for (int e = 0; e < num_events; ++e)
{
request* r = reinterpret_cast<request*>(
static_cast<uintptr_t>(events[e].data));
r->completed(canceled);
// release counting_ptr reference, this may delete the request object
r->dec_reference();
num_free_events_.signal();
num_posted_requests_.wait(); // will never block
}

num_posted_requests_.wait(num_events); // will never block
}

// internal routines, run by the waiting thread
void linuxaio_queue::wait_requests()
{
request_ptr req;
io_event* events = new io_event[max_events_];
tlx::simple_vector<io_event> events(max_events_);

for ( ; ; ) // as long as thread is running
{
@@ -232,12 +281,14 @@ void linuxaio_queue::wait_requests()
long num_events;
while (1) {
num_events = syscall(
SYS_io_getevents, context_, 1, max_events_, events, nullptr
SYS_io_getevents, context_, 1,
max_events_, events.data(), nullptr
);

if (num_events < 0) {
if (errno == EINTR) {
// io_getevents may return prematurely in case a signal is received
// io_getevents may return prematurely in case a signal is
// received
continue;
}

@@ -249,12 +300,11 @@ void linuxaio_queue::wait_requests()
break;
}

num_posted_requests_.signal(); // compensate for the one eaten prematurely above
// compensate for the one eaten prematurely above
num_posted_requests_.signal();

handle_events(events, num_events, false);
handle_events(events.data(), num_events, false);
}

delete[] events;
}

void* linuxaio_queue::post_async(void* arg)
@@ -51,47 +51,27 @@ void linuxaio_request::completed(bool posted, bool canceled)
request_with_state::completed(canceled);
}

void linuxaio_request::fill_control_block()
iocb* linuxaio_request::fill_control_block()
{
linuxaio_file* af = dynamic_cast<linuxaio_file*>(file_);

memset(&cb_, 0, sizeof(cb_));
// indirection, I/O system retains a virtual counting_ptr reference
// increment, I/O system retains a virtual counting_ptr reference
ReferenceCounter::inc_reference();

memset(&cb_, 0, sizeof(cb_));
cb_.aio_data = reinterpret_cast<__u64>(this);
cb_.aio_fildes = af->file_des_;
cb_.aio_lio_opcode = (op_ == READ) ? IOCB_CMD_PREAD : IOCB_CMD_PWRITE;
cb_.aio_reqprio = 0;
cb_.aio_buf = static_cast<__u64>(reinterpret_cast<unsigned long>(buffer_));
cb_.aio_nbytes = bytes_;
cb_.aio_offset = offset_;
}

//! Submits an I/O request to the OS
//! \returns false if submission fails
bool linuxaio_request::post()
{
TLX_LOG << "linuxaio_request[" << this << "] post()";

fill_control_block();
iocb* cb_pointer = &cb_;
// io_submit might considerable time, so we have to remember the current
// time before the call.
time_posted_ = timestamp();
linuxaio_queue* queue = dynamic_cast<linuxaio_queue*>(
disk_queues::get_instance()->get_queue(file_->get_queue_id()));

long success = syscall(SYS_io_submit, queue->get_io_context(), 1, &cb_pointer);
// At this point another thread may have already called complete(),
// so consider most values as invalidated!

if (success == -1 && errno != EAGAIN)
FOXXLL_THROW_ERRNO(
io_error, "linuxaio_request::post"
" io_submit()"
);

return success == 1;
return &cb_;
}

//! Cancel the request
@@ -118,8 +98,10 @@ bool linuxaio_request::cancel_aio(linuxaio_queue* queue)

io_event event;
long result = syscall(SYS_io_cancel, queue->get_io_context(), &cb_, &event);
if (result == 0) //successfully canceled
if (result == 0) {
// successfully canceled
queue->handle_events(&event, 1, true);
}
return result == 0;
}

@@ -44,8 +44,6 @@ class linuxaio_request : public request_with_state
iocb cb_;
double time_posted_;

void fill_control_block();

public:
linuxaio_request(
const completion_handler& on_complete,
@@ -61,7 +59,7 @@ class linuxaio_request : public request_with_state
<< " op=" << op << ")";
}

bool post();
iocb * fill_control_block();
bool cancel() final;
bool cancel_aio(linuxaio_queue* queue);
void completed(bool posted, bool canceled);
@@ -40,4 +40,4 @@ foxxll_test(test_prefetch_pool)
foxxll_test(test_read_write_pool)
foxxll_test(test_write_pool)

message(WARNING "test_pool_pair is not tested as it failed with LinuxAIO")
message(WARNING "test_pool_pair is not tested as it failed with LinuxAIO")

0 comments on commit c400389

Please sign in to comment.
You can’t perform that action at this time.