Skip to content

Commit

Permalink
Merge branch 'poller'
Browse files Browse the repository at this point in the history
  • Loading branch information
ljanyst committed Nov 18, 2013
2 parents 95a4f1f + 429f1c8 commit c55a61b
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 186 deletions.
2 changes: 2 additions & 0 deletions src/XrdCl/XrdClCheckSumManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ namespace XrdCl
log->Error( UtilityMsg, "Unable read from %s: %s", filePath.c_str(),
strerror( errno ) );
close( fd );
delete [] buffer;
return false;
}
calc->Update( buffer, bytesRead );
Expand All @@ -150,6 +151,7 @@ namespace XrdCl
//--------------------------------------------------------------------------
// Clean up
//--------------------------------------------------------------------------
delete [] buffer;
close( fd );
return true;
}
Expand Down
3 changes: 2 additions & 1 deletion src/XrdCl/XrdClCopy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class ProgressDisplay: public XrdCl::CopyProgressHandler
//--------------------------------------------------------------------------
//! Constructor
//--------------------------------------------------------------------------
ProgressDisplay(): pBytesProcessed(0), pBytesTotal(0), pPrevious(0) {}
ProgressDisplay(): pBytesProcessed(0), pBytesTotal(0), pPrevious(0),
pStarted(0) {}

//--------------------------------------------------------------------------
//! Begin job
Expand Down
4 changes: 2 additions & 2 deletions src/XrdCl/XrdClCopyProcess.hh
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ namespace XrdCl
struct JobDescriptor
{
JobDescriptor(): sourceLimit(1), force(false), posc(false), coerce(false),
makedir(false), thirdParty(false), checkSumPrint(false),
chunkSize( 4194304 ), parallelChunks(8)
makedir(false), thirdParty(false), thirdPartyFallBack(true),
checkSumPrint(false), chunkSize( 4194304 ), parallelChunks(8)
{}

URL source; //!< [in] original source URL
Expand Down
11 changes: 1 addition & 10 deletions src/XrdCl/XrdClFileStateHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,7 @@ namespace
//----------------------------------------------------------------------
OpenInfo *openInfo = 0;
if( status->IsOK() )
{
if( !response )
{
XRootDStatus st( stError, errInternal );
pStateHandler->OnOpen( &st, 0, hostList );
}
response->Get( openInfo );
}

//----------------------------------------------------------------------
// Notify the state handler and the client and say bye bye
Expand Down Expand Up @@ -884,6 +877,7 @@ namespace XrdCl
pDataServer = new URL( hostList->back().url );
pDataServer->SetParams( pFileUrl->GetParams() );
pDataServer->SetPath( pFileUrl->GetPath() );
lastServer = pDataServer->GetHostId();
HostList::const_iterator itC;
URL::ParamsMap params = pDataServer->GetParams();
for( itC = hostList->begin(); itC != hostList->end(); ++itC )
Expand All @@ -903,9 +897,6 @@ namespace XrdCl
}
}

if( pDataServer )
lastServer = pDataServer->GetHostId();

log->Debug( FileMsg, "[0x%x@%s] Open has returned with status %s",
this, pFileUrl->GetURL().c_str(), status->ToStr().c_str() );

Expand Down
6 changes: 4 additions & 2 deletions src/XrdCl/XrdClInQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ namespace XrdCl
{
XrdSysMutexHelper scopedLock( pMutex );
HandlerList::iterator it;
for( it = pHandlers.begin(); it != pHandlers.end(); ++it )
for( it = pHandlers.begin(); it != pHandlers.end(); )
if( it->first == handler )
pHandlers.erase( it );
it = pHandlers.erase( it );
else
++it;
}

//----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/XrdCl/XrdClMonitor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ namespace XrdCl
ErrUnc //!< Unclassified operation
};

ErrorInfo(): file(0), opCode( ErrUnc ) {}
ErrorInfo(): file(0), status(0), opCode( ErrUnc ) {}
const URL *file; //!< The file in question
const XRootDStatus *status; //!< Status code
Operation opCode; //!< The associated operation
Expand Down
196 changes: 132 additions & 64 deletions src/XrdCl/XrdClPollerBuiltIn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,6 @@ namespace XrdCl
//----------------------------------------------------------------------------
bool PollerBuiltIn::Initialize()
{
//--------------------------------------------------------------------------
// Start the poller
//--------------------------------------------------------------------------
using namespace XrdSys;

Log *log = DefaultEnv::GetLog();
log->Debug( PollerMsg, "Creating the built-in poller..." );
int errNum = 0;
const char *errMsg = 0;
pPoller = IOEvents::Poller::Create( errNum, &errMsg );
if( !pPoller )
{
log->Error( PollerMsg, "Unable to create the internal poller object: ",
"%s (%s)", strerror( errno ), errMsg );
return false;
}
pPoller->Pause();
return true;
}

Expand All @@ -115,13 +98,6 @@ namespace XrdCl
//----------------------------------------------------------------------------
bool PollerBuiltIn::Finalize()
{
//--------------------------------------------------------------------------
// Destroy the poller
//--------------------------------------------------------------------------
pPoller->Stop();
delete pPoller;
pPoller = 0;

//--------------------------------------------------------------------------
// Clean up the channels
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -149,8 +125,55 @@ namespace XrdCl
using namespace XrdSys;

Log *log = DefaultEnv::GetLog();
log->Debug( PollerMsg, "Starting the built-in poller..." );
pPoller->Pause(false);
log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
XrdSysMutexHelper scopedLock( pMutex );
int errNum = 0;
const char *errMsg = 0;
pPoller = IOEvents::Poller::Create( errNum, &errMsg );
if( !pPoller )
{
log->Error( PollerMsg, "Unable to create the internal poller object: ",
"%s (%s)", strerror( errno ), errMsg );
return false;
}

//--------------------------------------------------------------------------
// Check if we have any descriptors to reinsert from the last time we
// were started
//--------------------------------------------------------------------------
SocketMap::iterator it;
for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
{
PollerHelper *helper = (PollerHelper*)it->second;
Socket *socket = it->first;
helper->channel = new IOEvents::Channel( pPoller, socket->GetFD(),
helper->callBack );
if( helper->readEnabled )
{
bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
helper->readTimeout, &errMsg );
if( !status )
{
log->Error( PollerMsg, "Unable to enable read notifications ",
"while re-starting %s (%s)", strerror( errno ), errMsg );

return false;
}
}

if( helper->writeEnabled )
{
bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
helper->writeTimeout, &errMsg );
if( !status )
{
log->Error( PollerMsg, "Unable to enable write notifications ",
"while re-starting %s (%s)", strerror( errno ), errMsg );

return false;
}
}
}
return true;
}

Expand All @@ -159,15 +182,39 @@ namespace XrdCl
//------------------------------------------------------------------------
bool PollerBuiltIn::Stop()
{
using namespace XrdSys::IOEvents;

Log *log = DefaultEnv::GetLog();
log->Debug( PollerMsg, "Stopping the poller..." );
XrdSysMutexHelper scopedLock( pMutex );

SocketMap::iterator it;
const char *errMsg = 0;

for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
{
PollerHelper *helper = (PollerHelper*)it->second;
Socket *socket = it->first;
bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
if( !status )
{
log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
socket->GetName().c_str(), errMsg );
}
delete helper->channel;
helper->channel = 0;
}

if( !pPoller )
{
log->Debug( PollerMsg, "Stopping a poller that has not been started" );
return true;
}

pPoller->Pause();
pPoller->Stop();
delete pPoller;
pPoller = 0;

return true;
}

Expand Down Expand Up @@ -211,9 +258,14 @@ namespace XrdCl
//--------------------------------------------------------------------------
PollerHelper *helper = new PollerHelper();
helper->callBack = new ::SocketCallBack( socket, handler );
helper->channel = new XrdSys::IOEvents::Channel( pPoller,
socket->GetFD(),
helper->callBack );

if( pPoller )
{
helper->channel = new XrdSys::IOEvents::Channel( pPoller,
socket->GetFD(),
helper->callBack );
}

handler->Initialize( this );
pSocketMap[socket] = helper;
return true;
Expand Down Expand Up @@ -242,15 +294,18 @@ namespace XrdCl
// Remove the socket
//--------------------------------------------------------------------------
PollerHelper *helper = (PollerHelper*)it->second;
const char *errMsg;
bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
if( !status )
if( pPoller )
{
log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
const char *errMsg;
bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
if( !status )
{
log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
}
delete helper->channel;
}
delete helper->channel;
delete helper->callBack;
delete helper;
pSocketMap.erase( it );
Expand Down Expand Up @@ -298,16 +353,19 @@ namespace XrdCl

log->Dump( PollerMsg, "%s Enable read notifications, timeout: %d",
socket->GetName().c_str(), timeout );
const char *errMsg;
bool status = helper->channel->Enable( Channel::readEvents, timeout,
&errMsg );
if( !status )

if( pPoller )
{
log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
const char *errMsg;
bool status = helper->channel->Enable( Channel::readEvents, timeout,
&errMsg );
if( !status )
{
log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
}
}

helper->readEnabled = true;
}

Expand All @@ -321,13 +379,17 @@ namespace XrdCl

log->Dump( PollerMsg, "%s Disable read notifications",
socket->GetName().c_str() );
const char *errMsg;
bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
if( !status )

if( pPoller )
{
log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
const char *errMsg;
bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
if( !status )
{
log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
}
}
helper->readEnabled = false;
}
Expand Down Expand Up @@ -377,14 +439,17 @@ namespace XrdCl
log->Dump( PollerMsg, "%s Enable write notifications, timeout: %d",
socket->GetName().c_str(), timeout );

const char *errMsg;
bool status = helper->channel->Enable( Channel::writeEvents, timeout,
&errMsg );
if( !status )
if( pPoller )
{
log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
const char *errMsg;
bool status = helper->channel->Enable( Channel::writeEvents, timeout,
&errMsg );
if( !status )
{
log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
}
}
helper->writeEnabled = true;
}
Expand All @@ -399,13 +464,16 @@ namespace XrdCl

log->Dump( PollerMsg, "%s Disable write notifications",
socket->GetName().c_str() );
const char *errMsg;
bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
if( !status )
if( pPoller )
{
log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
const char *errMsg;
bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
if( !status )
{
log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
socket->GetName().c_str(), errMsg );
return false;
}
}
helper->writeEnabled = false;
}
Expand Down
3 changes: 2 additions & 1 deletion src/XrdCl/XrdClSocket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ namespace XrdCl
//! @param status status of a socket if available
//------------------------------------------------------------------------
Socket( int socket = -1, SocketStatus status = Disconnected ):
pSocket(socket), pStatus( status ), pServerAddr( 0 )
pSocket(socket), pStatus( status ), pServerAddr( 0 ),
pProtocolFamily( AF_INET )
{
};

Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ namespace XrdCl
pTransport( 0 ),
pPoller( 0 ),
pTaskManager( 0 ),
pJobManager( 0 ),
pIncomingQueue( 0 ),
pChannelData( 0 ),
pLastStreamError( 0 ),
Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClXRootDMsgHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ namespace XrdCl
if( !st.IsOK() )
{
delete status;
delete response;
status = new XRootDStatus( st );
response = 0;
}
Expand Down

0 comments on commit c55a61b

Please sign in to comment.