From 907e535f2b15b3de48b508d49a834793ae9990f2 Mon Sep 17 00:00:00 2001 From: Vito G Castellana Date: Sun, 5 Dec 2021 19:34:28 -0800 Subject: [PATCH] [#204] First implementation of local and distributed multimap --- include/shad/data_structures/local_multimap.h | 1424 +++++++++++++++++ include/shad/data_structures/multimap.h | 914 +++++++++++ 2 files changed, 2338 insertions(+) create mode 100644 include/shad/data_structures/local_multimap.h create mode 100644 include/shad/data_structures/multimap.h diff --git a/include/shad/data_structures/local_multimap.h b/include/shad/data_structures/local_multimap.h new file mode 100644 index 00000000..f25ff45e --- /dev/null +++ b/include/shad/data_structures/local_multimap.h @@ -0,0 +1,1424 @@ +//===------------------------------------------------------------*- C++ -*-===// +// +// SHAD +// +// The Scalable High-performance Algorithms and Data Structure Library +// +//===----------------------------------------------------------------------===// +// +// Copyright 2021 Battelle Memorial Institute +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy +// of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. +// +//===----------------------------------------------------------------------===// + +#ifndef INCLUDE_SHAD_DATA_STRUCTURES_LOCAL_MULTIMAP_H_ +#define INCLUDE_SHAD_DATA_STRUCTURES_LOCAL_MULTIMAP_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include "shad/data_structures/compare_and_hash_utils.h" +#include "shad/runtime/runtime.h" + +namespace shad { + +template +class lmultimap_iterator; + +template +class lmultimap_key_iterator; + +/// @brief The LocalMultimap data structure. +/// +/// SHAD's LocalMultimap is a "local", thread-safe, associative container. +/// LocalMultimaps can be used ONLY on the Locality on which they are created. +/// @tparam KTYPE type of the multimap keys. +/// @tparam VTYPE type of the multimap values. +/// @tparam KEY_COMPARE key comparison function; default is MemCmp. +template > +class LocalMultimap { + template + + friend class Multimap; + friend class lmultimap_iterator, + const std::pair>; + friend class lmultimap_key_iterator, const std::pair>>; + + template + friend class multimap_iterator; + + template + friend class multimap_key_iterator; + + public: + using inner_type = VTYPE; + using iterator = lmultimap_iterator, + const std::pair>; + using const_iterator = lmultimap_iterator, + const std::pair>; + using key_iterator = lmultimap_key_iterator, + const std::pair>>; + using const_key_iterator = lmultimap_key_iterator, + const std::pair>>; + + /// @brief Constructor. + /// @param numInitBuckets initial number of Buckets. + explicit LocalMultimap(const size_t numInitBuckets) + : numBuckets_(numInitBuckets), + buckets_array_(numInitBuckets), + deleter_array_(numInitBuckets), + inserter_array_(numInitBuckets), + size_(0) + { + for (uint64_t i = 0; i < numInitBuckets; i ++) { deleter_array_[i] = 0; inserter_array_[i] = 0; } + } + + /// @brief Size of the multimap (number of entries). + /// @return the size of the multimap. + size_t Size() const { return size_.load(); } + + /// @brief Insert a key-value pair in the multimap. + /// @param[in] key the key. + /// @param[in] value the value to copy into the multimap. + /// @return an iterator to the inserted value + std::pair Insert(const KTYPE &key, const VTYPE &value); + + template + std::pair Insert(const KTYPE &key, const ELTYPE &value); + + /// @brief Asynchronously Insert a key-value pair in the multimap. + /// @warning Asynchronous operations are guaranteed to have completed + /// only after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param[in] key the key. + /// @param[in] value the value to copy into the multimap. + /// @return a pointer to the inserted value + void AsyncInsert(rt::Handle &handle, const KTYPE &key, const VTYPE &value); + + template + void AsyncInsert(rt::Handle &handle, const KTYPE &key, const ELTYPE &value); + + /// @brief Remove a key and value from the multimap. + /// @param[in] key the key. + void Erase(const KTYPE &key); + + /// @brief Asynchronously remove a key and value from the multimap. + /// @warning Asynchronous operations are guaranteed to have completed. + /// only after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param[in] key the key. + void AsyncErase(rt::Handle &handle, const KTYPE &key); + + /// @brief Clear the content of the multimap. + void Clear() { + size_ = 0; + buckets_array_.clear(); + buckets_array_ = std::vector(numBuckets_); + for (uint64_t i = 0; i < numBuckets_; i ++) { deleter_array_[i] = 0; inserter_array_[i] = 0; } + } + + /// @brief Result for the Lookup and AsyncLookup methods. + struct LookupResult { + bool found; + size_t size; /// Size of the value vector. + std::vector value; /// A copy of the value vector. + }; + + /// @brief Result for the Remote Lookup method. + struct LookupRemoteResult { + bool found; + size_t size; /// Size of the value vector. + rt::Locality localLoc; /// Locality of the local site. + VTYPE * local_elems; /// Address of the value vector elements at the local site + VTYPE * remote_elems; /// Address of a copy of the value vector elements at the remote site + }; + + /// @brief Asynchronously get the values associated to a key. + /// @warning Asynchronous operations are guaranteed to have completed. only + /// after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param[in] key The key. + /// @param[out] res The result of the lookup operation. + void AsyncLookup(rt::Handle & handle, const KTYPE & key, LookupResult * result); + + /// @brief Get the values associated to a key. + /// @param[in] key The key. + /// @param[out] res The result of the lookup operation. + bool Lookup(const KTYPE & key, LookupResult * result); + + /// @brief Make a local copy of the values associated to a key and + /// return the size and address of the local copy. + /// + /// @param[in] key The key. + /// @param[out] local_result The result of the lookup operation. + void LookupFromRemote(KTYPE & key, LookupRemoteResult * remote_result); + + /// @brief Apply a user-defined function to every element of an entry's value array. + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in] key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void Apply(const KTYPE &key, ApplyFunT &&function, Args &... args) { + LookupResult result; + Lookup(key, & result); + for (auto x : result.value) function(key, x, args...); + } + + /// @brief Asynchronously apply a user-defined function to a key-value pair. + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(rt::Handle &handle, const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param[in] key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args); + + /// @brief Apply a user-defined function to each key-value pair. + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param function The function to apply. + /// @param args The function arguments. + template + void ForEachEntry(ApplyFunT &&function, Args &... args); + + /// @brief Asynchronously apply a user-defined function to each key-value pair. + /// + /// @tparam ApplyFunT User-defined function type. The function prototype /// should be: + /// @code + /// void(shad::rt::Handle&, const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @warning Asynchronous operations are guaranteed to have completed only + /// after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncForEachEntry(rt::Handle &handle, ApplyFunT &&function, Args &... args); + + /// @brief Apply a user-defined function to each key. + /// @tparam ApplyFunT User-defined function type. + /// The function prototype should be: + /// @code + /// void(const KTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// @param function The function to apply. + /// @param args The function arguments. + template + void ForEachKey(ApplyFunT &&function, Args &... args); + + /// @brief Asynchronously apply a user-defined function to each key. + /// @tparam ApplyFunT User-defined function type. + /// The function prototype should be: + /// @code + /// void(shad::rt::Handle&, const KTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// @warning Asynchronous operations are guaranteed to have completed. + /// only after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncForEachKey(rt::Handle &handle, ApplyFunT &&function, Args &... args); + + /// @brief Print all the entries in the multimap. + /// @warning std::ostream & operator<< must be defined for both KTYPE and VTYPE + void PrintAllEntries(); + + /// @brief Print all the keys in the multimap. + /// @warning std::ostream & operator<< must be defined for both KTYPE and VTYPE + void PrintAllKeys(); + + iterator begin() { + Entry *firstEntry = &buckets_array_[0].getEntry(0); + + if (firstEntry->state == USED) { + iterator cbeg(this, 0, 0, &buckets_array_[0], firstEntry, firstEntry->value.begin()); + return cbeg; + } + + iterator cbeg(this, 0, 0, &buckets_array_[0], firstEntry, std::vector::iterator()); + return ++cbeg; + } + + iterator end() { return iterator::lmultimap_end(numBuckets_); } + + key_iterator key_begin() { + Entry *firstEntry = &buckets_array_[0].getEntry(0); + + if (firstEntry->state == USED) { + key_iterator cbeg(this, 0, 0, &buckets_array_[0], firstEntry); + return cbeg; + } + + key_iterator cbeg(this, 0, 0, &buckets_array_[0], firstEntry); + return ++cbeg; + } + + key_iterator key_end() { return key_iterator::lmultimap_key_end(numBuckets_); } + + const_iterator cbegin() { return begin(); } + const_iterator cend() { return const_iterator::lmultimap_end(numBuckets_); } + + void allow_inserter(size_t i) { + while (true) { + uint32_t prev_inserters = inserter_array_[i]; + + if ((prev_inserters != has_deleter) && (deleter_array_[i] == 0)) { + uint32_t new_inserters = prev_inserters + 1; + if (inserter_array_[i].compare_exchange_weak(prev_inserters, new_inserters)) { + return; + } else { + rt::impl::yield(); + } + } else { + rt::impl::yield(); + } + } + } + + void release_inserter(size_t i) { inserter_array_[i] --; } + + void allow_deleter(size_t i) { + deleter_array_[i] ++; + + while (true) { + uint32_t prev_inserters = inserter_array_[i]; + + if (prev_inserters == 0) { + if (inserter_array_[i].compare_exchange_weak(prev_inserters, has_deleter)) { + deleter_array_[i] --; + return; + } else { + rt::impl::yield(); + } + } else { + rt::impl::yield(); + } + } + } + + void release_deleter(size_t i) { inserter_array_[i] = 0; } + + private: + static const uint8_t kHashSeed = 0; + static const size_t kAllocPending = 0x1; + static const size_t kNumEntriesPerBucket = constants::kDefaultNumEntriesPerBucket; + static const uint32_t kKeyWords = sizeof(KTYPE) > sizeof(uint64_t) + ? sizeof(KTYPE) / sizeof(uint64_t) + : 1; + + typedef KEY_COMPARE KeyCompare; + enum State { EMPTY, USED, PENDING_INSERT }; + + struct Entry { + KTYPE key; + std::vector value; + volatile State state; + Entry() : state(EMPTY) {} + }; + + struct Bucket { + std::shared_ptr next; + bool isNextAllocated; + + explicit Bucket(size_t bsize = kNumEntriesPerBucket) + : next(nullptr), + isNextAllocated(false), + entries(nullptr), + bucketSize_(bsize) {} + + Entry &getEntry(size_t i) { + if (!entries) { + std::lock_guard _(_entriesLock); + + if (!entries) + entries = std::move(std::shared_ptr(new Entry[bucketSize_], std::default_delete())); + } + return entries.get()[i]; + } + + size_t BucketSize() const { return bucketSize_; } + + private: + size_t bucketSize_; + rt::Lock _entriesLock; + std::shared_ptr entries; + }; + + uint32_t has_deleter = 0xffffffff; + + KeyCompare KeyComp_; + size_t numBuckets_; + std::atomic size_; + std::vector buckets_array_; + std::vector> deleter_array_; + std::vector> inserter_array_; + + template + static void CallForEachEntryFun( + const size_t i, LocalMultimap *mapPtr, + ApplyFunT function, std::tuple &args, + std::index_sequence) { + + Bucket *bucket = &mapPtr->buckets_array_[i]; + + while (bucket != nullptr) { + Bucket *next_bucket = bucket->next.get(); + + for (uint64_t j = 0; j < bucket->BucketSize(); ++j) { + Entry *entry = &bucket->getEntry(j); + + if (entry->state == USED) { + function(entry->key, entry->value, std::get(args)...); + } else if (entry->state != EMPTY) { + printf( "Entry in PENDING state while iterating over entries\n"); + } + } + + bucket = next_bucket; + } + } + + template + static void ForEachEntryFunWrapper(const Tuple &args, size_t i) { + + constexpr auto Size = std::tuple_size(args))>::type>::value; + Tuple &tuple = const_cast(args); + + CallForEachEntryFun(i, std::get<0>(tuple), + std::get<1>(tuple), std::get<2>(tuple), std::make_index_sequence{}); + } + + template + static void AsyncCallForEachEntryFun( + rt::Handle &handle, const size_t i, + LocalMultimap *mapPtr, + ApplyFunT function, std::tuple &args, + std::index_sequence) { + + Bucket *bucket = &mapPtr->buckets_array_[i]; + + while (bucket != nullptr) { + Bucket *next_bucket = bucket->next.get(); + + for (uint64_t j = 0; j < bucket->BucketSize(); ++j) { + Entry *entry = &bucket->getEntry(j); + + if (entry->state == USED) { + function(handle, entry->key, entry->value, std::get(args)...); + } else if (entry->state != EMPTY) { + printf("Entry in PENDING state while iterating over entries\n"); + } + } + + bucket = next_bucket; + } + } + + template + static void AsyncForEachEntryFunWrapper(rt::Handle &handle, const Tuple &args, size_t i) { + constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; + Tuple &tuple = const_cast(args); + + AsyncCallForEachEntryFun(handle, i, std::get<0>(tuple), + std::get<1>(tuple), std::get<2>(tuple), std::make_index_sequence{}); + } + + template + static void CallForEachKeyFun( + const size_t i, LocalMultimap *mapPtr, + ApplyFunT function, std::tuple &args, + std::index_sequence) { + + size_t cnt = 0; + Bucket *buckets_array = mapPtr->buckets_array_.data(); + Bucket *bucket = &buckets_array[i]; + + while (bucket != nullptr) { + Bucket *next_bucket = bucket->next.get(); + for (uint64_t j = 0; j < bucket->BucketSize(); ++j) { + Entry *entry = &bucket->getEntry(j); + + if (entry->state == USED) { + function(entry->key, std::get(args)...); + } else if (entry->state != EMPTY) { + printf("Entry in PENDING state while iterating over entries\n"); + } + } + bucket = next_bucket; + } + } + + template + static void ForEachKeyFunWrapper(const Tuple &args, size_t i) { + constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; + Tuple &tuple = const_cast(args); + + CallForEachKeyFun(i, std::get<0>(tuple), + std::get<1>(tuple), std::get<2>(tuple), std::make_index_sequence{}); + } + + template + static void AsyncCallForEachKeyFun( + rt::Handle &handle, const size_t i, + LocalMultimap *mapPtr, + ApplyFunT function, std::tuple &args, std::index_sequence) { + + Bucket *buckets_array = mapPtr->buckets_array_.data(); + Bucket *bucket = &buckets_array[i]; + + while (bucket != nullptr) { + Bucket *next_bucket = bucket->next.get(); + + for (uint64_tj = 0; j < bucket->BucketSize(); ++j) { + Entry *entry = &bucket->getEntry(j); + if (entry->state == USED) { + function(handle, entry->key, std::get(args)...); + } else if (entry->state != EMPTY) { + printf("Entry in PENDING state while iterating over entries\n"); + } + } + bucket = next_bucket; + } + } + + template + static void AsyncForEachKeyFunWrapper(rt::Handle &handle, const Tuple &args, size_t i) { + constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; + Tuple &tuple = const_cast(args); + + AsyncCallForEachKeyFun(handle, i, std::get<0>(tuple), + std::get<1>(tuple), std::get<2>(tuple), std::make_index_sequence{}); + } + + template + static void AsyncCallApplyFun( + rt::Handle &handle, + LocalMultimap *mapPtr, + const KTYPE &key, ApplyFunT function, std::tuple &args, std::index_sequence) { + + size_t bucketIdx = shad::hash{}(key) % mapPtr->numBuckets_; + Bucket *bucket = &(mapPtr->buckets_array_[bucketIdx]); + + while (bucket != nullptr) { + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Stop at the first empty entry. + if (entry->state == EMPTY) break; + + // Yield on pending entries. + while (entry->state == PENDING_INSERT) rt::impl::yield(); + + // Entry is USED. + if (mapPtr->KeyComp_(&entry->key, &key) == 0) { + while (entry->state == PENDING_INSERT) rt::impl::yield(); + function(handle, key, entry->value, std::get(args)...); + return; + } + } + + bucket = bucket->next.get(); + } + return; + } + + template + static void CallApplyFun( + LocalMultimap *mapPtr, + const KTYPE &key, ApplyFunT function, std::tuple &args, std::index_sequence) { + + size_t bucketIdx = shad::hash{}(key) % mapPtr->numBuckets_; + Bucket *bucket = &(mapPtr->buckets_array_[bucketIdx]); + + while (bucket != nullptr) { + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Stop at the first empty entry. + if (entry->state == EMPTY) break; + + // Yield on pending entries. + while (entry->state == PENDING_INSERT) rt::impl::yield(); + + // Entry is USED. + if (mapPtr->KeyComp_(&entry->key, &key) == 0) { + while (entry->state == PENDING_INSERT) rt::impl::yield(); + function(key, entry->value, std::get(args)...); + return; + } + } + + bucket = bucket->next.get(); + } + return; + } + + template + static void AsyncApplyFunWrapper(rt::Handle &handle, const Tuple &args) { + constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; + Tuple &tuple = const_cast(args); + + AsyncCallApplyFun(handle, std::get<0>(tuple), + std::get<1>(tuple), std::get<2>(tuple), std::get<3>(tuple), std::make_index_sequence{}); + } +}; + +template +void LocalMultimap::AsyncLookup( + rt::Handle & handle, const KTYPE & key, LookupResult * result) { + + using LMapPtr = LocalMultimap *; + auto args = std::tuple(this, key, result); + + auto lookupLambda = [](rt::Handle &, const std::tuple &t) { + (std::get<0>(t))->Lookup(std::get<1>(t), std::get<2>(t)); + }; + + rt::asyncExecuteAt(handle, rt::thisLocality(), lookupLambda, args); +} + +template +bool LocalMultimap::Lookup(const KTYPE & key, LookupResult * result) { + + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket * bucket = &(buckets_array_[bucketIdx]); + allow_inserter(bucketIdx); // concurrent inserts okay; concurrent delete not okay + + while (bucket != nullptr) { + + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Reached first unused entry, key not found, break + if (entry->state == EMPTY) break; + + // Wait for some other thread to finish with this entry + while (entry->state == PENDING_INSERT) rt::impl::yield(); + + // if key matches this entry's key, return entry's value + if (KeyComp_(&entry->key, &key) == 0) { + while (!__sync_bool_compare_and_swap(&entry->state, USED, PENDING_INSERT)) rt::impl::yield(); + + result->found = true; + result->size = entry->value.size(); + result->value = entry->value; + + entry->state = USED; + release_inserter(bucketIdx); + return true; + } } + + bucket = bucket->next.get(); + } + + result->found = false; + result->size = 0; + + release_inserter(bucketIdx); + return false; // key not found +} + +template +void LocalMultimap::LookupFromRemote(KTYPE & key, LookupRemoteResult * result) { + + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket * bucket = &(buckets_array_[bucketIdx]); + allow_inserter(bucketIdx); // concurrent inserts okay; concurrent delete not okay + + while (bucket != nullptr) { + + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Reached first unused entry, key not found, return + if (entry->state == EMPTY) { + result->found = false; + result->size = 0; + release_inserter(bucketIdx); + } + + // Wait for some other thread to finish with this entry + while (entry->state == PENDING_INSERT) rt::impl::yield(); + + // if key matches this entry's key, return entry's value + if (KeyComp_(&entry->key, &key) == 0) { + while (!__sync_bool_compare_and_swap(&entry->state, USED, PENDING_INSERT)) rt::impl::yield(); + + + result->found = true; + result->size = entry->value.size(); + result->remote_elems = (VTYPE *) malloc(result->size * sizeof(VTYPE)); + memcpy(result->remote_elems, entry->value.data(), result->size * sizeof(VTYPE)); + + entry->state = USED; + release_inserter(bucketIdx); + return; + } } + + bucket = bucket->next.get(); + } + + // key not found + result->found = false; + result->size = 0; + + release_inserter(bucketIdx); +} + +template +void LocalMultimap::PrintAllEntries() { + + for (auto itr = begin(); itr != end(); ++ itr) { + auto key = (* itr).first; + std::cout << std::get<0>(key) << " " << std::get<1>(key); + std::cout << "\n"; + } +} + +template +void LocalMultimap::PrintAllKeys() { + + for (auto itr = key_begin(); itr != key_end(); ++ itr) { + auto key = (* itr).first; + auto value = (* itr).second; + std::cout << value.size() << " " << std::get<0>(key) << " " << std::get<1>(key); + std::cout << "\n"; + } +} + +template +void LocalMultimap::Erase(const KTYPE &key) { + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket *bucket = &(buckets_array_[bucketIdx]); + std::vector emptyValue; + allow_deleter(bucketIdx); + + for (;;) { // loop over linked buckets + for (size_t i = 0; i < bucket->BucketSize(); ++i) { // loop over entries in this bucket + Entry * entry = &bucket->getEntry(i); + +// Reached first unused entry, key not found, return + if (entry->state == EMPTY) { release_deleter(bucketIdx); return; } + +// If key does not match this entry's key, continue inner for loop + if (KeyComp_(&entry->key, &key) != 0) continue; + +// Key found + size_ -= entry->value.size(); + + // find last USED entry in this bucket and swap with this entry + // entry == entry being deleted + // lastEntry == last entry in this bucket + // nextEntry == first unused entry in this bucket + + Entry * lastEntry = entry; + size_t j = i + 1; + + for (;;) { // loop over linked buckets + for ( ; j < bucket->BucketSize(); ++j) { // loop over entries in this bucket + Entry * nextEntry = & bucket->getEntry(j); + if (nextEntry->state == USED) {lastEntry = nextEntry; continue;} + + // entry is last entry, so set it to empty + if (entry == lastEntry) { + entry->state = EMPTY; + std::swap(entry->value, emptyValue); + release_deleter(bucketIdx); + return; + + // set entry {key, value} to lastEntry {key, value} and set lastEntry to empty + } else { + entry->key = std::move(lastEntry->key); + entry->value = std::move(lastEntry->value); + + lastEntry->state = EMPTY; + std::swap(lastEntry->value, emptyValue); + release_deleter(bucketIdx); + return; + } + } // Second inner for loop + + if (bucket->next == nullptr) { // Exhausted linked buckets, so lastEntry is last USED entry + entry->key = std::move(lastEntry->key); + entry->value = std::move(lastEntry->value); + + lastEntry->state = EMPTY; + std::swap(lastEntry->value, emptyValue); + release_deleter(bucketIdx); + return; + + } else { + bucket = bucket->next.get(); + } + } // Second outer for loop + } // First inner for loop + + // Exhausted linked buckets without finding key, return + if (bucket->next == nullptr) { + release_deleter(bucketIdx); + return; + } else { + bucket = bucket->next.get(); + } + } // First outer for loop +} + +template +void LocalMultimap::AsyncErase(rt::Handle &handle, const KTYPE &key) { + using LMapPtr = LocalMultimap *; + auto args = std::tuple(this, key); + + auto eraseLambda = [](rt::Handle &, const std::tuple &t) { + (std::get<0>(t))->Erase(std::get<1>(t)); + }; + + rt::asyncExecuteAt(handle, rt::thisLocality(), eraseLambda, args); +} + +template +std::pair::iterator, bool> +LocalMultimap::Insert(const KTYPE &key, const VTYPE &value) { + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket * bucket = &(buckets_array_[bucketIdx]); + allow_inserter(bucketIdx); + + for (;;) { // loop over linked buckets + for (size_t i = 0; i < bucket->BucketSize(); ++i) { // loop over entries in this bucket + Entry *entry = & bucket->getEntry(i); + + // Reached end of used entries without finding key, so new key + if (__sync_bool_compare_and_swap(&entry->state, EMPTY, PENDING_INSERT)) { + entry->key = std::move(key); + entry->value.push_back(value); + + size_ += 1; + entry->state = USED; + release_inserter(bucketIdx); + return std::make_pair(iterator(this, bucketIdx, i, bucket, entry, entry->value.end() - 1), true); + } + + // Wait for some other thread to finish with this entry + while (entry->state == PENDING_INSERT) rt::impl::yield(); + + // if key matches this entry's key, insert value; else continue inner for loop + if (KeyComp_(&entry->key, &key) == 0) { + while (!__sync_bool_compare_and_swap(&entry->state, USED, PENDING_INSERT)) rt::impl::yield(); + entry->value.push_back(value); + + size_ += 1; + entry->state = USED; + release_inserter(bucketIdx); + return std::make_pair(iterator(this, bucketIdx, i, bucket, entry, entry->value.end() - 1), true); + } + } // Inner for loop + + // Exhausted entries in this buckets + // ... if no more buckets, link new bucket + if (bucket->next == nullptr) { + if (__sync_bool_compare_and_swap(&bucket->isNextAllocated, false, true)) { // Allocate new bucket + std::shared_ptr newBucket(new Bucket(constants::kDefaultNumEntriesPerBucket)); + bucket->next.swap(newBucket); + } else { // Wait for pending allocation to finish + while (bucket->next == nullptr) rt::impl::yield(); + } + } + + // move to next bucket + bucket = bucket->next.get(); + } // Outer for loop +} + +template +void LocalMultimap::AsyncInsert( + rt::Handle &handle, const KTYPE &key, const VTYPE &value) { + + using LMapPtr = LocalMultimap *; + auto args = std::tuple(this, key, value); + + auto insertLambda = [](rt::Handle &, const std::tuple &t) { + (std::get<0>(t))->Insert(std::get<1>(t), std::get<2>(t)); + }; + + rt::asyncExecuteAt(handle, rt::thisLocality(), insertLambda, args); +} + +template +template +void LocalMultimap::ForEachEntry(ApplyFunT &&function, Args &... args) { + + using FunctionTy = void (*)(const KTYPE &, VTYPE &, Args &...); + FunctionTy fn = std::forward(function); + using LMapPtr = LocalMultimap *; + using ArgsTuple = std::tuple>; + + ArgsTuple argsTuple(this, fn, std::tuple(args...)); + rt::forEachAt(rt::thisLocality(), ForEachEntryFunWrapper, argsTuple, numBuckets_); +} + +template +template +void LocalMultimap::AsyncForEachEntry( + rt::Handle &handle, ApplyFunT &&function, Args &... args) { + + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...); + FunctionTy fn = std::forward(function); + using LMapPtr = LocalMultimap *; + using ArgsTuple = std::tuple>; + ArgsTuple argsTuple(this, fn, std::tuple(args...)); + + rt::asyncForEachAt(handle, rt::thisLocality(), + AsyncForEachEntryFunWrapper, argsTuple, numBuckets_); +} + +template +template +void LocalMultimap::ForEachKey(ApplyFunT &&function, Args &... args) { + + using FunctionTy = void (*)(const KTYPE &, Args &...); + FunctionTy fn = std::forward(function); + using LMapPtr = LocalMultimap *; + using ArgsTuple = std::tuple>; + ArgsTuple argsTuple(this, fn, std::tuple(args...)); + + rt::forEachAt(rt::thisLocality(), ForEachKeyFunWrapper, argsTuple, numBuckets_); +} + +template +template +void LocalMultimap::AsyncForEachKey( + rt::Handle &handle, ApplyFunT &&function, Args &... args) { + + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, Args &...); + FunctionTy fn = std::forward(function); + using LMapPtr = LocalMultimap *; + using ArgsTuple = std::tuple>; + ArgsTuple argsTuple(this, fn, std::tuple(args...)); + + rt::asyncForEachAt(handle, rt::thisLocality(), + AsyncForEachKeyFunWrapper, argsTuple, numBuckets_); +} + +template +template +void LocalMultimap::AsyncApply( + rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args) { + + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...); + FunctionTy fn = std::forward(function); + using LMapPtr = LocalMultimap *; + using ArgsTuple = std::tuple>; + + ArgsTuple argsTuple(this, key, fn, std::tuple(args...)); + rt::asyncExecuteAt(handle, rt::thisLocality(), AsyncApplyFunWrapper, argsTuple); +} + +template +template +std::pair::iterator, bool> +LocalMultimap::Insert(const KTYPE &key, const ELTYPE &value) { + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket *bucket = &(buckets_array_[bucketIdx]); + allow_inserter(bucketIdx); + + for (;;) { // loop over linked buckets + for (size_t i = 0; i < bucket->BucketSize(); ++i) { // loop over entries in this bucket + Entry *entry = &bucket->getEntry(i); + + // Reached end of used entries without finding key, so new key + if (__sync_bool_compare_and_swap(&entry->state, EMPTY, PENDING_INSERT)) { + entry->key = std::move(key); + entry->value.push_back(value); + + size_ += 1; + entry->state = USED; + release_inserter(bucketIdx); + return std::make_pair(iterator(this, bucketIdx, i, bucket, entry), true); + } + + // Wait for some other thread to finish with this entry + while (entry->state == PENDING_INSERT) rt::impl::yield(); + + //if key matches entry's key, insert value; else continue inner for loop + if (KeyComp_(&entry->key, &key) == 0) { + while (!__sync_bool_compare_and_swap(&entry->state, USED, PENDING_INSERT)) rt::impl::yield(); + entry->value.push_back(value); + + size_ += 1; + entry->state = USED; + release_inserter(bucketIdx); + return std::make_pair(iterator(this, bucketIdx, i, bucket, entry), true); + } + } // Inner for loop + + // Exhausted linked buckets, so link new bucket + // Exhausted entries in this buckets + // ... if no more buckets, link new bucket + if (bucket->next == nullptr) { + if (__sync_bool_compare_and_swap(&bucket->isNextAllocated, false, true)) { // Allocate new bucket + std::shared_ptr newBucket( new Bucket(constants::kDefaultNumEntriesPerBucket)); + bucket->next.swap(newBucket); + } else { // Wait for pending allocation to finish + while (bucket->next == nullptr) rt::impl::yield(); + } + } + + // move to next bucket + bucket = bucket->next.get(); + } // Outer for loop +} + +template +template +void LocalMultimap::AsyncInsert( + rt::Handle &handle, const KTYPE &key, const ELTYPE &value) { + + using LMapPtr = LocalMultimap *; + auto args = std::tuple(this, key, value); + + auto insertLambda = [](rt::Handle &, const std::tuple &t) { + (std::get<0>(t))->Insert(std::get<1>(t), std::get<2>(t)); + }; + + rt::asyncExecuteAt(handle, rt::thisLocality(), insertLambda, args); +} + +template +class lmultimap_iterator : public std::iterator { + template + friend class multimap_iterator; + + public: + using inner_type = typename LMap::inner_type; + using Entry = typename LMap::Entry; + using State = typename LMap::State; + using Bucket = typename LMap::Bucket; + + lmultimap_iterator() {} + lmultimap_iterator(const LMap *mapPtr, size_t bId, + size_t pos, Bucket *cb, Entry *ePtr, typename std::vector::iterator valueItr) + : mapPtr_(mapPtr), + bucketId_(bId), + position_(pos), + currBucket_(cb), + entryPtr_(ePtr), + valueItr_(valueItr) {} + + static lmultimap_iterator lmultimap_begin(const LMap *mapPtr) { + Bucket *rootPtr = &(const_cast(mapPtr)->buckets_array_[0]); + Entry *firstEntry = &(rootPtr->getEntry(0)); + lmultimap_iterator beg(mapPtr, 0, 0, rootPtr, firstEntry, firstEntry->value.begin()); + if (firstEntry->state == LMap::USED) return beg; else return ++beg; + } + + static lmultimap_iterator lmultimap_end(const LMap *mapPtr) { + return lmultimap_end(mapPtr->numBuckets_); + } + + static lmultimap_iterator lmultimap_end(size_t numBuckets) { + return lmultimap_iterator(nullptr, numBuckets, 0, nullptr, nullptr, typename std::vector::iterator()); + } + + bool operator==(const lmultimap_iterator &other) const { + return valueItr_ == other.valueItr_; + } + + bool operator!=(const lmultimap_iterator &other) const { + return !(*this == other); + } + + T operator*() const { return T(entryPtr_->key, * valueItr_); } + +// Returns the key and value of the next value vector entry; +// Entry values are stored in a four-level data structure --- +// for each bucket list ... for each bucket in list ... +// for each entry in bucket ... for each value in entry's value array + lmultimap_iterator &operator++() { + auto null_iter = std::vector::iterator(); + + // if iterator points to an VTYPE, move to next VTYPE; else move to next bucket list + if (valueItr_ != null_iter) { + ++ valueItr_; + + // if there is another VTYPE in entry's value array, return it + if (valueItr_ != entryPtr_->value.end()) return * this; + + // ... else move to next entry in bucket + ++ position_; + + // if there is another entry in this bucket ... + //FIXME + if (position_ < constants::kDefaultNumEntriesPerBucket) { + entryPtr_ ++; + + // if this entry is used, return begin of its value array + // ... else no more entries in this bucket or buckets in this bucket list + if (entryPtr_->state == LMap::USED) {valueItr_ = entryPtr_->value.begin(); return * this;} + + // ... else move to next bucket in this bucket list + } else { + currBucket_ = currBucket_->next.get(); + position_ = 0; + + // if bucket is not empty and first entry is used, return begin of its value array + // ... else no more entries in this bucket or buckets in this bucket list + if (currBucket_ != nullptr) { + entryPtr_ = &currBucket_->getEntry(position_); + if (entryPtr_->state == LMap::USED) {valueItr_ = entryPtr_->value.begin(); return * this;} + } + } + } + + // move to next bucket list + ++ bucketId_; + position_ = 0; + + // search for a bucket list with a used entry + for ( ; bucketId_ < mapPtr_->numBuckets_; ++bucketId_) { + currBucket_ = & const_cast(mapPtr_)->buckets_array_[bucketId_]; + entryPtr_ = & currBucket_->getEntry(position_); + if (entryPtr_->state == LMap::USED) {valueItr_ = entryPtr_->value.begin(); return *this;} + } + + // next is not found, return end iterator + mapPtr_ = nullptr; + entryPtr_ = nullptr; + currBucket_ = nullptr; + valueItr_ = std::vector::iterator(); + return * this; + } + + lmultimap_iterator operator++(int) { + lmultimap_iterator tmp = *this; + operator++(); + return tmp; + } + + class partition_range { + public: + partition_range(const lmultimap_iterator &begin, const lmultimap_iterator &end) + : begin_(begin), end_(end) {} + lmultimap_iterator begin() { return begin_; } + lmultimap_iterator end() { return end_; } + + private: + lmultimap_iterator begin_; + lmultimap_iterator end_; + }; + + // split a range into at most n_parts non-empty sub-ranges + static std::vector partitions(lmultimap_iterator begin, lmultimap_iterator end, size_t n_parts) { + std::vector res; + auto n_buckets = n_spanned_buckets(begin, end); + + if (n_buckets && n_parts) { + auto part_step = (n_buckets >= n_parts) ? (n_buckets + n_parts - 1) / n_parts : 1; + auto map_ptr = begin.mapPtr_; + auto &buckets = map_ptr->buckets_array_; + auto b_end = (end != lmultimap_end(map_ptr)) ? end.bucketId_ : map_ptr->numBuckets_; + auto bi = begin.bucketId_; + auto pbegin = begin; + + while (true) { + bi = first_used_bucket(map_ptr, bi + part_step); + if (bi < b_end) { + auto pend = first_in_bucket(map_ptr, bi); + res.push_back(partition_range{pbegin, pend}); + pbegin = pend; + } else { + if (pbegin != end) res.push_back(partition_range{pbegin, end}); + break; + } + } + } + + return res; + } + + private: + const LMap *mapPtr_; + size_t bucketId_; + size_t position_; + Bucket *currBucket_; + Entry *entryPtr_; + typename std::vector::iterator valueItr_; + + // returns a pointer to the first entry of a bucket + static typename LMap::Entry &first_bucket_entry(const LMap *mapPtr_, size_t bi) { + return const_cast(mapPtr_)->buckets_array_[bi].getEntry(0); + } + + // returns an iterator pointing to the beginning of the first active bucket + // from the input bucket (included) + static lmultimap_iterator first_in_bucket(const LMap *mapPtr_, size_t bi) { + auto &entry = first_bucket_entry(mapPtr_, bi); + + return lmultimap_iterator(mapPtr_, bi, 0, + &const_cast(mapPtr_)->buckets_array_[bi], &entry, entry.value.begin()); + } + + // returns the index of the first active bucket, starting from the input + // bucket (included). If not such bucket, it returns the number of buckets. + static size_t first_used_bucket(const LMap *mapPtr_, size_t bi) { + + // scan for the first used entry with the same logic as operator++ + for (; bi < mapPtr_->numBuckets_; ++bi) + if (first_bucket_entry(mapPtr_, bi).state == LMap::USED) return bi; + + return mapPtr_->numBuckets_; + } + + // returns the number of buckets spanned by the input range + static size_t n_spanned_buckets(const lmultimap_iterator &begin, const lmultimap_iterator &end) { + + if (begin != end) { + auto map_ptr = begin.mapPtr_; + + // invariant check - end is either: + // - the end of the set; or + // - an iterator pointing to an used entry + + if (end != lmultimap_end(map_ptr)) { + // count one more if end is not on a bucket edge + return end.bucketId_ - begin.bucketId_ + + (end.entryPtr_ != &first_bucket_entry(end.mapPtr_, end.bucketId_)); + } + + return map_ptr->numBuckets_ - begin.bucketId_; + } + return 0; + } +}; + +template +class lmultimap_key_iterator : public std::iterator { + template + friend class multimap_key_iterator; + + public: + using inner_type = typename LMap::inner_type; + using Entry = typename LMap::Entry; + using State = typename LMap::State; + using Bucket = typename LMap::Bucket; + + lmultimap_key_iterator() {} + lmultimap_key_iterator(const LMap *mapPtr, size_t bId, size_t pos, Bucket *cb, Entry *ePtr) + : mapPtr_(mapPtr), + bucketId_(bId), + position_(pos), + currBucket_(cb), + entryPtr_(ePtr) {} + + static lmultimap_key_iterator lmultimap_key_begin(const LMap *mapPtr) { + Bucket *rootPtr = &(const_cast(mapPtr)->buckets_array_[0]); + Entry *firstEntry = &(rootPtr->getEntry(0)); + lmultimap_key_iterator beg(mapPtr, 0, 0, rootPtr, firstEntry); + if (firstEntry->state == LMap::USED) return beg; else return ++beg; + } + + static lmultimap_key_iterator lmultimap_key_end(const LMap *mapPtr) { + return lmultimap_key_end(mapPtr->numBuckets_); + } + + static lmultimap_key_iterator lmultimap_key_end(size_t numBuckets) { + return lmultimap_key_iterator(nullptr, numBuckets, 0, nullptr, nullptr); + } + + bool operator==(const lmultimap_key_iterator &other) const { + return entryPtr_ == other.entryPtr_; + } + + bool operator!=(const lmultimap_key_iterator &other) const { + return !(*this == other); + } + + T operator*() const { return T(entryPtr_->key, entryPtr_->value); } + +// Returns next entry (a pointer to {KEY, VALUE}; +// Entries are stored in three-level data structure --- +// for each bucket list ... for each bucket in list ... for each entry in bucket + lmultimap_key_iterator &operator++() { + // move to next entry in bucket + ++ position_; + + // if there is another entry in this bucket ... + if (position_ < constants::kDefaultNumEntriesPerBucket) { + entryPtr_ ++; + + // if this entry is used, return it + // ... else no more entries in this bucket or buckets in this bucket list + if (entryPtr_->state == LMap::USED) return * this; + + // ... else move to next bucket in this bucket list + } else { + currBucket_ = currBucket_->next.get(); + position_ = 0; + + // if bucket is not empty and first entry is used, return it + // ... else no more entries in this bucket or buckets in this bucket list + if (currBucket_ != nullptr) { + entryPtr_ = &currBucket_->getEntry(position_); + if (entryPtr_->state == LMap::USED) return * this; + } + } + + // move to next bucket list + ++ bucketId_; + position_ = 0; + + // search for a bucket list with a used entity + for ( ; bucketId_ < mapPtr_->numBuckets_; ++bucketId_) { + currBucket_ = & const_cast(mapPtr_)->buckets_array_[bucketId_]; + entryPtr_ = & currBucket_->getEntry(position_); + if (entryPtr_->state == LMap::USED) return * this; + } + + // next is not found, return end iterator + mapPtr_ = nullptr; + entryPtr_ = nullptr; + currBucket_ = nullptr; + return * this; + } + + lmultimap_key_iterator operator++(int) { + lmultimap_key_iterator tmp = *this; + operator++(); + return tmp; + } + + class partition_range { + public: + partition_range(const lmultimap_key_iterator &begin, const lmultimap_key_iterator &end) + : begin_(begin), end_(end) {} + lmultimap_key_iterator begin() { return begin_; } + lmultimap_key_iterator end() { return end_; } + + private: + lmultimap_key_iterator begin_; + lmultimap_key_iterator end_; + }; + + // split a range into at most n_parts non-empty sub-ranges + static std::vector partitions(lmultimap_key_iterator begin, + lmultimap_key_iterator end, size_t n_parts) { + std::vector res; + auto n_buckets = n_spanned_buckets(begin, end); + + if (n_buckets && n_parts) { + auto part_step = (n_buckets >= n_parts) ? (n_buckets + n_parts - 1) / n_parts : 1; + auto map_ptr = begin.mapPtr_; + auto &buckets = map_ptr->buckets_array_; + auto b_end = (end != lmultimap_key_end(map_ptr)) ? end.bucketId_ : map_ptr->numBuckets_; + auto bi = begin.bucketId_; + auto pbegin = begin; + + while (true) { + bi = first_used_bucket(map_ptr, bi + part_step); + if (bi < b_end) { + auto pend = first_in_bucket(map_ptr, bi); + res.push_back(partition_range{pbegin, pend}); + pbegin = pend; + } else { + if (pbegin != end) res.push_back(partition_range{pbegin, end}); + break; + } + } + } + + return res; + } + + private: + const LMap *mapPtr_; + size_t bucketId_; + size_t position_; + Bucket *currBucket_; + Entry *entryPtr_; + + // returns a pointer to the first entry of a bucket + static typename LMap::Entry &first_bucket_entry(const LMap *mapPtr_, size_t bi) { + return const_cast(mapPtr_)->buckets_array_[bi].getEntry(0); + } + + // returns an iterator pointing to the beginning of the first active bucket + // from the input bucket (included) + static lmultimap_key_iterator first_in_bucket(const LMap *mapPtr_, size_t bi) { + auto &entry = first_bucket_entry(mapPtr_, bi); + + return lmultimap_key_iterator(mapPtr_, bi, 0, &const_cast(mapPtr_)->buckets_array_[bi], &entry); + } + + // returns the index of the first active bucket, starting from the input + // bucket (included). If not such bucket, it returns the number of buckets. + static size_t first_used_bucket(const LMap *mapPtr_, size_t bi) { + + // scan for the first used entry with the same logic as operator++ + for (; bi < mapPtr_->numBuckets_; ++bi) + if (first_bucket_entry(mapPtr_, bi).state == LMap::USED) return bi; + + return mapPtr_->numBuckets_; + } + + // returns the number of buckets spanned by the input range + static size_t n_spanned_buckets(const lmultimap_key_iterator &begin, const lmultimap_key_iterator &end) { + + if (begin != end) { + auto map_ptr = begin.mapPtr_; + + // invariant check - end is either: + // - the end of the set; or + // - an iterator pointing to an used entry + + if (end != lmultimap_key_end(map_ptr)) { + // count one more if end is not on a bucket edge + return end.bucketId_ - begin.bucketId_ + + (end.entryPtr_ != &first_bucket_entry(end.mapPtr_, end.bucketId_)); + } + + return map_ptr->numBuckets_ - begin.bucketId_; + } + return 0; + } +}; + +} // namespace shad + +#endif // INCLUDE_SHAD_DATA_STRUCTURES_LOCAL_MULTIMAP_H_ diff --git a/include/shad/data_structures/multimap.h b/include/shad/data_structures/multimap.h new file mode 100644 index 00000000..2eea4e9e --- /dev/null +++ b/include/shad/data_structures/multimap.h @@ -0,0 +1,914 @@ +//===------------------------------------------------------------*- C++ -*-===// +// +// SHAD +// +// The Scalable High-performance Algorithms and Data Structure Library +// +//===----------------------------------------------------------------------===// +// +// Copyright 2021 Battelle Memorial Institute +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy +// of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. +// +//===----------------------------------------------------------------------===// + +#ifndef INCLUDE_SHAD_DATA_STRUCTURES_MULTIMAP_H_ +#define INCLUDE_SHAD_DATA_STRUCTURES_MULTIMAP_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "shad/data_structures/abstract_data_structure.h" +#include "shad/data_structures/buffer.h" +#include "shad/data_structures/compare_and_hash_utils.h" +#include "shad/data_structures/local_multimap.h" +#include "shad/distributed_iterator_traits.h" +#include "shad/runtime/runtime.h" +#include "shad/runtime/mappings/gmt/gmt_synchronous_interface.h" + +#define PREFIX_SIZE 80 + +namespace shad { + +template +class multimap_iterator; + +/// @brief The Multimap data structure. +/// +/// SHAD's Multimap is a distributed, thread-safe, associative container. +/// @tparam KTYPE type of the multimap keys. +/// @tparam VTYPE type of the multimap values. +/// @tparam KEY_COMPARE key comparison function; default is MemCmp. +/// @warning obects of type KTYPE and VTYPE need to be trivially copiable. +template < typename KTYPE, typename VTYPE, typename KEY_COMPARE = MemCmp > +class Multimap : public AbstractDataStructure< Multimap > { + + template + friend class AbstractDataStructure; + friend class multimap_iterator, + const std::pair, + std::pair>; + friend class multimap_iterator, + const std::pair, + std::pair>; + + public: + using value_type = std::pair; + using HmapT = Multimap; + using LMapT = LocalMultimap; + using ObjectID = typename AbstractDataStructure::ObjectID; + using ShadMultimapPtr = typename AbstractDataStructure::SharedPtr; + + using iterator = + multimap_iterator, + const std::pair, std::pair>; + using const_iterator = + multimap_iterator, + const std::pair, std::pair>; + using local_iterator = + lmultimap_iterator, + const std::pair>; + using const_local_iterator = + lmultimap_iterator, + const std::pair>; + + struct EntryT { + EntryT(const KTYPE &k, const VTYPE &v) : key(k), value(v) {} + EntryT() = default; + KTYPE key; + VTYPE value; + }; + + using BuffersVector = typename impl::BuffersVector; + + /// @brief Create method. + /// + /// Creates a new multimap instance. + /// @param numEntries Expected number of entries. + /// @return A shared pointer to the newly created multimap instance. +#ifdef DOXYGEN_IS_RUNNING + static ShadMultimapPtr Create(const size_t numEntries); +#endif + + /// @brief Getter of the Global Identifier. + /// + /// @return The global identifier associated with the multimap instance. + ObjectID GetGlobalID() const { return oid_; } + + /// @brief Overall size of the multimap (number of entries). + /// @warning Calling the size method may result in one-to-all + /// communication among localities to retrieve consistent information. + /// @return the size of the multimap. + size_t Size() const; + + /// @brief Insert a key-value pair in the multimap. + /// @param[in] key the key. + /// @param[in] value the value to copy into the multimap. + /// @return an iterator either to the inserted value + std::pair Insert(const KTYPE &key, const VTYPE &value); + + /// @brief Asynchronously Insert a key-value pair in the multimap. + /// @warning Asynchronous operations are guaranteed to have completed + /// only after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param[in] key the key. + /// @param[in] value the value to copy into the multimap. + /// @return a pointer to the value if the the key-value was inserted + void AsyncInsert(rt::Handle &handle, const KTYPE &key, const VTYPE &value); + + /// @brief Buffered Insert method. + /// Inserts a key-value pair, using aggregation buffers. + /// @warning Insertions are finalized only after calling + /// the WaitForBufferedInsert() method. + /// @param[in] key The key. + /// @param[in] value The value. + void BufferedInsert(const KTYPE &key, const VTYPE &value); + + /// @brief Asynchronous Buffered Insert method. + /// Asynchronously inserts a key-value pair, using aggregation buffers. + /// @warning asynchronous buffered insertions are finalized only after + /// calling the rt::waitForCompletion(rt::Handle &handle) method AND + /// the WaitForBufferedInsert() method, in this order. + /// @param[in,out] handle Reference to the handle + /// @param[in] key The key. + /// @param[in] value The value. + void BufferedAsyncInsert(rt::Handle &handle, const KTYPE &key, const VTYPE &value); + + /// @brief Finalize method for buffered insertions. + void WaitForBufferedInsert() { + auto flushLambda_ = [](const ObjectID &oid) { + auto ptr = HmapT::GetPtr(oid); + ptr->buffers_.FlushAll(); + }; + rt::executeOnAll(flushLambda_, oid_); + } + /// @brief Remove a key and all associated values from the multimap. + /// @param[in] key the key. + void Erase(const KTYPE &key); + + /// @brief Asynchronously remove a key and associated values from the multimap. + /// @warning Asynchronous operations are guaranteed to have completed + /// only after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param[in] key the key. + void AsyncErase(rt::Handle &handle, const KTYPE &key); + + /// @brief Clear the content of the multimap. + void Clear() { + auto clearLambda = [](const ObjectID &oid) { + auto mapPtr = HmapT::GetPtr(oid); + mapPtr->localMultimap_.Clear(); + }; + rt::executeOnAll(clearLambda, oid_); + } + + using LookupResult = typename LocalMultimap::LookupResult; + using LookupRemoteResult = typename LocalMultimap::LookupRemoteResult; + + /// @brief Get all the values associated to a key. + /// @param[in] key the key. + /// @param[out] res a pointer to the values if the the key-value was found, otherwise NULL + /// @return true if the entry is found, otherwise false + bool Lookup(const KTYPE &key, LookupResult *res); + + /// @brief Asynchronous lookup method. + /// @warning Asynchronous operations are guaranteed to have completed. + /// only after calling the rt::waitForCompletion(rt::Handle &handle) method. + /// @param[in,out] handle Reference to the handle to be used to wait for completion. + /// @param[in] key the key. + /// @param[out] res the result of the lookup operation. + void AsyncLookup(rt::Handle &handle, const KTYPE &key, LookupResult *res); + + /// @brief Read records from multiple files + /// @param[in] prefix the file prefix + /// @param[in] lb the lower bound postscript + /// @param[in] ub the upper bound postscript + void readFromFiles(rt::Handle & handle, std::string prefix, uint64_t lb, uint64_t ub); + + /// @brief Apply a user-defined function to a key-value pair. + /// + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void Apply(const KTYPE &key, ApplyFunT &&function, Args &... args); + + /// @brief Asynchronously apply a user-defined function to a key-value pair. + /// + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(rt::Handle &handle, const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, + Args &... args); + + /// @brief Apply a user-defined function to each key-value pair. + /// + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param function The function to apply. + /// @param args The function arguments. + template + void ForEachEntry(ApplyFunT &&function, Args &... args); + + /// @brief Asynchronously apply a user-defined function to each key-value pair. + /// + /// @tparam ApplyFunT User-defined function type. he function prototype should be: + /// @code + /// void(shad::rt::Handle&, const KTYPE&, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncForEachEntry(rt::Handle &handle, ApplyFunT &&function, Args &... args); + + /// @brief Apply a user-defined function to each key. + /// + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(const KTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param function The function to apply. + /// @param args The function arguments. + template + void ForEachKey(ApplyFunT &&function, Args &... args); + + /// @brief Asynchronously apply a user-defined function to each key. + /// + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(shad::rt::Handle&, const KTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncForEachKey(rt::Handle &handle, ApplyFunT &&function, Args &... args); + + void PrintAllEntries() { + auto printLambda = [](const ObjectID & oid) { + auto mapPtr = HmapT::GetPtr(oid); + mapPtr->localMultimap_.PrintAllEntries(); + }; + + for (auto loc : rt::allLocalities()) rt::executeAt(loc, printLambda, oid_); + } + + void PrintAllKeys() { + auto printLambda = [](const ObjectID & oid) { + auto mapPtr = HmapT::GetPtr(oid); + mapPtr->localMultimap_.PrintAllKeys(); + }; + + for (auto loc : rt::allLocalities()) rt::executeAt(loc, printLambda, oid_); + } + + // FIXME it should be protected + void BufferEntryInsert(const EntryT &entry) { + localMultimap_.Insert(entry.key, entry.value); + } + + iterator begin() { return iterator::multimap_begin(this); } + iterator end() { return iterator::multimap_end(this); } + const_iterator cbegin() const { return const_iterator::multimap_begin(this); } + const_iterator cend() const { return const_iterator::multimap_end(this); } + const_iterator begin() const { return cbegin(); } + const_iterator end() const { return cend(); } + local_iterator local_begin() { + return local_iterator::lmultimap_begin(&localMultimap_); + } + local_iterator local_end() { return local_iterator::lmultimap_end(&localMultimap_); } + const_local_iterator clocal_begin() { + return const_local_iterator::lmultimap_begin(&localMultimap_); + } + const_local_iterator clocal_end() { + return const_local_iterator::lmultimap_end(&localMultimap_); + } + + std::pair insert(const value_type &value) { + return Insert(value.first, value.second); + } + + std::pair insert(const_iterator, const value_type &value) { + return insert(value); + } + + void buffered_async_insert(rt::Handle &h, const value_type &value) { + BufferedAsyncInsert(h, value.first, value.second); + } + + void buffered_async_wait(rt::Handle &h) { rt::waitForCompletion(h); } + + void buffered_async_flush() { WaitForBufferedInsert(); } + + private: + ObjectID oid_; + LocalMultimap localMultimap_; + BuffersVector buffers_; + + struct InsertArgs { + ObjectID oid; + KTYPE key; + VTYPE value; + }; + + struct LookupArgs { + ObjectID oid; + KTYPE key; + }; + + struct RFArgs { + uint64_t lb; + ObjectID oid; + char prefix[PREFIX_SIZE]; + }; + + protected: + Multimap(ObjectID oid, const size_t numEntries) + : oid_(oid), + localMultimap_(std::max(numEntries / (constants::kDefaultNumEntriesPerBucket * rt::numLocalities()), 1lu)), + buffers_(oid) {} +}; + +template +inline size_t Multimap::Size() const { + size_t size = localMultimap_.size_; + size_t remoteSize; + + auto sizeLambda = [](const ObjectID &oid, size_t *res) { + auto mapPtr = HmapT::GetPtr(oid); + *res = mapPtr->localMultimap_.size_; + }; + + for (auto tgtLoc : rt::allLocalities()) { + if (tgtLoc != rt::thisLocality()) { + rt::executeAtWithRet(tgtLoc, sizeLambda, oid_, &remoteSize); + size += remoteSize; + } + } + + return size; +} + +template +inline std::pair < typename Multimap::iterator, bool > +Multimap::Insert(const KTYPE &key, const VTYPE &value) { + using itr_traits = distributed_iterator_traits; + + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + std::pair res; + + if (targetLocality == rt::thisLocality()) { + auto lres = localMultimap_.Insert(key, value); + res.first = itr_traits::iterator_from_local(begin(), end(), lres.first); + res.second = lres.second; + } else { + auto insertLambda = + [](const std::tuple &args_, + std::pair *res_ptr) { + auto &args(std::get<2>(args_)); + auto mapPtr = HmapT::GetPtr(args.oid); + auto lres = mapPtr->localMultimap_.Insert(args.key, args.value); + res_ptr->first = itr_traits::iterator_from_local (std::get<0>(args_), std::get<1>(args_), lres.first); + res_ptr->second = lres.second; + }; + rt::executeAtWithRet( + targetLocality, insertLambda, + std::make_tuple(begin(), end(), InsertArgs{oid_, key, value}), &res); + } + + return res; +} + +template +inline void Multimap::AsyncInsert( + rt::Handle &handle, const KTYPE &key, const VTYPE &value) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.AsyncInsert(handle, key, value); + } else { + auto insertLambda = [](rt::Handle &handle, const InsertArgs &args) { + auto mapPtr = HmapT::GetPtr(args.oid); + mapPtr->localMultimap_.AsyncInsert(handle, args.key, args.value); + }; + InsertArgs args = {oid_, key, value}; + rt::asyncExecuteAt(handle, targetLocality, insertLambda, args); + } +} + +template +inline void Multimap::BufferedInsert(const KTYPE &key, const VTYPE &value) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + buffers_.Insert(EntryT(key, value), targetLocality); +} + +template +inline void Multimap:: + BufferedAsyncInsert(rt::Handle &handle, const KTYPE &key, const VTYPE &value) { + + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + buffers_.AsyncInsert(handle, EntryT(key, value), targetLocality); +} + +template +inline void Multimap::Erase(const KTYPE &key) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.Erase(key); + } else { + auto eraseLambda = [](const LookupArgs &args) { + auto mapPtr = HmapT::GetPtr(args.oid); + mapPtr->localMultimap_.Erase(args.key); + }; + LookupArgs args = {oid_, key}; + rt::executeAt(targetLocality, eraseLambda, args); + } +} + +template +inline void Multimap::AsyncErase(rt::Handle &handle, const KTYPE &key) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.AsyncErase(handle, key); + } else { + auto eraseLambda = [](rt::Handle &handle, const LookupArgs &args) { + auto mapPtr = HmapT::GetPtr(args.oid); + mapPtr->localMultimap_.AsyncErase(handle, args.key); + }; + LookupArgs args = {oid_, key}; + rt::asyncExecuteAt(handle, targetLocality, eraseLambda, args); + } +} + +template +inline bool Multimap::Lookup(const KTYPE &key, LookupResult *res) { + rt::Handle handle; + AsyncLookup(handle, key, res); + waitForCompletion(handle); + return res->found; +} + +template +inline void Multimap::AsyncLookup( + rt::Handle & handle, const KTYPE & key, LookupResult * result) { + + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.AsyncLookup(handle, key, result); + + } else { + + auto lookupLambda = [](const LookupArgs & args, LookupRemoteResult * ret) { + auto my_key = args.key; + auto my_map = HmapT::GetPtr(args.oid); + + LookupRemoteResult remote_result; + my_map->localMultimap_.LookupFromRemote(my_key, & remote_result); + + * ret = remote_result; + }; + + // executed at remote site, so meaning of remote and local are switched + auto lookupFetchLambda = [](rt::Handle & handle, const LookupRemoteResult & args) { + rt::dma(args.localLoc, args.local_elems, args.remote_elems, args.size); + free(args.remote_elems); + }; + + LookupArgs args = {oid_, key}; + LookupRemoteResult remote_result; + rt::executeAtWithRet(targetLocality, lookupLambda, args, & remote_result); + + (* result).found = remote_result.found; + (* result).size = remote_result.size; + + if (remote_result.found) { + (* result).value.resize(remote_result.size); + remote_result.localLoc = rt::thisLocality(); + remote_result.local_elems = (* result).value.data(); + rt::asyncExecuteAt(handle, targetLocality, lookupFetchLambda, remote_result); + } + } +} + +template +inline void Multimap::readFromFiles( + rt::Handle & handle, std::string prefix, uint64_t lb, uint64_t ub) { + + auto readFileLambda = [](rt::Handle & handle, const RFArgs & args, size_t it) { + std::string line; + auto my_map = HmapT::GetPtr(args.oid); + std::string filename = args.prefix + std::to_string(args.lb + it); + printf("reading file %s\n", filename.c_str()); + + std::ifstream file(filename); + if (! file.is_open()) { printf("Cannot open file %s\n", filename.c_str()); exit(-1); } + + while (getline(file, line)) { + if (line[0] == '#') continue; // skip comments + + VTYPE record = VTYPE(line); + // my_map->AsyncInsert(handle, record.key(), record); + my_map->BufferedAsyncInsert(handle, record.key(), record); + } }; + + RFArgs args = {lb, oid_}; + memcpy(args.prefix, prefix.c_str(), prefix.size() + 1); + rt::asyncForEachOnAll(handle, readFileLambda, args, ub - lb + 1); +} + + +template +template +void Multimap::ForEachEntry(ApplyFunT &&function, Args &... args) { + using FunctionTy = void (*)(const KTYPE &, VTYPE &, Args &...); + FunctionTy fn = std::forward(function); + + using feArgs = std::tuple>; + using LMapPtr = LocalMultimap *; + using ArgsTuple = std::tuple>; + + feArgs arguments(oid_, fn, std::tuple(args...)); + auto feLambda = [](const feArgs &args) { + auto mapPtr = HmapT::GetPtr(std::get<0>(args)); + ArgsTuple argsTuple(&mapPtr->localMultimap_, std::get<1>(args), std::get<2>(args)); + rt::forEachAt(rt::thisLocality(), + LMapT::template ForEachEntryFunWrapper, + argsTuple, mapPtr->localMultimap_.numBuckets_); + }; + rt::executeOnAll(feLambda, arguments); +} + +template +template +void Multimap::AsyncForEachEntry( + rt::Handle &handle, ApplyFunT &&function, Args &... args) { + + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...); + FunctionTy fn = std::forward(function); + + using feArgs = std::tuple>; + using ArgsTuple = std::tuple>; + + feArgs arguments(oid_, fn, std::tuple(args...)); + auto feLambda = [](rt::Handle &handle, const feArgs &args) { + auto mapPtr = HmapT::GetPtr(std::get<0>(args)); + ArgsTuple argsTuple(&mapPtr->localMultimap_, std::get<1>(args), std::get<2>(args)); + + rt::asyncForEachAt( + handle, rt::thisLocality(), + LMapT::template AsyncForEachEntryFunWrapper, + argsTuple, mapPtr->localMultimap_.numBuckets_); + }; + rt::asyncExecuteOnAll(handle, feLambda, arguments); +} + +template +template +void Multimap::ForEachKey(ApplyFunT &&function, Args &... args) { + using FunctionTy = void (*)(const KTYPE &, Args &...); + FunctionTy fn = std::forward(function); + + using feArgs = std::tuple>; + using ArgsTuple = std::tuple>; + + feArgs arguments(oid_, fn, std::tuple(args...)); + auto feLambda = [](const feArgs &args) { + auto mapPtr = HmapT::GetPtr(std::get<0>(args)); + ArgsTuple argsTuple(&mapPtr->localMultimap_, std::get<1>(args), std::get<2>(args)); + rt::forEachAt(rt::thisLocality(), + LMapT::template ForEachKeyFunWrapper, + argsTuple, mapPtr->localMultimap_.numBuckets_); + }; + rt::executeOnAll(feLambda, arguments); +} + +template +template +void Multimap::AsyncForEachKey( + rt::Handle &handle, ApplyFunT &&function, Args &... args) { + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, Args &...); + FunctionTy fn = std::forward(function); + + using feArgs = std::tuple>; + using ArgsTuple = std::tuple>; + + feArgs arguments(oid_, fn, std::tuple(args...)); + auto feLambda = [](rt::Handle &handle, const feArgs &args) { + auto mapPtr = HmapT::GetPtr(std::get<0>(args)); + ArgsTuple argsTuple(&mapPtr->localMultimap_, std::get<1>(args), std::get<2>(args)); + rt::asyncForEachAt( + handle, rt::thisLocality(), + LMapT::template AsyncForEachKeyFunWrapper, + argsTuple, mapPtr->localMultimap_.numBuckets_); + }; + rt::asyncExecuteOnAll(handle, feLambda, arguments); +} + +template +template +void Multimap::Apply(const KTYPE &key, ApplyFunT &&function, Args &... args) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.Apply(key, function, args...); + + } else { + using FunctionTy = void (*)(const KTYPE &, VTYPE &, Args &...); + FunctionTy fn = std::forward(function); + + using ArgsTuple = std::tuple>; + ArgsTuple arguments(oid_, key, fn, std::tuple(args...)); + + auto feLambda = [](const ArgsTuple &args) { + constexpr auto Size = std::tuple_size(args))>::type>::value; + ArgsTuple &tuple = const_cast(args); + LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_); + LMapT::CallApplyFun(mapPtr, std::get<1>(tuple), std::get<2>(tuple), + std::get<3>(tuple), std::make_index_sequence{}); + }; + rt::executeAt(targetLocality, feLambda, arguments); + } +} + +template +template +void Multimap::AsyncApply( + rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.AsyncApply(handle, key, function, args...); + + } else { + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...); + FunctionTy fn = std::forward(function); + using ArgsTuple = std::tuple>; + + ArgsTuple arguments(oid_, key, fn, std::tuple(args...)); + auto feLambda = [](rt::Handle &handle, const ArgsTuple &args) { + constexpr auto Size = std::tuple_size(args))>::type>::value; + ArgsTuple &tuple(const_cast(args)); + LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_); + LMapT::AsyncCallApplyFun(handle, mapPtr, std::get<1>(tuple), + std::get<2>(tuple), std::get<3>(tuple), + std::make_index_sequence{}); + }; + rt::asyncExecuteAt(handle, targetLocality, feLambda, arguments); + } +} + +template +class multimap_iterator : public std::iterator { + public: + using OIDT = typename MapT::ObjectID; + using LMap = typename MapT::LMapT; + using inner_type = typename LMap::inner_type; + using local_iterator_type = lmultimap_iterator; + using value_type = NonConstT; + + multimap_iterator() {} + multimap_iterator(uint32_t locID, const OIDT mapOID, local_iterator_type &lit, T element) { + data_ = {locID, mapOID, lit, element}; + } + + multimap_iterator(uint32_t locID, const OIDT mapOID, local_iterator_type &lit) { + auto mapPtr = MapT::GetPtr(mapOID); + const LMap *lmapPtr = &(mapPtr->localMultimap_); + + if (lit != local_iterator_type::lmultimap_end(lmapPtr)) + data_ = itData(locID, mapOID, lit, *lit); + else + *this = multimap_end(mapPtr.get()); + } + + static multimap_iterator multimap_begin(const MapT *mapPtr) { + const LMap *lmapPtr = &(mapPtr->localMultimap_); + auto localEnd = local_iterator_type::lmultimap_end(lmapPtr); + + if (static_cast(rt::thisLocality()) == 0) { + auto localBegin = local_iterator_type::lmultimap_begin(lmapPtr); + if (localBegin != localEnd) { return multimap_iterator(0, mapPtr->oid_, localBegin); } + multimap_iterator beg(0, mapPtr->oid_, localEnd, T()); + return ++beg; + } + + auto getItLambda = [](const OIDT &mapOID, multimap_iterator *res) { + auto mapPtr = MapT::GetPtr(mapOID); + const LMap *lmapPtr = &(mapPtr->localMultimap_); + auto localEnd = local_iterator_type::lmultimap_end(lmapPtr); + auto localBegin = local_iterator_type::lmultimap_begin(lmapPtr); + if (localBegin != localEnd) { + *res = multimap_iterator(0, mapOID, localBegin); + } else { + multimap_iterator beg(0, mapOID, localEnd, T()); + *res = ++beg; + } + }; + + multimap_iterator beg(0, mapPtr->oid_, localEnd, T()); + rt::executeAtWithRet(rt::Locality(0), getItLambda, mapPtr->oid_, &beg); + return beg; + } + + static multimap_iterator multimap_end(const MapT *mapPtr) { + local_iterator_type lend = local_iterator_type::lmultimap_end(&(mapPtr->localMultimap_)); + multimap_iterator end(rt::numLocalities(), OIDT(0), lend, T()); + return end; + } + + bool operator==(const multimap_iterator &other) const { + return (data_ == other.data_); + } + + bool operator!=(const multimap_iterator &other) const { return !(*this == other); } + + T operator*() const { return data_.element_; } + + multimap_iterator &operator++() { + auto mapPtr = MapT::GetPtr(data_.oid_); + + if (static_cast(rt::thisLocality()) == data_.locId_) { + const LMap *lmapPtr = &(mapPtr->localMultimap_); + auto lend = local_iterator_type::lmultimap_end(lmapPtr); + if (data_.lmapIt_ != lend) { ++(data_.lmapIt_); } + + if (data_.lmapIt_ != lend) { + data_.element_ = *(data_.lmapIt_); + return *this; + + } else { // find the local begin on next localities + itData itd; + + for (uint32_t i = data_.locId_ + 1; i < rt::numLocalities(); ++i) { + rt::executeAtWithRet(rt::Locality(i), getLocBeginIt, data_.oid_, &itd); + // if Data is valid + if (itd.locId_ != rt::numLocalities()) { data_ = itd; return *this; } + } + + data_ = itData(rt::numLocalities(), OIDT(0), lend, T()); + return *this; + } + } + + itData itd; + rt::executeAtWithRet(rt::Locality(data_.locId_), getRemoteIt, data_, &itd); + data_ = itd; + return *this; + } + + multimap_iterator operator++(int) { + multimap_iterator tmp = *this; + operator++(); + return tmp; + } + + class local_iterator_range { + public: + local_iterator_range(local_iterator_type B, local_iterator_type E) + : begin_(B), end_(E) {} + local_iterator_type begin() { return begin_; } + local_iterator_type end() { return end_; } + + private: + local_iterator_type begin_; + local_iterator_type end_; + }; + + static local_iterator_range local_range(multimap_iterator &B, multimap_iterator &E) { + auto mapPtr = MapT::GetPtr(B.data_.oid_); + local_iterator_type lbeg, lend; + uint32_t thisLocId = static_cast(rt::thisLocality()); + + if (B.data_.locId_ == thisLocId) { + lbeg = B.data_.lmapIt_; + } else { + lbeg = local_iterator_type::lmultimap_begin(&(mapPtr->localMultimap_)); + } + + if (E.data_.locId_ == thisLocId) { + lend = E.data_.lmapIt_; + } else { + lend = local_iterator_type::lmultimap_end(&(mapPtr->localMultimap_)); + } + + return local_iterator_range(lbeg, lend); + } + + static rt::localities_range localities(multimap_iterator &B, multimap_iterator &E) { + return rt::localities_range(rt::Locality(B.data_.locId_), + rt::Locality(std::min(rt::numLocalities(), E.data_.locId_ + 1))); + } + + static multimap_iterator iterator_from_local(multimap_iterator &B, multimap_iterator &E, local_iterator_type itr) { + return multimap_iterator(static_cast(rt::thisLocality()), B.data_.oid_, itr); + } + + private: + struct itData { + itData() : oid_(0), lmapIt_(nullptr, 0, 0, nullptr, nullptr, typename std::vector::iterator()) {} + itData(uint32_t locId, OIDT oid, local_iterator_type lmapIt, T element) + : locId_(locId), oid_(oid), lmapIt_(lmapIt), element_(element) {} + + bool operator==(const itData &other) const { + return (locId_ == other.locId_) && (lmapIt_ == other.lmapIt_); + } + + bool operator!=(itData &other) const { return !(*this == other); } + uint32_t locId_; + OIDT oid_; + local_iterator_type lmapIt_; + NonConstT element_; + }; + + itData data_; + + static void getLocBeginIt(const OIDT &mapOID, itData *res) { + auto mapPtr = MapT::GetPtr(mapOID); + auto lmapPtr = &(mapPtr->localMultimap_); + auto localEnd = local_iterator_type::lmultimap_end(lmapPtr); + auto localBegin = local_iterator_type::lmultimap_begin(lmapPtr); + if (localBegin != localEnd) { + *res = itData(static_cast(rt::thisLocality()), mapOID, localBegin, *localBegin); + } else { + *res = itData(rt::numLocalities(), OIDT(0), localEnd, T()); + } + } + + static void getRemoteIt(const itData &itd, itData *res) { + auto mapPtr = MapT::GetPtr(itd.oid_); + auto lmapPtr = &(mapPtr->localMultimap_); + auto localEnd = local_iterator_type::lmultimap_end(lmapPtr); + local_iterator_type cit = itd.lmapIt_; + ++cit; + + if (cit != localEnd) { + *res = itData(static_cast(rt::thisLocality()), itd.oid_, cit, *cit); + return; + } else { + itData outitd; + for (uint32_t i = itd.locId_ + 1; i < rt::numLocalities(); ++i) { + rt::executeAtWithRet(rt::Locality(i), getLocBeginIt, itd.oid_, &outitd); + // It Data is valid + if (outitd.locId_ != rt::numLocalities()) { *res = outitd; return; } + } + *res = itData(rt::numLocalities(), OIDT(0), localEnd, T()); + } + } +}; + +} // namespace shad + +#endif // INCLUDE_SHAD_DATA_STRUCTURES_MULTIMAP_H_