Skip to content

Commit

Permalink
[#204] Implemented Blocking Apply functions in Multimap
Browse files Browse the repository at this point in the history
  • Loading branch information
VitoCastellana committed Mar 25, 2023
1 parent 664f0c5 commit 41e822e
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 0 deletions.
131 changes: 131 additions & 0 deletions include/shad/data_structures/local_multimap.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,38 @@ class LocalMultimap {
template <typename ApplyFunT, typename... Args>
void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function,
Args &...args);

/// @brief Apply a user-defined function to every element of an entry's value
/// array. Thread safe wrt other operations.
/// @tparam ApplyFunT User-defined function type. The function prototype
/// should be:
/// @code
/// void(const KTYPE&, std::vector<VTYPE> &, Args&);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param[in] key The key.
/// @param function The function to apply.
/// @param args The function arguments.
template <typename ApplyFunT, typename... Args>
void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &...args);

/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// Thread safe wrt other operations.
/// @tparam ApplyFunT User-defined function type. The function prototype
/// should be:
/// @code
/// void(rt::Handle &handle, std::vector<VTYPE> &, VTYPE&, Args&);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param[in,out] handle Reference to the handle.
/// @param[in] key The key.
/// @param function The function to apply.
/// @param args The function arguments.
template <typename ApplyFunT, typename... Args>
void AsyncBlockingApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function,
Args &...args);

/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// @tparam ApplyFunT User-defined function type. The function prototype
Expand Down Expand Up @@ -693,6 +725,14 @@ class LocalMultimap {
return;
}

template <typename ApplyFunT, typename... Args, std::size_t... is>
static void CallBlockingApplyFun(LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *mapPtr,
const KTYPE &key, ApplyFunT function,
std::tuple<Args...> &args,
std::index_sequence<is...>) {
mapPtr->BlockingApply(key, function, std::get<is>(args)...);
}

template <typename Tuple, typename... Args>
static void AsyncApplyFunWrapper(rt::Handle &handle, const Tuple &args) {
constexpr auto Size = std::tuple_size<
Expand All @@ -704,6 +744,16 @@ class LocalMultimap {
std::make_index_sequence<Size>{});
}


template <typename ApplyFunT, typename... Args, std::size_t... is>
static void CallAsyncBlockingApplyFun(rt::Handle &h,
LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *mapPtr,
const KTYPE &key, ApplyFunT function,
std::tuple<Args...> &args,
std::index_sequence<is...>) {
mapPtr->AsyncBlockingApply(h, key, function, std::get<is>(args)...);
}

template <typename Tuple, typename... Args>
static void AsyncApplyWRBFunWrapper(rt::Handle &handle, const Tuple &args,
uint8_t* result, uint32_t* resultSize) {
Expand Down Expand Up @@ -1213,6 +1263,87 @@ void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncInsert(
rt::asyncExecuteAt(handle, rt::thisLocality(), insertLambda, args);
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::BlockingApply(const KTYPE &key,
ApplyFunT &&function,
Args &...args) {
size_t bucketIdx = shad::hash<KTYPE>{}(key) % numBuckets_;
Bucket *bucket = &(buckets_array_[bucketIdx]);
allow_inserter(bucketIdx);
// loop over linked buckets
while (bucket != nullptr) {
// loop over entries in this bucket
for (size_t i = 0; i < bucket->BucketSize(); ++i) {
Entry *entry = &bucket->getEntry(i);

// Stop at the first empty or pending insert entry.
if ((entry->state == EMPTY) or (entry->state == PENDING_INSERT)) {
break;
}

// if key matches this entry's key, apply function; else continue inner for
// loop
if (KeyComp_(&entry->key, &key) == 0) {
// tagging as pending insert
while (!__sync_bool_compare_and_swap(&entry->state, USED,
PENDING_INSERT)) {
rt::impl::yield();
}
function(key, entry->value, args...);
entry->state = USED;
release_inserter(bucketIdx);
return;
}
} // Inner for loop

// move to next bucket
bucket = bucket->next.get();
} // Outer for loop
release_inserter(bucketIdx);
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncBlockingApply(rt::Handle &h,
const KTYPE &key,
ApplyFunT &&function,
Args &...args) {
size_t bucketIdx = shad::hash<KTYPE>{}(key) % numBuckets_;
Bucket *bucket = &(buckets_array_[bucketIdx]);
allow_inserter(bucketIdx);
// loop over linked buckets
while (bucket != nullptr) {
// loop over entries in this bucket
for (size_t i = 0; i < bucket->BucketSize(); ++i) {
Entry *entry = &bucket->getEntry(i);

// Stop at the first empty or pending insert entry.
if ((entry->state == EMPTY) or (entry->state == PENDING_INSERT)) {
break;
}

// if key matches this entry's key, apply function; else continue inner for
// loop
if (KeyComp_(&entry->key, &key) == 0) {
// tagging as pending insert
while (!__sync_bool_compare_and_swap(&entry->state, USED,
PENDING_INSERT)) {
rt::impl::yield();
}
function(h, key, entry->value, args...);
entry->state = USED;
release_inserter(bucketIdx);
return;
}
} // Inner for loop

// move to next bucket
bucket = bucket->next.get();
} // Outer for loop
release_inserter(bucketIdx);
}

template <typename LMap, typename T>
class lmultimap_iterator : public std::iterator<std::forward_iterator_tag, T> {
template <typename, typename, typename>
Expand Down
88 changes: 88 additions & 0 deletions include/shad/data_structures/multimap.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,37 @@ class Multimap : public AbstractDataStructure< Multimap<KTYPE, VTYPE, KEY_COMPAR
template <typename ApplyFunT, typename... Args>
void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args);

/// @brief Apply a user-defined function to a key-value pair.
/// Thread safe wrt other operations.
/// @tparam ApplyFunT User-defined function type. The function prototype should be:
/// @code
/// void(const KTYPE&, std::vector<VTYPE> &, Args&);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param key The key.
/// @param function The function to apply.
/// @param args The function arguments.
template <typename ApplyFunT, typename... Args>
void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &... args);


/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// Thread safe wrt other operations.
/// @tparam ApplyFunT User-defined function type. The function prototype should be:
/// @code
/// void(rt::Handle &h, const KTYPE&, std::vector<VTYPE>&, Args&);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param[in,out] handle Reference to the handle.
/// @param key The key.
/// @param function The function to apply.
/// @param args The function arguments.
template <typename ApplyFunT, typename... Args>
void AsyncBlockingApply(rt::Handle &handle, const KTYPE &key,
ApplyFunT &&function, Args &... args);

/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// @tparam ApplyFunT User-defined function type. The function prototype
/// should be:
Expand Down Expand Up @@ -797,6 +828,63 @@ void Multimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncApply(
}
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void Multimap<KTYPE, VTYPE, KEY_COMPARE>::BlockingApply(const KTYPE &key,
ApplyFunT &&function, Args &... args) {
size_t targetId = shad::hash<KTYPE>{}(key) % rt::numLocalities();
rt::Locality targetLocality(targetId);

if (targetLocality == rt::thisLocality()) {
localMultimap_.BlockingApply(key, function, args...);

} else {
using FunctionTy = void (*)(const KTYPE &, std::vector<VTYPE> &, Args &...);
FunctionTy fn = std::forward<decltype(function)>(function);

using ArgsTuple = std::tuple<ObjectID, const KTYPE, FunctionTy, std::tuple<Args...>>;
ArgsTuple arguments(oid_, key, fn, std::tuple<Args...>(args...));

auto feLambda = [](const ArgsTuple &args) {
constexpr auto Size = std::tuple_size<typename std::decay<decltype(std::get<3>(args))>::type>::value;
ArgsTuple &tuple = const_cast<ArgsTuple &>(args);
LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_);
//map_ptr->BlockingApply(key, function, std::get<is>(args)...);
LMapT::CallBlockingApplyFun(mapPtr, std::get<1>(tuple), std::get<2>(tuple),
std::get<3>(tuple), std::make_index_sequence<Size>{});
};
rt::executeAt(targetLocality, feLambda, arguments);
}
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void Multimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncBlockingApply(
rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args) {
size_t targetId = shad::hash<KTYPE>{}(key) % rt::numLocalities();
rt::Locality targetLocality(targetId);

if (targetLocality == rt::thisLocality()) {
localMultimap_.AsyncBlockingApply(handle, key, function, args...);

} else {
using FunctionTy = void (*)(rt::Handle &, const KTYPE &, std::vector<VTYPE> &, Args &...);
FunctionTy fn = std::forward<decltype(function)>(function);
using ArgsTuple = std::tuple<ObjectID, const KTYPE, FunctionTy, std::tuple<Args...>>;

ArgsTuple arguments(oid_, key, fn, std::tuple<Args...>(args...));
auto feLambda = [](rt::Handle &handle, const ArgsTuple &args) {
constexpr auto Size = std::tuple_size<typename std::decay<decltype(std::get<3>(args))>::type>::value;
ArgsTuple &tuple(const_cast<ArgsTuple &>(args));
LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_);
LMapT::CallAsyncBlockingApplyFun(handle, mapPtr, std::get<1>(tuple),
std::get<2>(tuple), std::get<3>(tuple),
std::make_index_sequence<Size>{});
};
rt::asyncExecuteAt(handle, targetLocality, feLambda, arguments);
}
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void Multimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncApplyWithRetBuff(
Expand Down

0 comments on commit 41e822e

Please sign in to comment.