diff --git a/src/shogun/io/streaming/InputParser.h b/src/shogun/io/streaming/InputParser.h index 652d0db3a1d..c49e29b52f6 100644 --- a/src/shogun/io/streaming/InputParser.h +++ b/src/shogun/io/streaming/InputParser.h @@ -13,20 +13,14 @@ #include -#if defined(HAVE_CXX11) || defined(HAVE_PTHREAD) - #include #include #include #include -#ifdef HAVE_CXX11 #include #include #include #include -#elif defined(HAVE_PTHREAD) -#include -#endif #define PARSER_DEFAULT_BUFFSIZE 100 @@ -324,11 +318,7 @@ template class CInputParser CStreamingFile* input_source; /// Thread in which the parser runs -#ifdef HAVE_CXX11 - std::shared_ptr parse_thread; -#elif defined(HAVE_PTHREAD) - pthread_t parse_thread; -#endif + std::thread parse_thread; /// The ring of examples, stored as they are parsed CParseBuffer* examples_ring; @@ -361,18 +351,13 @@ template class CInputParser int32_t ring_size; /// Mutex which is used when getting/setting state of examples (whether a new example is ready) -#ifdef HAVE_CXX11 - std::shared_ptr examples_state_lock; -#elif defined(HAVE_PTHREAD) - pthread_mutex_t examples_state_lock; -#endif + std::mutex examples_state_lock; /// Condition variable to indicate change of state of examples -#ifdef HAVE_CXX11 - std::shared_ptr examples_state_changed; -#elif defined(HAVE_PTHREAD) - pthread_cond_t examples_state_changed; -#endif + std::condition_variable examples_state_changed; + + /// Flag that indicate that the parsing thread should continue reading + std::atomic keep_running; }; @@ -393,28 +378,15 @@ template template CInputParser::CInputParser() { - /* this line was commented out when I found it. However, the mutex locks - * have to be initialised. Otherwise uninitialised memory error */ - //init(NULL, true, PARSER_DEFAULT_BUFFSIZE); -#if HAVE_CXX11 - examples_state_lock = std::make_shared(); - examples_state_changed = std::make_shared(); -#elif defined(HAVE_PTHREAD) - pthread_mutex_init(&examples_state_lock, NULL); - pthread_cond_init(&examples_state_changed, NULL); -#endif - examples_ring=NULL; + examples_ring = nullptr; parsing_done=true; reading_done=true; + keep_running.store(false, std::memory_order_release); } template CInputParser::~CInputParser() { -#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD) - pthread_mutex_destroy(&examples_state_lock); - pthread_cond_destroy(&examples_state_changed); -#endif SG_UNREF(examples_ring); } @@ -469,11 +441,8 @@ template SG_SDEBUG("creating parse thread\n") if (examples_ring) examples_ring->init_vector(); -#ifdef HAVE_CXX11 - parse_thread.reset(new std::thread(&parse_loop_entry_point, this)); -#elif defined(HAVE_PTHREAD) - pthread_create(&parse_thread, NULL, parse_loop_entry_point, this); -#endif + keep_running.store(true, std::memory_order_release); + parse_thread = std::thread(&parse_loop_entry_point, this); SG_SDEBUG("leaving CInputParser::start_parser()\n") } @@ -491,11 +460,7 @@ template { SG_SDEBUG("entering CInputParser::is_running()\n") bool ret; -#ifdef HAVE_CXX11 - std::lock_guard lock(*examples_state_lock); -#elif defined(HAVE_PTHREAD) - pthread_mutex_lock(&examples_state_lock); -#endif + std::lock_guard lock(examples_state_lock); if (parsing_done) if (reading_done) @@ -504,9 +469,6 @@ template ret = true; else ret = false; -#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD) - pthread_mutex_unlock(&examples_state_lock); -#endif SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret) return ret; @@ -556,26 +518,15 @@ template void* CInputParser::main_parse_loop(void* params) CInputParser* this_obj = (CInputParser *) params; this->input_source = this_obj->input_source; - while (1) + while (keep_running.load(std::memory_order_acquire)) { -#ifdef HAVE_CXX11 - std::unique_lock lock(*examples_state_lock); -#elif defined(HAVE_PTHREAD) - pthread_mutex_lock(&examples_state_lock); -#endif + std::unique_lock lock(examples_state_lock); + if (parsing_done) { -#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD) - pthread_mutex_unlock(&examples_state_lock); -#endif return NULL; } -#ifdef HAVE_CXX11 lock.unlock(); -#elif defined(HAVE_PTHREAD) - pthread_mutex_unlock(&examples_state_lock); - pthread_testcancel(); -#endif current_example = examples_ring->get_free_example(); current_feature_vector = current_example->fv; @@ -589,16 +540,9 @@ template void* CInputParser::main_parse_loop(void* params) if (current_len < 0) { -#ifdef HAVE_CXX11 lock.lock(); parsing_done = true; - examples_state_changed->notify_one(); -#elif defined(HAVE_PTHREAD) - pthread_mutex_lock(&examples_state_lock); - parsing_done = true; - pthread_cond_signal(&examples_state_changed); - pthread_mutex_unlock(&examples_state_lock); -#endif + examples_state_changed.notify_one(); return NULL; } @@ -607,16 +551,9 @@ template void* CInputParser::main_parse_loop(void* params) current_example->length = current_len; examples_ring->copy_example(current_example); -#ifdef HAVE_CXX11 lock.lock(); number_of_vectors_parsed++; - examples_state_changed->notify_one(); -#elif defined(HAVE_PTHREAD) - pthread_mutex_lock(&examples_state_lock); - number_of_vectors_parsed++; - pthread_cond_signal(&examples_state_changed); - pthread_mutex_unlock(&examples_state_lock); -#endif + examples_state_changed.notify_one(); } return NULL; } @@ -632,11 +569,7 @@ template Example* CInputParser::retrieve_example() { reading_done = true; /* Signal to waiting threads that no more examples are left */ -#ifdef HAVE_CXX11 - examples_state_changed->notify_one(); -#elif defined(HAVE_PTHREAD) - pthread_cond_signal(&examples_state_changed); -#endif + examples_state_changed.notify_one(); return NULL; } } @@ -670,11 +603,7 @@ template int32_t CInputParser::get_next_example(T* &fv, if (reading_done) return 0; -#ifdef HAVE_CXX11 - std::unique_lock lock(*examples_state_lock); -#elif defined(HAVE_PTHREAD) - pthread_mutex_lock(&examples_state_lock); -#endif + std::unique_lock lock(examples_state_lock); ex = retrieve_example(); if (ex == NULL) @@ -682,29 +611,18 @@ template int32_t CInputParser::get_next_example(T* &fv, if (reading_done) { /* No more examples left, return */ -#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD) - pthread_mutex_unlock(&examples_state_lock); -#endif return 0; } else { /* Examples left, wait for one to become ready */ -#ifdef HAVE_CXX11 - examples_state_changed->wait(lock); -#elif defined(HAVE_PTHREAD) - pthread_cond_wait(&examples_state_changed, &examples_state_lock); - pthread_mutex_unlock(&examples_state_lock); -#endif + examples_state_changed.wait(lock); continue; } } else { /* Example ready, return the example */ -#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD) - pthread_mutex_unlock(&examples_state_lock); -#endif break; } } @@ -734,25 +652,15 @@ template void CInputParser::end_parser() { SG_SDEBUG("entering CInputParser::end_parser\n") SG_SDEBUG("joining parse thread\n") -#ifdef HAVE_CXX11 - parse_thread->join(); -#elif defined(HAVE_PTHREAD) - pthread_join(parse_thread, NULL); -#endif + parse_thread.join(); SG_SDEBUG("leaving CInputParser::end_parser\n") } template void CInputParser::exit_parser() { SG_SDEBUG("cancelling parse thread\n") -#ifdef HAVE_CXX11 - parse_thread.reset(); -#elif defined(HAVE_PTHREAD) - pthread_cancel(parse_thread); -#endif + keep_running.store(false, std::memory_order_release); } } -#endif /* defined(HAVE_CXX11) || defined(HAVE_PTHREAD) */ - #endif // __INPUTPARSER_H__ diff --git a/src/shogun/lib/RefCount.cpp b/src/shogun/lib/RefCount.cpp index e9517e28cbc..8f2dbedda04 100644 --- a/src/shogun/lib/RefCount.cpp +++ b/src/shogun/lib/RefCount.cpp @@ -2,41 +2,22 @@ using namespace shogun; -int32_t RefCount::ref() +RefCount::RefCount(int32_t ref_start) { -#ifdef HAVE_CXX11_ATOMIC - int32_t count = rc.fetch_add(1)+1; -#else - lock.lock(); - int32_t count = ++rc; - lock.unlock(); -#endif + rc.store(ref_start, std::memory_order_release); +} - return count; +int32_t RefCount::ref() +{ + return rc.fetch_add(1, std::memory_order_relaxed)+1; } int32_t RefCount::unref() { -#ifdef HAVE_CXX11_ATOMIC - int32_t count = rc.fetch_sub(1)-1; -#else - lock.lock(); - int32_t count = --rc; - lock.unlock(); -#endif - - return count; + return rc.fetch_sub(1, std::memory_order_acquire)-1; } int32_t RefCount::ref_count() { -#ifdef HAVE_CXX11_ATOMIC - int32_t count = rc.load(); -#else - lock.lock(); - int32_t count = rc; - lock.unlock(); -#endif - - return count; + return rc.load(std::memory_order_acquire); } diff --git a/src/shogun/lib/RefCount.h b/src/shogun/lib/RefCount.h index 137d04d7c0b..86c0678ae04 100644 --- a/src/shogun/lib/RefCount.h +++ b/src/shogun/lib/RefCount.h @@ -1,15 +1,10 @@ -#include - -#ifdef HAVE_CXX11_ATOMIC -#include -#endif - -#include -#include - #ifndef _REFCOUNT__H__ #define _REFCOUNT__H__ +#include +#include +#include + namespace shogun { /** brief This class implements a thread-safe counter used for @@ -22,7 +17,7 @@ class RefCount * * @param ref_start starting value for counter */ - RefCount(int32_t ref_start=0) : rc(ref_start) {} + RefCount(int32_t ref_start=0) : rc(ref_start) {}; /** Increase ref count * @@ -43,14 +38,7 @@ class RefCount int32_t ref_count(); /** reference count */ -#ifdef HAVE_CXX11_ATOMIC - volatile std::atomic rc; -#else - int32_t rc; - - /** the lock */ - CLock lock; -#endif + std::atomic rc; }; }