Skip to content

Commit

Permalink
[XrdCl] Add possibility to not call the processing callback when rece…
Browse files Browse the repository at this point in the history
…iving partial messages
  • Loading branch information
ljanyst committed Apr 25, 2013
1 parent 6a80f7a commit b16a149
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/XrdCl/XrdClChannel.cc
Expand Up @@ -52,7 +52,7 @@ namespace
//------------------------------------------------------------------------
// Message handler
//------------------------------------------------------------------------
virtual uint8_t Examine( XrdCl::Message *msg )
virtual uint16_t Examine( XrdCl::Message *msg )
{
if( pFilter->Filter( msg ) )
return Take | RemoveHandler;
Expand Down
31 changes: 16 additions & 15 deletions src/XrdCl/XrdClInQueue.cc
Expand Up @@ -29,18 +29,17 @@ namespace XrdCl
pMutex.Lock();

HandlerList::iterator it;
uint8_t action = 0;
uint16_t action = 0;
IncomingMsgHandler *handler = 0;
for( it = pHandlers.begin(); it != pHandlers.end(); ++it )
for( it = pHandlers.begin(); it != pHandlers.end(); )
{
handler = it->first;
action = handler->Examine( msg );

if( action & IncomingMsgHandler::RemoveHandler )
{
it = pHandlers.erase( it );
--it;
}
else
++it;

if( action & IncomingMsgHandler::Take )
break;
Expand All @@ -52,7 +51,7 @@ namespace XrdCl
pMessages.push_front( msg );

pMutex.UnLock();
if( handler )
if( handler && !(action & IncomingMsgHandler::NoProcess) )
handler->Process( msg );

return true;
Expand All @@ -66,17 +65,19 @@ namespace XrdCl
XrdSysMutexHelper scopedLock( pMutex );

std::list<Message *>::iterator it;
uint8_t action = 0;
for( it = pMessages.begin(); it != pMessages.end(); ++it )
uint16_t action = 0;
for( it = pMessages.begin(); it != pMessages.end(); )
{
action = handler->Examine( *it );

if( action & IncomingMsgHandler::Take )
{
handler->Process( *it );
if( !(action & IncomingMsgHandler::NoProcess ) )
handler->Process( *it );
it = pMessages.erase( it );
--it;
}
else
++it;

if( action & IncomingMsgHandler::RemoveHandler )
break;
Expand All @@ -90,15 +91,15 @@ namespace XrdCl
// Get a message handler inerested in receiving message whose header
// is storead in msg
//----------------------------------------------------------------------------
IncomingMsgHandler *InQueue::GetHandlerForMessage( Message *msg,
time_t &expires,
uint8_t &action )
IncomingMsgHandler *InQueue::GetHandlerForMessage( Message *msg,
time_t &expires,
uint16_t &action )
{
XrdSysMutexHelper scopedLock( pMutex );
HandlerList::iterator it;
IncomingMsgHandler *handler = 0;
time_t exp = 0;
uint8_t act = 0;
time_t exp = 0;
uint16_t act = 0;
for( it = pHandlers.begin(); it != pHandlers.end(); ++it )
{
handler = it->first;
Expand Down
6 changes: 3 additions & 3 deletions src/XrdCl/XrdClInQueue.hh
Expand Up @@ -58,9 +58,9 @@ namespace XrdCl
//!
//! @return handler or 0 if none is interested
//------------------------------------------------------------------------
IncomingMsgHandler *GetHandlerForMessage( Message *msg,
time_t &expires,
uint8_t &action );
IncomingMsgHandler *GetHandlerForMessage( Message *msg,
time_t &expires,
uint16_t &action );

//------------------------------------------------------------------------
//! Re-insert the handler without scanning the cached messages
Expand Down
19 changes: 11 additions & 8 deletions src/XrdCl/XrdClPostMasterInterfaces.hh
Expand Up @@ -58,13 +58,16 @@ namespace XrdCl
//------------------------------------------------------------------------
enum Action
{
Take = 0x01, //!< Take ownership over the message
Ignore = 0x02, //!< Ignore the message
RemoveHandler = 0x04, //!< Remove the handler from the notification
//!< list
Raw = 0x08 //!< the handler is interested in reding
//!< the message body directly from the
//!< socket
Take = 0x0001, //!< Take ownership over the message
Ignore = 0x0002, //!< Ignore the message
RemoveHandler = 0x0004, //!< Remove the handler from the notification
//!< list
Raw = 0x0008, //!< the handler is interested in reding
//!< the message body directly from the
//!< socket
NoProcess = 0x0010 //!< don't call the processing callback
//!< even if the message belongs to this
//!< handler
};

//------------------------------------------------------------------------
Expand All @@ -91,7 +94,7 @@ namespace XrdCl
//! @return action type that needs to be take wrt the message and
//! the handler
//------------------------------------------------------------------------
virtual uint8_t Examine( Message *msg ) = 0;
virtual uint16_t Examine( Message *msg ) = 0;

//------------------------------------------------------------------------
//! Process the message if it was "taken" by the examine action
Expand Down
18 changes: 13 additions & 5 deletions src/XrdCl/XrdClStream.cc
Expand Up @@ -63,7 +63,7 @@ namespace XrdCl
InMessageHelper( Message *message = 0,
IncomingMsgHandler *hndlr = 0,
time_t expir = 0,
uint8_t actio = 0 ):
uint16_t actio = 0 ):
msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
void Reset()
{
Expand All @@ -72,7 +72,7 @@ namespace XrdCl
Message *msg;
IncomingMsgHandler *handler;
time_t expires;
uint8_t action;
uint16_t action;
};

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -396,7 +396,7 @@ namespace XrdCl
InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
if( !mh.handler )
{
log->Dump( PostMasterMsg, "[%s] Queuing received message.",
log->Dump( PostMasterMsg, "[%s] Queuing received message: %s.",
pStreamName.c_str(), msg->GetDescription().c_str() );

uint32_t streamAction = pTransport->StreamAction( msg, *pChannelData );
Expand All @@ -410,13 +410,21 @@ namespace XrdCl
//--------------------------------------------------------------------------
// We have a handler, so we call the callback
//--------------------------------------------------------------------------
log->Dump( PostMasterMsg, "[%s] Handling received message.",
log->Dump( PostMasterMsg, "[%s] Handling received message: %s.",
pStreamName.c_str(), msg->GetDescription().c_str() );
Job *job = new HandleIncMsgJob( mh.handler );

if( !(mh.action & IncomingMsgHandler::RemoveHandler) )
pIncomingQueue->ReAddMessageHandler( mh.handler, mh.expires );

if( mh.action & IncomingMsgHandler::NoProcess )
{
log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
pStreamName.c_str(), msg->GetDescription().c_str() );
mh.Reset();
return;
}

Job *job = new HandleIncMsgJob( mh.handler );
mh.Reset();
pJobManager->QueueJob( job, msg );
}
Expand Down
22 changes: 6 additions & 16 deletions src/XrdCl/XrdClXRootDMsgHandler.cc
Expand Up @@ -64,7 +64,7 @@ namespace XrdCl
//----------------------------------------------------------------------------
// Examine an incomming message, and decide on the action to be taken
//----------------------------------------------------------------------------
uint8_t XRootDMsgHandler::Examine( Message *msg )
uint16_t XRootDMsgHandler::Examine( Message *msg )
{
if( msg->GetSize() < 8 )
return Ignore;
Expand Down Expand Up @@ -198,12 +198,12 @@ namespace XrdCl
{
pReadRawStarted = false;
pAsyncMsgSize = dlen;
return Take | Raw;
return Take | Raw | NoProcess;
}
else
{
pReadRawCurrentOffset += dlen;
return Take;
return Take | NoProcess;
}
}

Expand All @@ -216,13 +216,13 @@ namespace XrdCl
{
pAsyncMsgSize = dlen;
pReadVRawMsgOffset = 0;
return Take | Raw;
return Take | Raw | NoProcess;
}
else
return Take;
return Take | NoProcess;
}

return Take;
return Take | NoProcess;
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -512,16 +512,6 @@ namespace XrdCl
return;
}

//------------------------------------------------------------------------
// We've got a partial answer. Wait for more
//------------------------------------------------------------------------
case kXR_oksofar:
{
// we do nothing here, we queued partials in examine to have them
// in the right order
return;
}

//------------------------------------------------------------------------
// Default - unrecognized/unsupported response, declare an error
//------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/XrdCl/XrdClXRootDMsgHandler.hh
Expand Up @@ -108,7 +108,7 @@ namespace XrdCl
//! @return action type that needs to be take wrt the message and
//! the handler
//------------------------------------------------------------------------
virtual uint8_t Examine( Message *msg );
virtual uint16_t Examine( Message *msg );

//------------------------------------------------------------------------
//! Process the message if it was "taken" by the examine action
Expand Down

0 comments on commit b16a149

Please sign in to comment.