Skip to content

Commit

Permalink
[XrdCl] Send signed request using writev.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Oct 13, 2016
1 parent 44161c2 commit aeda84c
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 47 deletions.
106 changes: 99 additions & 7 deletions src/XrdCl/XrdClAsyncSocketHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace XrdCl
pIncoming( 0 ),
pHSIncoming( 0 ),
pOutgoing( 0 ),
pSignature( 0 ),
pHSOutgoing( 0 ),
pHandShakeData( 0 ),
pHandShakeDone( false ),
Expand Down Expand Up @@ -72,6 +73,7 @@ namespace XrdCl
{
Close();
delete pSocket;
delete pSignature;
}

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -376,7 +378,14 @@ namespace XrdCl
//------------------------------------------------------------------------
// Secure the message if necessary
//------------------------------------------------------------------------
SecureMsg( pOutgoing );
XRootDStatus st = GetSignature( pOutgoing, pSignature );
if( !st.IsOK() )
{
Log *log = DefaultEnv::GetLog();
log->Error( AsyncSockMsg, "[%s] Failed to sign the request: "
"%s (0x%x).", pStreamName.c_str(),
pOutgoing->GetDescription().c_str(), pOutgoing );
}
}

//--------------------------------------------------------------------------
Expand All @@ -385,7 +394,7 @@ namespace XrdCl
Status st;
if( !pOutMsgDone )
{
if( !(st = WriteCurrentMessage( pOutgoing )).IsOK() )
if( !(st = WriteSignedMessage( pOutgoing, pSignature )).IsOK() )
{
OnFault( st );
return;
Expand Down Expand Up @@ -511,6 +520,56 @@ namespace XrdCl
return Status();
}

//----------------------------------------------------------------------------
// Write the message and its signature
//----------------------------------------------------------------------------
Status AsyncSocketHandler::WriteSignedMessage( Message *toWrite, Message *sign )
{
if( !sign ) return WriteCurrentMessage( toWrite );

Log *log = DefaultEnv::GetLog();

const size_t iovcnt = 2;
iovec iov[iovcnt];
ToIov( *sign, iov[0] );
ToIov( *toWrite, iov[1] );

uint32_t leftToBeWritten = iov[0].iov_len + iov[1].iov_len;

while( leftToBeWritten )
{
int bytesRead = pSocket->WriteV( iov, iovcnt );
if( bytesRead <= 0 )
{
//----------------------------------------------------------------------
// Writing operation would block! So we are done for now, but we will
// return
//----------------------------------------------------------------------
if( errno == EAGAIN || errno == EWOULDBLOCK )
return Status( stOK, suRetry );

//----------------------------------------------------------------------
// Actual socket error error!
//----------------------------------------------------------------------
sign->SetCursor( 0 );
toWrite->SetCursor( 0 );
return Status( stError, errSocketError, errno );
}

leftToBeWritten -= bytesRead;
UpdateAfterWrite( *sign, iov[0], bytesRead );
UpdateAfterWrite( *toWrite, iov[1], bytesRead );
}

//--------------------------------------------------------------------------
// We have written the message successfully
//--------------------------------------------------------------------------
log->Dump( AsyncSockMsg, "[%s] Wrote a message: %s (0x%x), %d bytes",
pStreamName.c_str(), toWrite->GetDescription().c_str(),
toWrite, toWrite->GetSize() );
return Status();
}

//----------------------------------------------------------------------------
// Got a read readiness event
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -775,11 +834,21 @@ namespace XrdCl
OnFaultWhileHandshaking( Status( stError, errSocketTimeout ) );
}

Status AsyncSocketHandler::SecureMsg( Message *toSign )
//------------------------------------------------------------------------
// Get signature for given message
//------------------------------------------------------------------------
Status AsyncSocketHandler::GetSignature( Message *toSign, Message *&sign )
{
ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
kXR_unt16 reqid = ntohs( thereq->header.requestid );
if( reqid == kXR_sigver ) return Status(); // the message is already signed

if( sign )
{
SecurityRequest *sec = reinterpret_cast<SecurityRequest*>( sign->GetBuffer() );
kXR_unt16 reqid = ntohs( thereq->header.requestid );
kXR_unt16 expid = ntohs( sec->sigver.expectrid );
if( expid == reqid ) return Status(); // it's the correct signature for the request
delete sign; sign = 0; // otherwise delete the signature
}

XRootDChannelInfo *info = 0;
pChannelData->Get( info );
Expand All @@ -794,9 +863,32 @@ namespace XrdCl
if( rc < 0 )
return Status( stError, errInternal, -rc );

toSign->Free();
toSign->Grab( reinterpret_cast<char*>( newreq ), rc );
sign = new Message();
sign->Grab( reinterpret_cast<char*>( newreq ), rc );
}
else
return Status( stError, errInternal );

return Status();
}

//------------------------------------------------------------------------
// Initialize the iovec with given message
//------------------------------------------------------------------------
void AsyncSocketHandler::ToIov( Message &msg, iovec &iov )
{
iov.iov_base = msg.GetBufferAtCursor();
iov.iov_len = msg.GetSize() - msg.GetCursor();
}

//------------------------------------------------------------------------
// Update iovec after write
//------------------------------------------------------------------------
void AsyncSocketHandler::UpdateAfterWrite( Message &msg, iovec &iov, int &bytesRead )
{
size_t advance = ( bytesRead < (int)iov.iov_len ) ? bytesRead : iov.iov_len;
bytesRead -= advance;
msg.AdvanceCursor( advance );
ToIov( msg, iov );
}
}
20 changes: 18 additions & 2 deletions src/XrdCl/XrdClAsyncSocketHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ namespace XrdCl
//------------------------------------------------------------------------
Status WriteCurrentMessage( Message *toWrite );

//------------------------------------------------------------------------
// Write the message and its signature
//------------------------------------------------------------------------
Status WriteSignedMessage( Message *toWrite, Message *sign );

//------------------------------------------------------------------------
// Got a read readiness event
//------------------------------------------------------------------------
Expand Down Expand Up @@ -187,9 +192,19 @@ namespace XrdCl
void OnTimeoutWhileHandshaking();

//------------------------------------------------------------------------
// Sign a message
// Get signature for given message
//------------------------------------------------------------------------
Status GetSignature( Message *toSign, Message *&sign );

//------------------------------------------------------------------------
// Initialize the iovec with given message
//------------------------------------------------------------------------
inline void ToIov( Message &msg, iovec &iov );

//------------------------------------------------------------------------
// Update iovec after write
//------------------------------------------------------------------------
Status SecureMsg( Message *toSign );
inline void UpdateAfterWrite( Message &msg, iovec &iov, int &bytesRead );

//------------------------------------------------------------------------
// Data members
Expand All @@ -204,6 +219,7 @@ namespace XrdCl
Message *pIncoming;
Message *pHSIncoming;
Message *pOutgoing;
Message *pSignature;
Message *pHSOutgoing;
XrdNetAddr pSockAddr;
HandShakeData *pHandShakeData;
Expand Down
9 changes: 9 additions & 0 deletions src/XrdCl/XrdClSocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <signal.h>
#include <cstdlib>
#include <cstring>
#include <sys/uio.h>

namespace XrdCl
{
Expand Down Expand Up @@ -433,6 +434,14 @@ namespace XrdCl
#endif
}

//------------------------------------------------------------------------
// Wrapper around writev
//------------------------------------------------------------------------
ssize_t Socket::WriteV( iovec *iov, int iovcnt )
{
return ::writev( pSocket, iov, iovcnt );
}

//----------------------------------------------------------------------------
// Poll the descriptor
//----------------------------------------------------------------------------
Expand Down
15 changes: 12 additions & 3 deletions src/XrdCl/XrdClSocket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,21 @@ namespace XrdCl
//------------------------------------------------------------------------
//! Portable wrapper around SIGPIPE free send
//!
//! @param buffer data to be written
//! @param size size of the data buffer
//! @param bytesWritten the amount of data actually written
//! @param buffer : data to be written
//! @param size : size of the data buffer
//! @return : the amount of data actually written
//------------------------------------------------------------------------
ssize_t Send( void *buffer, uint32_t size );

//------------------------------------------------------------------------
//! Wrapper around writev
//!
//! @param iov : buffers to be written
//! @param iovcnt : number of buffers
//! @return : the amount of data actually written
//------------------------------------------------------------------------
ssize_t WriteV( iovec *iov, int iovcnt );

//------------------------------------------------------------------------
//! Get the file descriptor
//------------------------------------------------------------------------
Expand Down
5 changes: 0 additions & 5 deletions src/XrdCl/XrdClXRootDChannelInfo.hh
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ namespace XrdCl
openFiles(0),
waitBarrier(0),
protection(0),
signprot(0),
protRespBody(0),
protRespSize(0)
{
Expand All @@ -93,9 +92,6 @@ namespace XrdCl

if( protection )
protection->Delete();

if( signprot )
signprot->Delete();
}

typedef std::vector<XRootDStreamInfo> StreamInfoVector;
Expand All @@ -121,7 +117,6 @@ namespace XrdCl
uint32_t openFiles;
time_t waitBarrier;
XrdSecProtect *protection;
XrdSecProtocol *signprot;
ServerResponseBody_Protocol *protRespBody;
unsigned int protRespSize;
XrdSysMutex mutex;
Expand Down
63 changes: 33 additions & 30 deletions src/XrdCl/XrdClXRootDTransport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,9 @@ namespace XrdCl
XRootDChannelInfo *info = 0;
channelData.Get( info );
XrdSysMutexHelper scopedLock( info->mutex );

CleanUpProtection( info );

if( !info->stream.empty() )
{
XRootDStreamInfo &sInfo = info->stream[subStreamId];
Expand All @@ -877,8 +880,6 @@ namespace XrdCl
info->openFiles = 0;
info->waitBarrier = 0;
}

CleanUpProtection( info );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -1168,8 +1169,11 @@ namespace XrdCl
if( rsp->body.protocol.pval >= 0x297 )
info->serverFlags = rsp->body.protocol.flags;

info->protRespBody = new ServerResponseBody_Protocol( rsp->body.protocol );
info->protRespSize = rsp->hdr.dlen;
if( rsp->hdr.dlen > 8 )
{
info->protRespBody = new ServerResponseBody_Protocol( rsp->body.protocol );
info->protRespSize = rsp->hdr.dlen;
}

log->Debug( XRootDTransportMsg,
"[%s] kXR_protocol successful (%s, protocol version %x)",
Expand Down Expand Up @@ -1508,30 +1512,33 @@ namespace XrdCl
{
info->authProtocolName = info->authProtocol->Entity.prot;

int rc = XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
if( rc > 0 )
{
log->Debug( XRootDTransportMsg,
"[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
info->signprot = info->authProtocol;
info->authProtocol = 0;
}
else if( rc == 0 )
{
log->Debug( XRootDTransportMsg,
"[%s] XrdSecProtect: no protection needed.",
hsData->streamName.c_str() );
CleanUpProtection( info );
}
else
//----------------------------------------------------------------------
// Do we need protection?
//----------------------------------------------------------------------
if( info->protRespBody )
{
log->Debug( XRootDTransportMsg,
"[%s] Failed to load XrdSecProtect: %s",
hsData->streamName.c_str(), strerror( -rc ) );
CleanUpProtection( info );
int rc = XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
if( rc > 0 )
{
log->Debug( XRootDTransportMsg,
"[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
}
else if( rc == 0 )
{
log->Debug( XRootDTransportMsg,
"[%s] XrdSecProtect: no protection needed.",
hsData->streamName.c_str() );
}
else
{
log->Debug( XRootDTransportMsg,
"[%s] Failed to load XrdSecProtect: %s",
hsData->streamName.c_str(), strerror( -rc ) ); // TODO probably we need to fail more dramatically
}
}

CleanUpAuthentication( info );
if( !info->protection )
CleanUpAuthentication( info );

log->Debug( XRootDTransportMsg,
"[%s] Authenticated with %s.", hsData->streamName.c_str(),
Expand Down Expand Up @@ -1682,12 +1689,8 @@ namespace XrdCl
{
info->protection->Delete();
info->protection = 0;
}

if( info->signprot )
{
info->signprot->Delete();
info->signprot = 0;
CleanUpAuthentication( info );
}

if( info->protRespBody )
Expand Down

0 comments on commit aeda84c

Please sign in to comment.