Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a simple spillable queue #85

Merged
merged 2 commits into from
Feb 23, 2023
Merged

Add a simple spillable queue #85

merged 2 commits into from
Feb 23, 2023

Conversation

igalshilman
Copy link
Contributor

Spill-able Queue

This PR adds a simple, spillable queue that integrates with tokio and bytes to spill and load in a non blocking way.

Requirements

  • in memory threshold - this queue will hold at least that number of elements in memory, once this threshold is exceeded, the content of the queue will be spilled to disk.
  • This queue requires a scratch directory, to use to spill its content. It assumes that this directory was freshly created for it. In practice it is recommend to create a fresh directory with a uuid.
  • Since the elements of the queue are not assumed to be linear in memory (i.e. they might contain a String etc') an element Encoder and Decoder are required.
  • This queue relies on tokio's fs module for the asynchronous io.

Implementation details

This is implemented in a very simple way, a queue is segmented to different parts.

  1. A mutable, in memory (tail) segment - the segment that one can append to. Ones the number of element exceeds a threshold it will be flushed to disk.
  2. An immutable disk segment - a previously spilled mutable segment that now lives on a disk.
  3. An immutable loaded in memory (head) segment - a previously on disk segment that is loaded to memory, and one can pop from.

Since it is a FIFO queue, at most we would have:

  • a single mutable tail
  • unbounded number of on disk segments.
  • a single loaded immutable head.

The following is a basic usage example:

  • First, we implement an element Encoder and Decoder
    struct IntEncoder;

    impl Encoder for IntEncoder {
        type Item = i32;

        fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) {
            dst.put_i32(item);
        }

        fn element_size_hint(&self) -> Option<usize> {
            Some(mem::size_of::<i32>())
        }
    }

    struct IntDecoder;

    impl Decoder for IntDecoder {
        type Item = i32;

        fn decode(&mut self, src: &mut Bytes) -> Self::Item {
            src.get_i32()
        }
    }
  • Then we can define the queue itself
    #[tokio::test]
    async fn simple_example() {
        let temp_dir = testdir!();
        let max_in_memory_limit = 1024;
        let mut queue = SegmentQueue::new(temp_dir, max_in_memory_limit, IntEncoder {}, IntDecoder {});

        queue.enqueue(1).await;
        queue.enqueue(2).await;
        queue.enqueue(3).await;
        queue.enqueue(4).await;

        assert_eq!(queue.dequeue().await, Some(1));
        assert_eq!(queue.dequeue().await, Some(2));
        assert_eq!(queue.dequeue().await, Some(3));
        assert_eq!(queue.dequeue().await, Some(4));

        assert_eq!(queue.dequeue().await, None);
    }

@igalshilman igalshilman changed the title Exp queue Add a simple spillable queue Feb 12, 2023
Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, left a couple of comments. The encoding/decoding aspect is a bit unclear, and can probably be simplified by using serde


fn encode(&mut self, item: Self::Item, target: &mut BytesMut);

fn element_size_hint(&self) -> Option<usize> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size hint might require the item as well to be computed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a size hint is used to pre-allocate a buffer, it doesn't need to be exact, but it might help.

@@ -0,0 +1,17 @@
use bytes::{Bytes, BytesMut};

pub trait Encoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need these two interfaces. I would keep it pragmatic and just accept in SegmentQueue types implementing serde::{Serialize, Deserialize}, and then we pick whatever format we want (in the PoC we used bincode, which worked very well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like bincode as well, but the intentions here are:

  1. being able to peel off Bytes out of the main buffer with zero copy.
  2. not to be intrusive to the domain objects stored here.
  3. let the user (well i suppose that this is you :)) to control the layout.

let me know if you think that still serde::* should be used, and I'll change that

Copy link
Contributor

@slinkydeveloper slinkydeveloper Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being able to peel off Bytes out of the main buffer with zero copy.

I double checked, I think you can avoid copying by manually using the file reader and directly decode the messages and materialize them in memory.

not to be intrusive to the domain objects stored here.
let the user (well i suppose that this is you :)) to control the layout.

I don't think this is something we really need to control. In general I think for most of our intra-messaging and stored state (except some very specific situations like storing entries in rocksdb) we'll probably be fine with whatever bincode/serde offers us.

Comment on lines 58 to 59
encoder: E,
decoder: D,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these even stateful? Could they be just phantom data?

fn len(&self) -> usize {
match self {
OnDisk {
id: _,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use the .. syntax to discard unused fields


async fn load_from_disk(&mut self, mut path: PathBuf) {
if let OnDisk { id, len, byte_size } = self {
let mut buffer = vec![0u8; *byte_size];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use bytes::BufMut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you mean, BytesMut. unfortunately this doesn't work with read_exact. read_exeact things that this is a zero sized buffer (although it has capacity)
The buffer's ownership is transferred to Bytes a bit later.

let element_len = encoder.element_size_hint().unwrap_or(mem::size_of::<T>());
let mut target = BytesMut::with_capacity(len * element_len);
for e in buffer.drain(..) {
encoder.encode(e, &mut target);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is assuming encoder and decoder are eventually writing the length of length delimited fields, right? Most likely, the struct for which we need this queue will be length delimited, because it contains the service id with service name, which is a length variable string.

Mutable { buffer } => buffer.pop_front(),
LoadedFromDisk { len, buffer } => {
if *len == 0 {
assert!(buffer.is_empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is a debug_assert?

Comment on lines +136 to +135
Some(segment) if segment.is_mutable() => {
segment.enqueue(element);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if, rather than having a pair is_mutable and then enqueue, you just have a method try_enqueue which returns either true or false?

// within the OnDisk variant.
// once we load back from an OnDisk, we would have to first make sure that
// this future was resolved.
crate::io::write_file_infallible(base_dir, &target).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps why don't you pass the file writer directly to encoder, rather than collecting the whole buffer of the serialized segment in memory and write it afterwards?

Comment on lines 231 to 235
// we block while the queue is being persisted to disk.
// we can also do it in the background and remember the JoinHandle
// within the OnDisk variant.
// once we load back from an OnDisk, we would have to first make sure that
// this future was resolved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to reason whether it's fine to asynchronously resolve this future or not. If I'm not mistaken, it shouldn't matter that we haven't durably stored the element on disk after returning from write, because this queue is ephemeral anyway and goes away as soon as we restart the process.

So what matters is that you need to be able to see the effects of the flush next time you read. But what about failures? If a flush fails asynchronously, then what are we supposed to do? We just panic because flush failures at this level are intolerable? If, in the meantime, we had another push to the mutable tail of the queue, we should still be fine because, by the properties of the queue, you cannot see the effect of that push until you pop the segment that is supposed to be on disk, hence the effect of the broken flush will be visible before the other pushed elements will be popped from the queue.

Still, It's important perhaps to underline that when you enqueue an element to the queue, the element might be lost after the enqueue method terminates.

Now given this discussion, I think you can quite easily implement the asynchronous flush just by storing JoinHandle within OnDisk. When you read from disk, you need to await on that future and then proceed with the read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what the comment says yes.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @igalshilman. The changes look good to me and make a lot of sense. Nicely done! I only have very minor comments.

One thing I'd be curious to learn about is how you envision the SegmentedQueue being used concurrently. I think we want to use it between the PartitionProcessor and the Invoker. Would these components share access to the SegmentedQueue (shared state) or would it be powered by a dedicated co-routine?

In the shared state case: Since we are .awaiting in the enqueue, dequeue methods, I guess that we would have to use tokio::sync::Mutex to be able to .await on the lock. This Mutex is said to be more expensive than the std::sync::Mutex or the parking_lot::Mutex, though.

I guess some measurements could help with making a decision for how to implement the concurrent access.

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = {workspace = true, features = ["full"]}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which of the "full" tokio features are needed by queue?

bytes = {workspace = true}

[dev-dependencies]
testdir = "*"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: Why did you choose testdir over tempfile? The latter seems to be used by more projects.


const WRITE_ERR_MSG: &str = "Failure during writing of a (transient) queue segment to disk. This is an unrecoverable failure at this point.";

pub(crate) async fn read_file_infallible<P: AsRef<Path>>(path: P, target: &mut [u8]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could use impl AsRef as a shorthand.

Suggested change
pub(crate) async fn read_file_infallible<P: AsRef<Path>>(path: P, target: &mut [u8]) {
pub(crate) async fn read_file_infallible(path: impl AsRef<Path>, target: &mut [u8]) {

use crate::segmented_queue::Segment::{LoadedFromDisk, Mutable, OnDisk};
use crate::{Decoder, Encoder};

///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///


///
/// This is a FIFO queue that has a predefined capacity (number of elements) and can spill to disk
/// once this capacity is reached.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// once this capacity is reached.
/// once its capacity is reached.

/// segment.
/// If the last segment isn't mutable or does not exists, it would create one.
/// This function returns the size of the mutable segment after insertion.
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///

path.push(format!("{id}.segment"));
crate::io::read_file_infallible(&path, &mut buffer).await;
// remove the file in the background
tokio::spawn(tokio::fs::remove_file(path));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this task fails or panics? Should we log it?

next_segment_id: u64,
encoder: E,
decoder: D,
spillable_base_path: PathBuf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this also be a Path since this is the const root?

Comment on lines 91 to 76
pub async fn enqueue(&mut self, element: T) {
if self.enqueue_internal(element) < self.in_memory_element_threshold {
return;
}
// SAFETY: enqueue_internal will always make sure that the last segment in
// SAFETY: self.segments is mutable, therefore it can not be empty.
debug_assert!(!self.segments.is_empty());
let segment = self.segments.back_mut().unwrap();
let id = self.next_segment_id;
// Good luck storing more than 2^64 segments on disk.
self.next_segment_id = self.next_segment_id.wrapping_add_signed(1);
segment
.store_to_disk(self.spillable_base_path.clone(), id, &mut self.encoder)
.await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a slight follow-up optimization we could avoid spilling to disk if the number of segments is currently 1. That way we would keep the head of the queue always in memory which could save us in some scenarios unnecessary deserialization work. This shouldn't change the memory requirements because we would load the data from disk on the first read.

A spillable queue that is divided to segments of a fixed size. Each
segment can be spilled or loaded from disk on demand.
This queue support pre loading, and background flushes of segments

1. Background flushes of mutable segments to disk. A background flush
   would happen for at most one segment at a time to prevent an over subscription.
2. Background pre-loading of the next on disk segment. Once the
   currently dequeued segment's capacity reaches to 50% of the flush limit, the next
   segment would be scheduled to be loaded in the background.
@igalshilman igalshilman merged commit 730c054 into restatedev:main Feb 23, 2023
slinkydeveloper added a commit to slinkydeveloper/restate that referenced this pull request Apr 16, 2024
…28f98..a4a8596e

a4a8596e Now side effect is part of the protocol (restatedev#87)
494f765c Format
fec17c9e Change x-restate-user-agent to x-restate-server, to be similar to https://www.rfc-editor.org/rfc/rfc9110#field.server
98bd326c Entry names + specify the entry that caused the failure in ErrorMessage (restatedev#86)
c47e65b0 Describe x-restate-user-agent (restatedev#85)
d43de04c Describe side effects in protocol (restatedev#84)

git-subtree-dir: crates/service-protocol/service-protocol
git-subtree-split: a4a8596eab8c554dce0f8f0f1020d5ddc623a305
slinkydeveloper added a commit that referenced this pull request Apr 16, 2024
…28f98..a4a8596e

a4a8596e Now side effect is part of the protocol (#87)
494f765c Format
fec17c9e Change x-restate-user-agent to x-restate-server, to be similar to https://www.rfc-editor.org/rfc/rfc9110#field.server
98bd326c Entry names + specify the entry that caused the failure in ErrorMessage (#86)
c47e65b0 Describe x-restate-user-agent (#85)
d43de04c Describe side effects in protocol (#84)

git-subtree-dir: crates/service-protocol/service-protocol
git-subtree-split: a4a8596eab8c554dce0f8f0f1020d5ddc623a305
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants