Skip to content

Commit

Permalink
Add new receive_data entry method to DistributedObject
Browse files Browse the repository at this point in the history
This one takes a Charm++ message (pointer)  and inserts it into the
inbox.

Also, a corresponding Parallel::receive_data function was added. This
can only be used on Array components at the moment.

You also need to properly register messages with charm or you'll
get incorrect behavior so there's also code added to handle that
automatically.

The parallel test for this can be run on one or two charm nodes.
  • Loading branch information
knelli2 committed Jan 31, 2023
1 parent 77a7a66 commit 082cde4
Show file tree
Hide file tree
Showing 12 changed files with 777 additions and 40 deletions.
1 change: 1 addition & 0 deletions cmake/FindCharm.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ find_package_handle_standard_args(
mark_as_advanced(
CHARM_COMPILER
CHARM_INCLUDE_DIR
CHARM_USE_MPI
CHARM_VERSION_MAJOR
CHARM_VERSION_MINOR
CHARM_VERSION_PATCH
Expand Down
5 changes: 5 additions & 0 deletions docs/DevGuide/Parallelization.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,11 @@ inherit publicly off the inserters to gain the required insertion capabilities:

\snippet Test_AlgorithmCore.cpp int receive tag insert

Any inbox tag that uses Charm++ messages must also specify a `message_type` type
alias which is the object that will be sent. An example is:

\snippet Test_AlgorithmMessages.cpp charm message inbox tag

The `inbox_tags` type alias for the action is:

\snippet Test_AlgorithmParallel.cpp int_receive_tag_list
Expand Down
1 change: 1 addition & 0 deletions src/Evolution/DiscontinuousGalerkin/InboxTags.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ struct BoundaryMessageInbox {
FixedHashMap<maximum_number_of_neighbors(Dim),
std::pair<Direction<Dim>, ElementId<Dim>>, stored_type,
boost::hash<std::pair<Direction<Dim>, ElementId<Dim>>>>>;
using message_type = BoundaryMessage<Dim>;

template <typename Inbox>
static void insert_into_inbox(const gsl::not_null<Inbox*> inbox,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "Domain/Structure/ElementId.hpp"
#include "NumericalAlgorithms/Spectral/Mesh.hpp"
#include "Time/TimeStepId.hpp"
#include "Utilities/GetOutput.hpp"
#include "Utilities/PrettyType.hpp"

#include "Evolution/DiscontinuousGalerkin/Messages/BoundaryMessage.decl.h"
Expand All @@ -30,6 +31,11 @@ template <size_t Dim>
struct BoundaryMessage : public CMessage_BoundaryMessage<Dim> {
using base = CMessage_BoundaryMessage<Dim>;

// Needed for charm registration
static std::string name() {
return "BoundaryMessage<" + get_output(Dim) + ">";
};

size_t subcell_ghost_data_size;
size_t dg_flux_data_size;
bool sent_across_nodes;
Expand Down
3 changes: 3 additions & 0 deletions src/Parallel/Algorithms/AlgorithmArray.ci
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ module AlgorithmArray {

entry void start_phase(Parallel::Phase);

template <typename ReceiveTag, typename MessageType>
entry void receive_data(MessageType*);

template <typename ReceiveTag, typename ReceiveData_t>
entry [inline] void receive_data(typename ReceiveTag::temporal_id&,
ReceiveData_t&,
Expand Down
58 changes: 47 additions & 11 deletions src/Parallel/CharmRegistration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,35 @@ struct RegistrationHelper {
virtual bool is_registering_chare() const { return false; };
};

/*!
* \ingroup CharmExtensionsGroup
* \brief Class used for automatic registration of Charm++ messages.
*
* Entry methods that use Charm++ messages are responsible for registering those
* messages by calling the static `register_with_charm()` function of this
* struct. Because an entry method can be templated on the message type, we
* don't want to accidentally register the same message-type twice. This struct
* will take care of that, rather than having the logic in the entry method
* registration logic.
*/
template <typename MessageType>
struct RegisterCharmMessage {
static void register_with_charm() {
static bool done_registration = false;
if (done_registration) {
return; // LCOV_EXCL_LINE
}
done_registration = true;

// NOTE: Assumes custom pack/unpack but default allocate.
static std::string name = MessageType::name();
MessageType::base::__register(
name.c_str(), sizeof(MessageType),
reinterpret_cast<CkPackFnPtr>(MessageType::pack),
reinterpret_cast<CkUnpackFnPtr>(MessageType::unpack));
}
};

/*!
* \ingroup CharmExtensionsGroup
* \brief Derived class for registering parallel components.
Expand Down Expand Up @@ -432,7 +461,7 @@ using get_value_type_t = typename get_value_type<T>::type;
* arguments for groups and nodegroups, so we have to handle the (node)group
* cases separately from the singleton and array cases.
*/
template <typename ParallelComponent, typename ReceiveTag>
template <typename ParallelComponent, typename ReceiveTag, bool UsingMessages>
struct RegisterReceiveData : RegistrationHelper {
using chare_type = typename ParallelComponent::chare_type;
using charm_type = charm_types_with_parameters<
Expand All @@ -455,12 +484,19 @@ struct RegisterReceiveData : RegistrationHelper {
return; // LCOV_EXCL_LINE
}
done_registration = true;
ckindex::template idx_receive_data<ReceiveTag>(
static_cast<void (algorithm::*)(
const typename ReceiveTag::temporal_id&,
const detail::get_value_type_t<
typename ReceiveTag::type::mapped_type>&,
bool)>(nullptr));
if constexpr (UsingMessages) {
using message_type = typename ReceiveTag::message_type;
ckindex::template idx_receive_data<ReceiveTag>(
static_cast<void (algorithm::*)(message_type*)>(nullptr));
RegisterCharmMessage<message_type>::register_with_charm();
} else {
ckindex::template idx_receive_data<ReceiveTag>(
static_cast<void (algorithm::*)(
const typename ReceiveTag::temporal_id&,
const detail::get_value_type_t<
typename ReceiveTag::type::mapped_type>&,
bool)>(nullptr));
}
}

std::string name() const override {
Expand Down Expand Up @@ -761,11 +797,11 @@ bool Parallel::charmxx::RegisterThreadedAction<ParallelComponent,
RegisterThreadedAction<ParallelComponent, Action>>();

// clang-tidy: redundant declaration
template <typename ParallelComponent, typename ReceiveTag>
bool Parallel::charmxx::RegisterReceiveData<ParallelComponent,
ReceiveTag>::registrar = // NOLINT
template <typename ParallelComponent, typename ReceiveTag, bool UsingMessages>
bool Parallel::charmxx::RegisterReceiveData<
ParallelComponent, ReceiveTag, UsingMessages>::registrar = // NOLINT
Parallel::charmxx::register_func_with_charm<
RegisterReceiveData<ParallelComponent, ReceiveTag>>();
RegisterReceiveData<ParallelComponent, ReceiveTag, UsingMessages>>();

// clang-tidy: redundant declaration
template <typename ParallelComponent, typename Action, typename ReductionType>
Expand Down
34 changes: 32 additions & 2 deletions src/Parallel/DistributedObject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ class DistributedObject<ParallelComponent,
void receive_data(typename ReceiveTag::temporal_id instance,
ReceiveDataType&& t, bool enable_if_disabled = false);

template <typename ReceiveTag, typename MessageType>
void receive_data(MessageType* message);

/// @{
/// Start evaluating the algorithm until it is stopped by an action.
void perform_algorithm();
Expand Down Expand Up @@ -838,8 +841,8 @@ void DistributedObject<ParallelComponent,
receive_data(typename ReceiveTag::temporal_id instance, ReceiveDataType&& t,
const bool enable_if_disabled) {
try {
(void)Parallel::charmxx::RegisterReceiveData<ParallelComponent,
ReceiveTag>::registrar;
(void)Parallel::charmxx::RegisterReceiveData<ParallelComponent, ReceiveTag,
false>::registrar;
{
std::optional<std::lock_guard<Parallel::NodeLock>> hold_lock{};
if constexpr (std::is_same_v<Parallel::NodeLock, decltype(node_lock_)>) {
Expand All @@ -858,6 +861,33 @@ void DistributedObject<ParallelComponent,
}
}

template <typename ParallelComponent, typename... PhaseDepActionListsPack>
template <typename ReceiveTag, typename MessageType>
void DistributedObject<ParallelComponent,
tmpl::list<PhaseDepActionListsPack...>>::
receive_data(MessageType* message) {
try {
(void)Parallel::charmxx::RegisterReceiveData<ParallelComponent, ReceiveTag,
true>::registrar;
{
std::optional<std::lock_guard<Parallel::NodeLock>> hold_lock{};
if constexpr (std::is_same_v<Parallel::NodeLock, decltype(node_lock_)>) {
hold_lock.emplace(node_lock_);
}
if (message->enable_if_disabled) {
set_terminate(false);
}
ReceiveTag::insert_into_inbox(
make_not_null(&tuples::get<ReceiveTag>(inboxes_)), message);
// Cannot use message after this call because a std::unique_ptr now owns
// it. Doing so would result in undefined behavior
}
perform_algorithm();
} catch (const std::exception& exception) {
initiate_shutdown(exception);
}
}

template <typename ParallelComponent, typename... PhaseDepActionListsPack>
void DistributedObject<
ParallelComponent,
Expand Down
32 changes: 32 additions & 0 deletions src/Parallel/Invoke.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ void receive_data(Proxy&& proxy, typename ReceiveTag::temporal_id temporal_id,
}
}

/*!
* \ingroup ParallelGroup
* \brief Send a pointer `message` to the algorithm running on `proxy`.
*
* Here, `message` should hold all the information you need as member variables
* of the object. This includes, temporal ID identifiers, the data itself, and
* any auxilary information that needs to be communicated. The `ReceiveTag`
* associated with this `message` should be able unpack the information that was
* sent.
*
* If the component associated with the `proxy` you are calling this on is
* running on the same charm-node, the exact pointer `message` is sent to the
* receiving component. No copies of data are done. If the receiving component
* is on a different charm-node, then the data pointed to by `message` is
* copied, sent through charm, and unpacked on the receiving component. The
* pointer that is passed to the algorithm on the receiving component then
* points to the copied data on the receiving component.
*
* \warning You cannot use the `message` pointer after you call this function.
* Doing so will result in undefined behavior because something else may be
* controlling the pointer.
*/
template <typename ReceiveTag, typename Proxy, typename MessageType>
void receive_data(Proxy&& proxy, MessageType* message) {
static_assert(is_array_proxy<std::decay_t<Proxy>>::value or
is_array_element_proxy<std::decay_t<Proxy>>::value,
"Charm++ messages can only be used with Array[Element] chares "
"at the moment. If you want to use them with other types of "
"components, you will need to implement it.");
proxy.template receive_data<ReceiveTag, std::decay_t<MessageType>>(message);
}

/// @{
/*!
* \ingroup ParallelGroup
Expand Down
27 changes: 27 additions & 0 deletions tests/Unit/Parallel/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ set(ALGORITHM_TEST_LINK_LIBRARIES
Actions
Boost::program_options
Charmxx::main
DomainStructure
ErrorHandling
Evolution
H5
Informer
Observer
Options
Parallel
ParallelHelpers
PhaseControl
Spectral
SystemUtilities
Time
Utilities
)

Expand Down Expand Up @@ -61,6 +65,28 @@ function(add_algorithm_test_with_restart_from_checkpoint TEST_NAME)
set_standalone_test_properties("Unit.Parallel.${TEST_NAME}")
endfunction()

function(add_algorithm_message_test TEST_NAME)
add_standalone_test_executable("Test_${TEST_NAME}")
target_link_libraries(
"Test_${TEST_NAME}"
PRIVATE
"${ALGORITHM_TEST_LINK_LIBRARIES}")
if (CHARM_USE_MPI)
set(CHARM_ARGS "+n2 +p2")
else()
set(CHARM_ARGS "")
endif()
add_test(
NAME "Unit.Parallel.${TEST_NAME}"
COMMAND
${SHELL_EXECUTABLE}
-c
"${CMAKE_BINARY_DIR}/bin/charmrun ${CMAKE_BINARY_DIR}/bin/Test_${TEST_NAME} \
${CHARM_ARGS} 2>&1"
)
set_standalone_test_properties("Unit.Parallel.${TEST_NAME}")
endfunction()

function(add_algorithm_test BASE_NAME)
add_standalone_test("Unit.Parallel.${BASE_NAME}" ${ARGN})
target_link_libraries(
Expand All @@ -83,6 +109,7 @@ add_dependencies(
module_Test_GlobalCache
)

add_algorithm_message_test("AlgorithmMessages")
add_algorithm_test("AlgorithmCore")
add_algorithm_test("AlgorithmLocalSyncAction")
add_algorithm_test(
Expand Down
Loading

0 comments on commit 082cde4

Please sign in to comment.