diff --git a/rpc/bridge.cpp b/rpc/bridge.cpp index f2fb405b..50ac047e 100755 --- a/rpc/bridge.cpp +++ b/rpc/bridge.cpp @@ -3449,9 +3449,27 @@ gint32 CRpcInterfaceServer::CloneInvTask( gint32 ret = 0; do{ CParamList oParams; - oParams.CopyProp( propIfPtr, pCallback ); - oParams.CopyProp( propMsgPtr, pCallback ); - oParams.CopyProp( propMatchPtr, pCallback ); + CIfInvokeMethodTask* pInv = + ObjPtr( pCallback ); + + if( pInv == nullptr ) + { + ret = -EFAULT; + break; + } + + CStdRTMutex oLock( pInv->GetLock() ); + if( pInv->IsStopped( + pInv->GetTaskState() ) ) + { + ret = ERROR_STATE; + break; + } + IConfigDb* pInvCfg = pInv->GetConfig(); + oParams.CopyProp( propIfPtr, pInvCfg ); + oParams.CopyProp( propMsgPtr, pInvCfg ); + oParams.CopyProp( propMatchPtr, pInvCfg ); + oLock.Unlock(); ret = pTask.NewObj( clsid( CIfInvokeMethodTask ), diff --git a/rpc/reqfwdr.cpp b/rpc/reqfwdr.cpp index 51376630..6ecc2c76 100755 --- a/rpc/reqfwdr.cpp +++ b/rpc/reqfwdr.cpp @@ -4881,8 +4881,6 @@ gint32 CRpcReqForwarderProxy::AddInterface( gint32 ret = 0; do{ - MatchPtr ptrMatch( pMatch ); - MatchPtr ptrCopy; ret = ptrCopy.NewObj( clsid( CRouterRemoteMatch ) ); diff --git a/rpc/stmpdo.cpp b/rpc/stmpdo.cpp index 44aad7a9..7786e9fa 100644 --- a/rpc/stmpdo.cpp +++ b/rpc/stmpdo.cpp @@ -187,7 +187,7 @@ gint32 CRpcTcpBusPort::BuildPdoPortName( std::to_string( dwPortId ); break; } - else if( pCfg->exist( propConnParams ) ) + else { // proxy side // ip addr must exist @@ -200,10 +200,10 @@ gint32 CRpcTcpBusPort::BuildPdoPortName( CStdRMutex oPortLock( GetLock() ); CConnParams ocps( pcp ); PDOADDR oAddr( ocps ); - std::map< PDOADDR, guint32 >::iterator + std::map< PDOADDR, guint32 >::const_iterator itr = m_mapAddrToId.find( oAddr ); - if( itr != m_mapAddrToId.end() ) + if( itr != m_mapAddrToId.cend() ) { dwPortId = itr->second; } @@ -215,10 +215,6 @@ gint32 CRpcTcpBusPort::BuildPdoPortName( strPortName = strClass + "_" + std::to_string( dwPortId ); } - else - { - ret = -EINVAL; - } }while( 0 ); @@ -435,23 +431,155 @@ gint32 CRpcTcpBusPort::PostStart( return ret; } +extern void SetPnpState( + IRP* pIrp, guint32 state ); + gint32 CRpcTcpBusPort::PreStop( IRP* pIrp ) { - // - // NOTE: this routine could be repeated many - // times till it returns - // STATUS_MORE_PROCESS_NEEDED - // - gint32 ret = super::PreStop( pIrp ); - if( ret != STATUS_MORE_PROCESS_NEEDED ) - { - for( auto elem : m_vecListenSocks ) + gint32 ret = 0; + do{ + CIoManager* pMgr = GetIoMgr(); + + if( pIrp == nullptr ) { - ret = elem->Stop(); - elem.Clear(); + ret = -EINVAL; + break; } - } + + IrpCtxPtr& pCtx = pIrp->GetCurCtx(); + + BufPtr pBuf; + pCtx->GetExtBuf( pBuf ); + CGenBusPnpExt* pExt = + ( CGenBusPnpExt* )( *pBuf ); + + if( pExt->m_dwExtState == 0 ) + { + pExt->m_dwExtState = 1; + if( ret == STATUS_PENDING ) + { + break; + } + // fall through + } + + if( pExt->m_dwExtState == 1 ) + { + ret = super::PreStop( pIrp ); + + if( ERROR( ret ) ) + break; + + pExt->m_dwExtState = 2; + + if( ret == + STATUS_MORE_PROCESS_NEEDED ) + break; + // fall through + } + + if( pExt->m_dwExtState == 2 ) + { + gint32 (*func)( IEventSink*, + CPort*, CRpcListeningSock* ) = + ([]( IEventSink* pCb, CPort* pBus, + CRpcListeningSock* pSock )->gint32 + { + gint32 (*pStopSock)( IEventSink*, + CRpcListeningSock* ) = + ([]( IEventSink* pCb, + CRpcListeningSock* pSock ) + { + pSock->OnEvent( eventStop, + 0, 0, nullptr ); + + pCb->OnEvent( eventTaskComp, + 0, 0, nullptr ); + return 0; + }); + + TaskletPtr pTask; + gint32 ret = NEW_FUNCCALL_TASKEX2( + 0, pTask, pBus->GetIoMgr(), + pStopSock, pCb, pSock ); + if( ERROR( ret ) ) + return ret; + + CIfRetryTask* pRetry = pTask; + pRetry->SetClientNotify( pCb ); + auto pLoop = pSock->GetMainLoop(); + pLoop->AddTask( pTask ); + return STATUS_PENDING; + }); + + TaskGrpPtr pGrp; + CParamList oParams; + + oParams.SetPointer( + propIoMgr, pMgr ); + + ret = pGrp.NewObj( + clsid( CIfTaskGroup ), + oParams.GetCfg() ); + if( ERROR( ret ) ) + break; + + pGrp->SetRelation( logicNONE ); + + for( auto elem : m_vecListenSocks ) + { + TaskletPtr pStopChild; + ret = NEW_FUNCCALL_TASKEX2( 0, + pStopChild, this->GetIoMgr(), + func, nullptr, this, + ( CRpcListeningSock* )elem ); + + if( ERROR( ret ) ) + break; + pGrp->AppendTask( pStopChild ); + } + m_vecListenSocks.clear(); + + TaskletPtr pCompIrp; + ret = DEFER_OBJCALLEX_NOSCHED( + pCompIrp, this->GetIoMgr(), + &CIoManager::CompleteIrp, + pIrp ); + if( ERROR( ret ) ) + { + ( *pGrp )( eventCancelTask ); + break; + } + pGrp->AppendTask( pCompIrp ); + TaskletPtr pTask = pGrp; + ret = pMgr->RescheduleTask( pTask ); + if( SUCCEEDED( ret ) ) + { + ret = STATUS_MORE_PROCESS_NEEDED; + break; + } + ( *pGrp )( eventCancelTask ); + } + + if( pExt->m_dwExtState == 3 ) + { + SetPnpState( pIrp, + PNP_STATE_STOP_PRE ); + ret = 0; + break; + } + + // we don't expect the PreStop + // will be entered again + ret = ERROR_STATE; + break; + + }while( 0 ); + + if( ret == STATUS_PENDING ) + ret = STATUS_MORE_PROCESS_NEEDED; + return ret; } @@ -2139,7 +2267,10 @@ gint32 CTcpStreamPdo::OnPortReady( if( ERROR( ret ) ) break; - CConnParams oConnParams( pcp ); + CParamList oConnCpy; + oConnCpy.Append( pcp ); + CConnParams oConnParams( + oConnCpy.GetCfg() ); if( m_pBusPort == nullptr ) { ret = -EFAULT; diff --git a/rpc/tcpfido.cpp b/rpc/tcpfido.cpp index d3adec8e..19fce0ff 100755 --- a/rpc/tcpfido.cpp +++ b/rpc/tcpfido.cpp @@ -472,7 +472,9 @@ gint32 CRpcTcpFido::HandleSendReq( { // the message arrives before // the irp arrives here - DebugPrint( ret, "respmsg arrives before reqmsg" ); + DebugPrint( ret, "respmsg arrives before " + "reqmsg ctrlcode = %d", + dwCtrlCode ); DMsgPtr pRespMsg = itr->second; m_mapRespMsgs.erase( itr ); diff --git a/rpc/tcportex.h b/rpc/tcportex.h index df64985a..446f566b 100644 --- a/rpc/tcportex.h +++ b/rpc/tcportex.h @@ -486,7 +486,7 @@ class CRpcConnSock : virtual gint32 OnSendReady(); virtual gint32 OnReceive(); virtual gint32 OnConnected(); - virtual CMainIoLoop* GetMainLoop() const override; + CMainIoLoop* GetMainLoop() const override; gint32 OnDisconnected(); diff --git a/rpc/tcpport.cpp b/rpc/tcpport.cpp index 381c1c15..7311aa05 100755 --- a/rpc/tcpport.cpp +++ b/rpc/tcpport.cpp @@ -3195,8 +3195,9 @@ gint32 CRpcStream::GetReadIrpsToComp( guint32 dwResidual = pBuf->size() - dwRecvSize; - memmove( pBuf->ptr() + dwRecvSize, - pBuf->ptr(), dwResidual ); + memmove( pBuf->ptr(), + pBuf->ptr() + dwRecvSize, + dwResidual ); pBuf->Resize( dwResidual ); pPacket->SetPayload( pBuf ); @@ -4458,11 +4459,19 @@ gint32 CRpcListeningSock::Connect() gint32 CRpcListeningSock::Start() { gint32 ret = 0; + auto pBus = static_cast + < CRpcTcpBusPort* >( m_pParentPort ); + if( pBus == nullptr ) + return -EFAULT; CStdRMutex oSockLock( GetLock() ); if( GetState() != sockInit ) return ERROR_STATE; + ret = pBus->AllocMainLoop( m_pLoop ); + if( ERROR( ret ) ) + return ret; + // start listening to the port ret = Connect(); if( ERROR( ret ) ) @@ -4472,6 +4481,19 @@ gint32 CRpcListeningSock::Start() return 0; } +gint32 CRpcListeningSock::Stop() +{ + gint32 ret = super::Stop(); + CStdRMutex oSockLock( GetLock() ); + auto pBus = static_cast + < CRpcTcpBusPort* >( m_pParentPort ); + if( pBus == nullptr ) + return -EFAULT; + + pBus->ReleaseMainLoop( m_pLoop ); + m_pLoop.Clear(); +} + gint32 CRpcListeningSock::OnConnected() { // a new connection coming diff --git a/rpc/tcpport.h b/rpc/tcpport.h index b1321714..8ee38dee 100644 --- a/rpc/tcpport.h +++ b/rpc/tcpport.h @@ -675,7 +675,7 @@ typedef CAutoPtr< Clsid_Invalid, CRpcSocketBase > SockPtr; class CRpcListeningSock : public CRpcSocketBase { - + MloopPtr m_pLoop; virtual gint32 DispatchSockEvent( GIOCondition ); @@ -685,12 +685,19 @@ class CRpcListeningSock : CRpcListeningSock( const IConfigDb* pCfg ); // commands - gint32 Start(); + gint32 Start() override; + gint32 Stop() override; // new connection comes virtual gint32 OnConnected(); virtual gint32 OnError( gint32 ret ); gint32 Connect(); + + CMainIoLoop* GetMainLoop() const override + { + CMainIoLoop* pLoop = m_pLoop; + return pLoop; + } }; class CRpcStreamSock :