Skip to content

Commit

Permalink
[XrdCl] Stop handling socket events if the handler get unregistered b…
Browse files Browse the repository at this point in the history
…y one of the events
  • Loading branch information
smithdh committed Mar 8, 2023
1 parent d1e178b commit 08c80c7
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 27 deletions.
13 changes: 13 additions & 0 deletions src/XrdCl/XrdClAsyncSocketHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ namespace XrdCl
//--------------------------------------------------------------------------
type = pSocket->MapEvent( type );

//--------------------------------------------------------------------------
// We need to use poller later, when current object might have been deleted
//--------------------------------------------------------------------------
Poller *poller = pPoller;

//--------------------------------------------------------------------------
// Read event
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -241,6 +246,14 @@ namespace XrdCl
OnTimeoutWhileHandshaking();
}

//--------------------------------------------------------------------------
// Check to see if we got unregistered by any of the handlers above. If so
// return, because the AsyncSocketHandler may have been closed and possibly
// re-connected or destroyed.
//--------------------------------------------------------------------------
if( !poller->IsRegistered( nullptr ) )
return;

//--------------------------------------------------------------------------
// Write event
//--------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions src/XrdCl/XrdClPoller.hh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ namespace XrdCl

//------------------------------------------------------------------------
//! Check whether the socket is registered with the poller
//!
//! If socket is nullptr the method must be called from a registered
//! SocketHandler's Event() method during a callback. In that case it is
//! checked if the registation that made the currently executing callback
//! still exists.
//------------------------------------------------------------------------
virtual bool IsRegistered( Socket *socket ) = 0;

Expand Down
96 changes: 74 additions & 22 deletions src/XrdCl/XrdClPollerBuiltIn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@
namespace
{
//----------------------------------------------------------------------------
// A helper struct passed to the callback as a custom arg
// A helper struct associated to a socket and passed to the callback
//----------------------------------------------------------------------------
struct PollerHelper
{
PollerHelper():
channel(0), callBack(0), readEnabled(false), writeEnabled(false),
readTimeout(0), writeTimeout(0)
channel(0), callBack(0), pollerData(0), readEnabled(false),
writeEnabled(false), readTimeout(0), writeTimeout(0)
{}
XrdSys::IOEvents::Channel *channel;
XrdSys::IOEvents::CallBack *callBack;
XrdCl::PollerBuiltIn::PollerData *pollerData;
bool readEnabled;
bool writeEnabled;
uint16_t readTimeout;
Expand All @@ -66,6 +67,8 @@ namespace
{
using namespace XrdCl;
uint8_t ev = 0;
XrdCl::PollerBuiltIn::PollerData *pd =
(XrdCl::PollerBuiltIn::PollerData*)cbArg;

if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
Expand All @@ -80,6 +83,12 @@ namespace
SocketHandler::EventTypeToString( ev ).c_str() );
}

pd->chDead = false;
if( !pd->haveTid )
{
pd->tid = XrdSysThread::ID();
pd->haveTid = true;
}
pHandler->Event( ev, pSocket );
return true;
}
Expand Down Expand Up @@ -146,7 +155,8 @@ namespace XrdCl
"%s (%s)", XrdSysE2T( errno ), errMsg );
return false;
}
pPollerPool.push_back( poller );
pPollerPool.push_back(
PollerPoolItem( poller, std::make_unique<PollerData>() ) );
}

pNext = pPollerPool.begin();
Expand All @@ -162,8 +172,12 @@ namespace XrdCl
{
PollerHelper *helper = (PollerHelper*)it->second;
Socket *socket = it->first;
helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
helper->callBack );
PollerAndData pad = RegisterAndGetPollerAndData( socket );
helper->pollerData = pad.second;
helper->channel = new IOEvents::Channel( pad.first,
socket->GetFD(),
helper->callBack,
helper->pollerData );
if( helper->readEnabled )
{
bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
Expand Down Expand Up @@ -213,8 +227,9 @@ namespace XrdCl

while( !pPollerPool.empty() )
{
XrdSys::IOEvents::Poller *poller = pPollerPool.back();
PollerPoolItem pi = std::move(pPollerPool.back());
pPollerPool.pop_back();
XrdSys::IOEvents::Poller *poller = pi.first;

if( !poller ) continue;

Expand All @@ -232,6 +247,7 @@ namespace XrdCl
for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
{
PollerHelper *helper = (PollerHelper*)it->second;
helper->pollerData = 0;
if( !helper->channel ) continue;
bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
if( !status )
Expand Down Expand Up @@ -285,16 +301,19 @@ namespace XrdCl
//--------------------------------------------------------------------------
// Create the socket helper
//--------------------------------------------------------------------------
XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
PollerAndData pad = RegisterAndGetPollerAndData( socket );
XrdSys::IOEvents::Poller* poller = pad.first;

PollerHelper *helper = new PollerHelper();
helper->callBack = new ::SocketCallBack( socket, handler );
helper->callBack = new ::SocketCallBack( socket, handler );
helper->pollerData = pad.second;

if( poller )
{
helper->channel = new XrdSys::IOEvents::Channel( poller,
socket->GetFD(),
helper->callBack );
helper->callBack,
helper->pollerData );
}

handler->Initialize( this );
Expand Down Expand Up @@ -343,6 +362,15 @@ namespace XrdCl
}
helper->channel->Delete();
}
if( helper->pollerData )
{
if ( helper->pollerData->haveTid &&
XrdSysThread::Same( XrdSysThread::ID(), helper->pollerData->tid ) )
{
helper->pollerData->chDead = true;
}
}

delete helper->callBack;
delete helper;
return true;
Expand Down Expand Up @@ -524,48 +552,71 @@ namespace XrdCl
bool PollerBuiltIn::IsRegistered( Socket *socket )
{
XrdSysMutexHelper scopedLock( pMutex );
if( socket == nullptr )
{
for( PollerMap::iterator itr = pPollerMap.begin();
itr != pPollerMap.end(); ++itr )
{
const PollerMapValue &v = itr->second;
const PollerAndData pad = v.first;
const PollerData *pd = pad.second;
if( pd->haveTid && XrdSysThread::Same( XrdSysThread::ID(), pd->tid ) )
{
return !pd->chDead;
}
}
return false;
}
SocketMap::iterator it = pSocketMap.find( socket );
return it != pSocketMap.end();
}

//----------------------------------------------------------------------------
// Return poller threads in round-robin fashion
//----------------------------------------------------------------------------
XrdSys::IOEvents::Poller* PollerBuiltIn::GetNextPoller()
XrdCl::PollerBuiltIn::PollerAndData PollerBuiltIn::GetNextPollerAndData()
{
if( pPollerPool.empty() ) return 0;
if( pPollerPool.empty() ) return PollerAndData( 0, 0 );

PollerPool::iterator ret = pNext;
++pNext;
if( pNext == pPollerPool.end() )
pNext = pPollerPool.begin();
return *ret;
return PollerAndData( ret->first, ret->second.get() );
}

//----------------------------------------------------------------------------
// Return the poller associated with the respective channel
//----------------------------------------------------------------------------
XrdSys::IOEvents::Poller* PollerBuiltIn::RegisterAndGetPoller(const Socket * socket)
XrdCl::PollerBuiltIn::PollerAndData
PollerBuiltIn::RegisterAndGetPollerAndData(const Socket * socket)
{
PollerMap::iterator itr = pPollerMap.find( socket->GetChannelID() );
if( itr == pPollerMap.end() )
{
XrdSys::IOEvents::Poller* poller = GetNextPoller();
PollerAndData pad = GetNextPollerAndData();
XrdSys::IOEvents::Poller* poller = pad.first;
if( poller )
pPollerMap[socket->GetChannelID()] = std::make_pair( poller, size_t( 1 ) );
return poller;
{
pPollerMap[socket->GetChannelID()] =
std::make_pair( pad, size_t( 1 ) );
}

return pad;
}

++( itr->second.second );
return itr->second.first;
PollerMapValue &v = itr->second;
++( v.second );
return v.first;
}

void PollerBuiltIn::UnregisterFromPoller( const Socket *socket )
{
PollerMap::iterator itr = pPollerMap.find( socket->GetChannelID() );
if( itr == pPollerMap.end() ) return;
--itr->second.second;
if( itr->second.second == 0 )
PollerMapValue &v = itr->second;
--( v.second );
if( v.second == 0 )
pPollerMap.erase( itr );

}
Expand All @@ -574,7 +625,8 @@ namespace XrdCl
{
PollerMap::iterator itr = pPollerMap.find( socket->GetChannelID() );
if( itr == pPollerMap.end() ) return 0;
return itr->second.first;
const PollerMapValue &v = itr->second;
return v.first.first;
}

//----------------------------------------------------------------------------
Expand Down
39 changes: 34 additions & 5 deletions src/XrdCl/XrdClPollerBuiltIn.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "XrdSys/XrdSysPthread.hh"
#include "XrdCl/XrdClPoller.hh"
#include <map>
#include <memory>
#include <utility>
#include <vector>


Expand All @@ -40,6 +42,21 @@ namespace XrdCl
class PollerBuiltIn: public Poller
{
public:
//------------------------------------------------------------------------
// Extra state associated with a poller
//------------------------------------------------------------------------
struct PollerData
{
PollerData():
chDead( false ), haveTid( false )
{}
bool chDead;
bool haveTid;
pthread_t tid;
};

typedef std::pair<XrdSys::IOEvents::Poller *, PollerData *> PollerAndData;

//------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------
Expand Down Expand Up @@ -108,6 +125,11 @@ namespace XrdCl

//------------------------------------------------------------------------
//! Check whether the socket is registered with the poller
//!
//! If socket is nullptr the method must be called from a registered
//! SocketHandler's Event() method during a callback. In that case it is
//! checked if the registation that made the currently executing callback
//! still exists.
//------------------------------------------------------------------------
virtual bool IsRegistered( Socket *socket );

Expand All @@ -124,12 +146,12 @@ namespace XrdCl
//------------------------------------------------------------------------
//! Goes over poller threads in round robin fashion
//------------------------------------------------------------------------
XrdSys::IOEvents::Poller* GetNextPoller();
PollerAndData GetNextPollerAndData();

//------------------------------------------------------------------------
//! Registers given socket as a poller user and returns the poller object
//------------------------------------------------------------------------
XrdSys::IOEvents::Poller* RegisterAndGetPoller(const Socket *socket);
PollerAndData RegisterAndGetPollerAndData(const Socket *socket);

//------------------------------------------------------------------------
//! Unregisters given socket from poller object
Expand All @@ -146,11 +168,18 @@ namespace XrdCl
//------------------------------------------------------------------------
static int GetNbPollerInit();

// associates channel ID to a pair: poller and count (how many sockets where mapped to this poller)
typedef std::map<const AnyObject *, std::pair<XrdSys::IOEvents::Poller *, size_t> > PollerMap;
typedef std::pair<PollerAndData, size_t> PollerMapValue;

// associates channel ID to the pair: poller/data and count
// (count follows how many sockets where mapped to this poller)
typedef std::map<const AnyObject *, PollerMapValue> PollerMap;

typedef std::map<Socket *, void *> SocketMap;
typedef std::vector<XrdSys::IOEvents::Poller *> PollerPool;

typedef std::pair<XrdSys::IOEvents::Poller *,
std::unique_ptr<PollerData>>
PollerPoolItem;
typedef std::vector<PollerPoolItem> PollerPool;

SocketMap pSocketMap;
PollerMap pPollerMap;
Expand Down

0 comments on commit 08c80c7

Please sign in to comment.