Skip to content

Commit

Permalink
[XrdCl] Remove sync PostMaster::Send/Receive.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent cdb003e commit 4593de2
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 378 deletions.
242 changes: 0 additions & 242 deletions src/XrdCl/XrdClChannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,211 +30,10 @@
#include "XrdCl/XrdClLog.hh"
#include "XrdCl/XrdClRedirectorRegistry.hh"
#include "XrdCl/XrdClXRootDTransport.hh"

#include "XrdSys/XrdSysPthread.hh"

#include <ctime>

namespace
{
//----------------------------------------------------------------------------
// Filter handler
//----------------------------------------------------------------------------
class FilterHandler: public XrdCl::MsgHandler
{
public:
//------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------
FilterHandler( XrdCl::MessageFilter *filter ):
pSem( new XrdSysSemaphore(0) ), pFilter( filter ), pMsg( 0 )
{
}

//------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------
virtual ~FilterHandler()
{
delete pSem;
}

//------------------------------------------------------------------------
// Message handler
//------------------------------------------------------------------------
virtual uint16_t Examine( std::shared_ptr<XrdCl::Message> &msg )
{
if( pFilter->Filter( msg ) )
{
pMsg = msg;
return RemoveHandler;
}
return Ignore;
}

//------------------------------------------------------------------------
// Reexamine the incoming message, and decide on the action to be taken
//------------------------------------------------------------------------
virtual uint16_t InspectStatusRsp()
{
return 0;
}

virtual void Process()
{
pSem->Post();
}

//------------------------------------------------------------------------
// Handle a fault
//------------------------------------------------------------------------
virtual uint8_t OnStreamEvent( StreamEvent event,
XrdCl::XRootDStatus status )
{
if( event == Ready )
return 0;
pStatus = status;
pSem->Post();
return RemoveHandler;
}

//------------------------------------------------------------------------
// Wait for a status of the message
//------------------------------------------------------------------------
XrdCl::XRootDStatus WaitForStatus()
{
pSem->Wait();
return pStatus;
}

//------------------------------------------------------------------------
// Wait for the arrival of the message
//------------------------------------------------------------------------
std::shared_ptr<XrdCl::Message> GetMessage()
{
return pMsg;
}

//------------------------------------------------------------------------
// Get underlying message filter sid
//------------------------------------------------------------------------
uint16_t GetSid() const
{
if (pFilter)
return pFilter->GetSid();

return 0;
}

//------------------------------------------------------------------------
// The requested action has been performed and the status is available
//------------------------------------------------------------------------
virtual void OnStatusReady( const XrdCl::Message *message,
XrdCl::XRootDStatus status )
{
}

//------------------------------------------------------------------------
// Get a timestamp after which we give up
//------------------------------------------------------------------------
virtual time_t GetExpiration()
{
return 0;
}

private:
FilterHandler(const FilterHandler &other);
FilterHandler &operator = (const FilterHandler &other);

XrdSysSemaphore *pSem;
XrdCl::MessageFilter *pFilter;
std::shared_ptr<XrdCl::Message> pMsg;
XrdCl::XRootDStatus pStatus;
};

//----------------------------------------------------------------------------
// Status handler
//----------------------------------------------------------------------------
class StatusHandler: public XrdCl::MsgHandler
{
public:
//------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------
StatusHandler( XrdCl::Message *msg ):
pSem( new XrdSysSemaphore(0) ),
pMsg( msg ) {}

//------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------
virtual ~StatusHandler()
{
delete pSem;
}

virtual uint16_t Examine( std::shared_ptr<XrdCl::Message> &msg )
{
return MsgHandler::Action::None;
}

//------------------------------------------------------------------------
// Reexamine the incoming message, and decide on the action to be taken
//------------------------------------------------------------------------
virtual uint16_t InspectStatusRsp()
{
return MsgHandler::Action::None;
}

//------------------------------------------------------------------------
//! Get handler sid
//!
//! return sid of the corresponding request, otherwise 0
//------------------------------------------------------------------------
virtual uint16_t GetSid() const
{
return 0;
}

//------------------------------------------------------------------------
// Handle the status information
//------------------------------------------------------------------------
void OnStatusReady( const XrdCl::Message *message,
XrdCl::XRootDStatus status )
{
if( pMsg == message )
pStatus = status;
pSem->Post();
}

//------------------------------------------------------------------------
// Wait for the status to be ready
//------------------------------------------------------------------------
XrdCl::XRootDStatus WaitForStatus()
{
pSem->Wait();
return pStatus;
}

//------------------------------------------------------------------------
// Get a timestamp after which we give up
//------------------------------------------------------------------------
virtual time_t GetExpiration()
{
return 0;
}

private:
StatusHandler(const StatusHandler &other);
StatusHandler &operator = (const StatusHandler &other);

XrdSysSemaphore *pSem;
XrdCl::XRootDStatus pStatus;
XrdCl::Message *pMsg;
};

}

namespace XrdCl
{
class TickGeneratorTask: public XrdCl::Task
Expand Down Expand Up @@ -338,19 +137,6 @@ namespace XrdCl
pTransport->FinalizeChannel( pChannelData );
}

//----------------------------------------------------------------------------
// Send a message synchronously
//----------------------------------------------------------------------------
XRootDStatus Channel::Send( Message *msg, bool stateful, time_t expires )
{
StatusHandler sh( msg );
XRootDStatus sc = Send( msg, &sh, stateful, expires );
if( !sc.IsOK() )
return sc;
sc = sh.WaitForStatus();
return sc;
}

//----------------------------------------------------------------------------
// Send the message asynchronously
//----------------------------------------------------------------------------
Expand All @@ -363,34 +149,6 @@ namespace XrdCl
return pStream->Send( msg, handler, stateful, expires );
}

//----------------------------------------------------------------------------
// Synchronously receive a message - blocks until a message matching
//----------------------------------------------------------------------------
Status Channel::Receive( std::shared_ptr<Message> &msg,
MessageFilter *filter,
time_t expires )
{
FilterHandler fh( filter );
Status sc = Receive( &fh, expires );
if( !sc.IsOK() )
return sc;

sc = fh.WaitForStatus();
if( sc.IsOK() )
msg = fh.GetMessage();
return sc;
}

//----------------------------------------------------------------------------
// Listen to incoming messages
//----------------------------------------------------------------------------
Status Channel::Receive( MsgHandler *handler, time_t expires )
{
bool rmMsg;
pIncoming.AddMessageHandler( handler, expires, rmMsg );
return Status();
}

//----------------------------------------------------------------------------
// Handle a time event
//----------------------------------------------------------------------------
Expand Down
36 changes: 0 additions & 36 deletions src/XrdCl/XrdClChannel.hh
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,6 @@ namespace XrdCl
return pUrl;
}

//------------------------------------------------------------------------
//! Send a message synchronously - synchronously means that
//! it will block until the message is written to a socket
//!
//! @param msg message to be sent
//! @param expires expiration timestamp after which a failure should be
//! reported if sending was unsuccessful
//! @param stateful physical stream disconnection causes an error
//! @return success if the message has been pushed through the wire,
//! failure otherwise
//------------------------------------------------------------------------
XRootDStatus Send( Message *msg, bool stateful, time_t expires );

//------------------------------------------------------------------------
//! Send the message asynchronously - the message is inserted into the
//! send queue and a listener is called when the message is successfully
Expand All @@ -109,29 +96,6 @@ namespace XrdCl
bool stateful,
time_t expires );

//------------------------------------------------------------------------
//! Synchronously receive a message - blocks until a message matching
//! a filter is found in the incoming queue or the timeout passes
//!
//! @param msg reference to a message pointer, the pointer will
//! point to the received message
//! @param filter filter object defining what to look for
//! @param expires expiration timestamp
//! @return success when the message has been received
//! successfully, failure otherwise
//------------------------------------------------------------------------
Status Receive( std::shared_ptr<Message> &msg, MessageFilter *filter, time_t expires );

//------------------------------------------------------------------------
//! Listen to incoming messages, the listener is notified when a new
//! message arrives and when the timeout passes
//!
//! @param handler handler to be notified about new messages
//! @param expires expiration timestamp
//! @return success when the handler has been registered correctly
//------------------------------------------------------------------------
Status Receive( MsgHandler *handler, time_t expires );

//------------------------------------------------------------------------
//! Query the transport handler
//!
Expand Down
50 changes: 0 additions & 50 deletions src/XrdCl/XrdClPostMaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,6 @@ namespace XrdCl
return true;
}

//----------------------------------------------------------------------------
// Send a message synchronously
//----------------------------------------------------------------------------
XRootDStatus PostMaster::Send( const URL &url,
Message *msg,
bool stateful,
time_t expires )
{
XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
Channel *channel = GetChannel( url );

if( !channel )
return XRootDStatus( stError, errNotSupported );

return channel->Send( msg, stateful, expires );
}

//----------------------------------------------------------------------------
// Send the message asynchronously
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -254,39 +237,6 @@ namespace XrdCl
return redirector->HandleRequest( msg, inHandler );
}

//----------------------------------------------------------------------------
// Synchronously receive a message
//----------------------------------------------------------------------------
Status PostMaster::Receive( const URL &url,
std::shared_ptr<Message> &msg,
MessageFilter *filter,
time_t expires )
{
XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
Channel *channel = GetChannel( url );

if( !channel )
return Status( stError, errNotSupported );

return channel->Receive( msg, filter, expires );
}

//----------------------------------------------------------------------------
// Listen to incoming messages
//----------------------------------------------------------------------------
Status PostMaster::Receive( const URL &url,
MsgHandler *handler,
time_t expires )
{
XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
Channel *channel = GetChannel( url );

if( !channel )
return Status( stError, errNotSupported );

return channel->Receive( handler, expires );
}

//----------------------------------------------------------------------------
// Query the transport handler
//----------------------------------------------------------------------------
Expand Down

0 comments on commit 4593de2

Please sign in to comment.