Skip to content

Commit

Permalink
Add terminate atomic flag for parsing thread in InputParser
Browse files Browse the repository at this point in the history
this allows graceful stopping of the parsing thread
Remove pthread implementation from InputParser
finetune the memory ordering of the atomic counter in RefCount
  • Loading branch information
vigsterkr committed Feb 28, 2017
1 parent fafad1d commit 63a8e07
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 158 deletions.
134 changes: 21 additions & 113 deletions src/shogun/io/streaming/InputParser.h
Expand Up @@ -13,20 +13,14 @@

#include <shogun/lib/config.h>

#if defined(HAVE_CXX11) || defined(HAVE_PTHREAD)

#include <shogun/lib/common.h>
#include <shogun/io/SGIO.h>
#include <shogun/io/streaming/StreamingFile.h>
#include <shogun/io/streaming/ParseBuffer.h>
#ifdef HAVE_CXX11
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#elif defined(HAVE_PTHREAD)
#include <pthread.h>
#endif

#define PARSER_DEFAULT_BUFFSIZE 100

Expand Down Expand Up @@ -324,11 +318,7 @@ template <class T> class CInputParser
CStreamingFile* input_source;

/// Thread in which the parser runs
#ifdef HAVE_CXX11
std::shared_ptr<std::thread> parse_thread;
#elif defined(HAVE_PTHREAD)
pthread_t parse_thread;
#endif
std::thread parse_thread;

/// The ring of examples, stored as they are parsed
CParseBuffer<T>* examples_ring;
Expand Down Expand Up @@ -361,18 +351,13 @@ template <class T> class CInputParser
int32_t ring_size;

/// Mutex which is used when getting/setting state of examples (whether a new example is ready)
#ifdef HAVE_CXX11
std::shared_ptr<std::mutex> examples_state_lock;
#elif defined(HAVE_PTHREAD)
pthread_mutex_t examples_state_lock;
#endif
std::mutex examples_state_lock;

/// Condition variable to indicate change of state of examples
#ifdef HAVE_CXX11
std::shared_ptr<std::condition_variable> examples_state_changed;
#elif defined(HAVE_PTHREAD)
pthread_cond_t examples_state_changed;
#endif
std::condition_variable examples_state_changed;

/// Flag that indicate that the parsing thread should continue reading
std::atomic<bool> keep_running;

};

Expand All @@ -393,28 +378,15 @@ template <class T>
template <class T>
CInputParser<T>::CInputParser()
{
/* this line was commented out when I found it. However, the mutex locks
* have to be initialised. Otherwise uninitialised memory error */
//init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
#if HAVE_CXX11
examples_state_lock = std::make_shared<std::mutex>();
examples_state_changed = std::make_shared<std::condition_variable>();
#elif defined(HAVE_PTHREAD)
pthread_mutex_init(&examples_state_lock, NULL);
pthread_cond_init(&examples_state_changed, NULL);
#endif
examples_ring=NULL;
examples_ring = nullptr;
parsing_done=true;
reading_done=true;
keep_running.store(false, std::memory_order_release);
}

template <class T>
CInputParser<T>::~CInputParser()
{
#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD)
pthread_mutex_destroy(&examples_state_lock);
pthread_cond_destroy(&examples_state_changed);
#endif
SG_UNREF(examples_ring);
}

Expand Down Expand Up @@ -469,11 +441,8 @@ template <class T>
SG_SDEBUG("creating parse thread\n")
if (examples_ring)
examples_ring->init_vector();
#ifdef HAVE_CXX11
parse_thread.reset(new std::thread(&parse_loop_entry_point, this));
#elif defined(HAVE_PTHREAD)
pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
#endif
keep_running.store(true, std::memory_order_release);
parse_thread = std::thread(&parse_loop_entry_point, this);

SG_SDEBUG("leaving CInputParser::start_parser()\n")
}
Expand All @@ -491,11 +460,7 @@ template <class T>
{
SG_SDEBUG("entering CInputParser::is_running()\n")
bool ret;
#ifdef HAVE_CXX11
std::lock_guard<std::mutex> lock(*examples_state_lock);
#elif defined(HAVE_PTHREAD)
pthread_mutex_lock(&examples_state_lock);
#endif
std::lock_guard<std::mutex> lock(examples_state_lock);

if (parsing_done)
if (reading_done)
Expand All @@ -504,9 +469,6 @@ template <class T>
ret = true;
else
ret = false;
#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD)
pthread_mutex_unlock(&examples_state_lock);
#endif

SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
return ret;
Expand Down Expand Up @@ -556,26 +518,15 @@ template <class T> void* CInputParser<T>::main_parse_loop(void* params)
CInputParser* this_obj = (CInputParser *) params;
this->input_source = this_obj->input_source;

while (1)
while (keep_running.load(std::memory_order_acquire))
{
#ifdef HAVE_CXX11
std::unique_lock<std::mutex> lock(*examples_state_lock);
#elif defined(HAVE_PTHREAD)
pthread_mutex_lock(&examples_state_lock);
#endif
std::unique_lock<std::mutex> lock(examples_state_lock);

if (parsing_done)
{
#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD)
pthread_mutex_unlock(&examples_state_lock);
#endif
return NULL;
}
#ifdef HAVE_CXX11
lock.unlock();
#elif defined(HAVE_PTHREAD)
pthread_mutex_unlock(&examples_state_lock);
pthread_testcancel();
#endif

current_example = examples_ring->get_free_example();
current_feature_vector = current_example->fv;
Expand All @@ -589,16 +540,9 @@ template <class T> void* CInputParser<T>::main_parse_loop(void* params)

if (current_len < 0)
{
#ifdef HAVE_CXX11
lock.lock();
parsing_done = true;
examples_state_changed->notify_one();
#elif defined(HAVE_PTHREAD)
pthread_mutex_lock(&examples_state_lock);
parsing_done = true;
pthread_cond_signal(&examples_state_changed);
pthread_mutex_unlock(&examples_state_lock);
#endif
examples_state_changed.notify_one();
return NULL;
}

Expand All @@ -607,16 +551,9 @@ template <class T> void* CInputParser<T>::main_parse_loop(void* params)
current_example->length = current_len;

examples_ring->copy_example(current_example);
#ifdef HAVE_CXX11
lock.lock();
number_of_vectors_parsed++;
examples_state_changed->notify_one();
#elif defined(HAVE_PTHREAD)
pthread_mutex_lock(&examples_state_lock);
number_of_vectors_parsed++;
pthread_cond_signal(&examples_state_changed);
pthread_mutex_unlock(&examples_state_lock);
#endif
examples_state_changed.notify_one();
}
return NULL;
}
Expand All @@ -632,11 +569,7 @@ template <class T> Example<T>* CInputParser<T>::retrieve_example()
{
reading_done = true;
/* Signal to waiting threads that no more examples are left */
#ifdef HAVE_CXX11
examples_state_changed->notify_one();
#elif defined(HAVE_PTHREAD)
pthread_cond_signal(&examples_state_changed);
#endif
examples_state_changed.notify_one();
return NULL;
}
}
Expand Down Expand Up @@ -670,41 +603,26 @@ template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
if (reading_done)
return 0;

#ifdef HAVE_CXX11
std::unique_lock<std::mutex> lock(*examples_state_lock);
#elif defined(HAVE_PTHREAD)
pthread_mutex_lock(&examples_state_lock);
#endif
std::unique_lock<std::mutex> lock(examples_state_lock);
ex = retrieve_example();

if (ex == NULL)
{
if (reading_done)
{
/* No more examples left, return */
#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD)
pthread_mutex_unlock(&examples_state_lock);
#endif
return 0;
}
else
{
/* Examples left, wait for one to become ready */
#ifdef HAVE_CXX11
examples_state_changed->wait(lock);
#elif defined(HAVE_PTHREAD)
pthread_cond_wait(&examples_state_changed, &examples_state_lock);
pthread_mutex_unlock(&examples_state_lock);
#endif
examples_state_changed.wait(lock);
continue;
}
}
else
{
/* Example ready, return the example */
#if !defined(HAVE_CXX11) && defined(HAVE_PTHREAD)
pthread_mutex_unlock(&examples_state_lock);
#endif
break;
}
}
Expand Down Expand Up @@ -734,25 +652,15 @@ template <class T> void CInputParser<T>::end_parser()
{
SG_SDEBUG("entering CInputParser::end_parser\n")
SG_SDEBUG("joining parse thread\n")
#ifdef HAVE_CXX11
parse_thread->join();
#elif defined(HAVE_PTHREAD)
pthread_join(parse_thread, NULL);
#endif
parse_thread.join();
SG_SDEBUG("leaving CInputParser::end_parser\n")
}

template <class T> void CInputParser<T>::exit_parser()
{
SG_SDEBUG("cancelling parse thread\n")
#ifdef HAVE_CXX11
parse_thread.reset();
#elif defined(HAVE_PTHREAD)
pthread_cancel(parse_thread);
#endif
keep_running.store(false, std::memory_order_release);
}
}

#endif /* defined(HAVE_CXX11) || defined(HAVE_PTHREAD) */

#endif // __INPUTPARSER_H__
35 changes: 8 additions & 27 deletions src/shogun/lib/RefCount.cpp
Expand Up @@ -2,41 +2,22 @@

using namespace shogun;

int32_t RefCount::ref()
RefCount::RefCount(int32_t ref_start)
{
#ifdef HAVE_CXX11_ATOMIC
int32_t count = rc.fetch_add(1)+1;
#else
lock.lock();
int32_t count = ++rc;
lock.unlock();
#endif
rc.store(ref_start, std::memory_order_release);
}

return count;
int32_t RefCount::ref()
{
return rc.fetch_add(1, std::memory_order_relaxed)+1;
}

int32_t RefCount::unref()
{
#ifdef HAVE_CXX11_ATOMIC
int32_t count = rc.fetch_sub(1)-1;
#else
lock.lock();
int32_t count = --rc;
lock.unlock();
#endif

return count;
return rc.fetch_sub(1, std::memory_order_acquire)-1;
}

int32_t RefCount::ref_count()
{
#ifdef HAVE_CXX11_ATOMIC
int32_t count = rc.load();
#else
lock.lock();
int32_t count = rc;
lock.unlock();
#endif

return count;
return rc.load(std::memory_order_acquire);
}
24 changes: 6 additions & 18 deletions src/shogun/lib/RefCount.h
@@ -1,15 +1,10 @@
#include <shogun/lib/config.h>

#ifdef HAVE_CXX11_ATOMIC
#include <atomic>
#endif

#include <shogun/lib/common.h>
#include <shogun/lib/Lock.h>

#ifndef _REFCOUNT__H__
#define _REFCOUNT__H__

#include <shogun/lib/config.h>
#include <shogun/lib/common.h>
#include <atomic>

namespace shogun
{
/** brief This class implements a thread-safe counter used for
Expand All @@ -22,7 +17,7 @@ class RefCount
*
* @param ref_start starting value for counter
*/
RefCount(int32_t ref_start=0) : rc(ref_start) {}
RefCount(int32_t ref_start=0) : rc(ref_start) {};

/** Increase ref count
*
Expand All @@ -43,14 +38,7 @@ class RefCount
int32_t ref_count();

/** reference count */
#ifdef HAVE_CXX11_ATOMIC
volatile std::atomic<int> rc;
#else
int32_t rc;

/** the lock */
CLock lock;
#endif
std::atomic<int32_t> rc;
};
}

Expand Down

0 comments on commit 63a8e07

Please sign in to comment.