Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking Issue: Multiple Completion of Requests #29

Closed
AndrewGaspar opened this issue Jan 30, 2018 · 19 comments
Closed

Tracking Issue: Multiple Completion of Requests #29

AndrewGaspar opened this issue Jan 30, 2018 · 19 comments

Comments

@AndrewGaspar
Copy link
Contributor

AndrewGaspar commented Jan 30, 2018

Just putting some notes here on some observations from using this library -

It's kind of a pain point right now that there's no MPI_Waitall equivalent in this library. It's not too bad since you can always use mpi::ffi, but the interop experience with the existing mpi::request::Request object is poor. MPI_Waitall expects a contiguous array of MPI_Request objects, but the definition of Request makes it impossible to satisfy this requirement without allocating a separate slice of MPI_Request objects, since each MPI_Request is separated by a Scope object.

The easiest and most naive option would be to just add a wait_all operation that takes a slice (or perhaps an Iterator) of mpi::request::Request objects. It would have to internally move the MPI_Request handles into a contiguous array. APIs like MPI_Waitsome or MPI_Waitany would have some additional indirection, since they must invalidate some of the MPI_Request objects that are passed in, but not all of them. You would need to pass in a &mut [Option<mpi::request::Request>], and then wait_some or wait_any would need to update the slice to remove any request objects that have been completed.

I don't love this option since it imposes overhead in the name of keeping the API simple, especially for any calls that are likely to be called many times on the same set of requests such as MPI_Waitsome, MPI_Waitany, MPI_Testsome, and MPI_Testany.

The second option is to add some sort of RequestCollection object. For example:

pub struct RequestCollection<'a, S: Scope<'a> = StaticScope> {
    requests: Vec<MPI_Request>,
    scope: S,
    phantom: PhantomData<Cell<&'a ()>>,
}

This would allow the layout of the request objects to be directly composable with existing MPI_ multiple completion operations, while still tracking the request objects via the Scope.

This type would ideally have these features:

  • It can be constructed from an iterator of mpi::request::Request objects, or via a .push(request) operation. This allows all existing APIs to be used to construct mpi::request::Request objects that will then be added to the collection.
  • It would have member routines such as wait_all, wait_some, test_some, etc.
  • It would support stable indexing.
  • It likely would have to return Option<Request> rather than Request when indexing since wait_* may invalidate request objects.
@bsteinb
Copy link
Collaborator

bsteinb commented Jan 30, 2018

Hey there! Thanks for your input. I agree that the plural versions of the request completion routines are a very important part of the non-blocking machinery. I have not gotten around to implementing them yet, among other things because I was also still pondering possible ways of making them more idiomatic in Rust.

I think your RequestCollection design might be a step in the right direction. I am, however, not completely certain about some of the details.

Returning Option<Request> from the indexing operator I think would mean that indexing would have to mutate the collection, i.e. remove the accessed Request from the collection, otherwise there would be two copies of the same request. It is also not possible to return Option<&Request> since the collection holds a vector of raw MPI_Request.

Maybe indexing into the RequestCollection is not necessary? What if e.g. the wait_any method returns (int, Status) the index of the completed operation and the corresponding status? wait_some would return Vec<(int, Status)> and wait_all would return Vec<Status>. Similar for the test methods. That conveys what requests have completed without needing an indexing operation.

On the other hand, if adding a Request to a Collection consumes the Request, does there have to be a way to get the Request out of the Collection again?

By the way, you call these routines "collective" twice, but I think you meant "completion". In the context of MPI I have only seen the term "collective" applied to MPI_Reduce, MPI_Gather, etc.

@AndrewGaspar
Copy link
Contributor Author

Thanks for the quick response! I really like this library, by the way. Apologies about "collective", I'm a bit of an MPI newbie. The MPI 3.1 spec refers to these routines collectively as "Multiple Completions", so that sounds good to me. I'll edit the original issue.

I should be a little more specific with the index interface of RequestCollection.

I think it first makes sense to promote the bulk of Request to a trait. Let's call the trait AsyncRequest for now, but names are flexible. We could have two implementations: the existing Request<'a, S> type and a RequestRef<'a,'b, S> type, where 'a is the scope lifetime and 'b is the lifetime of the associated RequestCollection. If I understand correctly, 'b must be a sub-lifetime of 'a

Roughly:

trait AsyncRequest<'a, S: Scope<'a>> {
    // as_raw, get_scope, and unsubscribe must be implemented by struct
    unsafe fn as_raw(&self) -> MPI_Request;
    fn get_scope(&self) -> S;

    // unsubscribe is called when the request is being removed from its scope
    fn unsubscribe(&mut self);

    // existing `Request` methods are implemented using as_raw and get_scope
    pub unsafe fn into_raw(mut self) -> (MPI_Request, S) { ... }
    pub fn wait_with() { ... }
    pub fn wait(self) -> Status { ... }
    // etc...
}

// owns a standalone MPI_Request with an attached scope.
// PANICs on drop if the request is not completed.
struct Request<'a, S: Scope<'a> = StaticScope> {
    request: MPI_Request,
    scope: S,
    phantom: PhantomData<Cell<&'a ()>>,
}

impl<'a, S: Scope<'a>> AsyncRequest<'a, S> for Request<'a, S> {
    unsafe fn as_raw(&self) -> MPI_Request { self.request.clone() }
    fn get_scope(&self) -> S { self.scope }
    fn unsubscribe(&mut self) { /* doesn't need to do anything */ }
}

// owns a reference into a RequestCollection, inherits its scope
// Does not panic on drop, but returns ownership to to the collection.
struct RequestRef<'a, 'b: 'a, S: Scope<'a> = StaticScope> {
    request: &'b mut MPI_Request,
    scope: S,
    phantom: PhantomData<Cell<&'a ()>>,
}

impl<'a, 'b: 'a, S: Scope<'a>> AsyncRequest<'a, S> for RequestRef<'a, 'b, S> {
    unsafe fn as_raw(&self) -> MPI_Request { self.request.clone() }
    fn get_scope(&self) -> S { self.scope }
    // this is so the slot in the request collection appears as null, and MPI will
    // ignore it in future multi-completion operations.
    fn unsubscribe(&mut self) { self. request = unsafe_extern_static!(ffi::RSMPI_REQUEST_NULL); }
}

To answer your concern about indexing into a RequestCollection returning a reference, it's important to note that RequestCollection cannot literally implement std::ops::Index since it cannot hand out direct access to its requests - it has to hand out RequestRefs. The RequestRef is not quite a &Request, it just contains a reference into the RequestCollection itself.

I think returning the index in the _any and _some operations is important because the user may be storing per-request context information off to the side, so you may need the index as some key to look up additional information about the request. So that's why I think we shouldn't hide the indices from the user. You can also imagine a case where a user may want to test() individual requests, so I think it's a good idea to be able to identify the individual requests. It's possible there's a better way to do this, though, and I'm just not being very imaginative.

Here's a fairly detailed description of an imagined RequestCollection's interface:

// return value from test_any(). See test_any documentation for more detail.
enum TestAny {
    NoneActive,
    NoneComplete,
    Completed(usize, Status),
}

impl<'a, S: Scope<'a>> RequestCollection<'a, S> {
    pub fn new() -> Self { ... }
    pub fn with_capacity(capacity: usize) -> Self{ ... }
    // etc. whatever other constructors might be useful

    pub fn len(&self) -> usize { ... }

    // push() maybe should return the index of the request?
    pub fn push(&mut self, request: Request<'a, S>) { ... }

    // Description:
    //   returns `None` if the request at idx has already been completed or 
    //   otherwise removed
    //   from the collection
    // Returns:
    //   - `None` if the request at idx has been completed.
    //   - `Some(request)` if the request is still active.
    // Notes:
    //   there's no `at()` since it seems like most Request operations require
    //   mutability.
    //   
    //   Since RequestRef owns a mutable reference to RequestCollection,
    //   the collection cannot be modified until the RequestRef is returned one
    //   way or another. RequestRef is naturally returned when it goes out of
    //   scope. But it may also be returned by a one-off test() or wait(), which
    //   destroys the reference and marks the Request as inactive in the
    //   collection.
    //
    //   It also may be desirable to get multiple RequestRef's at once, but this
    //   current model would not allow it. Might be worth thinking more about
    //   this.
    pub fn at_mut<'b>(&'b mut self, idx: usize) -> Option<RequestRef<'a, 'b, S>> { ... }

    // Description:
    //   This is a zero-overhead wrapper of MPI_Waitany.
    // Returns:
    //   - `None` if all requests are inactive.
    //   - Some((idx, status)) if there are still active requests. idx is the
    //     index of the completed request, and status is the status it was
    //     completed with.
    //     The request at idx will be `None` on return.
    pub fn wait_any(&mut self) -> Option<(usize, Status)> { ... }

    // Description:
    //   This is a zero-overhead wrapper of MPI_Waitsome.
    // Returns:
    //   - `None` if all requests are inactive. indices and statuses are not
    //     modified.
    //   - If there are still active requests, return `Some(completed)`, where
    //     `completed` is the number of requests completed by the call.
    //     `completed` is guaranteed to be >= 1.
    //     indices[..completed] and statuses[..completed] will contain the
    //     completed request indices and statuses, respectively.
    //     The requests at indices[..completed] will be `None` on return.
    // Notes:
    //   It panics if indices.len() and status.len() are not >= self.len().
    //   This assumes that Status is transmutable to MPI_Status.
    pub fn wait_some(&mut self, indices: &mut [usize], statuses: &mut [Status]) -> Option<usize> { ... }

    // Description:
    //   This is a zero-overhead wrapper of MPI_Waitall.
    //   All requests will be `None` on return.
    // Notes:
    //   I chose to not have it destroy the RequestCollection since the other
    //   wait_* methods do not destroy the collection.
    pub fn wait_all(&mut self) { ... }

    // Description:
    //   This is a zero-overhead wrapper of MPI_Testany.
    // Returns:
    //   - TestAny::NoneActive, if there are no active requests in the
    //     collection
    //   - TestAny::NoneComplete, if the test yields no completed request.
    //   - Completed(idx, status), when a completed request is available.
    //     idx is the index of the request in the collection, and status is the
    //     status of the completed request.
    //     The request at `idx` will be `None` on return.
    // Notes:
    //   A possible alternative return type is
    //   Option<Option<(usize, Status)>>. That is, it would return
    //   `None` if all the requests in the collection were inactive. It would
    //   return `Some(None)` if there aren't any completed requests. And it
    //   return `Some(Some((idx, status)))` when there is a completed request.
    //   This matches-ish with test_some, but double nested Options seems kind
    //   of confusing to me, which is why I opted for the custom enum.
    pub fn test_any(&mut self) -> TestAny { ... }

    // Description:
    //   This is a zero-overhead wrapper of MPI_Testsome.
    // Returns:
    //   - `None` if all requests are inactive. indices and statuses are not
    //     modified.
    //   - If there are still active requests, return `Some(completed)`, where
    //     `completed` is the number of requests completed by the call.
    //     `completed` may be 0.
    //     indices[..completed] and statuses[..completed] will contain the
    //     completed request indices and statuses, respectively.
    //     The requests at indices[..completed] will be `None` on return.
    // Notes:
    //   It panics if indices.len() and statuses.len() are not >= self.len().
    //   This assumes that Status is transmutable to MPI_Status.
    pub fn test_some(&mut self, indices: &mut [usize], statuses: &mut [Status]) -> Option<usize> { ... }

    // Description:
    //   This is a zero-overhead wrapper of MPI_Waitall.
    // Returns:
    //   - `false` if not all requests are complete. No requests will be
    //     modified. The value of statuses[..self.len()] is undefined.
    //   - `true` if all active requests are complete. All requests will be set
    //     to `None`. `statuses` will contain the completion status of each of
    //     the requests completed by test_all. Statuses for null or inactive
    //     requests will be empty.
    // Notes:
    //   Panics if statuses.len() is not >= self.len()
    pub fn test_all(&mut self, statuses: &mut [Status]) -> bool { ... }
}

Apologies for the brain dump. Also, apologies if any of this code, especially the explicit lifetimes, is not quite right. My free-hand Rust isn't too good yet. :)

Let me know what you think - I might try making a prototype this weekend.

@AndrewGaspar AndrewGaspar changed the title Tracking Issue: MPI_Waitall (and other request collective operations) Tracking Issue: Multiple Completion of Requests Feb 1, 2018
@bsteinb
Copy link
Collaborator

bsteinb commented Feb 2, 2018

Thanks for the praise. Do not worry about not getting the terminology right 100% of the time. At more than 800 pages, the MPI standard is probably too large to know by heart, especially when you are only just starting.

I like the RequestRef interface, although I think it is customary to name these things RequestMut if they act like a mutable reference, but that is a small change.

Regarding the test_some and wait_some methods, I would like to rename those test_some_into and wait_some_into mirroring the communication methods that use "intent out" arguments, pardon my Fortran. It would then be possible to have a higher level interface on top of those, that returns Vec<(int, Status)> like I mentioned before.

Further down the road it might also be worthwhile to play around with a higher level interface based on top of this one but using iterators, but this is a good start, I think.

Lastly, I am afraid, I will have to do some more rules lawyering. The term "inactive" does not apply to the kind of requests that can be used through this API. MPI has two kinds of requests, "persistent" and "nonpersistent" (which are used here), with different APIs and very different life cycles.

Nonpersistent requests have two states, "active" and "deallocated" (meaning they have the value MPI_REQUEST_NULL). They are created in the active state using "immediate" routines like MPI_Isend and can only move to the deallocated state via completion routines like MPI_Wait.

Persistent requests have an additional state "inactive". They are created in the inactive state using "init" routines like MPI_Send_init. From inactive they can be moved to either active using "start" routines like MPI_Start or deallocated using MPI_Request_free. Unlike nonpersistent requests, successful completion of a persistent request via e.g. MPI_Wait moves the request back into the inactive state from where it can be started again.

I think it is a bit unfortunate, that MPI lumps these two kinds of objects together in the same type so that it effectively leads a double life and I am not yet sure how to map that into rsmpi. The situation is similar with communicators, which can be either inter-communicators or intra-communicators.

If you do find the time this week end I would be happy to review your prototype next week.

@AndrewGaspar
Copy link
Contributor Author

Just so you know, I'm still working on this, just didn't get it finished yesterday.

@bsteinb
Copy link
Collaborator

bsteinb commented Feb 6, 2018

No worries.

@AndrewGaspar
Copy link
Contributor Author

AndrewGaspar commented Feb 13, 2018

I realized to my frustration that this proposal is insufficient. Consider the following pseudo-code

let recv_buffers: Vec<&mut [f64]> = ...;
let recv_ranks: Vec<Rank> = ...
mpi::request::scope(|scope| {
    let recv_requests =
        recv_ranks
            .iter()
            .zip(&mut recv_buffers)
            .map(|(rank, buffer)| {
                world
                    .process_at_rank(rank)
                    .immediate_receive_into(scope, buffer)
            })
            .collect_requests(scope);

    while let Some(indices) = recv_requests.wait_some_without_status() {
        for i in indices {
            // ERROR: This borrow fails because recv_buffer is still borrowed
            // mutably
            let buffer = &recv_buffers[i];
            // do something with buffer
        }
    }
});

Essentially, RequestCollection does nothing to allow the data that was received for a request to actually be used. This is fatal for wait/test_any and wait/test_some - what's the point of doing overlapped IO and computation if the computation can't access the data from the IO?

I think there is a Rust compatible solution to this: RequestCollection needs some way to (optionally) own the mutable reference to the associated buffers and safely return them iff their associated request has been completed.

Essentially:

    while let Some(indices) = recv_requests.wait_some_without_status() {
        for i in indices {
            // PANICS if the request at `i` is non-null
            // also comes in get_buffer_mut flavor
            let buffer = recv_requests.get_buffer(i);
            // do something with buffer
        }
    }

The simple version of this is a RequestCollection that owns a Vec<&'a mut Buf> where Buf: 'a + BufferMut. The Vec is the same size as the MPI_Request Vec. Unfortunately, this incurs a memory allocation. In MPI programs, or at least the one I maintain, you may have one large buffer for all the receives, that is subdivided at variable lengths for each request. So, in my case, I'd want a RequestCollection that owns a reference to a &'a mut [T] where T: Equivalence and some I: Iterator<Range<usize>>. This iterator is a list of exclusive ranges into the receive buffer. It's an Iterator rather than a Vec because:

  1. It allows the RequestCollection to reference an existing allocated list of either buffer offsets or ranges
  2. It allows 0 memory allocation if the offsets is a simple computation.

However, I suspect even this isn't enough. You'd probably in some situations want to register essentially a Vec<Box<Buf>> with the RequestCollection such that you could detach the allocation from the collection and re-use it.

This needs to be fleshed out a little more, and I think it's actually essential for this proposal, outside of wait_all and test_all. Here are some other thoughts that come to mind:

  1. How are buffers registered to individual requests? This presumably calls for Request similarly owning a mutable reference to its buffer, if we want to stick with the pattern of re-using the existing routines for constructing the requests in a RequestCollection.
  2. Do "Send" requests need this capability too? You can imagine an MPI program wanting to re-use the buffer associated with a send as soon as it completes.

You can side step all this by using unsafe and raw pointers, but I think the strength of this library is making it safe and zero-cost to use MPI efficiently.

I'm sure there are other questions that need to be answered, so let me know if you can think of any. I'll need to let my mind stew on this for a bit. Man, Rust can make things hard. 😜

@AndrewGaspar
Copy link
Contributor Author

@bsteinb
Copy link
Collaborator

bsteinb commented Feb 17, 2018

Yeah, I see the problem. I remember that @Rufflewind actually did experiment with attaching Buffers to Requests when he rewrote this part of rsmpi. His work can still be found here: https://github.com/Rufflewind/rsmpi/tree/buffer. Maybe it would make sense to revive that?

@Rufflewind
Copy link
Contributor

I have an internal RequestPoll that is used to manage multiple requests in mpi_futures, similar to @AndrewGaspar’s RequestCollection but with callbacks instead of indices. It was designed specifically for internal use, so it does make a bunch of trade-offs and certainly isn't the most general or efficient way to handle multiple requests – I wasn't able to find a general, zero-cost solution to the problem. But maybe you can find something useful out of it.

@AndrewGaspar
Copy link
Contributor Author

AndrewGaspar commented Feb 19, 2018

Thanks for the prior art - it's very helpful. If we have to deal with a Vec<Buffer> parallel to the Vec<MPI_Request> for now, that's not so bad as far as overhead goes - essentially a (T *, usize) per request when Buffer = &[T], and having to go to the heap.

I'll see if we can use a trait that would allow you to use a { buffer: Buffer, chunk_size: usize } instead, or a { buffer: Buffer, displs: &[usize] } to reduce extra allocations. However, this would also require API evolution to make it easy to add new requests to the RequestCollection that use a slice of an existing Buffer attached to the RequestCollection.

@AndrewGaspar
Copy link
Contributor Author

I've got a branch off of this one tracking this work here: https://github.com/AndrewGaspar/rsmpi/tree/multi-completion-managed-buffer

@AndrewGaspar
Copy link
Contributor Author

AndrewGaspar commented Mar 5, 2018

I've been working on getting the data owned by the Request. It has come pretty far - all request objects and request collection objects now own the data (whether it's a Vec<T> or a borrow of a [T]). However, now that I've done that, I realize that scope seems pretty redundant - if the Request owns either the data, or a borrow of the data, then the borrow checker (and the panic on drop) will enforce the kinds of properties that scope is providing today. So you can still get the exact same safety guarantees without needing the scope. Is there something I'm missing?

I'm considering getting rid of the scope parameter to the immediate methods, though I've been trying to avoid changing the public API as much as possible.

Scope could really just be another type of "data" that a request can own. So although immediate_* methods return a Request that owns the data passed to the method, Request could have a method called forget_data(scope). e.g.

impl<D> Request<D> {
    // ...
    fn forget_data<'a, S: Scope<'a>>(self, scope: S) -> Request<S>
    where D: 'a {
        // ...
    }
}

This allows you to get a possibly more minimally sized Request object than if the Request owns the data. Of course, this change would involve removing the scope argument from all of the immediate methods. Are we ok making this breaking change? I think it's probably worth it.

@Rufflewind
Copy link
Contributor

The issue happens when the Request borrows data from something else, and then you mem::forget(request). The request still exists in the MPI backend, but Rust is no longer aware of that so you can easily wind up in a situation where the original owner of the buffer is gone yet the MPI is still trying to read/write from it.

@AndrewGaspar
Copy link
Contributor Author

Oh wow you're exactly right. That really complicates things... the notion of a "must run" destructor would be incredibly helpful.

So I think the right answer here, then, is to only require scoping for borrows, in which case we could require a &T to be wrapped in some sort of Scoped structure when passed to an immediate_ method. However, if the datatype owns its buffer (and therefore can be safely forgotten), it doesn't need an attached Scope.

@bsteinb
Copy link
Collaborator

bsteinb commented Mar 6, 2018

That actually sounds very nice. The Scope would then only be required to construct a Scoped value from a &T, but not to invoke any of the immediate_ routines for owned data.

As for transferring ownership of buffers to the requests, have you put any thought into types that implement shared ownership with mutability, e.g., Rc<RefCell<Vec<T>>>? If you transfer such an Rc<_> into a request, then hold on to a clone of it and use that to push into the vector causing it to reallocate, that cannot be good. Would this just be guarded against by judicious impls of the Buffer trait? Does this need to be a separate trait ImmediateBuffer or does the same problem arise when using Rc<RefCell<Vec<T>>> as a Buffer with a blocking routine as well?

@AndrewGaspar
Copy link
Contributor Author

AndrewGaspar commented Mar 7, 2018

Would this just be guarded against by judicious impls of the Buffer trait?

Yeah, exactly - it would be illegal to implement the Pointer or PointerMut trait (which are marked as unsafe) because it is not possible to statically guarantee that the buffer than is owned by the type will be stable through the lifetime of the Request. We should make sure to thoroughly document the requirements to implement the Pointer, Datatype, Buffer, ReadBuffer, and WriteBuffer traits.

Currently I don't implement ReadBuffer or WriteBuffer for any types besides the ones that @Rufflewind had in their original change. None of those seem to have the issue you describe.

Does this need to be a separate trait ImmediateBuffer

Not for the reason you state (my understanding is the borrow from the Rc would persist through the end of the blocking routine), but we may need to add a separate trait for the buffers passed to blocking routines in light of the addition of a Scoped type - you shouldn't need a scope for blocking routines. But I think we'd only need to implement that trait for &'a T, &'a mut T, &'a [T], and &'a mut [T] rather than all the types we implement it for the immediate routines.

@AndrewGaspar
Copy link
Contributor Author

Well, now that I think about it, we could have semantics where when a Buffer is passed to an immediate_ routine, the routines takes "ownership" of the buffer. For a Scoped buffer, this means that the Request owns a reference on the scope (therefore ensuring the Request is completed before the buffer leaves scope). For a Rc<RefCell<Vec<T>>>, that would mean the Request takes ownership of a RefMut<Vec<T>>, thereby ensuring no other code can borrow the Vec<T> until the Request completes.

@AndrewGaspar
Copy link
Contributor Author

Though, in that case, it may be just as easy to implement WriteBuffer directly on RefMut<Vec<T>>, rather than implementing it on Rc<RefCell<Vec<T>>>

@jedbrown
Copy link
Contributor

jedbrown commented Aug 5, 2022

I think this can close now that #122 has merged. Feel free to reopen.

@jedbrown jedbrown closed this as completed Aug 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants