Skip to content

Commit

Permalink
. stmpdo.cpp, tcpport.cpp, tcpport.h: updated the CRpcTcpBusPort's On…
Browse files Browse the repository at this point in the history
…PreStop to

shutdown the listening socket gracefully, to eliminate the potental race
condition on the listening socket, during stopping.
. bridge.cpp, reqfwdr.cpp, tcportex.h, tcpfido.cpp: made some small changes.
  • Loading branch information
zhiming99 committed Jan 3, 2024
1 parent 1440e04 commit 414611e
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 31 deletions.
24 changes: 21 additions & 3 deletions rpc/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3449,9 +3449,27 @@ gint32 CRpcInterfaceServer::CloneInvTask(
gint32 ret = 0;
do{
CParamList oParams;
oParams.CopyProp( propIfPtr, pCallback );
oParams.CopyProp( propMsgPtr, pCallback );
oParams.CopyProp( propMatchPtr, pCallback );
CIfInvokeMethodTask* pInv =
ObjPtr( pCallback );

if( pInv == nullptr )
{
ret = -EFAULT;
break;
}

CStdRTMutex oLock( pInv->GetLock() );
if( pInv->IsStopped(
pInv->GetTaskState() ) )
{
ret = ERROR_STATE;
break;
}
IConfigDb* pInvCfg = pInv->GetConfig();
oParams.CopyProp( propIfPtr, pInvCfg );
oParams.CopyProp( propMsgPtr, pInvCfg );
oParams.CopyProp( propMatchPtr, pInvCfg );
oLock.Unlock();

ret = pTask.NewObj(
clsid( CIfInvokeMethodTask ),
Expand Down
2 changes: 0 additions & 2 deletions rpc/reqfwdr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4881,8 +4881,6 @@ gint32 CRpcReqForwarderProxy::AddInterface(

gint32 ret = 0;
do{
MatchPtr ptrMatch( pMatch );

MatchPtr ptrCopy;
ret = ptrCopy.NewObj(
clsid( CRouterRemoteMatch ) );
Expand Down
171 changes: 151 additions & 20 deletions rpc/stmpdo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ gint32 CRpcTcpBusPort::BuildPdoPortName(
std::to_string( dwPortId );
break;
}
else if( pCfg->exist( propConnParams ) )
else
{
// proxy side
// ip addr must exist
Expand All @@ -200,10 +200,10 @@ gint32 CRpcTcpBusPort::BuildPdoPortName(
CStdRMutex oPortLock( GetLock() );
CConnParams ocps( pcp );
PDOADDR oAddr( ocps );
std::map< PDOADDR, guint32 >::iterator
std::map< PDOADDR, guint32 >::const_iterator
itr = m_mapAddrToId.find( oAddr );

if( itr != m_mapAddrToId.end() )
if( itr != m_mapAddrToId.cend() )
{
dwPortId = itr->second;
}
Expand All @@ -215,10 +215,6 @@ gint32 CRpcTcpBusPort::BuildPdoPortName(
strPortName = strClass + "_" +
std::to_string( dwPortId );
}
else
{
ret = -EINVAL;
}

}while( 0 );

Expand Down Expand Up @@ -435,23 +431,155 @@ gint32 CRpcTcpBusPort::PostStart(
return ret;
}

extern void SetPnpState(
IRP* pIrp, guint32 state );

gint32 CRpcTcpBusPort::PreStop(
IRP* pIrp )
{
//
// NOTE: this routine could be repeated many
// times till it returns
// STATUS_MORE_PROCESS_NEEDED
//
gint32 ret = super::PreStop( pIrp );
if( ret != STATUS_MORE_PROCESS_NEEDED )
{
for( auto elem : m_vecListenSocks )
gint32 ret = 0;
do{
CIoManager* pMgr = GetIoMgr();

if( pIrp == nullptr )
{
ret = elem->Stop();
elem.Clear();
ret = -EINVAL;
break;
}
}

IrpCtxPtr& pCtx = pIrp->GetCurCtx();

BufPtr pBuf;
pCtx->GetExtBuf( pBuf );
CGenBusPnpExt* pExt =
( CGenBusPnpExt* )( *pBuf );

if( pExt->m_dwExtState == 0 )
{
pExt->m_dwExtState = 1;
if( ret == STATUS_PENDING )
{
break;
}
// fall through
}

if( pExt->m_dwExtState == 1 )
{
ret = super::PreStop( pIrp );

if( ERROR( ret ) )
break;

pExt->m_dwExtState = 2;

if( ret ==
STATUS_MORE_PROCESS_NEEDED )
break;
// fall through
}

if( pExt->m_dwExtState == 2 )
{
gint32 (*func)( IEventSink*,
CPort*, CRpcListeningSock* ) =
([]( IEventSink* pCb, CPort* pBus,
CRpcListeningSock* pSock )->gint32
{
gint32 (*pStopSock)( IEventSink*,
CRpcListeningSock* ) =
([]( IEventSink* pCb,
CRpcListeningSock* pSock )
{
pSock->OnEvent( eventStop,
0, 0, nullptr );

pCb->OnEvent( eventTaskComp,
0, 0, nullptr );
return 0;
});

TaskletPtr pTask;
gint32 ret = NEW_FUNCCALL_TASKEX2(
0, pTask, pBus->GetIoMgr(),
pStopSock, pCb, pSock );
if( ERROR( ret ) )
return ret;

CIfRetryTask* pRetry = pTask;
pRetry->SetClientNotify( pCb );
auto pLoop = pSock->GetMainLoop();
pLoop->AddTask( pTask );
return STATUS_PENDING;
});

TaskGrpPtr pGrp;
CParamList oParams;

oParams.SetPointer(
propIoMgr, pMgr );

ret = pGrp.NewObj(
clsid( CIfTaskGroup ),
oParams.GetCfg() );
if( ERROR( ret ) )
break;

pGrp->SetRelation( logicNONE );

for( auto elem : m_vecListenSocks )
{
TaskletPtr pStopChild;
ret = NEW_FUNCCALL_TASKEX2( 0,
pStopChild, this->GetIoMgr(),
func, nullptr, this,
( CRpcListeningSock* )elem );

if( ERROR( ret ) )
break;
pGrp->AppendTask( pStopChild );
}
m_vecListenSocks.clear();

TaskletPtr pCompIrp;
ret = DEFER_OBJCALLEX_NOSCHED(
pCompIrp, this->GetIoMgr(),
&CIoManager::CompleteIrp,
pIrp );
if( ERROR( ret ) )
{
( *pGrp )( eventCancelTask );
break;
}
pGrp->AppendTask( pCompIrp );
TaskletPtr pTask = pGrp;
ret = pMgr->RescheduleTask( pTask );
if( SUCCEEDED( ret ) )
{
ret = STATUS_MORE_PROCESS_NEEDED;
break;
}
( *pGrp )( eventCancelTask );
}

if( pExt->m_dwExtState == 3 )
{
SetPnpState( pIrp,
PNP_STATE_STOP_PRE );
ret = 0;
break;
}

// we don't expect the PreStop
// will be entered again
ret = ERROR_STATE;
break;

}while( 0 );

if( ret == STATUS_PENDING )
ret = STATUS_MORE_PROCESS_NEEDED;

return ret;
}

Expand Down Expand Up @@ -2139,7 +2267,10 @@ gint32 CTcpStreamPdo::OnPortReady(
if( ERROR( ret ) )
break;

CConnParams oConnParams( pcp );
CParamList oConnCpy;
oConnCpy.Append( pcp );
CConnParams oConnParams(
oConnCpy.GetCfg() );
if( m_pBusPort == nullptr )
{
ret = -EFAULT;
Expand Down
4 changes: 3 additions & 1 deletion rpc/tcpfido.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,9 @@ gint32 CRpcTcpFido::HandleSendReq(
{
// the message arrives before
// the irp arrives here
DebugPrint( ret, "respmsg arrives before reqmsg" );
DebugPrint( ret, "respmsg arrives before "
"reqmsg ctrlcode = %d",
dwCtrlCode );
DMsgPtr pRespMsg = itr->second;
m_mapRespMsgs.erase( itr );

Expand Down
2 changes: 1 addition & 1 deletion rpc/tcportex.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ class CRpcConnSock :
virtual gint32 OnSendReady();
virtual gint32 OnReceive();
virtual gint32 OnConnected();
virtual CMainIoLoop* GetMainLoop() const override;
CMainIoLoop* GetMainLoop() const override;

gint32 OnDisconnected();

Expand Down
26 changes: 24 additions & 2 deletions rpc/tcpport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3195,8 +3195,9 @@ gint32 CRpcStream::GetReadIrpsToComp(
guint32 dwResidual =
pBuf->size() - dwRecvSize;

memmove( pBuf->ptr() + dwRecvSize,
pBuf->ptr(), dwResidual );
memmove( pBuf->ptr(),
pBuf->ptr() + dwRecvSize,
dwResidual );

pBuf->Resize( dwResidual );
pPacket->SetPayload( pBuf );
Expand Down Expand Up @@ -4458,11 +4459,19 @@ gint32 CRpcListeningSock::Connect()
gint32 CRpcListeningSock::Start()
{
gint32 ret = 0;
auto pBus = static_cast
< CRpcTcpBusPort* >( m_pParentPort );
if( pBus == nullptr )
return -EFAULT;

CStdRMutex oSockLock( GetLock() );
if( GetState() != sockInit )
return ERROR_STATE;

ret = pBus->AllocMainLoop( m_pLoop );
if( ERROR( ret ) )
return ret;

// start listening to the port
ret = Connect();
if( ERROR( ret ) )
Expand All @@ -4472,6 +4481,19 @@ gint32 CRpcListeningSock::Start()
return 0;
}

gint32 CRpcListeningSock::Stop()
{
gint32 ret = super::Stop();
CStdRMutex oSockLock( GetLock() );
auto pBus = static_cast
< CRpcTcpBusPort* >( m_pParentPort );
if( pBus == nullptr )
return -EFAULT;

pBus->ReleaseMainLoop( m_pLoop );
m_pLoop.Clear();
}

gint32 CRpcListeningSock::OnConnected()
{
// a new connection coming
Expand Down
11 changes: 9 additions & 2 deletions rpc/tcpport.h
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ typedef CAutoPtr< Clsid_Invalid, CRpcSocketBase > SockPtr;
class CRpcListeningSock :
public CRpcSocketBase
{

MloopPtr m_pLoop;
virtual gint32 DispatchSockEvent(
GIOCondition );

Expand All @@ -685,12 +685,19 @@ class CRpcListeningSock :
CRpcListeningSock( const IConfigDb* pCfg );

// commands
gint32 Start();
gint32 Start() override;
gint32 Stop() override;

// new connection comes
virtual gint32 OnConnected();
virtual gint32 OnError( gint32 ret );
gint32 Connect();

CMainIoLoop* GetMainLoop() const override
{
CMainIoLoop* pLoop = m_pLoop;
return pLoop;
}
};

class CRpcStreamSock :
Expand Down

0 comments on commit 414611e

Please sign in to comment.