Skip to content

Commit

Permalink
refactor to directly send heap message from shared_pool
Browse files Browse the repository at this point in the history
  • Loading branch information
bholt committed Oct 24, 2013
1 parent 6778655 commit df452fc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 73 deletions.
4 changes: 2 additions & 2 deletions system/AsyncDelegate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace Grappa {
} else {
if (GCE) GCE->enroll();

pool.send_message(dest, [origin, remote_work] {
send_heap_message(dest, [origin, remote_work] {
delegate_stats.count_op_am();
remote_work();
if (GCE) complete(make_global(GCE,origin));
Expand Down Expand Up @@ -155,7 +155,7 @@ namespace Grappa {
} else {
start_time = Grappa_get_timestamp();

pool.send_message(dest, [origin, func, this] {
send_heap_message(dest, [origin, func, this] {
delegate_stats.count_op_am();
R val = func();

Expand Down
38 changes: 17 additions & 21 deletions system/SharedMessagePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,23 @@ namespace Grappa {
shared_pool = pool_stack->take();
}

void* _shared_message_pool_alloc(size_t sz) {
void* _shared_pool_alloc(size_t sz) {
#ifdef DEBUG
auto i = pool_stack->find(shared_pool);
if (i >= 0) VLOG(0) << "found: " << shared_pool << ": " << i << " / " << pool_stack->size();
// auto i = pool_stack->find(shared_pool);
// if (i >= 0) VLOG(0) << "found: " << shared_pool << ": " << i << " / " << pool_stack->size();
#endif
CHECK(shared_pool->next == nullptr);
CHECK(!shared_pool || shared_pool->next == nullptr);
if (shared_pool && !shared_pool->emptying && shared_pool->remaining() >= sz) {
return _shared_pool_alloc_message(sz);
} else {
// if not emptying already, do it
auto *o = shared_pool;
if (shared_pool && !shared_pool->emptying) {
CHECK_GT(shared_pool->allocated, 0);
shared_pool->emptying = true;
if (shared_pool->to_send == 0) {
shared_pool->on_empty();
shared_pool = nullptr;
if (o && !o->emptying) {
CHECK_GT(o->allocated, 0);
o->emptying = true;
if (o->to_send == 0) {
o->on_empty();
}
}
if (!pool_stack->empty()) CHECK(pool_stack->top->allocated == 0);
Expand All @@ -128,7 +129,7 @@ namespace Grappa {
// try to block until a pool frees up
do {
Grappa::wait(&blocked_senders);
if (!shared_pool->emptying && shared_pool->remaining() >= sz) {
if (shared_pool && !shared_pool->emptying && shared_pool->remaining() >= sz) {
p = shared_pool;
} else {
p = pool_stack->take();
Expand All @@ -142,12 +143,6 @@ namespace Grappa {
}
}

void* SharedMessagePool::allocate(size_t sz) {
CHECK_EQ(this, shared_pool) << "not allocating from current shared_pool!";
// CHECK(this->next == nullptr);
return _shared_message_pool_alloc(sz);
}

void SharedMessagePool::message_sent(impl::MessageBase* m) {
CHECK(this->next == nullptr);
validate_in_pool(m);
Expand All @@ -161,14 +156,15 @@ namespace Grappa {

void SharedMessagePool::on_empty() {
DCHECK_EQ(to_send, 0);
CHECK(this != shared_pool);
VLOG(3) << "empty and emptying, to_send:"<< to_send << ", allocated:" << allocated << "/" << buffer_size << " @ " << this << ", buf:" << (void*)buffer << " completions_received:" << completions_received << ", allocated_count:" << allocated_count;
// #ifdef DEBUG
// verify everything sent
this->iterate([](impl::MessageBase* m) {
if (!m->is_sent_ || !m->is_delivered_ || !m->is_enqueued_) {
CHECK(false) << "!! message(" << m << ", is_sent_:" << m->is_sent_ << ", is_delivered_:" << m->is_delivered_ << ", m->is_enqueued_:" << m->is_enqueued_ << ", extra:" << m->pool << ")";
}
});
// this->iterate([](impl::MessageBase* m) {
// if (!m->is_sent_ || !m->is_delivered_ || !m->is_enqueued_) {
// CHECK(false) << "!! message(" << m << ", is_sent_:" << m->is_sent_ << ", is_delivered_:" << m->is_delivered_ << ", m->is_enqueued_:" << m->is_enqueued_ << ", extra:" << m->pool << ")";
// }
// });

memset(buffer, (0x11*locale_mycore()) | 0xf0, buffer_size); // poison

Expand Down
70 changes: 20 additions & 50 deletions system/SharedMessagePool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ namespace Grappa {
/// @addtogroup Communication
/// @{

template<typename T> class PoolMessage;
template<typename T> class PoolPayloadMessage;
void* _shared_pool_alloc(size_t sz);

class SharedMessagePool;
extern SharedMessagePool * shared_pool;
Expand Down Expand Up @@ -43,7 +42,6 @@ class SharedMessagePool: public PoolAllocator<impl::MessageBase> {
next = nullptr;
}

virtual void * allocate(size_t size);
void message_sent(impl::MessageBase* m);
void on_empty();

Expand All @@ -53,47 +51,13 @@ class SharedMessagePool: public PoolAllocator<impl::MessageBase> {
<< "pool(" << this << ", buf:" << (void*)this->buffer << ", size:" << this->buffer_size << ")";
}

///
/// Templated message creating functions, all taken straight from Message.hpp
///

/// Send message, allocating from the pool. @see Grappa::message
template<typename T>
inline PoolMessage<T>* message(Core dest, T t) {
void* p = this->allocate(sizeof(PoolMessage<T>));
return new (p) PoolMessage<T>(shared_pool, dest, t);
}

/// Message with payload. @see Grappa::message
template< typename T >
inline PoolPayloadMessage<T>* message(Core dest, T t, void * payload, size_t payload_size) {
void* p = this->allocate(sizeof(PoolPayloadMessage<T>));
return new (p) PoolPayloadMessage<T>(shared_pool, dest, t, payload, payload_size);
}

/// Same as message, but immediately enqueued to be sent. @see Grappa::send_message
template< typename T >
inline PoolMessage<T> * send_message(Core dest, T t) {
auto* m = this->message(dest, t);
m->enqueue();
return m;
}

/// Message with payload, immediately enqueued to be sent. @see Grappa::send_message
template< typename T >
inline PoolPayloadMessage<T> * send_message(Core dest, T t, void * payload, size_t payload_size) {
auto* m = this->message(dest, t, payload, payload_size);
m->enqueue();
return m;
}

};

template<typename T>
class PoolMessage: public Message<T> {
public:
inline SharedMessagePool& get_pool() {
return *reinterpret_cast<SharedMessagePool*>(this->pool);
inline SharedMessagePool * shpool() {
return reinterpret_cast<SharedMessagePool*>(this->pool);
}
virtual void mark_sent() {
Message<T>::mark_sent();
Expand All @@ -105,38 +69,38 @@ class PoolMessage: public Message<T> {
// #endif
DCHECK_NE(this->destination_, 0x5555);
if (Grappa::mycore() == this->source_) {
if (get_pool().emptying) VLOG(5) << "emptying @ " << &get_pool() << "(buf:" << (void*)get_pool().buffer << ")" << " to_send:" << get_pool().to_send << ", completions_received:" << get_pool().completions_received << ", allocated_count:" << get_pool().allocated_count << ", sent message(" << this << ")";
// if (get_pool().emptying) VLOG(5) << "emptying @ " << &get_pool() << "(buf:" << (void*)get_pool().buffer << ")" << " to_send:" << get_pool().to_send << ", completions_received:" << get_pool().completions_received << ", allocated_count:" << get_pool().allocated_count << ", sent message(" << this << ")";

DCHECK_NE(this->source_, 0x5555) << " sent:" << this->is_sent_ << ", pool(" << &get_pool() << ")";
DCHECK_NE(this->source_, 0x5555) << " sent:" << this->is_sent_ << ", pool(" << shpool() << ")";
this->source_ = 0x5555;

get_pool().message_sent(this); // may trash things
shpool()->message_sent(this); // may trash things
}
}

inline PoolMessage(): Message<T>() {}
inline PoolMessage(SharedMessagePool * pool, Core dest, T t)
: Message<T>(dest, t)
{ this->pool = pool; get_pool().validate_in_pool(this); }
{ this->pool = pool; shpool()->validate_in_pool(this); }

virtual const size_t size() const { return sizeof(*this); }
} GRAPPA_BLOCK_ALIGNED;

template<typename T>
class PoolPayloadMessage: public PayloadMessage<T> {
public:
inline SharedMessagePool& get_pool() { return *reinterpret_cast<SharedMessagePool*>(this->pool); }
inline SharedMessagePool * shpool() { return reinterpret_cast<SharedMessagePool*>(this->pool); }

virtual void mark_sent() {
PayloadMessage<T>::mark_sent();
if (Grappa::mycore() == this->source_) {
get_pool().message_sent(this); // may delete the pool
shpool()->message_sent(this); // may delete the pool
}
}
inline PoolPayloadMessage(): PayloadMessage<T>() {}
inline PoolPayloadMessage(SharedMessagePool * pool, Core dest, T t, void * payload, size_t payload_size)
: PayloadMessage<T>(dest, t, payload, payload_size)
{ this->pool = pool; get_pool().validate_in_pool(this); }
{ this->pool = pool; shpool()->validate_in_pool(this); }
virtual const size_t size() const { return sizeof(*this); }
} GRAPPA_BLOCK_ALIGNED;

Expand All @@ -145,25 +109,31 @@ void init_shared_pool();
// Same as message, but allocated on heap
template< typename T >
inline PoolMessage<T> * heap_message(Core dest, T t) {
return shared_pool->message(dest, t);
return new (_shared_pool_alloc(sizeof(PoolMessage<T>))) PoolMessage<T>(shared_pool, dest, t);
}

/// Message with payload, allocated on heap
template< typename T >
inline PoolPayloadMessage<T> * heap_message(Core dest, T t, void * payload, size_t payload_size) {
return shared_pool->message(dest, t, payload, payload_size);
return new (_shared_pool_alloc(sizeof(PoolPayloadMessage<T>)))
PoolPayloadMessage<T>(shared_pool, dest, t, payload, payload_size);
}

/// Same as message, but allocated on heap and immediately enqueued to be sent.
template< typename T >
inline PoolMessage<T> * send_heap_message(Core dest, T t) {
return shared_pool->send_message(dest, t);
auto *m = new (_shared_pool_alloc(sizeof(PoolMessage<T>))) PoolMessage<T>(shared_pool, dest, t);
m->enqueue();
return m;
}

/// Message with payload, allocated on heap and immediately enqueued to be sent.
template< typename T >
inline PoolPayloadMessage<T> * send_heap_message(Core dest, T t, void * payload, size_t payload_size) {
return shared_pool->send_message(dest, t, payload, payload_size);
auto *m = new (_shared_pool_alloc(sizeof(PoolPayloadMessage<T>)))
PoolPayloadMessage<T>(shared_pool, dest, t, payload, payload_size);
m->enqueue();
return m;
}

} // namespace Grappa

0 comments on commit df452fc

Please sign in to comment.