Permalink
Browse files

Merge branch '158-blackhole-agent-disconnects' into 'master'

Fixed #158 blackhole loop got locked and blackhole agent seems disconnected; added logs to net code

Closes #158
  • Loading branch information...
tomatolog committed Jan 4, 2018
2 parents 56bfc38 + 37f574c commit f2f53757db45bcfb1544263ce0817e856656a621
Showing with 42 additions and 77 deletions.
  1. +12 −4 src/searchd.cpp
  2. +29 −72 src/searchdha.cpp
  3. +1 −1 src/searchdha.h
@@ -8188,7 +8188,7 @@ void SearchHandler_c::RunSubset ( int iStart, int iEnd )
tDistCtrl->WaitAgentsEvent();
bDistDone = tDistCtrl->IsDone();
// wait for remote queries to complete
if ( tDistCtrl->FetchReadyAgents() )
if ( tDistCtrl->FetchReadyAgents()>0 )
{
SearchReplyParser_c tParser ( iStart, iEnd );
int iMsecLeft = iAgentQueryTimeout - (int)( tmLocal/1000 );
@@ -14859,6 +14859,8 @@ static void BlackholeTick()
}
assert ( dAgents.GetLength() && tBlackholes.m_dPtrs.GetLength() );
sphLogDebugv ( "blackhole agents %d", dAgents.GetLength() );
BlackholeRequestBuilder_t tReq ( tBlackholes.m_dPtrs );
CSphScopedPtr<ISphRemoteAgentsController> tDistCtrl ( GetAgentsController ( 1, dAgents, tReq, g_iBlackholeTimeout, g_iBlackholeRetries ) );
if ( g_bShutdown )
@@ -14867,20 +14869,26 @@ static void BlackholeTick()
bool bDistDone = false;
while ( !bDistDone )
{
if ( !tDistCtrl->HasReadyAgents () )
if ( !tDistCtrl->HasReadyAgents () && !tDistCtrl->IsDone() )
tDistCtrl->WaitAgentsEvent();
bDistDone = tDistCtrl->IsDone();
if ( g_bShutdown )
return;
// need to fetch data to prevent network error reports at agent(blackhole)
if ( tDistCtrl->FetchReadyAgents() )
int iGot = tDistCtrl->FetchReadyAgents();
sphLogDebugv ( "blackhole got %d, done %d", iGot, (int)bDistDone );
if ( iGot>0 )
{
BlackholeReplyParser_t tParser;
RemoteWaitForAgents ( dAgents, g_iBlackholeTimeout, tParser );
if ( tDistCtrl->RetryFailed()>0 )
int iRetries = tDistCtrl->RetryFailed()>0;
if ( iRetries>0 )
bDistDone = false;
sphLogDebugv ( "blackhole %d retries", iRetries );
}
}
}
@@ -608,7 +608,7 @@ AgentConn_t::~AgentConn_t ()
void AgentConn_t::State ( AgentState_e eState )
{
sphLogDebugv ( "state %d > %d, sock %d, order %d", m_eConnState, eState, m_iSock, m_iStoreTag );
sphLogDebugv ( "state %d > %d, sock %d, order %d, %p", m_eConnState, eState, m_iSock, m_iStoreTag, this );
m_eConnState = eState;
}
@@ -1524,9 +1524,6 @@ int RemoteWaitForAgents ( AgentsVector & dAgents, int iTimeout, IReplyParser_t &
ARRAY_FOREACH ( iAgent, dAgents )
{
AgentConn_t * pAgent = dAgents[iAgent];
if ( pAgent->m_tDesc.m_bBlackhole )
continue;
if ( pAgent->State()==AGENT_QUERYED || pAgent->State()==AGENT_REPLY || pAgent->State()==AGENT_PREREPLY )
{
assert ( !pAgent->m_tDesc.m_sPath.IsEmpty() || pAgent->m_tDesc.m_iPort>0 );
@@ -1560,8 +1557,6 @@ int RemoteWaitForAgents ( AgentsVector & dAgents, int iTimeout, IReplyParser_t &
{
NetEventsIterator_t & tEvent = pEvents->IterateGet();
AgentConn_t & tAgent = *(AgentConn_t *)tEvent.m_pData;
if ( tAgent.m_tDesc.m_bBlackhole )
continue;
if (!( tAgent.State()==AGENT_QUERYED || tAgent.State()==AGENT_REPLY || tAgent.State()==AGENT_PREREPLY ))
continue;
@@ -1727,9 +1722,7 @@ int RemoteWaitForAgents ( AgentsVector & dAgents, int iTimeout, IReplyParser_t &
ARRAY_FOREACH ( iAgent, dAgents )
{
AgentConn_t * pAgent = dAgents[iAgent];
if ( pAgent->m_tDesc.m_bBlackhole )
pAgent->Close ();
else if ( bTimeout && ( pAgent->State()==AGENT_QUERYED || pAgent->State()==AGENT_PREREPLY ||
if ( bTimeout && ( pAgent->State()==AGENT_QUERYED || pAgent->State()==AGENT_PREREPLY ||
( pAgent->State()==AGENT_REPLY && pAgent->m_iReplyRead!=pAgent->m_iReplySize ) ) )
{
assert ( !pAgent->m_dResults.GetLength() );
@@ -1768,12 +1761,9 @@ class ThdWorkPool_c : ISphNoncopyable
CSphAtomic m_iActiveThreads;
private:
CSphFixedVector<AgentWorkContext_t> m_dData; // works array
CircularBuffer_T<AgentWorkContext_t> m_dWork; // works array
int m_iHead; // ring buffer begin
int m_iTail; // ring buffer end
CSphAtomic m_iWorksCount; // count of works to be done
CSphAtomic m_iWorksCount; // count of works to be done
volatile int m_iAgentsDone; // count of agents that finished their works
volatile int m_iAgentsReported; // count of agents that reported of their work done
volatile bool m_bIsDestroying; // help to keep at least 1 worker thread active
@@ -1799,29 +1789,17 @@ class ThdWorkPool_c : ISphNoncopyable
bool HasIncompleteWorks () const
{
return ( m_iWorksCount.GetValue ()>0 );
}
void SetWorksCount ( int iWorkers )
{
m_iWorksCount.SetValue (iWorkers);
}
void AddWorksCount ( int iWorkers )
{
m_iWorksCount.Add ( iWorkers );
return ( m_iWorksCount.GetValue()>0 );
}
static void PoolThreadFunc ( void * pArg );
};
ThdWorkPool_c::ThdWorkPool_c ( int iLen )
: m_dData ( iLen + 1 )
: m_dWork ( iLen )
{
m_tCrashQuery = SphCrashLogger_c::GetQuery(); // transfer query info for crash logger to new thread
m_iTail = m_iHead = 0;
m_iWorksCount.SetValue (0);
m_iAgentsDone = m_iAgentsReported = 0;
m_bIsDestroying = false;
@@ -1839,20 +1817,16 @@ ThdWorkPool_c::~ThdWorkPool_c ()
void ThdWorkPool_c::Pop ( AgentWorkContext_t & tNext )
{
tNext = AgentWorkContext_t();
if ( m_iTail==m_iHead ) // quick path for empty pool
if ( !m_dWork.GetLength() ) // quick path for empty pool
return;
CSphScopedLock<CSphMutex> tData ( m_tDataLock ); // lock on create, unlock on destroy
if ( m_iTail==m_iHead ) // it might be empty now as another thread could steal work till that moment
if ( !m_dWork.GetLength() ) // it might be empty now as another thread could steal work till that moment
return;
tNext = m_dData[m_iHead];
tNext = m_dWork.Pop();
assert ( tNext.m_pfn );
#ifndef NDEBUG
m_dData[m_iHead] = AgentWorkContext_t(); // to make sure that we don't rewrite valid elements
#endif
m_iHead = ( m_iHead+1 ) % m_dData.GetLength();
}
void ThdWorkPool_c::Push ( const AgentWorkContext_t & tElem )
@@ -1861,14 +1835,14 @@ void ThdWorkPool_c::Push ( const AgentWorkContext_t & tElem )
return;
CSphScopedLock<CSphMutex> tData ( m_tDataLock );
RawPush ( tElem );
m_dWork.Push() = tElem;
m_iWorksCount.Inc();
}
void ThdWorkPool_c::RawPush ( const AgentWorkContext_t & tElem )
{
assert ( !m_dData[m_iTail].m_pfn ); // to make sure that we don't rewrite valid elements
m_dData[m_iTail] = tElem;
m_iTail = ( m_iTail+1 ) % m_dData.GetLength();
m_dWork.Push() = tElem;
m_iWorksCount.Inc();
}
int ThdWorkPool_c::FetchReadyCount ()
@@ -1895,53 +1869,38 @@ void ThdWorkPool_c::PoolThreadFunc ( void * pArg )
sphLogDebugv ("Thread func started for %p, now %d threads active", pArg, (DWORD)pPool->m_iActiveThreads);
SphCrashLogger_c::SetLastQuery ( pPool->m_tCrashQuery );
int iSpinCount = 0;
int iPopCount = 0;
AgentWorkContext_t tNext;
while (true)
for ( ; !pPool->m_bIsDestroying; )
{
if ( !tNext.m_pfn ) // pop new work if current is done
{
iSpinCount = 0;
++iPopCount;
pPool->Pop ( tNext );
if ( !tNext.m_pfn ) // if there is no work at queue - worker done
{
// this is last worker. Let us keep it while pool is alive.
if ( !pPool->m_bIsDestroying && pPool->m_iActiveThreads==1 )
// keep all threads alive due to retry will add works from outside
// FIXME!!! switch to common worker pool
if ( !pPool->m_bIsDestroying )
{
// this thread is 'virtually' inactive also
if ( !pPool->m_iWorksCount.GetValue () )
{
sphSleepMsec ( 1 );
continue;
}
sphSleepMsec ( 1 );
continue;
} else
{
break;
}
break;
}
}
tNext.m_pfn ( &tNext );
pPool->m_iWorksCount.Dec();
if ( tNext.m_iAgentsDone || !tNext.m_pfn )
{
CSphScopedLock<CSphMutex> tStat ( pPool->m_tStatLock );
pPool->m_iAgentsDone += tNext.m_iAgentsDone;
if (!tNext.m_pfn)
pPool->m_iWorksCount.Dec();
pPool->m_tChanged.SetEvent();
}
iSpinCount++;
if ( iSpinCount>1 && iSpinCount<4 ) // it could be better not to do the same work
{
pPool->Push ( tNext );
tNext = AgentWorkContext_t();
} else if ( pPool->m_iWorksCount.GetValue ()>1 && iPopCount>pPool->m_iWorksCount.GetValue () ) // should sleep on queue wrap
{
iPopCount = 0;
sphSleepMsec ( 1 );
}
pPool->Push ( tNext );
tNext = AgentWorkContext_t();
}
--pPool->m_iActiveThreads;
sphLogDebugv ( "Thread func finished for %p, now %d threads active", pArg, (DWORD)pPool->m_iActiveThreads );
@@ -2054,7 +2013,7 @@ class CSphRemoteAgentsController : public ISphRemoteAgentsController
// check that there are no works to do
virtual bool IsDone () const override
{
return m_pWorkerPool->HasIncompleteWorks()==0;
return !m_pWorkerPool->HasIncompleteWorks();
}
// block execution while there are works to do
@@ -2066,9 +2025,9 @@ class CSphRemoteAgentsController : public ISphRemoteAgentsController
return (m_pWorkerPool->GetReadyCount()>0 );
}
virtual bool FetchReadyAgents () override
virtual int FetchReadyAgents () override
{
return (m_pWorkerPool->FetchReadyCount ()>0);
return m_pWorkerPool->FetchReadyCount();
}
virtual void WaitAgentsEvent () override;
@@ -2109,7 +2068,6 @@ CSphRemoteAgentsController::CSphRemoteAgentsController ( int iThreads, AgentsVec
if ( iThreads>1 )
{
m_pWorkerPool->SetWorksCount ( dAgents.GetLength() );
ARRAY_FOREACH ( i, dAgents )
{
tCtx.m_ppAgents = dAgents.Begin()+i;
@@ -2118,7 +2076,6 @@ CSphRemoteAgentsController::CSphRemoteAgentsController ( int iThreads, AgentsVec
}
} else
{
m_pWorkerPool->SetWorksCount ( 1 );
tCtx.m_ppAgents = dAgents.Begin();
tCtx.m_iAgentCount = dAgents.GetLength();
for ( AgentConn_t * pAgent : dAgents )
@@ -2183,7 +2140,6 @@ int CSphRemoteAgentsController::RetryFailed ()
sphLogDebugv ( "Found %d agents in state AGENT_RETRY, reschedule them", iRetries );
tCtx.m_pfn = ThdWorkSequental;
m_pWorkerPool->Push ( tCtx );
m_pWorkerPool->AddWorksCount ( 1 );
return iRetries;
}
@@ -2324,6 +2280,7 @@ class EpollEvents_c : public IterableEvents_c
if ( m_iEFD==-1 )
sphDie ( "failed to create epoll main FD, errno=%d, %s", errno, strerror ( errno ) );
sphLogDebugv ( "epoll %d created", m_iEFD );
m_dReady.Reserve ( iSizeHint );
m_iIterEv = -1;
}
@@ -535,7 +535,7 @@ class ISphRemoteAgentsController : ISphNoncopyable
virtual bool HasReadyAgents () const = 0;
// check that some agents are done, and reset counter of them
virtual bool FetchReadyAgents () = 0;
virtual int FetchReadyAgents () = 0;
// block execution while some works finished
virtual void WaitAgentsEvent () = 0;

0 comments on commit f2f5375

Please sign in to comment.