Skip to content

Commit

Permalink
[XrdCl] Update PostMaster interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Apr 17, 2019
1 parent 42a0884 commit 61fa014
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 130 deletions.
76 changes: 3 additions & 73 deletions src/XrdCl/XrdClAsyncSocketHandler.cc
Expand Up @@ -381,7 +381,7 @@ namespace XrdCl
// Secure the message if necessary
//------------------------------------------------------------------------
delete pSignature; pSignature = 0;
XRootDStatus st = GetSignature( pOutgoing, pSignature );
XRootDStatus st = pTransport->GetSignature( pOutgoing, pSignature, *pChannelData );
if( !st.IsOK() )
{
OnFault( st );
Expand All @@ -390,13 +390,9 @@ namespace XrdCl
}

//--------------------------------------------------------------------------
// Try to write everything at once: signature, request and raw data
// (this is only supported if pOutHandler is an instance of XRootDMsgHandler)
// Write everything at once: signature, request and raw data
//--------------------------------------------------------------------------
Status st = WriteMessageAndRaw( pOutgoing, pSignature );
if( !st.IsOK() && st.code == errNotSupported ) //< this part should go away
st = WriteSeparately( pOutgoing, pSignature ); //< once we can add GetMsgBody
//< to OutgoingMsgHandler interface !!!
if( !st.IsOK() )
{
OnFault( st );
Expand Down Expand Up @@ -564,21 +560,12 @@ namespace XrdCl

Status AsyncSocketHandler::WriteMessageAndRaw( Message *toWrite, Message *&sign )
{
// once we can add 'GetMessageBody' to OutgoingMsghandler
// interface we can get rid of the ugly dynamic_cast
static XRootDMsgHandler *xrdHandler = 0;
ChunkList *chunks = 0;
uint32_t *asyncOffset = 0;

if( pOutHandler->IsRaw() )
{
if( xrdHandler != pOutHandler )
xrdHandler = dynamic_cast<XRootDMsgHandler*>( pOutHandler );

if( !xrdHandler )
return Status( stError, errNotSupported );

chunks = xrdHandler->GetMessageBody( asyncOffset );
chunks = pOutHandler->GetMessageBody( asyncOffset );
Log *log = DefaultEnv::GetLog();
log->Dump( AsyncSockMsg, "[%s] Will write the payload in one go with "
"the header for message: %s (0x%x).", pStreamName.c_str(),
Expand All @@ -596,50 +583,6 @@ namespace XrdCl
return st;
}

Status AsyncSocketHandler::WriteSeparately( Message *toWrite, Message *&sign )
{
//------------------------------------------------------------------------
// Write the message if not already written
//------------------------------------------------------------------------
Status st;
if( !pOutMsgDone )
{
if( !(st = WriteVMessage( toWrite, sign, 0, 0 )).IsOK() )
return st;

if( st.code == suRetry )
return st;

Log *log = DefaultEnv::GetLog();

if( pOutHandler && pOutHandler->IsRaw() )
{
log->Dump( AsyncSockMsg, "[%s] Will call raw handler to write payload "
"for message: %s (0x%x).", pStreamName.c_str(),
pOutgoing->GetDescription().c_str(), pOutgoing );
}

pOutMsgDone = true;
}

//------------------------------------------------------------------------
// Check if the handler needs to be called
//------------------------------------------------------------------------
if( pOutHandler && pOutHandler->IsRaw() )
{
uint32_t bytesWritten = 0;
st = pOutHandler->WriteMessageBody( pSocket->GetFD(), bytesWritten );
pOutMsgSize += bytesWritten;
if( !st.IsOK() )
return st;

if( st.code == suRetry )
return st;
}

return Status();
}

//----------------------------------------------------------------------------
// Got a read readiness event
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -964,19 +907,6 @@ namespace XrdCl
OnFaultWhileHandshaking( Status( stError, errSocketTimeout ) );
}

//------------------------------------------------------------------------
// Get signature for given message
//------------------------------------------------------------------------
Status AsyncSocketHandler::GetSignature( Message *toSign, Message *&sign )
{
// ideally the 'GetSignature' method should be in TransportHandler interface
// however due to ABI compatibility for the time being this workaround has to
// be employed
XRootDTransport *xrootdTransport = dynamic_cast<XRootDTransport*>( pTransport );
if( !xrootdTransport ) return Status( stError, errNotSupported );
return xrootdTransport->GetSignature( toSign, sign, *pChannelData );
}

//------------------------------------------------------------------------
// Initialize the iovec with given message
//------------------------------------------------------------------------
Expand Down
11 changes: 3 additions & 8 deletions src/XrdCl/XrdClAsyncSocketHandler.hh
Expand Up @@ -165,11 +165,11 @@ namespace XrdCl
//------------------------------------------------------------------------
virtual void OnWriteWhileHandshaking();


//------------------------------------------------------------------------
// Write the message and it's signature in one go with writev
//------------------------------------------------------------------------
Status WriteMessageAndRaw( Message *toWrite, Message *&sign );

Status WriteSeparately( Message *toWrite, Message *&sign );

//------------------------------------------------------------------------
// Write the current message
//------------------------------------------------------------------------
Expand Down Expand Up @@ -228,11 +228,6 @@ namespace XrdCl
//------------------------------------------------------------------------
void OnTimeoutWhileHandshaking();

//------------------------------------------------------------------------
// Get signature for given message
//------------------------------------------------------------------------
Status GetSignature( Message *toSign, Message *&sign );

//------------------------------------------------------------------------
// Initialize the iovec with given message
//------------------------------------------------------------------------
Expand Down
34 changes: 7 additions & 27 deletions src/XrdCl/XrdClAsyncTlsSocketHandler.cc
Expand Up @@ -39,8 +39,7 @@ namespace XrdCl
AnyObject *channelData,
uint16_t subStreamNum ):
AsyncSocketHandler( poller, transport, channelData, subStreamNum ),
pXrdTransport( dynamic_cast<XRootDTransport*>( transport ) ),
pXrdHandler( 0 ),
pTransport( transport ),
pCorked( false ),
pWrtHdrDone( false ),
pTlsHSRevert( None ),
Expand Down Expand Up @@ -148,20 +147,10 @@ namespace XrdCl

if( pOutHandler->IsRaw() )
{
if( pXrdHandler != pOutHandler )
{
pXrdHandler = dynamic_cast<XRootDMsgHandler*>( pOutHandler );
if( !pXrdHandler )
{
OnFault( Status( stError, errNotSupported ) );
return;
}
}

if( !pWrtBody )
{
uint32_t *asyncOffset = 0;
pWrtBody = pXrdHandler->GetMessageBody( asyncOffset );
pWrtBody = pOutHandler->GetMessageBody( asyncOffset );
pCurrentChunk = pWrtBody->begin();
}

Expand Down Expand Up @@ -356,7 +345,7 @@ namespace XrdCl
//--------------------------------------------------------------------------
if( !pHeaderDone )
{
st = pXrdTransport->GetHeader( pIncoming, pTls.get() );
st = pTransport->GetHeader( pIncoming, pTls.get() );
OnTlsRead( st );

if( !st.IsOK() )
Expand Down Expand Up @@ -386,17 +375,8 @@ namespace XrdCl
//--------------------------------------------------------------------------
if( pIncHandler.first )
{
if( pXrdHandler != pIncHandler.first )
pXrdHandler = dynamic_cast<XRootDMsgHandler*>( pIncHandler.first );

if( !pXrdHandler )
{
OnFault( Status( stError, errNotSupported ) );
return;
}

uint32_t bytesRead = 0;
st = pXrdHandler->ReadMessageBody( pIncoming, pTls.get(), bytesRead );
st = pIncHandler.first->ReadMessageBody( pIncoming, pTls.get(), bytesRead );
OnTlsRead( st );

if( !st.IsOK() )
Expand All @@ -413,7 +393,7 @@ namespace XrdCl
//--------------------------------------------------------------------------
else
{
st = pXrdTransport->GetBody( pIncoming, pTls.get() );
st = pTransport->GetBody( pIncoming, pTls.get() );
OnTlsRead( st );

if( !st.IsOK() )
Expand Down Expand Up @@ -485,7 +465,7 @@ namespace XrdCl
Log *log = DefaultEnv::GetLog();
if( !pHeaderDone )
{
st = pXrdTransport->GetHeader( toRead, pTls.get() );
st = pTransport->GetHeader( toRead, pTls.get() );
if( st.IsOK() && st.code == suDone )
{
log->Dump( AsyncSockMsg,
Expand All @@ -497,7 +477,7 @@ namespace XrdCl
return st;
}

st = pXrdTransport->GetBody( toRead, pTls.get() );
st = pTransport->GetBody( toRead, pTls.get() );
if( st.IsOK() && st.code == suDone )
{
log->Dump( AsyncSockMsg, "[%s] Received a message of %d bytes",
Expand Down
3 changes: 1 addition & 2 deletions src/XrdCl/XrdClAsyncTlsSocketHandler.hh
Expand Up @@ -159,8 +159,7 @@ namespace XrdCl
//------------------------------------------------------------------------
// Data members
//------------------------------------------------------------------------
XRootDTransport *pXrdTransport;
XRootDMsgHandler *pXrdHandler;
TransportHandler *pTransport;
std::unique_ptr<Tls> pTls;
bool pCorked;
bool pWrtHdrDone;
Expand Down
75 changes: 64 additions & 11 deletions src/XrdCl/XrdClPostMasterInterfaces.hh
Expand Up @@ -25,6 +25,8 @@
#ifndef __XRD_CL_POST_MASTER_INTERFACES_HH__
#define __XRD_CL_POST_MASTER_INTERFACES_HH__

#include "XrdCl/XrdClXRootDResponses.hh"

#include <stdint.h>
#include <ctime>

Expand All @@ -39,6 +41,7 @@ namespace XrdCl
class Channel;
class Message;
class URL;
class Tls;

//----------------------------------------------------------------------------
//! Message filter
Expand Down Expand Up @@ -144,6 +147,25 @@ namespace XrdCl
return Status( stOK, suDone );
};

//------------------------------------------------------------------------
//! Read message body directly from TLS layer - called if Examine returns
//! Raw flag - only TLS related errors may be returned here
//!
//! @param msg the corresponding message header
//! @param socket the socket to read from
//! @param bytesRead number of bytes read by the method
//! @return stOK & suDone if the whole body has been processed
//! stOK & suRetry if more data is needed
//! stError on failure
//------------------------------------------------------------------------
virtual Status ReadMessageBody( Message *msg,
Tls *tls,
uint32_t &bytesRead )
{
(void)msg; (void)tls; (void)bytesRead;
return Status( stOK, suDone );
}

//------------------------------------------------------------------------
//! Handle an event other that a message arrival
//!
Expand Down Expand Up @@ -197,20 +219,14 @@ namespace XrdCl
virtual bool IsRaw() const { return false; }

//------------------------------------------------------------------------
//! Write message body directly to a socket - called if IsRaw returns
//! true - only socket related errors may be returned here
//! Get message body - called if IsRaw returns true
//!
//! @param socket the socket to read from
//! @param bytesRead number of bytes read by the method
//! @return stOK & suDone if the whole body has been processed
//! stOK & suRetry if more data needs to be written
//! stError on failure
//! @param asyncOffset : the current async offset
//! @return : the list of chunks
//------------------------------------------------------------------------
virtual Status WriteMessageBody( int socket,
uint32_t &bytesRead )
virtual ChunkList* GetMessageBody( uint32_t *&asyncOffset )
{
(void)socket; (void)bytesRead;
return Status();
return 0;
}
};

Expand Down Expand Up @@ -339,6 +355,20 @@ namespace XrdCl
//------------------------------------------------------------------------
virtual Status GetHeader( Message *message, int socket ) = 0;

//------------------------------------------------------------------------
//! Read a message header from the TLS layer (non-blocking mode),
//! so if there is not enough data the function should return suRetry
//! in which case it will be called again when more data arrives, with
//! the data previously read stored in the message buffer
//!
//! @param message the message buffer
//! @param socket the socket
//! @return stOK & suDone if the whole message has been processed
//! stOK & suRetry if more data is needed
//! stError on failure
//------------------------------------------------------------------------
virtual Status GetHeader( Message *message, Tls *tls ) = 0;

//------------------------------------------------------------------------
//! Read the message body from the socket, the socket is non-blocking,
//! the method may be called multiple times - see GetHeader for details
Expand All @@ -351,6 +381,18 @@ namespace XrdCl
//------------------------------------------------------------------------
virtual Status GetBody( Message *message, int socket ) = 0;

//------------------------------------------------------------------------
//! Read the message body from the TLS layer (non-blocking mode),
//! the method may be called multiple times - see GetHeader for details
//!
//! @param message the message buffer containing the header
//! @param socket the socket
//! @return stOK & suDone if the whole message has been processed
//! stOK & suRetry if more data is needed
//! stError on failure
//------------------------------------------------------------------------
virtual Status GetBody( Message *message, Tls *tls ) = 0;

//------------------------------------------------------------------------
//! Initialize channel
//------------------------------------------------------------------------
Expand Down Expand Up @@ -445,6 +487,17 @@ namespace XrdCl
uint16_t subStream,
uint32_t bytesSent,
AnyObject &channelData ) = 0;

//------------------------------------------------------------------------
//! Wait before exit
//------------------------------------------------------------------------
virtual void WaitBeforeExit() = 0;

//------------------------------------------------------------------------
//! Get signature for given message
//------------------------------------------------------------------------
virtual Status GetSignature( Message *toSign, Message *&sign,
AnyObject &channelData ) = 0;
};
}

Expand Down
2 changes: 1 addition & 1 deletion src/XrdCl/XrdClXRootDMsgHandler.hh
Expand Up @@ -315,7 +315,7 @@ namespace XrdCl
//! @param asyncOffset : the current async offset
//! @return : the list of chunks
//------------------------------------------------------------------------
ChunkList* GetMessageBody( uint32_t *&asyncOffset )
virtual ChunkList* GetMessageBody( uint32_t *&asyncOffset )
{
asyncOffset = &pAsyncOffset;
return pChunkList;
Expand Down

0 comments on commit 61fa014

Please sign in to comment.