Skip to content

Commit

Permalink
[XrdEc] Move sync_queue to utilities.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 19, 2021
1 parent 2efaba2 commit 26cab7d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 89 deletions.
91 changes: 2 additions & 89 deletions src/XrdEc/XrdEcStrmWriter.hh
Expand Up @@ -27,102 +27,15 @@
#include <memory>
#include <vector>
#include <thread>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <iterator>

#include <sys/stat.h>

namespace XrdEc
{
//---------------------------------------------------------------------------
// A class implementing synchronous queue
//---------------------------------------------------------------------------
template<typename Element>
struct sync_queue
{
//-------------------------------------------------------------------------
// An internal exception used for interrupting the `dequeue` method
//-------------------------------------------------------------------------
struct wait_interrupted{ };

//-------------------------------------------------------------------------
// Default constructor
//-------------------------------------------------------------------------
sync_queue() : interrupted( false )
{
}

//-------------------------------------------------------------------------
// Enqueue new element into the queue
//-------------------------------------------------------------------------
inline void enqueue( Element && element )
{
std::unique_lock<std::mutex> lck( mtx );
elements.push( std::move( element ) );
cv.notify_all();
}

//-------------------------------------------------------------------------
// Dequeue an element from the front of the queue
// Note: if the queue is empty blocks until a new element is enqueued
//-------------------------------------------------------------------------
inline Element dequeue()
{
std::unique_lock<std::mutex> lck( mtx );
while( elements.empty() )
{
cv.wait( lck );
if( interrupted ) throw wait_interrupted();
}
Element element = std::move( elements.front() );
elements.pop();
return std::move( element );
}

//-------------------------------------------------------------------------
// Dequeue an element from the front of the queue
// Note: if the queue is empty returns false, true otherwise
//-------------------------------------------------------------------------
inline bool dequeue( Element &e )
{
std::unique_lock<std::mutex> lck( mtx );
if( elements.empty() ) return false;
e = std::move( elements.front() );
elements.pop();
return true;
}

//-------------------------------------------------------------------------
// Checks if the queue is empty
//-------------------------------------------------------------------------
bool empty()
{
std::unique_lock<std::mutex> lck( mtx );
return elements.empty();
}

//-------------------------------------------------------------------------
// Interrupt all waiting `dequeue` routines
//-------------------------------------------------------------------------
inline void interrupt()
{
interrupted = true;
cv.notify_all();
}

private:
std::queue<Element> elements; //< the queue itself
std::mutex mtx; //< mutex guarding the queue
std::condition_variable cv;
std::atomic<bool> interrupted; //< a flag, true if all `dequeue` routines
//< should be interrupted
};

//---------------------------------------------------------------------------
// The Stream Writer objects, responsible for writing erasure coded data
// into selected placement group.
//! The Stream Writer objects, responsible for writing erasure coded data
//! into selected placement group.
//---------------------------------------------------------------------------
class StrmWriter
{
Expand Down
88 changes: 88 additions & 0 deletions src/XrdEc/XrdEcUtilities.hh
Expand Up @@ -38,6 +38,9 @@
#include <exception>
#include <memory>
#include <random>
#include <queue>
#include <mutex>
#include <condition_variable>

namespace XrdEc
{
Expand Down Expand Up @@ -161,6 +164,91 @@ namespace XrdEc
//! @param st : operation status
//---------------------------------------------------------------------------
void ScheduleHandler( XrdCl::ResponseHandler *handler, const XrdCl::XRootDStatus &st = XrdCl::XRootDStatus() );


//---------------------------------------------------------------------------
// A class implementing synchronous queue
//---------------------------------------------------------------------------
template<typename Element>
struct sync_queue
{
//-------------------------------------------------------------------------
// An internal exception used for interrupting the `dequeue` method
//-------------------------------------------------------------------------
struct wait_interrupted{ };

//-------------------------------------------------------------------------
// Default constructor
//-------------------------------------------------------------------------
sync_queue() : interrupted( false )
{
}

//-------------------------------------------------------------------------
// Enqueue new element into the queue
//-------------------------------------------------------------------------
inline void enqueue( Element && element )
{
std::unique_lock<std::mutex> lck( mtx );
elements.push( std::move( element ) );
cv.notify_all();
}

//-------------------------------------------------------------------------
// Dequeue an element from the front of the queue
// Note: if the queue is empty blocks until a new element is enqueued
//-------------------------------------------------------------------------
inline Element dequeue()
{
std::unique_lock<std::mutex> lck( mtx );
while( elements.empty() )
{
cv.wait( lck );
if( interrupted ) throw wait_interrupted();
}
Element element = std::move( elements.front() );
elements.pop();
return std::move( element );
}

//-------------------------------------------------------------------------
// Dequeue an element from the front of the queue
// Note: if the queue is empty returns false, true otherwise
//-------------------------------------------------------------------------
inline bool dequeue( Element &e )
{
std::unique_lock<std::mutex> lck( mtx );
if( elements.empty() ) return false;
e = std::move( elements.front() );
elements.pop();
return true;
}

//-------------------------------------------------------------------------
// Checks if the queue is empty
//-------------------------------------------------------------------------
bool empty()
{
std::unique_lock<std::mutex> lck( mtx );
return elements.empty();
}

//-------------------------------------------------------------------------
// Interrupt all waiting `dequeue` routines
//-------------------------------------------------------------------------
inline void interrupt()
{
interrupted = true;
cv.notify_all();
}

private:
std::queue<Element> elements; //< the queue itself
std::mutex mtx; //< mutex guarding the queue
std::condition_variable cv;
std::atomic<bool> interrupted; //< a flag, true if all `dequeue` routines
//< should be interrupted
};
}

#endif /* SRC_XRDEC_XRDECUTILITIES_HH_ */

0 comments on commit 26cab7d

Please sign in to comment.