Skip to content

Commit

Permalink
drop HAVE_CXX11 in ParseBuffer.h
Browse files Browse the repository at this point in the history
part of #4284
  • Loading branch information
vigsterkr committed May 17, 2018
1 parent fb66888 commit ca3d1af
Showing 1 changed file with 2 additions and 103 deletions.
105 changes: 2 additions & 103 deletions src/shogun/io/streaming/ParseBuffer.h
@@ -1,27 +1,21 @@
/*
* This software is distributed under BSD 3-clause license (see LICENSE file).
*
* Authors: Heiko Strathmann, Viktor Gal, Soeren Sonnenburg, Yuyu Zhang,
* Authors: Heiko Strathmann, Viktor Gal, Soeren Sonnenburg, Yuyu Zhang,
* Sergey Lisitsyn, Wu Lin
*/
#ifndef __PARSEBUFFER_H__
#define __PARSEBUFFER_H__

#include <shogun/lib/config.h>

#if defined(HAVE_CXX11) || defined(HAVE_PTHREAD)

#include <shogun/lib/common.h>
#include <shogun/base/SGObject.h>
#include <shogun/lib/DataType.h>
#ifdef HAVE_CXX11
#include <condition_variable>
#include <memory>
#include <mutex>
#include <vector>
#elif HAVE_PTHREAD
#include <pthread.h>
#endif

namespace shogun
{
Expand Down Expand Up @@ -95,23 +89,12 @@ template <class T> class CParseBuffer: public CSGObject
*/
Example<T>* get_free_example()
{
#ifdef HAVE_CXX11
std::unique_lock<std::mutex> write_lk(*write_mutex, std::defer_lock);
std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[ex_write_index], std::defer_lock);
std::lock(write_lk, current_ex_lock);
while (ex_used[ex_write_index] == E_NOT_USED)
ex_in_use_cond[ex_write_index]->wait(current_ex_lock);
Example<T>* ex=&ex_ring[ex_write_index];
#elif HAVE_PTHREAD
pthread_mutex_lock(write_lock);
pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
while (ex_used[ex_write_index] == E_NOT_USED)
pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
Example<T>* ex=&ex_ring[ex_write_index];
pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
pthread_mutex_unlock(write_lock);
#endif

return ex;
}

Expand Down Expand Up @@ -221,7 +204,6 @@ template <class T> class CParseBuffer: public CSGObject

/// Enum used for representing used/unused/empty state of example
E_IS_EXAMPLE_USED* ex_used;
#ifdef HAVE_CXX11
/// Lock on state of example - used or unused
std::vector<std::shared_ptr<std::mutex> > ex_in_use_mutex;
/// Condition variable triggered when example is being/not being used
Expand All @@ -230,16 +212,6 @@ template <class T> class CParseBuffer: public CSGObject
std::shared_ptr<std::mutex> read_mutex;
/// Lock for writing new examples
std::shared_ptr<std::mutex> write_mutex;
#elif HAVE_PTHREAD
/// Lock on state of example - used or unused
pthread_mutex_t* ex_in_use_mutex;
/// Condition variable triggered when example is being/not being used
pthread_cond_t* ex_in_use_cond;
/// Lock for reading examples from the ring
pthread_mutex_t* read_lock;
/// Lock for writing new examples
pthread_mutex_t* write_lock;
#endif

/// Write position for next example
int32_t ex_write_index;
Expand Down Expand Up @@ -267,16 +239,8 @@ template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
ring_size = size;
ex_ring = SG_CALLOC(Example<T>, ring_size);
ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
#ifdef HAVE_CXX11
read_mutex = std::make_shared<std::mutex>();
write_mutex = std::make_shared<std::mutex>();
#elif HAVE_PTHREAD
ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
read_lock = SG_MALLOC(pthread_mutex_t, 1);
write_lock = SG_MALLOC(pthread_mutex_t, 1);
#endif

SG_SINFO("Initialized with ring size: %d.\n", ring_size)

ex_write_index = 0;
Expand All @@ -290,19 +254,9 @@ template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
ex_ring[i].length = 1;
ex_ring[i].label = FLT_MAX;

#ifdef HAVE_CXX11
ex_in_use_mutex.push_back(std::make_shared<std::mutex>());
ex_in_use_cond.push_back(std::make_shared<std::condition_variable>());
#elif defined(HAVE_PTHREAD)
pthread_cond_init(&ex_in_use_cond[i], NULL);
pthread_mutex_init(&ex_in_use_mutex[i], NULL);
#endif
}
#if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
pthread_mutex_init(read_lock, NULL);
pthread_mutex_init(write_lock, NULL);
#endif

free_vectors_on_destruct = true;
}

Expand All @@ -316,25 +270,14 @@ template <class T> CParseBuffer<T>::~CParseBuffer()
get_name(), get_name(), i, ex_ring[i].fv);
delete ex_ring[i].fv;
}
#if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
pthread_mutex_destroy(&ex_in_use_mutex[i]);
pthread_cond_destroy(&ex_in_use_cond[i]);
#endif
}
SG_FREE(ex_ring);
SG_FREE(ex_used);
#ifdef HAVE_CXX11

ex_in_use_mutex.clear();
ex_in_use_cond.clear();
read_mutex.reset();
write_mutex.reset();
#elif HAVE_PTHREAD
SG_FREE(ex_in_use_mutex);
SG_FREE(ex_in_use_cond);

SG_FREE(read_lock);
SG_FREE(write_lock);
#endif
}

template <class T>
Expand All @@ -361,79 +304,45 @@ Example<T>* CParseBuffer<T>::return_example_to_read()
template <class T>
Example<T>* CParseBuffer<T>::get_unused_example()
{
#ifdef HAVE_CXX11
std::lock_guard<std::mutex> read_lk(*read_mutex);
#elif HAVE_PTHREAD
pthread_mutex_lock(read_lock);
#endif

Example<T> *ex;
int32_t current_index = ex_read_index;
// Because read index will change after return_example_to_read

#ifdef HAVE_CXX11
std::lock_guard<std::mutex> current_ex_lk(*ex_in_use_mutex[current_index]);
#elif HAVE_PTHREAD
pthread_mutex_lock(&ex_in_use_mutex[current_index]);
#endif

if (ex_used[current_index] == E_NOT_USED)
ex = return_example_to_read();
else
ex = NULL;

#if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
pthread_mutex_unlock(read_lock);
#endif
return ex;
}

template <class T>
int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
{
#ifdef HAVE_CXX11
std::lock_guard<std::mutex> write_lk(*write_mutex);
#elif HAVE_PTHREAD
pthread_mutex_lock(write_lock);
#endif
int32_t ret;
int32_t current_index = ex_write_index;

#ifdef HAVE_CXX11
std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[current_index]);
#elif HAVE_PTHREAD
pthread_mutex_lock(&ex_in_use_mutex[current_index]);
#endif
while (ex_used[ex_write_index] == E_NOT_USED)
{
#ifdef HAVE_CXX11
ex_in_use_cond[ex_write_index]->wait(current_ex_lock);
#elif HAVE_PTHREAD
pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
#endif
}

ret = write_example(ex);

#if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
pthread_mutex_unlock(write_lock);
#endif

return ret;
}

template <class T>
void CParseBuffer<T>::finalize_example(bool free_after_release)
{
#ifdef HAVE_CXX11
std::lock_guard<std::mutex> read_lk(*read_mutex);
std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[ex_read_index]);
#elif HAVE_PTHREAD
pthread_mutex_lock(read_lock);
pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
#endif
ex_used[ex_read_index] = E_USED;

if (free_after_release)
Expand All @@ -445,20 +354,10 @@ void CParseBuffer<T>::finalize_example(bool free_after_release)
ex_ring[ex_read_index].fv=NULL;
}

#ifdef HAVE_CXX11
ex_in_use_cond[ex_read_index]->notify_one();
current_ex_lock.unlock();
#elif HAVE_PTHREAD
pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
#endif
inc_read_index();

#if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
pthread_mutex_unlock(read_lock);
#endif
}

}
#endif // defined(HAVE_CXX11) || defined(HAVE_PTHREAD)
#endif // __PARSEBUFFER_H__

0 comments on commit ca3d1af

Please sign in to comment.