Skip to content

Commit

Permalink
Switch priority queue to using std::merge for merging sequences rathe…
Browse files Browse the repository at this point in the history
…r than just sorting two sorted sequences
  • Loading branch information
Mortal committed Oct 18, 2012
1 parent cb8d651 commit 800c05f
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 37 deletions.
59 changes: 59 additions & 0 deletions tpie/priority_queue.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -198,6 +198,65 @@ class priority_queue {
* in memory. */ * in memory. */
tpie::array<T> gbuffer0; tpie::array<T> gbuffer0;


class cyclic_array_iterator : public boost::iterator_facade<cyclic_array_iterator, T, boost::random_access_traversal_tag> {
tpie::array<T> & arr;
memory_size_type idx;
memory_size_type first;

public:
cyclic_array_iterator(tpie::array<T> & arr, memory_size_type idx, memory_size_type first)
: arr(arr), idx(idx % arr.size()), first(first)
{
}

private:
friend class boost::iterator_core_access;
T & dereference() const {
return arr[idx];
}
void increment() {
if (idx+1 == arr.size()) idx = 0;
else ++idx;
}
void decrement() {
if (idx == 0) idx = arr.size() - 1;
else --idx;
}
void advance(memory_size_type n) {
idx = (idx + n) % arr.size();
}
memory_size_type from_beginning() const {
return (idx < first) ? (idx + arr.size() - first) : (idx - first);
}
memory_offset_type distance_to(const cyclic_array_iterator & other) const {
return static_cast<memory_offset_type>(other.from_beginning())
-static_cast<memory_offset_type>(from_beginning());
}
bool equal(const cyclic_array_iterator & other) const {
return idx == other.idx;
}
};

template <typename IT>
void assert_sorted(IT i, IT j, memory_size_type n) {
if (j-i != n) {
log_error() << "Bad distance" << std::endl;
}
IT k = i;
++k;
--n;
while (k != j) {
if (comp_(*k, *i)) {
log_error() << "Not a sorted sequence" << std::endl;
}
++k, ++i;
--n;
}
if (n != 0) {
log_error() << "Bad steps" << std::endl;
}
}

/** 2*(#slots) integers. Slot i contains its elements in cyclic ascending order, /** 2*(#slots) integers. Slot i contains its elements in cyclic ascending order,
* starting at index slot_state[2*i]. Slot i contains slot_state[2*i+1] elements. * starting at index slot_state[2*i]. Slot i contains slot_state[2*i+1] elements.
* Its data is in data file i. */ * Its data is in data file i. */
Expand Down
50 changes: 13 additions & 37 deletions tpie/priority_queue.inl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -246,15 +246,10 @@ void priority_queue<T, Comparator>::push(const T& x) {


// Bubble lesser elements down into deletion buffer // Bubble lesser elements down into deletion buffer
if(buffer_size > 0) { if(buffer_size > 0) {

// merge deletion and insertion buffer
// fetch insertion buffer std::merge(buffer.find(buffer_start), buffer.find(buffer_start + buffer_size),
std::copy(arr.begin(), arr.end(), mergebuffer.begin()); arr.begin(), arr.end(),

mergebuffer.begin(), comp_);
// fetch deletion buffer
std::copy(buffer.find(buffer_start), buffer.find(buffer_start+buffer_size), mergebuffer.find(setting_m));

// sort buffer elements
parallel_sort(mergebuffer.begin(), mergebuffer.find(buffer_size+setting_m), comp_);


// smaller elements go in deletion buffer // smaller elements go in deletion buffer
std::copy(mergebuffer.begin(), mergebuffer.find(buffer_size), buffer.find(buffer_start)); std::copy(mergebuffer.begin(), mergebuffer.find(buffer_size), buffer.find(buffer_start));
Expand All @@ -268,19 +263,11 @@ void priority_queue<T, Comparator>::push(const T& x) {


// Merge insertion buffer and group buffer 0 // Merge insertion buffer and group buffer 0
assert(group_state[0].size+setting_m <= setting_m*2); assert(group_state[0].size+setting_m <= setting_m*2);
memory_size_type j = 0;

// fetch gbuffer0
for(stream_size_type i = group_state[0].start; i < group_state[0].start+group_state[0].size; i++) {
mergebuffer[j] = gbuffer0[static_cast<memory_size_type>(i%setting_m)];
++j;
}


// fetch insertion buffer std::merge(cyclic_array_iterator(gbuffer0, group_state[0].start, group_state[0].start),
std::copy(arr.begin(), arr.find(setting_m), mergebuffer.find(j)); cyclic_array_iterator(gbuffer0, group_state[0].start + group_state[0].size, group_state[0].start),

arr.begin(), arr.end(),
// sort mergebuffer.begin(), comp_);
parallel_sort(mergebuffer.get(), mergebuffer.get()+(group_state[0].size+setting_m), comp_);


// smaller elements go in gbuffer0 // smaller elements go in gbuffer0
std::copy(mergebuffer.begin(), mergebuffer.find(group_state[0].size), gbuffer0.begin()); std::copy(mergebuffer.begin(), mergebuffer.find(group_state[0].size), gbuffer0.begin());
Expand Down Expand Up @@ -890,22 +877,11 @@ void priority_queue<T, Comparator>::remove_group_buffer(group_type group) {
// merge group buffer with group buffer 0 // merge group buffer with group buffer 0
array<T> mergebuffer(group_state[0].size + group_state[group].size); array<T> mergebuffer(group_state[0].size + group_state[group].size);


if (group_state[0].start + group_state[0].size <= setting_m) { std::merge(cyclic_array_iterator(gbuffer0, group_state[0].start, group_state[0].start),
std::copy(gbuffer0.find(group_state[0].start), cyclic_array_iterator(gbuffer0, group_state[0].start + group_state[0].size, group_state[0].start),
gbuffer0.find(group_state[0].start + group_state[0].size), arr.begin(), arr.find(group_state[group].size),
mergebuffer.begin()); mergebuffer.begin(), comp_);
} else {
memory_size_type first_read = setting_m - group_state[0].start;
memory_size_type second_read = group_state[0].size - first_read;
std::copy(gbuffer0.find(group_state[0].start),
gbuffer0.find(group_state[0].start + first_read),
mergebuffer.begin());
std::copy(gbuffer0.begin(),
gbuffer0.find(second_read),
mergebuffer.find(first_read));
}
std::copy(arr.begin(), arr.find(group_state[group].size), mergebuffer.find(group_state[0].size));
parallel_sort(mergebuffer.begin(), mergebuffer.find(group_state[0].size + group_state[group].size), comp_);
std::copy(mergebuffer.begin(), mergebuffer.find(group_state[0].size), gbuffer0.begin()); std::copy(mergebuffer.begin(), mergebuffer.find(group_state[0].size), gbuffer0.begin());
group_state[0].start = 0; group_state[0].start = 0;
std::copy(mergebuffer.find(group_state[0].size), mergebuffer.find(group_state[0].size + group_state[group].size), arr.begin()); std::copy(mergebuffer.find(group_state[0].size), mergebuffer.find(group_state[0].size + group_state[group].size), arr.begin());
Expand Down

0 comments on commit 800c05f

Please sign in to comment.