Skip to content

Commit

Permalink
[XrdCl] Pass stream and substream ids to the transport handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ljanyst committed Mar 10, 2014
1 parent 6bf3425 commit 804f6de
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
5 changes: 5 additions & 0 deletions src/XrdCl/XrdClPostMasterInterfaces.hh
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ namespace XrdCl
//! Check if the stream should be disconnected
//------------------------------------------------------------------------
virtual bool IsStreamTTLElapsed( time_t inactiveTime,
uint16_t streamId,
AnyObject &channelData ) = 0;

//------------------------------------------------------------------------
Expand Down Expand Up @@ -375,6 +376,7 @@ namespace XrdCl
//! the answer will be returned via the hinted stream.
//------------------------------------------------------------------------
virtual PathID MultiplexSubStream( Message *msg,
uint16_t streamId,
AnyObject &channelData,
PathID *hint = 0 ) = 0;

Expand Down Expand Up @@ -406,12 +408,15 @@ namespace XrdCl
//! Check if the message invokes a stream action
//------------------------------------------------------------------------
virtual uint32_t MessageReceived( Message *msg,
uint16_t streamId,
uint16_t subStream,
AnyObject &channelData ) = 0;

//------------------------------------------------------------------------
//! Notify the transport about a message having been sent
//------------------------------------------------------------------------
virtual void MessageSent( Message *msg,
uint16_t streamId,
uint16_t subStream,
uint32_t bytesSent,
AnyObject &channelData ) = 0;
Expand Down
13 changes: 9 additions & 4 deletions src/XrdCl/XrdClStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ namespace XrdCl
//--------------------------------------------------------------------------
// Decide on the path to send the message
//--------------------------------------------------------------------------
PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
PathID path = pTransport->MultiplexSubStream( msg, pStreamNum,
*pChannelData );
if( pSubStreams.size() <= path.up )
{
log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
Expand All @@ -318,7 +319,7 @@ namespace XrdCl
Status st = EnableLink( path );
if( st.IsOK() )
{
pTransport->MultiplexSubStream( msg, *pChannelData, &path );
pTransport->MultiplexSubStream( msg, pStreamNum, *pChannelData, &path );
pSubStreams[path.up]->outQueue->PushBack( msg, handler,
expires, stateful );
}
Expand Down Expand Up @@ -420,7 +421,9 @@ namespace XrdCl
msg->SetSessionId( pSessionId );
pBytesReceived += bytesReceived;

uint32_t streamAction = pTransport->MessageReceived( msg, *pChannelData );
uint32_t streamAction = pTransport->MessageReceived( msg, pStreamNum,
subStream,
*pChannelData );
if( streamAction & TransportHandler::DigestMsg )
return;

Expand Down Expand Up @@ -495,7 +498,8 @@ namespace XrdCl
Message *msg,
uint32_t bytesSent )
{
pTransport->MessageSent( msg, subStream, bytesSent, *pChannelData );
pTransport->MessageSent( msg, pStreamNum, subStream, bytesSent,
*pChannelData );
OutMessageHelper &h = pSubStreams[subStream]->outMsgHelper;
pBytesSent += bytesSent;
if( h.handler )
Expand Down Expand Up @@ -871,6 +875,7 @@ namespace XrdCl
if( !outgoingMessages )
{
bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
pStreamNum,
*pChannelData );
if( disconnect )
{
Expand Down
5 changes: 5 additions & 0 deletions src/XrdCl/XrdClXRootDTransport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ namespace XrdCl
// Check if the stream should be disconnected
//----------------------------------------------------------------------------
bool XRootDTransport::IsStreamTTLElapsed( time_t inactiveTime,
uint16_t streamId,
AnyObject &channelData )
{
XRootDChannelInfo *info = 0;
Expand Down Expand Up @@ -523,6 +524,7 @@ namespace XrdCl
// Multiplex
//----------------------------------------------------------------------------
PathID XRootDTransport::MultiplexSubStream( Message *msg,
uint16_t streamId,
AnyObject &channelData,
PathID *hint )
{
Expand Down Expand Up @@ -939,6 +941,8 @@ namespace XrdCl
// Check whether the transport can hijack the message
//----------------------------------------------------------------------------
uint32_t XRootDTransport::MessageReceived( Message *msg,
uint16_t streamId,
uint16_t subStream,
AnyObject &channelData )
{
XRootDChannelInfo *info = 0;
Expand Down Expand Up @@ -1018,6 +1022,7 @@ namespace XrdCl
// Notify the transport about a message having been sent
//----------------------------------------------------------------------------
void XRootDTransport::MessageSent( Message *msg,
uint16_t streamId,
uint16_t subStream,
uint32_t bytesSent,
AnyObject &channelData )
Expand Down
5 changes: 5 additions & 0 deletions src/XrdCl/XrdClXRootDTransport.hh
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ namespace XrdCl
//! Check if the stream should be disconnected
//------------------------------------------------------------------------
virtual bool IsStreamTTLElapsed( time_t time,
uint16_t streamId,
AnyObject &channelData );

//------------------------------------------------------------------------
Expand Down Expand Up @@ -135,6 +136,7 @@ namespace XrdCl
//! the answer will be returned via the hinted stream.
//------------------------------------------------------------------------
virtual PathID MultiplexSubStream( Message *msg,
uint16_t streamId,
AnyObject &channelData,
PathID *hint = 0 );

Expand Down Expand Up @@ -206,12 +208,15 @@ namespace XrdCl
//! Check if the message invokes a stream action
//------------------------------------------------------------------------
virtual uint32_t MessageReceived( Message *msg,
uint16_t streamId,
uint16_t subStream,
AnyObject &channelData );

//------------------------------------------------------------------------
//! Notify the transport about a message having been sent
//------------------------------------------------------------------------
virtual void MessageSent( Message *msg,
uint16_t streamId,
uint16_t subStream,
uint32_t bytesSent,
AnyObject &channelData );
Expand Down

0 comments on commit 804f6de

Please sign in to comment.