Skip to content

Commit

Permalink
[#139] local-parallel transform + decoupled wait/flush
Browse files Browse the repository at this point in the history
  • Loading branch information
Maurizio Drocco committed Apr 27, 2019
1 parent 21c0a77 commit 48bfaea
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 14 deletions.
23 changes: 22 additions & 1 deletion include/shad/core/impl/modifyng_sequence_ops.h
Expand Up @@ -305,6 +305,7 @@ ForwardIt2 dseq_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last,
auto begin = local_range.begin();
auto end = local_range.end();
auto res = std::transform(begin, end, d_first, op);
wait_iterator(res);
flush_iterator(res);
return res;
}
Expand All @@ -314,7 +315,27 @@ ForwardIt2 dseq_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last,
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 dpar_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last,
ForwardIt2 d_first, UnaryOperation op) {
return dseq_kernel(std::false_type{}, first, last, d_first, op);
using itr_traits = distributed_iterator_traits<ForwardIt1>;
using local_iterator_t = typename itr_traits::local_iterator_type;

// local map
auto lrange = itr_traits::local_range(first, last);

auto map_res = local_map_init(
// range
lrange.begin(), lrange.end(),
// kernel
[&](local_iterator_t b, local_iterator_t e) {
auto res = std::transform(b, e, d_first, op);
wait_iterator(res);
return res;
},
// init value
d_first);

// local reduce
flush_iterator(map_res.back());
return map_res.back();
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
8 changes: 8 additions & 0 deletions include/shad/core/impl/utils.h
Expand Up @@ -46,6 +46,14 @@ void advance_output_iterator(shad::insert_iterator<T> &, It2, It2) {}
template <typename T, typename It2>
void advance_output_iterator(shad::buffered_insert_iterator<T>, It2, It2) {}

template <typename It>
void wait_iterator(It &) {}

template <typename T>
void wait_iterator(shad::buffered_insert_iterator<T> &it) {
it.wait();
}

template <typename It>
void flush_iterator(It &) {}

Expand Down
16 changes: 11 additions & 5 deletions include/shad/core/iterator.h
Expand Up @@ -84,7 +84,7 @@ class insert_iterator
typename internal_container_t::ObjectID global_id_;
Iterator iterator_;
internal_container_t* local_container_ptr_ = nullptr;
rt::Locality locality_;
rt::Locality locality_{rt::numLocalities()};
};

/// @brief Buffered insert iterator.
Expand Down Expand Up @@ -126,19 +126,25 @@ class buffered_insert_iterator : public insert_iterator<Container> {
if (!this->local_container_ptr_ || this->locality_ != rt::thisLocality()) {
this->locality_ = rt::thisLocality();
this->local_container_ptr_ = Container::from_global_id(this->global_id_);
rt::Handle h;
handle_ = h;
handle_ = rt::Handle{}; // reset
}
this->local_container_ptr_->buffered_async_insert(handle_, value);
return *this;
}

/// @brief Wait for pending insertions to the container.
void wait() {
if (this->local_container_ptr_ != nullptr &&
this->locality_ == rt::thisLocality()) {
this->local_container_ptr_->buffered_async_wait(handle_);
}
}

/// @brief Flushes pending insertions to the container.
void flush() {
if (this->local_container_ptr_ != nullptr &&
this->locality_ == rt::thisLocality()) {
// if(!handle_.IsNull()) FIXME
this->local_container_ptr_->buffered_async_flush(handle_);
this->local_container_ptr_->buffered_async_flush();
}
}

Expand Down
7 changes: 3 additions & 4 deletions include/shad/data_structures/hashmap.h
Expand Up @@ -334,10 +334,9 @@ class Hashmap : public AbstractDataStructure<
BufferedAsyncInsert(h, value.first, value.second);
}

void buffered_async_flush(rt::Handle &h) {
rt::waitForCompletion(h);
WaitForBufferedInsert();
}
void buffered_async_wait(rt::Handle &h) { rt::waitForCompletion(h); }

void buffered_async_flush() { WaitForBufferedInsert(); }

private:
ObjectID oid_;
Expand Down
7 changes: 3 additions & 4 deletions include/shad/data_structures/set.h
Expand Up @@ -238,10 +238,9 @@ class Set : public AbstractDataStructure<Set<T, ELEM_COMPARE>> {
BufferedAsyncInsert(h, value);
}

void buffered_async_flush(rt::Handle& h) {
rt::waitForCompletion(h);
WaitForBufferedInsert();
}
void buffered_async_wait(rt::Handle& h) { rt::waitForCompletion(h); }

void buffered_async_flush() { WaitForBufferedInsert(); }

private:
ObjectID oid_;
Expand Down
4 changes: 4 additions & 0 deletions test/unit_tests/core/common.h
Expand Up @@ -141,6 +141,7 @@ struct create_set_<shad::unordered_set<U>, even> {
auto res = std::make_shared<T>(size);
shad::buffered_insert_iterator<T> ins(*res, res->end());
for (size_t i = 0; i < size; ++i) ins = (2 * i + !even);
ins.wait();
ins.flush();
return res;
}
Expand All @@ -163,6 +164,7 @@ struct create_map_<shad::unordered_map<U, V>, even> {
auto res = std::make_shared<T>(size);
shad::buffered_insert_iterator<T> ins(*res, res->begin());
for (size_t i = 0; i < size; i++) ins = std::make_pair(i, 2 * i + !even);
ins.wait();
ins.flush();
return res;
}
Expand Down Expand Up @@ -236,6 +238,7 @@ struct subseq_from_<shad::unordered_set<U>> {
ins = *first;
++first;
}
ins.wait();
ins.flush();
return res;
}
Expand Down Expand Up @@ -274,6 +277,7 @@ struct subseq_from_<shad::unordered_map<U, V>> {
ins = std::make_pair((*first).first, (*first).second);
++first;
}
ins.wait();
ins.flush();
return res;
}
Expand Down
6 changes: 6 additions & 0 deletions test/unit_tests/core/iterator_test.cc
Expand Up @@ -105,6 +105,7 @@ TEST(shad_uset, buffered_insert_iterator) {
for (auto i = batch_size; i > 0; --i) {
ins_begin = i;
}
ins_begin.wait();
ins_begin.flush();
for (auto i = batch_size; i > 0; --i) {
ASSERT_TRUE(shad_test_stl::find_(cnt.begin(),
Expand All @@ -117,6 +118,7 @@ TEST(shad_uset, buffered_insert_iterator) {
for (auto i = batch_size; i > 0; --i) {
ins_begin_ = first + i;
}
ins_begin_.wait();
ins_begin_.flush();
for (auto i = batch_size; i > 0; --i) {
ASSERT_TRUE(shad_test_stl::find_(cnt.begin(),
Expand All @@ -129,6 +131,7 @@ TEST(shad_uset, buffered_insert_iterator) {
for (auto i = batch_size; i > 0; --i) {
ins_end = first + i;
}
ins_end.wait();
ins_end.flush();
for (auto i = batch_size; i > 0; --i) {
ASSERT_TRUE(shad_test_stl::find_(cnt.begin(),
Expand Down Expand Up @@ -183,6 +186,7 @@ TEST(shad_umap, buffered_insert_iterator) {
for (auto i = batch_size; i > 0; --i) {
ins_begin = kv(i);
}
ins_begin.wait();
ins_begin.flush();
for (auto i = batch_size; i > 0; --i)
ASSERT_TRUE(shad_test_stl::find_(cnt.begin(),
Expand All @@ -194,6 +198,7 @@ TEST(shad_umap, buffered_insert_iterator) {
for (auto i = batch_size; i > 0; --i) {
ins_begin_ = kv(first + i);
}
ins_begin_.wait();
ins_begin_.flush();
for (auto i = batch_size; i > 0; --i)
ASSERT_TRUE(shad_test_stl::find_(cnt.begin(),
Expand All @@ -205,6 +210,7 @@ TEST(shad_umap, buffered_insert_iterator) {
for (auto i = batch_size; i > 0; --i) {
ins_end = kv(first + i);
}
ins_end.wait();
ins_end.flush();
for (auto i = batch_size; i > 0; --i)
ASSERT_TRUE(shad_test_stl::find_(cnt.begin(), cnt.end(), kv(first + i)) !=
Expand Down

0 comments on commit 48bfaea

Please sign in to comment.