Skip to content
Permalink
Browse files
Fix race in netloop reporting
When scheduling several network tasks we first added them to queue, and
then provide the num of queues to observer, having in mind that queue
is not run, and so, it is safe to gap between tasks and reporting.
But really queue may be in run by timeout, or even kicked by another
thread, so it became real race. So, the flow of executiong is rearranged
to avoid the race.

It resolves #915
  • Loading branch information
klirichek committed Jun 19, 2019
1 parent 2d19dbd commit 3dd8278e747d534f69fe9c3d360eef188ab7b6d5
Showing with 57 additions and 32 deletions.
  1. +23 −11 src/searchd.cpp
  2. +23 −19 src/searchdha.cpp
  3. +11 −2 src/searchdha.h
@@ -13115,24 +13115,36 @@ void PercolateMatchDocuments ( const BlobVec_t & dDocs, const PercolateOptions_t
int iStart = 0;
int iStep = iChunks>1 ? ( ( dDocs.GetLength () - 1 ) / iChunks + 1 ) : 0;

PqRequestBuilder_c tReqBuilder ( dDocs, tOpts, iStart, iStep );
iStart += iStep * dAgents.GetLength();
PqReplyParser_t tParser;
CSphRefcountedPtr<IRemoteAgentsObserver> tReporter ( GetObserver () );
ScheduleDistrJobs ( dAgents, &tReqBuilder, &tParser, tReporter );

bool bHaveRemotes = !dAgents.IsEmpty ();
int iSuccesses = 0;
int iAgentsDone = 0;
CSphScopedPtr<PqRequestBuilder_c> pReqBuilder { nullptr };
CSphScopedPtr<IReplyParser_t> pParser { nullptr };
CSphRefcountedPtr<IRemoteAgentsObserver> pReporter { nullptr };
if ( bHaveRemotes )
{
pReqBuilder = new PqRequestBuilder_c ( dDocs, tOpts, iStart, iStep );
iStart += iStep * dAgents.GetLength ();
pParser = new PqReplyParser_t;
pReporter = GetObserver();
ScheduleDistrJobs ( dAgents, pReqBuilder.Ptr(), pParser.Ptr(), pReporter );
}

LazyVector_T <CPqResult> dLocalResults;
for ( const auto & sPqIndex : *pLocalIndexes )
{
auto & dResult = dLocalResults.Add();
PQLocalMatch ( dDocs, sPqIndex, tOpts, tAcc, dResult, iStart, iStep );
iStart += iStep;
}

tReporter->Finish ();

auto iSuccesses = ( int ) tReporter->GetSucceeded ();
auto iAgentsDone = ( int ) tReporter->GetFinished ();

if ( bHaveRemotes )
{
assert ( pReporter );
pReporter->Finish ();
iSuccesses = ( int ) pReporter->GetSucceeded ();
iAgentsDone = ( int ) pReporter->GetFinished ();
}

LazyVector_T<CPqResult*> dAllResults;
for ( auto & dLocalRes : dLocalResults )
@@ -1378,6 +1378,8 @@ void SmartOutputBuffer_t::LeakTo ( CSphVector<ISphOutputBuffer *> dOut )
#define UIO_MAXIOV (1024)
#endif

// makes vector of chunks suitable to direct using in Send() or WSASend()
// returns federated size of the chunks
size_t SmartOutputBuffer_t::GetIOVec ( CSphVector<sphIovec> &dOut ) const
{
size_t iOutSize = 0;
@@ -2085,6 +2087,9 @@ int AgentConn_t::DoTFO ( struct sockaddr * pSs, int iLen )
//! Simplified wrapper for ScheduleDistrJobs, wait for finish and return succeeded
int PerformRemoteTasks ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser )
{
if ( dRemotes.IsEmpty() )
return 0;

CSphRefcountedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dRemotes, pQuery, pParser, tReporter );
tReporter->Finish ();
@@ -2096,15 +2101,21 @@ int PerformRemoteTasks ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery
/// jobs themselves are ref-counted and owned by nobody (they're just released on finish, so
/// if nobody waits them (say, blackhole), they just dissapeared).
/// on return blackholes removed from dRemotes
void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser
, IRemoteAgentsObserver * pReporter, int iQueryRetry, int iQueryDelay )
void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser,
IReporter_t * pReporter, int iQueryRetry, int iQueryDelay )
{
// sphLogSupress ( "L ", SPH_LOG_VERBOSE_DEBUG );
// sphLogSupress ( "- ", SPH_LOG_VERBOSE_DEBUG );
// TimePrefixed::TimeStart();
assert ( pReporter );
sphLogDebugv ( "S ==========> ScheduleDistrJobs() for %d remotes", dRemotes.GetLength () );

if ( dRemotes.IsEmpty () )
{
sphWarning ("Empty remotes provided to ScheduleDistrJobs. Consider to save resources and avoid it");
return;
}

bool bNeedKick = false; // if some of connections falled to waiting and need to kick the poller.
ARRAY_FOREACH ( i, dRemotes )
{
@@ -2114,6 +2125,7 @@ void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery
// start the actual job.
// It might lucky be completed immediately. Or, it will be acquired by async network
// (and addreffed there in the loop)
pReporter->Add ( 1 );
pConnection->StartRemoteLoopTry ();
bNeedKick |= pConnection->FireKick ();

@@ -2123,12 +2135,10 @@ void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery
sphLogDebugv ( "S Remove blackhole()" );
SafeRelease ( pConnection );
dRemotes.RemoveFast ( i-- );
pReporter->Add ( -1 );
}
}

if ( pReporter )
pReporter->Add ( dRemotes.GetLength () );

if ( bNeedKick )
{
sphLogDebugA ( "A Events need fire. Do it..." );
@@ -3761,20 +3771,16 @@ class CRemoteAgentsObserver : public IRemoteAgentsObserver

void Add ( int iTasks ) final
{
m_iTasks.Add ( iTasks );
m_bGotTasks = true;
m_iTasks += iTasks;
}

// check that there are no works to do
bool IsDone () const final
{
if ( m_bGotTasks )
{
if ( m_iFinished > m_iTasks )
sphWarning ( "Orphaned chain detected (expected %d, got %d)", (int)m_iTasks.GetValue(), (int)m_iFinished.GetValue() );
return m_iFinished>=m_iTasks;
}
return false;
if ( m_iFinished > m_iTasks )
sphWarning ( "Orphaned chain detected (expected %d, got %d)", (int)m_iTasks, (int)m_iFinished );

return m_iFinished>=m_iTasks;
}

// block execution until all tasks are finished
@@ -3802,11 +3808,9 @@ class CRemoteAgentsObserver : public IRemoteAgentsObserver

private:
CSphAutoEvent m_tChanged; ///< the signaller
CSphAtomic m_iSucceeded { 0 }; //< num of tasks finished successfully
CSphAtomic m_iFinished { 0 }; //< num of tasks finished.
CSphAtomic m_iTasks { 0 }; //< total num of tasks
bool m_bGotTasks = false;

CSphAtomic m_iSucceeded; //< num of tasks finished successfully
CSphAtomic m_iFinished; //< num of tasks finished.
CSphAtomic m_iTasks; //< total num of tasks
};

IRemoteAgentsObserver * GetObserver ()
@@ -239,7 +239,7 @@ struct HostDashboard_t : public ISphRefcountedMT
PersistentConnectionsPool_c * m_pPersPool = nullptr; // persistence pool also lives here, one per dashboard

mutable RwLock_t m_dMetricsLock; // guards everything essential (see thread annotations)
int64_t m_iLastAnswerTime GUARDED_BY ( m_dMetricsLock ) = 0; // updated when we get an answer from the host
CSphAtomicL m_iLastAnswerTime GUARDED_BY ( m_dMetricsLock ); // updated when we get an answer from the host
int64_t m_iLastQueryTime GUARDED_BY ( m_dMetricsLock ) = 0; // updated when we send a query to a host
int64_t m_iErrorsARow GUARDED_BY (
m_dMetricsLock ) = 0; // num of errors a row, updated when we update the general statistic.
@@ -370,8 +370,17 @@ extern int g_iAgentRetryDelay;

struct IReporter_t : ISphRefcountedMT
{
// called by netloop - initially feeds reporter with tasks
// For every task just before start querying it calls Add(1).
// If task is not to be traced (blackhole), it then calls Add(-1).
virtual void Add ( int iTasks ) = 0;

// called by netloop - when one of the task is finished (and tells, success or not). Good point for callback!
// false returned in case of permanent error (dead; retries limit exceeded) and when aborting due to shutdown.
virtual void Report ( bool bSuccess ) = 0;

// called by outline observer, or by netloop checking for orphaned
// must return 'true' if reporter is abandoned - i.e. if all expected connections are finished.
virtual bool IsDone () const = 0;
protected:
virtual ~IReporter_t () {};
@@ -618,7 +627,7 @@ class IRemoteAgentsObserver : public IReporter_t
IRemoteAgentsObserver * GetObserver ();

void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser,
IRemoteAgentsObserver * pReporter=nullptr, int iQueryRetry = -1, int iQueryDelay = -1 );
IReporter_t * pReporter, int iQueryRetry = -1, int iQueryDelay = -1 );

// simplified full task - schedule jobs, wait for complete, report num of succeeded
int PerformRemoteTasks ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser );

0 comments on commit 3dd8278

Please sign in to comment.