Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: add io queue manager #16454

Merged
merged 15 commits into from
Feb 8, 2024
Merged

io: add io queue manager #16454

merged 15 commits into from
Feb 8, 2024

Conversation

dotnwat
Copy link
Member

@dotnwat dotnwat commented Feb 2, 2024

Adds io_queue which manages and dispatches submitted reads and writes. It's underlying file description can safely be closed and re-opened at anytime which forms the basis for enforcing open-file limits (next PR) across all io_queues.

Reviewers will observe the complete lack of I/O optimization opportunities such as combining adjacent physical pages into larger I/O requests. The other feature lacking in this version which we will want is some amount of support for retrying failed I/Os and applying backpressure for certain scenarios such as ENOSPC, but that isn't in this version. Failures opening and closing are not fatal, and are used by the scheduler (next PR) to deal with transient issues opening and closing io queue files.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.3.x
  • v23.2.x
  • v23.1.x

Release Notes

  • none

The two intial flags, read and write, will be used in later commits by
the io queue.

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Comment on lines +182 to +185
if (page.test_flag(page::flags::queued)) {
pending_.push_back(page);
cond_.signal();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: it looks like we clear the queued bit before calling dispatch_write. When will this code path be taken?

Why doesn't a similar code path exist in dispatch_read?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. Added comments explaining this:

commit 049eaf3b462551d89e879dfbfe7b51fffd413107 (HEAD -> io-updates)
Author: Noah Watkins <noahwatkins@gmail.com>
Date:   Tue Feb 6 12:12:14 2024 -0800

    io: add comment explaining read/write re-queueing

    Signed-off-by: Noah Watkins <noahwatkins@gmail.com>

diff --git a/src/v/io/io_queue.cc b/src/v/io/io_queue.cc
index f78210015e..6bf1bc477a 100644
--- a/src/v/io/io_queue.cc
+++ b/src/v/io/io_queue.cc
@@ -157,6 +157,10 @@ void io_queue::dispatch_read(
             ->dma_read(
               page.offset(), page.data().get_write(), page.data().size())
             .then([this, &page, units = std::move(units)](auto) {
+                /*
+                 * unlike writes, reads are not requeued. the caller should
+                 * handle read/write coherency.
+                 */
                 running_.erase(request_list_type::s_iterator_to(page));
                 if (complete_) {
                     complete_(page);
@@ -179,6 +183,12 @@ void io_queue::dispatch_write(
             ->dma_write(page.offset(), page.data().get(), page.data().size())
             .then([this, &page, units = std::move(units)](auto) {
                 running_.erase(request_list_type::s_iterator_to(page));
+                /*
+                 * the queued flag is cleared before dispatch_write is invoked,
+                 * but while the write is in flight, submit_write may be called
+                 * again. for example, if more data was written into the page
+                 * and the page needs to be written again. it's requeued here.
+                 */
                 if (page.test_flag(page::flags::queued)) {
                     pending_.push_back(page);
                     cond_.signal();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding an explanation, this makes sense.

A queued write request for a page is put into the pending_ queue in continuation after dma_write, instead of doing this within submit_write when the request is first seen. Is this done to ensure some sort of serialization of writes?

Or is it because we cannot keep the page in both pending and running intrusive lists at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A queued write request for a page is put into the pending_ queue in continuation after dma_write, instead of doing this within submit_write when the request is first seen.

It is also added to the pending list in submit_write (via enqueue_pending). See here:
https://github.com/redpanda-data/redpanda/pull/16454/files#diff-7658d766d4b0e6184a27d049ab169d7b8ad7e53e56e478c6c8b345db7e15559eR96

Is this done to ensure some sort of serialization of writes?

There is no ordering guarantees. Submitted I/Os can be serviced in any order.

Or is it because we cannot keep the page in both pending and running intrusive lists at the same time?

This is true, there is only one intrusive list hook in the page.

More broadly, what is going on here is that pages are shared between the caller and the I/O queue. When data arrives into a page and the page is submitted to the I/O queue for write-back we need to be able to know if the newly arrived data will be covered by the next dma_write or if a dma_write was racing with the submission, should the page be re-inserted for write-back so that the newly arrived data will also be written to disk.

Comment on lines 144 to 145
assert(false);
std::terminate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe this is a placeholder/never supposed to execute but should there be a vassert or vlog here to add some context on why this will fail? The std::terminate(); statement is unreachable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The std::terminate(); statement is unreachable.

Well in debug mode assert is on right? Anyways +1 to vassert here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added vassert for now. This is super hot path and vassert is slow, but I can let perf results guide us :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if it's the super hot path we should not be using coroutines right? :)

+1 to profiling results guiding us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the dispatch loop can be coroutine and have a hot path inside of it. I thought i had a tight loop over the request queue intrusive list, but apparently not. As written currently you're right it's slower than necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just add an extra if (...) to avoid vassert performance penalty (and we should probably fix vassert if it has measurable impact).

if (!cond) {
  vassert(cond, "err");
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure why adding a conditional would help vassert is already a conditional like if (cond)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this and assumed that the check is slow for some reason

This is super hot path and vassert is slow

page& page, seastar::semaphore_units<> units) noexcept {
log.debug("Reading {} at {}~{}", path_, page.offset(), page.data().size());

std::ignore = seastar::with_gate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ssx::background?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yeh good call. we hard crash if there is an error, so i didn't see any value in adding code to swallow exceptions. in any case, better to be perf profile driven.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just suggesting to replace std::ignore with ssx::background which is essentially the same thing, but ssx::background allows us to track all the places we background work... (maybe that's not important tho).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allows us to track all the places we background work... (maybe that's not important tho).

it is important, for sure

page& page, seastar::semaphore_units<> units) noexcept {
log.debug("Writing {} at {}~{}", path_, page.offset(), page.data().size());

std::ignore = seastar::with_gate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ssx::background?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

void start() noexcept;

/**
* Stops the I/O queue's background dispatcher.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must one call close first before stop?

Copy link
Member Author

@dotnwat dotnwat Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but it is recommend. Added a comment:

diff --git a/src/v/io/io_queue.h b/src/v/io/io_queue.h
index baf819e9b9..9b9a6b6692 100644
--- a/src/v/io/io_queue.h
+++ b/src/v/io/io_queue.h
@@ -68,6 +68,11 @@ public:

     /**
      * Stops the I/O queue's background dispatcher.
+     *
+     * It is recommended that close() be invoked prior to stopping the queue.
+     * While Seastar will close the file handle automatically, ensuring that the
+     * queue is drained before the file handle is closed will avoid errors
+     * within Seastar related to closing file handles with pending I/O.
      */
     seastar::future<> stop() noexcept;

src/v/io/io_queue.cc Outdated Show resolved Hide resolved
Comment on lines 144 to 145
assert(false);
std::terminate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The std::terminate(); statement is unreachable.

Well in debug mode assert is on right? Anyways +1 to vassert here

*
* Requires that opened() be false.
*/
seastar::future<> open() noexcept;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The noexcept for me is always a little tricky because this can return a failed future still, meaning that opened is still false... I don't have a suggestion outside documenting that this can fail?

Copy link
Member Author

@dotnwat dotnwat Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is self-documenting, right? The future can contain an exception, but open() will never let a raw exception leak out. This is the pattern used for most of the seastar I/O layer IIRC. Or am I misunderstanding your point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean I guess so, part of it is that if open returns an error it's OK to be retried. So folks still have to handle potential failures in close?

Copy link
Member Author

@dotnwat dotnwat Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this comment

diff --git a/src/v/io/io_queue.h b/src/v/io/io_queue.h
index d4d916c8a7..baf819e9b9 100644
--- a/src/v/io/io_queue.h
+++ b/src/v/io/io_queue.h
@@ -79,7 +79,8 @@ public:
     /**
      * Open the queue and begin dispatching pending I/O requests.
      *
-     * Requires that opened() be false.
+     * Requires that opened() be false. The queue will remain in the closed
+     * state if an exceptional future is returned.
      */
     seastar::future<> open() noexcept;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is helpful IMO.

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
, complete_(std::move(complete)) {}

void io_queue::start() noexcept {
log.debug("Starting io queue for {}", path_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not vlog? This will not include the line number information which is very useful for debugging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks

*/
seastar::future<> io_queue::close() noexcept {
log.debug("Closing {}", path_);
assert(opened());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be an assertion? this could easily be an error message in the log + exceptional future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, but I think it's unnecessary. The io_queue is an internal component used by the scheduler (next PR) where there is one call to open and one call to close. If io_queue were to be used more broadly I'd agree it needs to have a more forgiving interface.


// ensure all on-going ops have completed
auto units = co_await seastar::get_units(
ops_, seastar::semaphore::max_counter());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this ensures only that current in-flight operation is completed. It is possible to close the file even if we have pages in the _pending list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit counter intuitive to me. If the io_queue is per file then closing the io_queue should probably guarantee that all pending reads and writes are completed. This behavior is similar to close syscall when buffered I/O is used. It also awaits until all buffered I/O is done and can return an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm wondering why this (and also open) is not done in the dispatch background loop? This could eliminate extra synchronization. Instead of acquiring units here you can push promise to the queue and await it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to close the file even if we have pages in the _pending list.

This is by design. I/O submission occurs independent of open/close. The former is for the I/O path, and the later is for resource management of total number of open file descriptors. I think that part of the issue here is the open/close terminology. In particular, the close terminology isn't a great analogy for close of a posix file. In the next PR that implements the open file descriptor quota the use will be much clear. In that PR I'll try to come up with a better name than open/close.

Also, I'm wondering why this (and also open) is not done in the dispatch background loop?

This is because the I/O loop is unaware of higher level resource management of file descriptors. In the common case an IO queue will be opened and remain open forever. It's only under pressure that we may start closing file descriptors to stay under the limit.

Comment on lines 144 to 145
assert(false);
std::terminate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just add an extra if (...) to avoid vassert performance penalty (and we should probably fix vassert if it has measurable impact).

if (!cond) {
  vassert(cond, "err");
}


const std::filesystem::path& io_queue::path() const noexcept { return path_; }

seastar::future<> io_queue::open() noexcept {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit strange that open/close methods are returning futures but submit_read/submit_write are not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open and close perform blocking system calls so their return of future makes sense. However, submit_read/submit_write only queue operations. The intention here is that resource management such as applying back pressure is done at a higher level through mechanisms like memory reservations. It's entirely possible that we'll discover that we need to make submit_* return future at some point.

}

void io_queue::submit_read(page& page) noexcept {
page.set_flag(page::flags::read);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check is_linked here? I can enqueue same page twice by mistake and boost intrusive list will trigger assertion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a page has been queued for read, it must not be re-queued for read or write until has completed. I didn't mention this in the comment, so I added a comment to address this.

Keep in mind that the I/O queue is not intended for general use. It is an internal component.

diff --git a/src/v/io/io_queue.h b/src/v/io/io_queue.h
index 0937a99a5c..026f155b43 100644
--- a/src/v/io/io_queue.h
+++ b/src/v/io/io_queue.h
@@ -107,6 +107,9 @@ public:

     /**
      * Submit a read I/O request.
+     *
+     * A page submitted for read must not be resubmitted for read or write until
+     * it has completed.
      */
     void submit_read(page&) noexcept;

}
})
.handle_exception([this](std::exception_ptr eptr) {
log.error("Write failed to {}: {}", path_, eptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this log message is misleading because the complete_ callback may also throw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. i made the callback signature require noexcept.

cond_.signal();
}
if (complete_) {
complete_(page);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably makes sense to release units before invoking the callback.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

request_list_type pending_;
request_list_type running_;
completion_callback_type complete_;
seastar::semaphore ops_{seastar::semaphore::max_counter()};
Copy link
Contributor

@Lazin Lazin Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why is it set to max_counter?
OK, looks like it's used as an rwlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW seastar has a seastar::rwlock to encapsulate this pattern in a more clear way.

Copy link
Member Author

@dotnwat dotnwat Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the semaphore use is intentional. even though it is currently configured as a shared-exclusive lock, we'll later use the semaphore to control the number of outstanding IOs.

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
@dotnwat dotnwat requested a review from Lazin February 7, 2024 18:56
Copy link
Contributor

@Lazin Lazin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, since it's a greenfield project it'd be great to have some high level description of the design somewhere. Or at least doc comments with descriptions of contracts/invariants. It's a bit tricky to review because some things are relying on invariants enforced by the code which is not included into the PR.

@dotnwat dotnwat merged commit 0c71696 into redpanda-data:dev Feb 8, 2024
17 checks passed
Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Just trying to orient myself with regards to these interfaces:

Will storage applications (segment_reader, segment_appender, cloud cache downloader equivalents) be the ones interacting directly with the io_queue, and the io_queue will manage interactions with a future page cache under the hood? Or is this a lower-level interface that will be further wrapped by another abstraction?

Comment on lines +121 to +123
if (stop_) {
co_return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how setting stop_ to true differs from breaking the condition variable. Could we do away with it?


void io_queue::dispatch_write(
page& page, seastar::semaphore_units<> units) noexcept {
log.debug("Writing {} at {}~{}", path_, page.offset(), page.data().size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe assert that we're not in queued state?

Comment on lines +137 to +141
if (page.test_flag(page::flags::read)) {
dispatch_read(page, std::move(units));

} else if (page.test_flag(page::flags::write)) {
dispatch_write(page, std::move(units));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, can a page have been dispatched for both a read and a write? I think the answer is no, but maybe that won't necessarily be true for write caching?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A page can only be queued for a read or a write. For writes its ok if the queue receives multiple submissions for the same page, but it still remains queued only once.

@dotnwat
Copy link
Member Author

dotnwat commented Feb 29, 2024

Awesome! Just trying to orient myself with regards to these interfaces:

Will storage applications (segment_reader, segment_appender, cloud cache downloader equivalents) be the ones interacting directly with the io_queue, and the io_queue will manage interactions with a future page cache under the hood? Or is this a lower-level interface that will be further wrapped by another abstraction?

It is quite low-level. There is a scheduler and a page cache on top of it. The former merged last week, the later will come as fast as I can fix the pending PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants