Skip to content

Commit

Permalink
supernova: clean up dsp_thread_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
timblechmann committed Nov 30, 2015
1 parent a2e9d99 commit 3be9fc2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 50 deletions.
98 changes: 63 additions & 35 deletions server/supernova/dsp_thread_queue/dsp_thread_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,55 @@ class dsp_thread_queue_item:
const activation_limit_t activation_limit; /**< number of precedessors */
};

template <typename T, typename Alloc>
class raw_vector:
Alloc
{
public:
explicit raw_vector( size_t elements, Alloc const & alloc = Alloc() ):
Alloc(alloc), capacity_(elements)
{
data = elements ? Alloc::allocate(capacity_ * sizeof(T))
: nullptr;
}

template< class... Args >
T * emplace_back( Args&&... args )
{
assert( size() != capacity_ );

T * element = data + size_;
Alloc::construct( element, std::forward<Args>(args)... );
size_ += 1;
return element;
}

T & operator[](std::size_t index)
{
assert( index < size_ );
return data[index];
}

T * begin() { return data; }
T * end() { return data + size_; }
bool empty() const { return size_ == 0; }
size_t size() const { return size_; }
size_t capacity() const { return capacity_; }

~raw_vector()
{
for (std::size_t i = 0; i != size_; ++i)
Alloc::destroy( data + i );
if (data)
Alloc::deallocate (data, capacity_ * sizeof(Alloc) );
}

private:
T * data = nullptr;
const size_t capacity_ = 0;
size_t size_ = 0;
};

template <typename runnable, typename Alloc = std::allocator<void*> >
class dsp_thread_queue
{
Expand Down Expand Up @@ -262,20 +311,13 @@ class dsp_thread_queue

/** preallocate node_count nodes */
dsp_thread_queue(std::size_t node_count, bool has_parallelism = true):
has_parallelism_(has_parallelism)
has_parallelism_(has_parallelism),
items( node_count )
{
initially_runnable_items.reserve(node_count);
queue_items = node_count ? item_allocator().allocate(node_count * sizeof(dsp_thread_queue_item))
: nullptr;
}

~dsp_thread_queue(void)
{
for (std::size_t i = 0; i != total_node_count; ++i)
queue_items[i].~dsp_thread_queue_item();
if (queue_items)
item_allocator().deallocate(queue_items, total_node_count * sizeof(dsp_thread_queue_item));
}
~dsp_thread_queue(void) = default;

void add_initially_runnable(dsp_thread_queue_item * item)
{
Expand All @@ -288,38 +330,26 @@ class dsp_thread_queue
typename dsp_thread_queue_item::successor_list const & successors,
typename dsp_thread_queue_item::activation_limit_t activation_limit)
{
assert(queue_items);
dsp_thread_queue_item * ret = queue_items + total_node_count;
++total_node_count;

assert (total_node_count <= initially_runnable_items.capacity());
new (ret) dsp_thread_queue_item(job, successors, activation_limit);
return ret;
return items.emplace_back( job, successors, activation_limit );
}

void reset_activation_counts(void)
{
for (node_count_t i = 0; i != total_node_count; ++i)
queue_items[i].reset_activation_count();
for( dsp_thread_queue_item & item : items)
item.reset_activation_count();
}

node_count_t get_total_node_count(void) const
{
return total_node_count;
}

bool has_parallelism(void) const
{
return has_parallelism_;
}
bool empty() const { return items.empty(); }
node_count_t total_node_count(void) const { return node_count_t(items.size()); }
bool has_parallelism(void) const { return has_parallelism_; }

private:
node_count_t total_node_count = 0; /* total number of nodes */
item_vector_t initially_runnable_items; /* nodes without precedessor */
dsp_thread_queue_item * queue_items; /* all nodes */
const bool has_parallelism_;

friend class dsp_queue_interpreter<runnable, Alloc>;

raw_vector<dsp_thread_queue_item, item_allocator> items;
};

template <typename runnable,
Expand Down Expand Up @@ -355,15 +385,13 @@ class dsp_queue_interpreter
*/
bool init_tick(void)
{
if (unlikely((queue.get() == nullptr) || /* no queue */
(queue->get_total_node_count() == 0) /* no nodes */
))
if (unlikely( !queue || queue->empty() ))
return false;

/* reset node count */
assert(node_count == 0);
assert(runnable_items.empty());
node_count.store(queue->get_total_node_count(), std::memory_order_release);
node_count.store(queue->total_node_count(), std::memory_order_release);

for( auto * item : queue->initially_runnable_items )
mark_as_runnable( item );
Expand Down Expand Up @@ -405,7 +433,7 @@ class dsp_queue_interpreter

node_count_t total_node_count(void) const
{
return queue->get_total_node_count();
return queue->total_node_count();
}

void set_thread_count(thread_count_t i)
Expand Down
16 changes: 9 additions & 7 deletions server/supernova/server/memory_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,14 @@ class rt_pool_allocator {
typedef rt_pool_allocator<U> other;
};

rt_pool_allocator() throw()
{}

rt_pool_allocator(const rt_pool_allocator&) throw()
{}
rt_pool_allocator() throw() = default;
rt_pool_allocator(const rt_pool_allocator&) throw() = default;
~rt_pool_allocator() throw() = default;

template <class U>
rt_pool_allocator(const rt_pool_allocator<U>&) throw()
{}

~rt_pool_allocator() throw()
{}

pointer address(reference x) const
{
Expand Down Expand Up @@ -95,6 +91,12 @@ class rt_pool_allocator {
new(p) T(val);
}

template< class U, class... Args >
void construct( U* p, Args&&... args )
{
::new((void *)p) U(std::forward<Args>(args)...);
}

void destroy(pointer p)
{
p->~T();
Expand Down
16 changes: 8 additions & 8 deletions testsuite/supernova/server_node_graph_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ BOOST_AUTO_TEST_CASE( queue_construction_test_2 )

n.add_node(g);
unique_ptr<node_graph::dsp_thread_queue> q = n.generate_dsp_queue();
BOOST_REQUIRE_EQUAL(q->get_total_node_count(), 0u);
BOOST_REQUIRE_EQUAL(q->total_node_count(), 0u);

n.remove_node(g);
}
Expand All @@ -258,13 +258,13 @@ BOOST_AUTO_TEST_CASE( queue_construction_test_3 )
n.add_node(s);

unique_ptr<node_graph::dsp_thread_queue> q1 = n.generate_dsp_queue();
BOOST_REQUIRE_EQUAL(q1->get_total_node_count(), 1u);
BOOST_REQUIRE_EQUAL(q1->total_node_count(), 1u);

test_synth * s2 = new test_synth(3, 0);
n.add_node(s2);

unique_ptr<node_graph::dsp_thread_queue> q2 = n.generate_dsp_queue();
BOOST_REQUIRE_EQUAL(q2->get_total_node_count(), 1u);
BOOST_REQUIRE_EQUAL(q2->total_node_count(), 1u);

n.remove_node(s);
n.remove_node(s2);
Expand All @@ -282,7 +282,7 @@ BOOST_AUTO_TEST_CASE( pgroup_test_1 )
BOOST_REQUIRE_EQUAL(n.find_group(1), g);

unique_ptr<node_graph::dsp_thread_queue> q = n.generate_dsp_queue();
BOOST_REQUIRE_EQUAL(q->get_total_node_count(), 0u);
BOOST_REQUIRE_EQUAL(q->total_node_count(), 0u);
n.remove_node(g);
}

Expand Down Expand Up @@ -337,7 +337,7 @@ BOOST_AUTO_TEST_CASE( pgroup_test_4 )
n.add_node(s, to_group);

unique_ptr<node_graph::dsp_thread_queue> q = n.generate_dsp_queue();
BOOST_REQUIRE_EQUAL(q->get_total_node_count(), 1u);
BOOST_REQUIRE_EQUAL(q->total_node_count(), 1u);

n.remove_node(s);
}
Expand All @@ -363,7 +363,7 @@ BOOST_AUTO_TEST_CASE( pgroup_test_5 )

unique_ptr<node_graph::dsp_thread_queue> q = n.generate_dsp_queue();

BOOST_REQUIRE_EQUAL(q->get_total_node_count(), 2u);
BOOST_REQUIRE_EQUAL(q->total_node_count(), 2u);

n.remove_node(s1);
n.remove_node(s2);
Expand All @@ -390,7 +390,7 @@ BOOST_AUTO_TEST_CASE( pgroup_test_6 )
n.add_node(s3, to_group);

unique_ptr<node_graph::dsp_thread_queue> q = n.generate_dsp_queue();
BOOST_REQUIRE_EQUAL(q->get_total_node_count(), 3u);
BOOST_REQUIRE_EQUAL(q->total_node_count(), 3u);
}
n.remove_node(g);
}
Expand Down Expand Up @@ -421,7 +421,7 @@ BOOST_AUTO_TEST_CASE( pgroup_test_7 )
n.add_node(s2, to_g2);

unique_ptr<node_graph::dsp_thread_queue> q = n.generate_dsp_queue();
BOOST_REQUIRE_EQUAL(q->get_total_node_count(), 2u);
BOOST_REQUIRE_EQUAL(q->total_node_count(), 2u);
}
n.remove_node(g);
}
Expand Down

0 comments on commit 3be9fc2

Please sign in to comment.