From b16a149cada6c80f06b4fed8fccfac9ba3cc7f5d Mon Sep 17 00:00:00 2001 From: Lukasz Janyst Date: Thu, 25 Apr 2013 15:01:31 +0200 Subject: [PATCH] [XrdCl] Add possibility to not call the processing callback when receiving partial messages --- src/XrdCl/XrdClChannel.cc | 2 +- src/XrdCl/XrdClInQueue.cc | 31 +++++++++++++------------- src/XrdCl/XrdClInQueue.hh | 6 ++--- src/XrdCl/XrdClPostMasterInterfaces.hh | 19 +++++++++------- src/XrdCl/XrdClStream.cc | 18 ++++++++++----- src/XrdCl/XrdClXRootDMsgHandler.cc | 22 +++++------------- src/XrdCl/XrdClXRootDMsgHandler.hh | 2 +- 7 files changed, 51 insertions(+), 49 deletions(-) diff --git a/src/XrdCl/XrdClChannel.cc b/src/XrdCl/XrdClChannel.cc index 90a595a6a1e..cb984df42bf 100644 --- a/src/XrdCl/XrdClChannel.cc +++ b/src/XrdCl/XrdClChannel.cc @@ -52,7 +52,7 @@ namespace //------------------------------------------------------------------------ // Message handler //------------------------------------------------------------------------ - virtual uint8_t Examine( XrdCl::Message *msg ) + virtual uint16_t Examine( XrdCl::Message *msg ) { if( pFilter->Filter( msg ) ) return Take | RemoveHandler; diff --git a/src/XrdCl/XrdClInQueue.cc b/src/XrdCl/XrdClInQueue.cc index 749b4e3575c..9c6e593e413 100644 --- a/src/XrdCl/XrdClInQueue.cc +++ b/src/XrdCl/XrdClInQueue.cc @@ -29,18 +29,17 @@ namespace XrdCl pMutex.Lock(); HandlerList::iterator it; - uint8_t action = 0; + uint16_t action = 0; IncomingMsgHandler *handler = 0; - for( it = pHandlers.begin(); it != pHandlers.end(); ++it ) + for( it = pHandlers.begin(); it != pHandlers.end(); ) { handler = it->first; action = handler->Examine( msg ); if( action & IncomingMsgHandler::RemoveHandler ) - { it = pHandlers.erase( it ); - --it; - } + else + ++it; if( action & IncomingMsgHandler::Take ) break; @@ -52,7 +51,7 @@ namespace XrdCl pMessages.push_front( msg ); pMutex.UnLock(); - if( handler ) + if( handler && !(action & IncomingMsgHandler::NoProcess) ) handler->Process( msg ); return true; @@ -66,17 +65,19 @@ namespace XrdCl XrdSysMutexHelper scopedLock( pMutex ); std::list::iterator it; - uint8_t action = 0; - for( it = pMessages.begin(); it != pMessages.end(); ++it ) + uint16_t action = 0; + for( it = pMessages.begin(); it != pMessages.end(); ) { action = handler->Examine( *it ); if( action & IncomingMsgHandler::Take ) { - handler->Process( *it ); + if( !(action & IncomingMsgHandler::NoProcess ) ) + handler->Process( *it ); it = pMessages.erase( it ); - --it; } + else + ++it; if( action & IncomingMsgHandler::RemoveHandler ) break; @@ -90,15 +91,15 @@ namespace XrdCl // Get a message handler inerested in receiving message whose header // is storead in msg //---------------------------------------------------------------------------- - IncomingMsgHandler *InQueue::GetHandlerForMessage( Message *msg, - time_t &expires, - uint8_t &action ) + IncomingMsgHandler *InQueue::GetHandlerForMessage( Message *msg, + time_t &expires, + uint16_t &action ) { XrdSysMutexHelper scopedLock( pMutex ); HandlerList::iterator it; IncomingMsgHandler *handler = 0; - time_t exp = 0; - uint8_t act = 0; + time_t exp = 0; + uint16_t act = 0; for( it = pHandlers.begin(); it != pHandlers.end(); ++it ) { handler = it->first; diff --git a/src/XrdCl/XrdClInQueue.hh b/src/XrdCl/XrdClInQueue.hh index 2f352104c0b..f8a413bff11 100644 --- a/src/XrdCl/XrdClInQueue.hh +++ b/src/XrdCl/XrdClInQueue.hh @@ -58,9 +58,9 @@ namespace XrdCl //! //! @return handler or 0 if none is interested //------------------------------------------------------------------------ - IncomingMsgHandler *GetHandlerForMessage( Message *msg, - time_t &expires, - uint8_t &action ); + IncomingMsgHandler *GetHandlerForMessage( Message *msg, + time_t &expires, + uint16_t &action ); //------------------------------------------------------------------------ //! Re-insert the handler without scanning the cached messages diff --git a/src/XrdCl/XrdClPostMasterInterfaces.hh b/src/XrdCl/XrdClPostMasterInterfaces.hh index 844ff4c0fc3..8408d33a1db 100644 --- a/src/XrdCl/XrdClPostMasterInterfaces.hh +++ b/src/XrdCl/XrdClPostMasterInterfaces.hh @@ -58,13 +58,16 @@ namespace XrdCl //------------------------------------------------------------------------ enum Action { - Take = 0x01, //!< Take ownership over the message - Ignore = 0x02, //!< Ignore the message - RemoveHandler = 0x04, //!< Remove the handler from the notification - //!< list - Raw = 0x08 //!< the handler is interested in reding - //!< the message body directly from the - //!< socket + Take = 0x0001, //!< Take ownership over the message + Ignore = 0x0002, //!< Ignore the message + RemoveHandler = 0x0004, //!< Remove the handler from the notification + //!< list + Raw = 0x0008, //!< the handler is interested in reding + //!< the message body directly from the + //!< socket + NoProcess = 0x0010 //!< don't call the processing callback + //!< even if the message belongs to this + //!< handler }; //------------------------------------------------------------------------ @@ -91,7 +94,7 @@ namespace XrdCl //! @return action type that needs to be take wrt the message and //! the handler //------------------------------------------------------------------------ - virtual uint8_t Examine( Message *msg ) = 0; + virtual uint16_t Examine( Message *msg ) = 0; //------------------------------------------------------------------------ //! Process the message if it was "taken" by the examine action diff --git a/src/XrdCl/XrdClStream.cc b/src/XrdCl/XrdClStream.cc index 92040d56b3a..98f2c44b582 100644 --- a/src/XrdCl/XrdClStream.cc +++ b/src/XrdCl/XrdClStream.cc @@ -63,7 +63,7 @@ namespace XrdCl InMessageHelper( Message *message = 0, IncomingMsgHandler *hndlr = 0, time_t expir = 0, - uint8_t actio = 0 ): + uint16_t actio = 0 ): msg( message ), handler( hndlr ), expires( expir ), action( actio ) {} void Reset() { @@ -72,7 +72,7 @@ namespace XrdCl Message *msg; IncomingMsgHandler *handler; time_t expires; - uint8_t action; + uint16_t action; }; //---------------------------------------------------------------------------- @@ -396,7 +396,7 @@ namespace XrdCl InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper; if( !mh.handler ) { - log->Dump( PostMasterMsg, "[%s] Queuing received message.", + log->Dump( PostMasterMsg, "[%s] Queuing received message: %s.", pStreamName.c_str(), msg->GetDescription().c_str() ); uint32_t streamAction = pTransport->StreamAction( msg, *pChannelData ); @@ -410,13 +410,21 @@ namespace XrdCl //-------------------------------------------------------------------------- // We have a handler, so we call the callback //-------------------------------------------------------------------------- - log->Dump( PostMasterMsg, "[%s] Handling received message.", + log->Dump( PostMasterMsg, "[%s] Handling received message: %s.", pStreamName.c_str(), msg->GetDescription().c_str() ); - Job *job = new HandleIncMsgJob( mh.handler ); if( !(mh.action & IncomingMsgHandler::RemoveHandler) ) pIncomingQueue->ReAddMessageHandler( mh.handler, mh.expires ); + if( mh.action & IncomingMsgHandler::NoProcess ) + { + log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.", + pStreamName.c_str(), msg->GetDescription().c_str() ); + mh.Reset(); + return; + } + + Job *job = new HandleIncMsgJob( mh.handler ); mh.Reset(); pJobManager->QueueJob( job, msg ); } diff --git a/src/XrdCl/XrdClXRootDMsgHandler.cc b/src/XrdCl/XrdClXRootDMsgHandler.cc index 86fcd0a0e56..f49a6dd1db6 100644 --- a/src/XrdCl/XrdClXRootDMsgHandler.cc +++ b/src/XrdCl/XrdClXRootDMsgHandler.cc @@ -64,7 +64,7 @@ namespace XrdCl //---------------------------------------------------------------------------- // Examine an incomming message, and decide on the action to be taken //---------------------------------------------------------------------------- - uint8_t XRootDMsgHandler::Examine( Message *msg ) + uint16_t XRootDMsgHandler::Examine( Message *msg ) { if( msg->GetSize() < 8 ) return Ignore; @@ -198,12 +198,12 @@ namespace XrdCl { pReadRawStarted = false; pAsyncMsgSize = dlen; - return Take | Raw; + return Take | Raw | NoProcess; } else { pReadRawCurrentOffset += dlen; - return Take; + return Take | NoProcess; } } @@ -216,13 +216,13 @@ namespace XrdCl { pAsyncMsgSize = dlen; pReadVRawMsgOffset = 0; - return Take | Raw; + return Take | Raw | NoProcess; } else - return Take; + return Take | NoProcess; } - return Take; + return Take | NoProcess; } //------------------------------------------------------------------------ @@ -512,16 +512,6 @@ namespace XrdCl return; } - //------------------------------------------------------------------------ - // We've got a partial answer. Wait for more - //------------------------------------------------------------------------ - case kXR_oksofar: - { - // we do nothing here, we queued partials in examine to have them - // in the right order - return; - } - //------------------------------------------------------------------------ // Default - unrecognized/unsupported response, declare an error //------------------------------------------------------------------------ diff --git a/src/XrdCl/XrdClXRootDMsgHandler.hh b/src/XrdCl/XrdClXRootDMsgHandler.hh index 7b6509e654d..0ce16884396 100644 --- a/src/XrdCl/XrdClXRootDMsgHandler.hh +++ b/src/XrdCl/XrdClXRootDMsgHandler.hh @@ -108,7 +108,7 @@ namespace XrdCl //! @return action type that needs to be take wrt the message and //! the handler //------------------------------------------------------------------------ - virtual uint8_t Examine( Message *msg ); + virtual uint16_t Examine( Message *msg ); //------------------------------------------------------------------------ //! Process the message if it was "taken" by the examine action