Skip to content

Commit

Permalink
fixed #158 blackhole loop got locked; added logs to net code
Browse files Browse the repository at this point in the history
  • Loading branch information
tomatolog committed Dec 25, 2017
1 parent c9f64d7 commit 37f574c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 77 deletions.
16 changes: 12 additions & 4 deletions src/searchd.cpp
Expand Up @@ -8188,7 +8188,7 @@ void SearchHandler_c::RunSubset ( int iStart, int iEnd )
tDistCtrl->WaitAgentsEvent();
bDistDone = tDistCtrl->IsDone();
// wait for remote queries to complete
if ( tDistCtrl->FetchReadyAgents() )
if ( tDistCtrl->FetchReadyAgents()>0 )
{
SearchReplyParser_c tParser ( iStart, iEnd );
int iMsecLeft = iAgentQueryTimeout - (int)( tmLocal/1000 );
Expand Down Expand Up @@ -14859,6 +14859,8 @@ static void BlackholeTick()
}
assert ( dAgents.GetLength() && tBlackholes.m_dPtrs.GetLength() );

sphLogDebugv ( "blackhole agents %d", dAgents.GetLength() );

BlackholeRequestBuilder_t tReq ( tBlackholes.m_dPtrs );
CSphScopedPtr<ISphRemoteAgentsController> tDistCtrl ( GetAgentsController ( 1, dAgents, tReq, g_iBlackholeTimeout, g_iBlackholeRetries ) );
if ( g_bShutdown )
Expand All @@ -14867,20 +14869,26 @@ static void BlackholeTick()
bool bDistDone = false;
while ( !bDistDone )
{
if ( !tDistCtrl->HasReadyAgents () )
if ( !tDistCtrl->HasReadyAgents () && !tDistCtrl->IsDone() )
tDistCtrl->WaitAgentsEvent();
bDistDone = tDistCtrl->IsDone();

if ( g_bShutdown )
return;

// need to fetch data to prevent network error reports at agent(blackhole)
if ( tDistCtrl->FetchReadyAgents() )
int iGot = tDistCtrl->FetchReadyAgents();
sphLogDebugv ( "blackhole got %d, done %d", iGot, (int)bDistDone );

if ( iGot>0 )
{
BlackholeReplyParser_t tParser;
RemoteWaitForAgents ( dAgents, g_iBlackholeTimeout, tParser );
if ( tDistCtrl->RetryFailed()>0 )
int iRetries = tDistCtrl->RetryFailed()>0;
if ( iRetries>0 )
bDistDone = false;

sphLogDebugv ( "blackhole %d retries", iRetries );
}
}
}
Expand Down
101 changes: 29 additions & 72 deletions src/searchdha.cpp
Expand Up @@ -608,7 +608,7 @@ AgentConn_t::~AgentConn_t ()

void AgentConn_t::State ( AgentState_e eState )
{
sphLogDebugv ( "state %d > %d, sock %d, order %d", m_eConnState, eState, m_iSock, m_iStoreTag );
sphLogDebugv ( "state %d > %d, sock %d, order %d, %p", m_eConnState, eState, m_iSock, m_iStoreTag, this );
m_eConnState = eState;
}

Expand Down Expand Up @@ -1524,9 +1524,6 @@ int RemoteWaitForAgents ( AgentsVector & dAgents, int iTimeout, IReplyParser_t &
ARRAY_FOREACH ( iAgent, dAgents )
{
AgentConn_t * pAgent = dAgents[iAgent];
if ( pAgent->m_tDesc.m_bBlackhole )
continue;

if ( pAgent->State()==AGENT_QUERYED || pAgent->State()==AGENT_REPLY || pAgent->State()==AGENT_PREREPLY )
{
assert ( !pAgent->m_tDesc.m_sPath.IsEmpty() || pAgent->m_tDesc.m_iPort>0 );
Expand Down Expand Up @@ -1560,8 +1557,6 @@ int RemoteWaitForAgents ( AgentsVector & dAgents, int iTimeout, IReplyParser_t &
{
NetEventsIterator_t & tEvent = pEvents->IterateGet();
AgentConn_t & tAgent = *(AgentConn_t *)tEvent.m_pData;
if ( tAgent.m_tDesc.m_bBlackhole )
continue;
if (!( tAgent.State()==AGENT_QUERYED || tAgent.State()==AGENT_REPLY || tAgent.State()==AGENT_PREREPLY ))
continue;

Expand Down Expand Up @@ -1727,9 +1722,7 @@ int RemoteWaitForAgents ( AgentsVector & dAgents, int iTimeout, IReplyParser_t &
ARRAY_FOREACH ( iAgent, dAgents )
{
AgentConn_t * pAgent = dAgents[iAgent];
if ( pAgent->m_tDesc.m_bBlackhole )
pAgent->Close ();
else if ( bTimeout && ( pAgent->State()==AGENT_QUERYED || pAgent->State()==AGENT_PREREPLY ||
if ( bTimeout && ( pAgent->State()==AGENT_QUERYED || pAgent->State()==AGENT_PREREPLY ||
( pAgent->State()==AGENT_REPLY && pAgent->m_iReplyRead!=pAgent->m_iReplySize ) ) )
{
assert ( !pAgent->m_dResults.GetLength() );
Expand Down Expand Up @@ -1768,12 +1761,9 @@ class ThdWorkPool_c : ISphNoncopyable
CSphAtomic m_iActiveThreads;

private:
CSphFixedVector<AgentWorkContext_t> m_dData; // works array
CircularBuffer_T<AgentWorkContext_t> m_dWork; // works array

int m_iHead; // ring buffer begin
int m_iTail; // ring buffer end

CSphAtomic m_iWorksCount; // count of works to be done
CSphAtomic m_iWorksCount; // count of works to be done
volatile int m_iAgentsDone; // count of agents that finished their works
volatile int m_iAgentsReported; // count of agents that reported of their work done
volatile bool m_bIsDestroying; // help to keep at least 1 worker thread active
Expand All @@ -1799,29 +1789,17 @@ class ThdWorkPool_c : ISphNoncopyable

bool HasIncompleteWorks () const
{
return ( m_iWorksCount.GetValue ()>0 );
}

void SetWorksCount ( int iWorkers )
{
m_iWorksCount.SetValue (iWorkers);
}

void AddWorksCount ( int iWorkers )
{
m_iWorksCount.Add ( iWorkers );
return ( m_iWorksCount.GetValue()>0 );
}

static void PoolThreadFunc ( void * pArg );
};

ThdWorkPool_c::ThdWorkPool_c ( int iLen )
: m_dData ( iLen + 1 )
: m_dWork ( iLen )
{
m_tCrashQuery = SphCrashLogger_c::GetQuery(); // transfer query info for crash logger to new thread

m_iTail = m_iHead = 0;
m_iWorksCount.SetValue (0);
m_iAgentsDone = m_iAgentsReported = 0;
m_bIsDestroying = false;

Expand All @@ -1839,20 +1817,16 @@ ThdWorkPool_c::~ThdWorkPool_c ()
void ThdWorkPool_c::Pop ( AgentWorkContext_t & tNext )
{
tNext = AgentWorkContext_t();
if ( m_iTail==m_iHead ) // quick path for empty pool
if ( !m_dWork.GetLength() ) // quick path for empty pool
return;

CSphScopedLock<CSphMutex> tData ( m_tDataLock ); // lock on create, unlock on destroy

if ( m_iTail==m_iHead ) // it might be empty now as another thread could steal work till that moment
if ( !m_dWork.GetLength() ) // it might be empty now as another thread could steal work till that moment
return;

tNext = m_dData[m_iHead];
tNext = m_dWork.Pop();
assert ( tNext.m_pfn );
#ifndef NDEBUG
m_dData[m_iHead] = AgentWorkContext_t(); // to make sure that we don't rewrite valid elements
#endif
m_iHead = ( m_iHead+1 ) % m_dData.GetLength();
}

void ThdWorkPool_c::Push ( const AgentWorkContext_t & tElem )
Expand All @@ -1861,14 +1835,14 @@ void ThdWorkPool_c::Push ( const AgentWorkContext_t & tElem )
return;

CSphScopedLock<CSphMutex> tData ( m_tDataLock );
RawPush ( tElem );
m_dWork.Push() = tElem;
m_iWorksCount.Inc();
}

void ThdWorkPool_c::RawPush ( const AgentWorkContext_t & tElem )
{
assert ( !m_dData[m_iTail].m_pfn ); // to make sure that we don't rewrite valid elements
m_dData[m_iTail] = tElem;
m_iTail = ( m_iTail+1 ) % m_dData.GetLength();
m_dWork.Push() = tElem;
m_iWorksCount.Inc();
}

int ThdWorkPool_c::FetchReadyCount ()
Expand All @@ -1895,53 +1869,38 @@ void ThdWorkPool_c::PoolThreadFunc ( void * pArg )
sphLogDebugv ("Thread func started for %p, now %d threads active", pArg, (DWORD)pPool->m_iActiveThreads);
SphCrashLogger_c::SetLastQuery ( pPool->m_tCrashQuery );

int iSpinCount = 0;
int iPopCount = 0;
AgentWorkContext_t tNext;
while (true)
for ( ; !pPool->m_bIsDestroying; )
{
if ( !tNext.m_pfn ) // pop new work if current is done
{
iSpinCount = 0;
++iPopCount;
pPool->Pop ( tNext );
if ( !tNext.m_pfn ) // if there is no work at queue - worker done
{
// this is last worker. Let us keep it while pool is alive.
if ( !pPool->m_bIsDestroying && pPool->m_iActiveThreads==1 )
// keep all threads alive due to retry will add works from outside
// FIXME!!! switch to common worker pool
if ( !pPool->m_bIsDestroying )
{
// this thread is 'virtually' inactive also
if ( !pPool->m_iWorksCount.GetValue () )
{
sphSleepMsec ( 1 );
continue;
}
sphSleepMsec ( 1 );
continue;
} else
{
break;
}
break;
}
}

tNext.m_pfn ( &tNext );
pPool->m_iWorksCount.Dec();
if ( tNext.m_iAgentsDone || !tNext.m_pfn )
{
CSphScopedLock<CSphMutex> tStat ( pPool->m_tStatLock );
pPool->m_iAgentsDone += tNext.m_iAgentsDone;
if (!tNext.m_pfn)
pPool->m_iWorksCount.Dec();
pPool->m_tChanged.SetEvent();
}

iSpinCount++;
if ( iSpinCount>1 && iSpinCount<4 ) // it could be better not to do the same work
{
pPool->Push ( tNext );
tNext = AgentWorkContext_t();
} else if ( pPool->m_iWorksCount.GetValue ()>1 && iPopCount>pPool->m_iWorksCount.GetValue () ) // should sleep on queue wrap
{
iPopCount = 0;
sphSleepMsec ( 1 );
}

pPool->Push ( tNext );
tNext = AgentWorkContext_t();
}
--pPool->m_iActiveThreads;
sphLogDebugv ( "Thread func finished for %p, now %d threads active", pArg, (DWORD)pPool->m_iActiveThreads );
Expand Down Expand Up @@ -2054,7 +2013,7 @@ class CSphRemoteAgentsController : public ISphRemoteAgentsController
// check that there are no works to do
virtual bool IsDone () const override
{
return m_pWorkerPool->HasIncompleteWorks()==0;
return !m_pWorkerPool->HasIncompleteWorks();
}

// block execution while there are works to do
Expand All @@ -2066,9 +2025,9 @@ class CSphRemoteAgentsController : public ISphRemoteAgentsController
return (m_pWorkerPool->GetReadyCount()>0 );
}

virtual bool FetchReadyAgents () override
virtual int FetchReadyAgents () override
{
return (m_pWorkerPool->FetchReadyCount ()>0);
return m_pWorkerPool->FetchReadyCount();
}

virtual void WaitAgentsEvent () override;
Expand Down Expand Up @@ -2109,7 +2068,6 @@ CSphRemoteAgentsController::CSphRemoteAgentsController ( int iThreads, AgentsVec

if ( iThreads>1 )
{
m_pWorkerPool->SetWorksCount ( dAgents.GetLength() );
ARRAY_FOREACH ( i, dAgents )
{
tCtx.m_ppAgents = dAgents.Begin()+i;
Expand All @@ -2118,7 +2076,6 @@ CSphRemoteAgentsController::CSphRemoteAgentsController ( int iThreads, AgentsVec
}
} else
{
m_pWorkerPool->SetWorksCount ( 1 );
tCtx.m_ppAgents = dAgents.Begin();
tCtx.m_iAgentCount = dAgents.GetLength();
for ( AgentConn_t * pAgent : dAgents )
Expand Down Expand Up @@ -2183,7 +2140,6 @@ int CSphRemoteAgentsController::RetryFailed ()
sphLogDebugv ( "Found %d agents in state AGENT_RETRY, reschedule them", iRetries );
tCtx.m_pfn = ThdWorkSequental;
m_pWorkerPool->Push ( tCtx );
m_pWorkerPool->AddWorksCount ( 1 );
return iRetries;
}

Expand Down Expand Up @@ -2324,6 +2280,7 @@ class EpollEvents_c : public IterableEvents_c
if ( m_iEFD==-1 )
sphDie ( "failed to create epoll main FD, errno=%d, %s", errno, strerror ( errno ) );

sphLogDebugv ( "epoll %d created", m_iEFD );
m_dReady.Reserve ( iSizeHint );
m_iIterEv = -1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/searchdha.h
Expand Up @@ -535,7 +535,7 @@ class ISphRemoteAgentsController : ISphNoncopyable
virtual bool HasReadyAgents () const = 0;

// check that some agents are done, and reset counter of them
virtual bool FetchReadyAgents () = 0;
virtual int FetchReadyAgents () = 0;

// block execution while some works finished
virtual void WaitAgentsEvent () = 0;
Expand Down

0 comments on commit 37f574c

Please sign in to comment.