Skip to content

Commit

Permalink
Merge pull request #23 from nelsonje/bholt-delegates
Browse files Browse the repository at this point in the history
Merge ported delegates
  • Loading branch information
bholt committed Jan 29, 2013
2 parents 7d41ca4 + 52f9337 commit d107af2
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 4 deletions.
8 changes: 4 additions & 4 deletions system/ConditionVariable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ namespace Grappa {
}

/// TODO: implement
inline void signal( GlobalAddress<ConditionVariable> m ) {
if (m.node() == Grappa::mynode()) {
inline void signal( const GlobalAddress<ConditionVariable> m ) {
if (m.node() == Grappa::mycore()) {
// if local, just signal
Grappa::signal(m.pointer());
} else {
// if remote, signal via active message
auto m = Grappa::send_message(0, [m]{
Grappa::signal(m.pointer());
auto _ = Grappa::send_message(m.node(), [=]{
Grappa::signal(m.pointer());
});
}
}
Expand Down
108 changes: 108 additions & 0 deletions system/Delegate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,112 @@ ReturnType Grappa_delegate_func( ArgType arg, Node target ) {
}
}

#include "Grappa.hpp"
#include "Message.hpp"
#include "FullEmpty.hpp"
#include "Message.hpp"
#include "ConditionVariable.hpp"

namespace Grappa {
namespace delegate {
/// @addtogroup Delegates
/// @{

/// Implements essentially a blocking remote procedure call. Callable object (lambda,
/// function pointer, or functor object) is called from the `dest` core and the return
/// value is sent back to the calling task.
template <typename F>
inline auto call(Core dest, F func) -> decltype(func()) {
delegate_stats.count_op();
using R = decltype(func());
Node origin = Grappa_mynode();

if (dest == origin) {
// short-circuit if local
return func();
} else {
FullEmpty<R> result;
int64_t network_time = 0;
int64_t start_time = Grappa_get_timestamp();

send_message(dest, [&result, origin, func, &network_time] {
delegate_stats.count_op_am();
R val = func();

// TODO: replace with handler-safe send_message
send_heap_message(origin, [&result, val, &network_time] {
network_time = Grappa_get_timestamp();
delegate_stats.record_network_latency(start_time);
result.writeXF(val); // can't block in message, assumption is that result is already empty
});
}); // send message
// ... and wait for the result
R r = result.readFE();
delegate_stats.record_wakeup_latency(start_time, network_time);
return r;
}
}

/// Read the value (potentially remote) at the given GlobalAddress, blocks the calling task until
/// round-trip communication is complete.
/// @warning { Target object must lie on a single node (not span blocks in global address space). }
template< typename T >
T read(GlobalAddress<T> target) {
delegate_stats.count_word_read();
return call(target.node(), [target]() -> T {
delegate_stats.count_word_read_am();
return *target.pointer();
});
}

/// Blocking remote write.
/// @warning { Target object must lie on a single node (not span blocks in global address space). }
template< typename T, typename U >
bool write(GlobalAddress<T> target, U value) {
delegate_stats.count_word_write();
// TODO: don't return any val, requires changes to `delegate::call()`.
return call(target.node(), [target, value]() -> bool {
delegate_stats.count_word_write_am();
*target.pointer() = value;
return true;
});
}

/// Fetch the value at `target`, increment the value stored there with `inc` and return the
/// original value to blocking thread.
/// @warning { Target object must lie on a single node (not span blocks in global address space). }
template< typename T, typename U >
T fetch_and_add(GlobalAddress<T> target, U inc) {
delegate_stats.count_word_fetch_add();
T * p = target.pointer();
return call(target.node(), [p, inc]() -> T {
delegate_stats.count_word_fetch_add_am();
T r = *p;
*p += inc;
return r;
});
}

/// If value at `target` equals `cmp_val`, set the value to `new_val` and return `true`,
/// otherwise do nothing and return `false`.
/// @warning { Target object must lie on a single node (not span blocks in global address space). }
template< typename T, typename U, typename V >
bool compare_and_swap(GlobalAddress<T> target, U cmp_val, V new_val) {
delegate_stats.count_word_compare_swap();
T * p = target.pointer();
return call(target.node(), [p, cmp_val, new_val]() -> bool {
delegate_stats.count_word_compare_swap_am();
if (cmp_val == *p) {
*p = new_val;
return true;
} else {
return false;
}
});
}

/// @}
} // namespace delegate
} // namespace Grappa

#endif
2 changes: 2 additions & 0 deletions system/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ FileIO_tests.test: LIBRARIES+='-lboost_filesystem'

Array_tests.o Array_tests.test: $(LIBRARY)

New_delegate_tests.test: $(LIBRARY)

#
# environment variables for run
# remember to include environment's $LD_LIBRARY_PATH, since MPI libs may be there
Expand Down
4 changes: 4 additions & 0 deletions system/Message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,10 @@ namespace Grappa {
return SendExternalPayloadMessage<T>( dest, t, payload, payload_size );
}

// FIXME: temporary way to distinguish messages that should be heap-allocated (i.e. reply messages)
// Should replace with call to new heap-allocated message send/enqueue calls when we have those.
#define send_heap_message send_message

/// @}

}
Expand Down
155 changes: 155 additions & 0 deletions system/New_delegate_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@

// Copyright 2010-2012 University of Washington. All Rights Reserved.
// LICENSE_PLACEHOLDER
// This software was created with Government support under DE
// AC05-76RL01830 awarded by the United States Department of
// Energy. The Government has certain rights in the software.

#include <boost/test/unit_test.hpp>
#include "Delegate.hpp"

BOOST_AUTO_TEST_SUITE( New_delegate_tests );

using namespace Grappa;
using Grappa::wait;

void check_short_circuiting() {
// read
int a = 0;
auto ga = make_global(&a);
BOOST_CHECK_EQUAL(delegate::read(ga), 0);

// write
BOOST_CHECK_EQUAL(a, 0); // value unchanged
BOOST_CHECK_EQUAL(delegate::write(ga, 7), true);
BOOST_CHECK_EQUAL(a, 7);

// compare and swap
double b = 3.14;
auto gb = make_global(&b);
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 2.0), true);
BOOST_CHECK_EQUAL(b, 2.0);
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 3.0), false);
BOOST_CHECK_EQUAL(b, 2.0);

// fetch and add
uint64_t c = 1;
auto gc = make_global(&c);
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, 1), 1);
BOOST_CHECK_EQUAL(c, 2);
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, -2), 2);
BOOST_CHECK_EQUAL(c, 0);
}

void check_remote() {
// read
int a = 0;
auto ga = make_global(&a);

ConditionVariable w;
auto gw = make_global(&w);

send_message(1, [ga, gw] {
privateTask([=]{
BOOST_CHECK_EQUAL(delegate::read(ga), 0);
signal(gw);
});
});
wait(&w);
BOOST_CHECK_EQUAL(a, 0); // value unchanged

// write
send_message(1, [ga, gw] {
privateTask([=]{
BOOST_CHECK_EQUAL(delegate::write(ga, 7), true);
signal(gw);
});
});
wait(&w);
BOOST_CHECK_EQUAL(a, 7);

// compare and swap
double b = 3.14;
auto gb = make_global(&b);
send_message(1, [gb, gw] {
privateTask([=]{
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 2.0), true);
signal(gw);
});
});
wait(&w);
BOOST_CHECK_EQUAL(b, 2.0);
send_message(1, [gb, gw] {
privateTask([=]{
BOOST_CHECK_EQUAL(delegate::compare_and_swap(gb, 3.14, 3.0), false);
signal(gw);
});
});
wait(&w);
BOOST_CHECK_EQUAL(b, 2.0);

// fetch and add
uint64_t c = 1;
auto gc = make_global(&c);
send_message(1, [gc, gw] {
privateTask([=]{
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, 1), 1);
signal(gw);
});
});
wait(&w);
BOOST_CHECK_EQUAL(c, 2);
send_message(1, [gc, gw] {
privateTask([=]{
BOOST_CHECK_EQUAL(delegate::fetch_and_add(gc, -2), 2);
signal(gw);
});
});
wait(&w);
BOOST_CHECK_EQUAL(c, 0);
}


void user_main(void * args) {
CHECK(Grappa_nodes() >= 2); // at least 2 nodes for these tests...

check_short_circuiting();

check_remote();

int64_t seed = 111;
GlobalAddress<int64_t> seed_addr = make_global(&seed);

ConditionVariable waiter;
auto waiter_addr = make_global(&waiter);

send_message(1, [seed_addr, waiter_addr] {
// on node 1
privateTask([seed_addr, waiter_addr] {
int64_t vseed = delegate::read(seed_addr);
BOOST_CHECK_EQUAL(111, vseed);

delegate::write(seed_addr, 222);
signal(waiter_addr);
});
});
Grappa::wait(&waiter);
BOOST_CHECK_EQUAL(seed, 222);

Grappa_dump_stats_all_nodes();
}

BOOST_AUTO_TEST_CASE( test1 ) {

Grappa_init( &(boost::unit_test::framework::master_test_suite().argc),
&(boost::unit_test::framework::master_test_suite().argv)
);

Grappa_activate();

Grappa_run_user_main( &user_main, (void*)NULL );

Grappa_finish( 0 );
}

BOOST_AUTO_TEST_SUITE_END();

0 comments on commit d107af2

Please sign in to comment.