Skip to content

Commit

Permalink
Implement new parallel sort (broken at the moment)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mortal committed Oct 13, 2011
1 parent efc2360 commit bc632a9
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 119 deletions.
52 changes: 41 additions & 11 deletions test/unit/test_parallel_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,43 @@
#include <tpie/parallel_sort.h>
#include <boost/random/linear_congruential.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <tpie/progress_indicator_arrow.h>

bool basic1() {
template<size_t min_size>
bool basic1(size_t elements = 1024*1024) {
boost::rand48 prng(42);
std::vector<int> v1;
std::vector<int> v2;
for (size_t i=0; i < 1234567; ++i) {
int x = prng();
v1.push_back(x);
v2.push_back(x);
std::vector<int> v1(elements);
std::vector<int> v2(elements);

tpie::progress_indicator_arrow pi("Parallel sort", elements);
tpie::fractional_progress fp(&pi);

tpie::fractional_subindicator gen_p(fp, "Generate", TPIE_FSI, elements, "Generate");
//tpie::fractional_subindicator std_p(fp, "std::sort", TPIE_FSI, elements, "std::sort");
tpie::fractional_subindicator par_p(fp, "parallel_sort", TPIE_FSI, elements, "parallel_sort");

fp.init();

gen_p.init(elements);
for (size_t i = 0; i < elements; ++i) {
gen_p.step();
v1[i] = v2[i] = prng();
}
std::sort(v1.begin(), v1.end());
tpie::parallel_sort_impl<std::vector<int>::iterator, std::less<int>, 42> s(0);
gen_p.done();

tpie::parallel_sort_impl<std::vector<int>::iterator, std::less<int>, min_size > s(&par_p);
s(v2.begin(), v2.end());
if(v1 != v2) {std::cerr << "Failed" << std::endl; return false;}

//std_p.init(1);
std::sort(v1.begin(), v1.end());
//std_p.done();

fp.done();

if(v1 != v2) {
std::cerr << "std::sort and parallel_sort disagree" << std::endl;
return false;
}
return true;
}

Expand Down Expand Up @@ -102,9 +125,16 @@ void stress_test() {

int main(int argc, char **argv) {
if(argc != 2) return 1;
tpie_initer _;
std::string test(argv[1]);
if (test == "basic1") {
exit(basic1()?EXIT_SUCCESS:EXIT_FAILURE);
return basic1<2>(1024*1024) ? EXIT_SUCCESS : EXIT_FAILURE;
} else if (test == "basic2") {
return basic1<8>(8*8) ? EXIT_SUCCESS : EXIT_FAILURE;
} else if (test == "medium") {
return basic1<1024*1024>(1024*1024*24) ? EXIT_SUCCESS : EXIT_FAILURE;
} else if (test == "large") {
return basic1<1024*1024>(1024*1024*256) ? EXIT_SUCCESS : EXIT_FAILURE;
} else if (test == "equal_elements") {
exit(equal_elements()?EXIT_SUCCESS:EXIT_FAILURE);
} else if (test == "stress_test") {
Expand Down
2 changes: 2 additions & 0 deletions tpie/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ set (OTHER_HEADERS
internal_stack.h
internal_vector.h
internal_stack_vector_base.h
job_manager.h
loglevel.h
logstream.h
mergeheap.h
Expand Down Expand Up @@ -157,6 +158,7 @@ set (OTHER_SOURCES
file_count.cpp
execution_time_predictor.cpp
fractional_progress.cpp
job_manager.cpp
logstream.cpp
memory.cpp
portability.cpp
Expand Down
4 changes: 4 additions & 0 deletions tpie/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,9 @@ struct end_of_stream_exception: public stream_exception {
end_of_stream_exception(): stream_exception("") {};
};

struct job_manager_exception: public exception {
job_manager_exception(): exception("") {};
};

}
#endif //__TPIE_EXCEPTION_H__
99 changes: 99 additions & 0 deletions tpie/job_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include <tpie/job_manager.h>
#include <tpie/array.h>
#include <tpie/internal_queue.h>
#include <tpie/exception.h>

namespace tpie {

class job_manager * the_job_manager = 0;

class job_manager {
public:
job_manager() : m_done(false) {
m_jobs.resize(2048);
}
~job_manager() {
boost::mutex::scoped_lock lock(jobs_mutex);
m_done = true;
m_has_data.notify_all();
lock.unlock();
for (size_t i = 0; i < m_thread_pool.size(); ++i) {
m_thread_pool[i].join();
}
}
void init_pool(size_t threads) {
m_thread_pool.resize(threads);
for (size_t i = 0; i < threads; ++i) {
boost::thread t(worker);
// thread is move-constructable
m_thread_pool[i].swap(t);
}
}
boost::mutex jobs_mutex;
private:
tpie::internal_queue<tpie::job *> m_jobs;
boost::condition_variable m_has_data;
tpie::array<boost::thread> m_thread_pool;
bool m_done;
static void worker() {
for (;;) {
boost::mutex::scoped_lock lock(the_job_manager->jobs_mutex);
while (the_job_manager->m_jobs.empty() && !the_job_manager->m_done) the_job_manager->m_has_data.wait(lock);
if (the_job_manager->m_done) break;
tpie::job * j = the_job_manager->m_jobs.front();
the_job_manager->m_jobs.pop();
lock.unlock();
j->run();
}
};
friend class tpie::job;
};

void init_job() {
the_job_manager = tpie_new<job_manager>();
the_job_manager->init_pool(boost::thread::hardware_concurrency());
}

void finish_job() {
tpie_delete(the_job_manager);
the_job_manager = 0;
}

void job::join() {
boost::mutex::scoped_lock lock(the_job_manager->jobs_mutex);
while (m_dependencies) {
m_done.wait(lock);
}
}

void job::enqueue(job * parent) {
boost::mutex::scoped_lock lock(the_job_manager->jobs_mutex);
if (the_job_manager->m_done) throw job_manager_exception();
m_parent = parent;
++m_dependencies;
if (m_parent) ++m_parent->m_dependencies;
if (the_job_manager->m_jobs.full()) {
lock.unlock();
run();
return;
}
the_job_manager->m_jobs.push(this);
the_job_manager->m_has_data.notify_one();
}

void job::run() {
(*this)();
done();
}

void job::done() {
boost::mutex::scoped_lock lock(the_job_manager->jobs_mutex);
--m_dependencies;
if (m_parent) m_parent->done();
if (m_dependencies) return;
m_done.notify_all();
on_done();
}

} // namespace tpie

32 changes: 32 additions & 0 deletions tpie/job_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef __TPIE_JOB_MANAGER_H
#define __TPIE_JOB_MANAGER_H

#include <stddef.h>
#include <boost/thread.hpp>

namespace tpie {

class job {
public:
inline job() : m_dependencies(0) {}
virtual void operator()() = 0;
virtual ~job() {}
void join();
void enqueue(job * parent = 0);
protected:
virtual void on_done() {}
private:
size_t m_dependencies;
boost::condition_variable m_done;
job * m_parent;
void run();
void done();
friend class job_manager;
};

void init_job();
void finish_job();

} // namespace tpie

#endif
Loading

0 comments on commit bc632a9

Please sign in to comment.