-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge ported delegates #23
Changes from 13 commits
38a7ac8
786f5a2
a81d32a
5103ab4
75f7da9
c1c46c4
d57ea45
4e581d5
9fd23ef
070566f
3bf126c
efdea49
b1a98b0
1458629
3610362
52f9337
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -387,4 +387,97 @@ ReturnType Grappa_delegate_func( ArgType arg, Node target ) { | |
} | ||
} | ||
|
||
#include "Grappa.hpp" | ||
#include "Message.hpp" | ||
#include "FullEmpty.hpp" | ||
#include "Message.hpp" | ||
#include "ConditionVariable.hpp" | ||
|
||
namespace Grappa { | ||
namespace delegate { | ||
/// @addtogroup Delegates | ||
/// @{ | ||
|
||
/// Implements essentially a blocking remote procedure call. Callable object (lambda, | ||
/// function pointer, or functor object) is called from the `dest` core and the return | ||
/// value is sent back to the calling task. | ||
template <typename F> | ||
inline auto call(Core dest, F func) -> decltype(func()) { | ||
using R = decltype(func()); | ||
Node origin = Grappa_mynode(); | ||
|
||
if (dest == origin) { | ||
// short-circuit if local | ||
return func(); | ||
} else { | ||
FullEmpty<R> result; | ||
|
||
send_message(dest, [&result, origin, func] { | ||
R val = func(); | ||
|
||
// TODO: replace with handler-safe send_message | ||
send_heap_message(origin, [&result, val] { | ||
result.writeXF(val); // can't block in message, assumption is that result is already empty | ||
}); | ||
}); // send message | ||
// ... and wait for the result | ||
R r = result.readFE(); | ||
return r; | ||
} | ||
} | ||
|
||
/// Read the value (potentially remote) at the given GlobalAddress, blocks the calling task until | ||
/// round-trip communication is complete. | ||
/// @warning { Target object must lie on a single node (not span blocks in global address space). } | ||
template< typename T > | ||
T read(GlobalAddress<T> target) { | ||
return call(target.node(), [target]() -> T { | ||
return *target.pointer(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Depending on how we do prefetching in the deaggregator, we might need to distinguish the short-circuit case to do prefetch-yielding |
||
}); | ||
} | ||
|
||
/// Blocking remote write. | ||
/// @warning { Target object must lie on a single node (not span blocks in global address space). } | ||
template< typename T, typename U > | ||
bool write(GlobalAddress<T> target, U value) { | ||
// TODO: don't return any val, requires changes to `delegate::call()`. | ||
return call(target.node(), [target, value]() -> bool { | ||
*target.pointer() = (T)value; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this is the best interface. Is there a reason the value argument is type U, not type T? The downside of this approach is that the C-style cast is done implicitly, which may cause data to be silently lost (if T is int8_t and U is int64_t, for example). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the different There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nelsonje Do you think "fetch_and_add" should allow the increment value to be of a different type? This would allow one to overload the "+" operator to take some other type (if desired) and still use this delegate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. That sounds reasonable, although a bit non-intuitive. The fact that it returns a copy of the T makes me wonder how useful it would be. The best approach may be to leave the T/U interface everywhere and let an implicit cast happen at the assignment instead of the call if allowed. I think they're essentially equivalent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A use case I can think of is incrementing a pointer or GlobalAddress. What do you mean at the assignment? Do the coersion inside the delegate On Tuesday, January 29, 2013, Jacob Nelson wrote:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (there are no pattern match problems with the T/U signature) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I removed the C-style cast, thank you for the discussion on this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, great. Unfortunately -Wconversion is kind of loud, especially inside On Tue, Jan 29, 2013 at 11:37 AM, Brandon Holt notifications@github.comwrote:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can tell GCC to treat the Gasnet headers as "system headers" and not issue warnings in them. Turns out we've accumulated a good deal of our own warnings, but we could squash those. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aha. That's an interesting idea. My only concern would be that we'd force On Tue, Jan 29, 2013 at 12:20 PM, Brandon Holt notifications@github.comwrote:
|
||
return true; | ||
}); | ||
} | ||
|
||
/// Fetch the value at `target`, increment the value stored there with `inc` and return the | ||
/// original value to blocking thread. | ||
/// @warning { Target object must lie on a single node (not span blocks in global address space). } | ||
template< typename T, typename U > | ||
T fetch_and_add(GlobalAddress<T> target, U inc) { | ||
T * p = target.pointer(); | ||
return call(target.node(), [p, inc]() -> T { | ||
T r = *p; | ||
*p += inc; | ||
return r; | ||
}); | ||
} | ||
|
||
/// If value at `target` equals `cmp_val`, set the value to `new_val` and return `true`, | ||
/// otherwise do nothing and return `false`. | ||
/// @warning { Target object must lie on a single node (not span blocks in global address space). } | ||
template< typename T, typename U, typename V > | ||
bool compare_and_swap(GlobalAddress<T> target, U cmp_val, V new_val) { | ||
T * p = target.pointer(); | ||
return call(target.node(), [p, cmp_val, new_val]() -> bool { | ||
if (cmp_val == *p) { | ||
*p = new_val; | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
}); | ||
} | ||
|
||
/// @} | ||
} // namespace delegate | ||
} // namespace Grappa | ||
|
||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
|
||
// Copyright 2010-2012 University of Washington. All Rights Reserved. | ||
// LICENSE_PLACEHOLDER | ||
// This software was created with Government support under DE | ||
// AC05-76RL01830 awarded by the United States Department of | ||
// Energy. The Government has certain rights in the software. | ||
|
||
#include <boost/test/unit_test.hpp> | ||
#include "Delegate.hpp" | ||
|
||
BOOST_AUTO_TEST_SUITE( New_delegate_tests ); | ||
|
||
using namespace Grappa; | ||
using Grappa::wait; | ||
|
||
void check_short_circuiting() { | ||
// read | ||
int a = 0; | ||
auto ga = make_global(&a); | ||
BOOST_CHECK_EQUAL(delegate::read(ga), 0); | ||
|
||
// write | ||
BOOST_CHECK_EQUAL(a, 0); // value unchanged | ||
BOOST_CHECK_EQUAL(delegate::write(ga, 7), true); | ||
BOOST_CHECK_EQUAL(a, 7); | ||
|
||
// compare and swap | ||
double b = 3.14; | ||
auto gb = make_global(&b); | ||
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 2.0), true); | ||
BOOST_CHECK_EQUAL(b, 2.0); | ||
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 3.0), false); | ||
BOOST_CHECK_EQUAL(b, 2.0); | ||
|
||
// fetch and add | ||
uint64_t c = 1; | ||
auto gc = make_global(&c); | ||
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, 1), 1); | ||
BOOST_CHECK_EQUAL(c, 2); | ||
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, -2), 2); | ||
BOOST_CHECK_EQUAL(c, 0); | ||
} | ||
|
||
void check_remote() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given that they call send_message directly, I think these remote tests might go in a file with a different name that requires Message.hpp directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which tests? All of them? The only calls to |
||
// read | ||
int a = 0; | ||
auto ga = make_global(&a); | ||
|
||
ConditionVariable w; | ||
auto gw = make_global(&w); | ||
|
||
send_message(1, [ga, gw] { | ||
privateTask([=]{ | ||
BOOST_CHECK_EQUAL(delegate::read(ga), 0); | ||
signal(gw); | ||
}); | ||
}); | ||
wait(&w); | ||
BOOST_CHECK_EQUAL(a, 0); // value unchanged | ||
|
||
// write | ||
send_message(1, [ga, gw] { | ||
privateTask([=]{ | ||
BOOST_CHECK_EQUAL(delegate::write(ga, 7), true); | ||
signal(gw); | ||
}); | ||
}); | ||
wait(&w); | ||
BOOST_CHECK_EQUAL(a, 7); | ||
|
||
// compare and swap | ||
double b = 3.14; | ||
auto gb = make_global(&b); | ||
send_message(1, [gb, gw] { | ||
privateTask([=]{ | ||
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 2.0), true); | ||
signal(gw); | ||
}); | ||
}); | ||
wait(&w); | ||
BOOST_CHECK_EQUAL(b, 2.0); | ||
send_message(1, [gb, gw] { | ||
privateTask([=]{ | ||
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 3.0), false); | ||
signal(gw); | ||
}); | ||
}); | ||
wait(&w); | ||
BOOST_CHECK_EQUAL(b, 2.0); | ||
|
||
// fetch and add | ||
uint64_t c = 1; | ||
auto gc = make_global(&c); | ||
send_message(1, [gc, gw] { | ||
privateTask([=]{ | ||
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, 1), 1); | ||
signal(gw); | ||
}); | ||
}); | ||
wait(&w); | ||
BOOST_CHECK_EQUAL(c, 2); | ||
send_message(1, [gc, gw] { | ||
privateTask([=]{ | ||
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, -2), 2); | ||
signal(gw); | ||
}); | ||
}); | ||
wait(&w); | ||
BOOST_CHECK_EQUAL(c, 0); | ||
} | ||
|
||
|
||
void user_main(void * args) { | ||
CHECK(Grappa_nodes() >= 2); // at least 2 nodes for these tests... | ||
|
||
check_short_circuiting(); | ||
|
||
check_remote(); | ||
|
||
int64_t seed = 111; | ||
GlobalAddress<int64_t> seed_addr = make_global(&seed); | ||
|
||
ConditionVariable waiter; | ||
auto waiter_addr = make_global(&waiter); | ||
|
||
send_message(1, [seed_addr, waiter_addr] { | ||
// on node 1 | ||
privateTask([seed_addr, waiter_addr] { | ||
int64_t vseed = delegate::read(seed_addr); | ||
BOOST_CHECK_EQUAL(111, vseed); | ||
|
||
delegate::write(seed_addr, 222); | ||
signal(waiter_addr); | ||
}); | ||
}); | ||
Grappa::wait(&waiter); | ||
BOOST_CHECK_EQUAL(seed, 222); | ||
} | ||
|
||
BOOST_AUTO_TEST_CASE( test1 ) { | ||
|
||
Grappa_init( &(boost::unit_test::framework::master_test_suite().argc), | ||
&(boost::unit_test::framework::master_test_suite().argv) | ||
); | ||
|
||
Grappa_activate(); | ||
|
||
Grappa_run_user_main( &user_main, (void*)NULL ); | ||
|
||
Grappa_finish( 0 ); | ||
} | ||
|
||
BOOST_AUTO_TEST_SUITE_END(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect we will want a non-blocking version. I'll have to think about whether it is possible to not have write two versions of read/write/cmp-swap/...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not sure what the difference is between non-blocking delegates and caches are, except the case where you don't care about a return value at all, but then you have to distinguish whether you want to block until a delegate completes or not (for consistency).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the difference is in how writes are handled (maybe reads too). With delegates, we expect both to generate a remote reference for every access.
Probably the right way to do this is to either pass in or return (with appropriate anti-move protection) the result FullEmpty<>; then the user can just block on that if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we started a new issue to discuss and incorporate these changes in the future?
It's not clear that we need all of this functionality to get things up and running (we don't have these abilities already) and without messages we don't have to block on, we can't implement something like feed-forward delegates anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree.
On Tue, Jan 29, 2013 at 12:32 PM, Brandon Holt notifications@github.comwrote: