Skip to content

Commit

Permalink
[XrdCl] Delegate all callbacks to the thread-pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jul 13, 2017
1 parent e4281e2 commit 830a5d5
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/XrdCl/XrdClJobManager.hh
Expand Up @@ -21,6 +21,7 @@

#include <stdint.h>
#include <vector>
#include <algorithm>
#include <pthread.h>
#include "XrdCl/XrdClSyncQueue.hh"

Expand Down Expand Up @@ -98,6 +99,14 @@ namespace XrdCl
//------------------------------------------------------------------------
void RunJobs();

bool IsWorker()
{
pthread_t thread = pthread_self();
std::vector<pthread_t>::iterator itr =
std::find( pWorkers.begin(), pWorkers.end(), thread );
return itr != pWorkers.end();
}

private:
//------------------------------------------------------------------------
//! Stop all workers up to n'th
Expand Down
47 changes: 44 additions & 3 deletions src/XrdCl/XrdClXRootDMsgHandler.cc
Expand Up @@ -32,6 +32,7 @@
#include "XrdCl/XrdClUglyHacks.hh"
#include "XrdCl/XrdClUtils.hh"
#include "XrdCl/XrdClTaskManager.hh"
#include "XrdCl/XrdClJobManager.hh"
#include "XrdCl/XrdClSIDManager.hh"
#include "XrdCl/XrdClMessageUtils.hh"

Expand Down Expand Up @@ -64,10 +65,37 @@ namespace
private:
XrdCl::XRootDMsgHandler *pHandler;
};

};

namespace XrdCl
{

//----------------------------------------------------------------------------
// Delegate the response handling to the thread-pool
//----------------------------------------------------------------------------
class HandleRspJob: public XrdCl::Job
{
public:
HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
{

}

virtual ~HandleRspJob()
{

}

virtual void Run( void *arg )
{
pHandler->HandleResponse();
delete this;
}
private:
XrdCl::XRootDMsgHandler *pHandler;
};

//----------------------------------------------------------------------------
// Examine an incoming message, and decide on the action to be taken
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1835,7 +1863,7 @@ namespace XrdCl
else
{
pStatus = status;
HandleResponse();
HandleRspOrQueue();
return;
}
}
Expand All @@ -1853,7 +1881,7 @@ namespace XrdCl
pUrl.GetHostId().c_str(),
pRequest->GetDescription().c_str() );
pStatus = status;
HandleResponse();
HandleRspOrQueue();
return;
}

Expand All @@ -1877,7 +1905,7 @@ namespace XrdCl
return;
}
pStatus = status;
HandleResponse();
HandleRspOrQueue();
return;
}
}
Expand Down Expand Up @@ -1996,4 +2024,17 @@ namespace XrdCl
XRootDTransport::SetDescription( pRequest );
XRootDTransport::MarshallRequest( pRequest );
}

//------------------------------------------------------------------------
// If the current thread is a worker thread from our thread-pool
// handle the response, otherwise submit a new task to the thread-pool
//------------------------------------------------------------------------
void XRootDMsgHandler::HandleRspOrQueue()
{
JobManager *jobMgr = pPostMaster->GetJobManager();
if( jobMgr->IsWorker() )
HandleResponse();
else
jobMgr->QueueJob( new HandleRspJob( this ), 0 );
}
}
8 changes: 8 additions & 0 deletions src/XrdCl/XrdClXRootDMsgHandler.hh
Expand Up @@ -43,6 +43,8 @@ namespace XrdCl
class XRootDMsgHandler: public IncomingMsgHandler,
public OutgoingMsgHandler
{
friend class HandleRspJob;

public:
//------------------------------------------------------------------------
//! Constructor
Expand Down Expand Up @@ -338,6 +340,12 @@ namespace XrdCl
//------------------------------------------------------------------------
void SwitchOnRefreshFlag();

//------------------------------------------------------------------------
//! If the current thread is a worker thread from our thread-pool
//! handle the response, otherwise submit a new task to the thread-pool
//------------------------------------------------------------------------
void HandleRspOrQueue();

//------------------------------------------------------------------------
// Helper struct for async reading of chunks
//------------------------------------------------------------------------
Expand Down

0 comments on commit 830a5d5

Please sign in to comment.