Skip to content

Commit

Permalink
[#204] Implemented the TryBlockingApply function in Multimap
Browse files Browse the repository at this point in the history
  • Loading branch information
VitoCastellana committed May 17, 2023
1 parent 8001425 commit 4df7446
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
72 changes: 72 additions & 0 deletions include/shad/data_structures/local_multimap.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,26 @@ class LocalMultimap {
template <typename ApplyFunT, typename... Args>
void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &...args);

enum ApplyResult { FAILED, SUCCESS, NOT_FOUND};

/// @brief Tries to apply a user-defined function to every element of an entry's value
/// array. Thread safe wrt other operations.
/// @tparam ApplyFunT User-defined function type. The function prototype
/// should be:
/// @code
/// void(const KTYPE&, std::vector<VTYPE> &, Args&);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param[in] key The key.
/// @param function The function to apply.
/// @param args The function arguments.
/// @return SUCCESS if the function has been successfully applied,
/// NOT_FOUND if the key is not found, FAILED otherwise.
template <typename ApplyFunT, typename... Args>
LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::ApplyResult
TryBlockingApply(const KTYPE &key, ApplyFunT &&function, Args &...args);

/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// Thread safe wrt other operations.
/// @tparam ApplyFunT User-defined function type. The function prototype
Expand Down Expand Up @@ -737,6 +757,15 @@ class LocalMultimap {
mapPtr->BlockingApply(key, function, std::get<is>(args)...);
}

template <typename ApplyFunT, typename... Args, std::size_t... is>
static LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::ApplyResult
CallTryBlockingApplyFun(LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *mapPtr,
const KTYPE &key, ApplyFunT function,
std::tuple<Args...> &args,
std::index_sequence<is...>) {
return mapPtr->TryBlockingApply(key, function, std::get<is>(args)...);
}

template <typename Tuple, typename... Args>
static void AsyncApplyFunWrapper(rt::Handle &handle, const Tuple &args) {
constexpr auto Size = std::tuple_size<
Expand Down Expand Up @@ -1304,6 +1333,49 @@ void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::BlockingApply(const KTYPE &key,
release_inserter(bucketIdx);
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
typename LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::ApplyResult
LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::TryBlockingApply(
const KTYPE &key,
ApplyFunT &&function,
Args &...args) {
size_t bucketIdx = shad::hash<KTYPE>{}(key) % numBuckets_;
Bucket *bucket = &(buckets_array_[bucketIdx]);
allow_inserter(bucketIdx);
// loop over linked buckets
while (bucket != nullptr) {
// loop over entries in this bucket
for (size_t i = 0; i < bucket->BucketSize(); ++i) {
Entry *entry = &bucket->getEntry(i);

// Stop at the first empty or pending insert entry.
if ((entry->state == EMPTY) or (entry->state == PENDING_INSERT)) {
break;
}

// if key matches this entry's key, apply function; else continue inner for
// loop
if (KeyComp_(&entry->key, &key) == 0) {
// tagging as pending insert
if (!__sync_bool_compare_and_swap(&entry->state, USED,
PENDING_UPDATE)) {
return ApplyResult::FAILED;
}
function(key, entry->value, args...);
entry->state = USED;
release_inserter(bucketIdx);
return ApplyResult::SUCCESS;
}
} // Inner for loop

// move to next bucket
bucket = bucket->next.get();
} // Outer for loop
release_inserter(bucketIdx);
return ApplyResult::NOT_FOUND;
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncBlockingApply(rt::Handle &h,
Expand Down
53 changes: 53 additions & 0 deletions include/shad/data_structures/multimap.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ class Multimap : public AbstractDataStructure< Multimap<KTYPE, VTYPE, KEY_COMPAR
/// @return the size of the multimap.
size_t Size() const;

/// @brief Target Locality of a specific key.
/// @param[in] key the key.
/// @return the Target Locality.
rt::Locality TargetLocality(const KTYPE &key) {
size_t targetId = shad::hash<KTYPE>{}(key) % rt::numLocalities();
rt::Locality tgtLocality(targetId);
return tgtLocality;
}

/// @brief Number of keys in the multimap.
/// @warning Calling this method may result in one-to-all
/// communication among localities to retrieve consistent information.
Expand Down Expand Up @@ -258,6 +267,21 @@ class Multimap : public AbstractDataStructure< Multimap<KTYPE, VTYPE, KEY_COMPAR
template <typename ApplyFunT, typename... Args>
void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &... args);

/// @brief Tries to apply a user-defined function to a key-value pair.
/// Thread safe wrt other operations.
/// @tparam ApplyFunT User-defined function type. The function prototype should be:
/// @code
/// void(const KTYPE&, std::vector<VTYPE> &, Args&);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param key The key.
/// @param function The function to apply.
/// @param args The function arguments.
/// @return SUCCESS if the function has been successfully applied,
/// NOT_FOUND if the key is not found, FAILED otherwise.
template <typename ApplyFunT, typename... Args>
typename LMapT::ApplyResult TryBlockingApply(const KTYPE &key, ApplyFunT &&function, Args &... args);

/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// Thread safe wrt other operations.
Expand Down Expand Up @@ -846,6 +870,35 @@ void Multimap<KTYPE, VTYPE, KEY_COMPARE>::BlockingApply(const KTYPE &key,
}
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
typename Multimap<KTYPE, VTYPE, KEY_COMPARE>::LMapT::ApplyResult
Multimap<KTYPE, VTYPE, KEY_COMPARE>::TryBlockingApply(const KTYPE &key,
ApplyFunT &&function, Args &... args) {
size_t targetId = shad::hash<KTYPE>{}(key) % rt::numLocalities();
rt::Locality targetLocality(targetId);
if (targetLocality == rt::thisLocality()) {
return localMultimap_.TryBlockingApply(key, function, args...);

} else {
using FunctionTy = void (*)(const KTYPE &, std::vector<VTYPE> &, Args &...);
FunctionTy fn = std::forward<decltype(function)>(function);
using ArgsTuple = std::tuple<ObjectID, const KTYPE, FunctionTy, std::tuple<Args...>>;
ArgsTuple arguments(oid_, key, fn, std::tuple<Args...>(args...));

auto feLambda = [](const ArgsTuple &args, bool* res) {
constexpr auto Size = std::tuple_size<typename std::decay<decltype(std::get<3>(args))>::type>::value;
ArgsTuple &tuple = const_cast<ArgsTuple &>(args);
LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_);
*res = LMapT::CallTryBlockingApplyFun(mapPtr, std::get<1>(tuple), std::get<2>(tuple),
std::get<3>(tuple), std::make_index_sequence<Size>{});
};
typename LMapT::ApplyResult res;
rt::executeAtWithRet(targetLocality, feLambda, arguments, &res);
return res;
}
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void Multimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncBlockingApply(
Expand Down

0 comments on commit 4df7446

Please sign in to comment.