Skip to content

Commit

Permalink
Make ex_used atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
tdjogi010 committed Mar 26, 2017
1 parent dece492 commit 755bd65
Showing 1 changed file with 15 additions and 19 deletions.
34 changes: 15 additions & 19 deletions src/shogun/io/streaming/ParseBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ template <class T> class CParseBuffer: public CSGObject
Example<T>* get_free_example()
{
int32_t current_write_index = ex_write_index.load(std::memory_order_relaxed); // only written from parser thread
int32_t current_read_index = ex_read_index.load(std::memory_order_acquire); // for synchronisation
while (ex_used[current_write_index] == E_NOT_USED)
current_read_index = ex_read_index.load(std::memory_order_acquire); // for synchronisation
E_IS_EXAMPLE_USED old_value;
while ((old_value=ex_used[current_write_index]->exchange(E_NOT_USED,std::memory_order_acq_rel) )== E_NOT_USED)
{}
Example<T>* ex=&ex_ring[ex_write_index];
ex_used[current_write_index]->store(old_value,std::memory_order_relaxed);

return ex;
}
Expand Down Expand Up @@ -205,7 +206,7 @@ template <class T> class CParseBuffer: public CSGObject
Example<T>* ex_ring;

/// Enum used for representing used/unused/empty state of example
E_IS_EXAMPLE_USED* ex_used;
std::vector<std::shared_ptr<std::atomic<E_IS_EXAMPLE_USED> > > ex_used;

/// Write position for next example
std::atomic<int32_t> ex_write_index;
Expand All @@ -232,7 +233,6 @@ template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
{
ring_size = size;
ex_ring = SG_CALLOC(Example<T>, ring_size);
ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);

SG_SINFO("Initialized with ring size: %d.\n", ring_size)

Expand All @@ -241,7 +241,8 @@ template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)

for (int32_t i=0; i<ring_size; i++)
{
ex_used[i] = E_EMPTY;
std::shared_ptr<std::atomic<E_IS_EXAMPLE_USED> > temp=std::make_shared<std::atomic<E_IS_EXAMPLE_USED> >(E_EMPTY);
ex_used.push_back(temp);
ex_ring[i].fv = NULL;
ex_ring[i].length = 1;
ex_ring[i].label = FLT_MAX;
Expand All @@ -262,7 +263,7 @@ template <class T> CParseBuffer<T>::~CParseBuffer()
}
}
SG_FREE(ex_ring);
SG_FREE(ex_used);
ex_used.clear();

}

Expand All @@ -272,7 +273,7 @@ int32_t CParseBuffer<T>::write_example(Example<T> *ex)
ex_ring[ex_write_index].label = ex->label;
ex_ring[ex_write_index].fv = ex->fv;
ex_ring[ex_write_index].length = ex->length;
ex_used[ex_write_index] = E_NOT_USED;
ex_used[ex_write_index]->store(E_NOT_USED,std::memory_order_release) ;
inc_write_index();

return 1;
Expand All @@ -292,14 +293,13 @@ Example<T>* CParseBuffer<T>::get_unused_example()
{
Example<T> *ex;
int32_t current_read_index = ex_read_index.load(std::memory_order_relaxed); // only written from streaming thread
// Because read index will change after return_example_to_read
int32_t current_write_index = ex_write_index.load(std::memory_order_acquire); //for synchronisation

if (ex_used[current_read_index] == E_NOT_USED) //indirectly compared with current_write_index for checking available example
E_IS_EXAMPLE_USED old_value;
if ((old_value=ex_used[current_read_index]->exchange(E_NOT_USED,std::memory_order_acq_rel)) == E_NOT_USED) //indirectly compared with current_write_index for checking available example
ex = return_example_to_read();
else
ex = NULL;

ex_used[current_read_index]->store(old_value,std::memory_order_relaxed);
return ex;
}

Expand All @@ -309,22 +309,17 @@ int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
int32_t ret;
int32_t current_write_index = ex_write_index.load(std::memory_order_relaxed); // only written from parser thread
// Because write index will change after write_example
int32_t current_read_index = ex_read_index.load(std::memory_order_acquire); // for synchronisation
while (ex_used[current_write_index] == E_NOT_USED) //indirectly compared with current_read_index for checking full buffer
{
current_read_index = ex_read_index.load(std::memory_order_acquire); // for synchronisation
}
while (ex_used[current_write_index]->exchange(E_NOT_USED,std::memory_order_acq_rel) == E_NOT_USED) //indirectly compared with current_read_index for checking full buffer
{}

ret = write_example(ex);

return ret;
}

template <class T>
void CParseBuffer<T>::finalize_example(bool free_after_release)
{
int32_t current_read_index = ex_read_index.load(std::memory_order_relaxed); // only written from streaming thread
ex_used[current_read_index] = E_USED;

if (free_after_release)
{
Expand All @@ -335,6 +330,7 @@ void CParseBuffer<T>::finalize_example(bool free_after_release)
ex_ring[current_read_index].fv=NULL;
}

ex_used[current_read_index]->store(E_USED,std::memory_order_release);
inc_read_index();

}
Expand Down

0 comments on commit 755bd65

Please sign in to comment.