Permalink
Browse files

Catch of blended (orphaned) network tasks

  • Loading branch information...
klirichek authored and adriannuta committed Aug 17, 2018
1 parent 14c874a commit f22ae34b623906b3c5a05a06198dad2e548dc541
Showing with 85 additions and 50 deletions.
  1. +77 −45 src/searchdha.cpp
  2. +8 −5 src/searchdha.h
@@ -1725,12 +1725,15 @@ bool AgentConn_t::StartNextRetry ()
sphLogDebugA ( "%d StartNextRetry() retries=%d, ref=%d", m_iStoreTag, m_iRetries, ( int ) GetRefcount () );
m_iSock = -1;
if ( m_pMultiAgent && !IsBlackhole () && m_iRetries>=0 )
{
m_tDesc.CloneFrom ( m_pMultiAgent->ChooseAgent () );
SwitchBlackhole ();
}
if ( m_iRetries--<0 )
return m_bManyTries ? Fail ( "retries limit exceeded" ) : false;
if ( m_pMultiAgent )
m_tDesc.CloneFrom ( m_pMultiAgent->ChooseAgent () );
sphLogDebugA ( "%d Connection %p, host %s, pers=%d", m_iStoreTag, this, m_tDesc.GetMyUrl().cstr(), m_tDesc.m_bPersistent );
if ( IsPersistent() )
@@ -1745,16 +1748,24 @@ bool AgentConn_t::StartNextRetry ()
}
}
if ( IsBlackhole() )
return true;
}
// if we're blackhole, drop retries, parser, reporter and return true
// 'onetry' must be set when initialization of non-multiagent to allow only one retry
bool AgentConn_t::SwitchBlackhole ()
{
if ( IsBlackhole () )
{
sphLogDebugA ( "%d Connection %p is blackhole (no retries, no parser, no reporter)", m_iStoreTag, this );
if ( m_iRetries>0 )
m_iRetries = 0;
m_bManyTries = false;
m_iRetries = -1;
m_pParser = nullptr;
m_pReporter = nullptr;
return true;
}
return true;
return false;
}
// set up ll the stuff about async query. Namely - add timeout callback,
@@ -1766,14 +1777,45 @@ void AgentConn_t::ScheduleCallbacks ()
LazyTask ( m_iPoolerTimeout, true, BYTE ( m_dIOVec.HasUnsent () ? 1 : 2 ) );
}
void AgentConn_t::HardTimeoutCallback ()
void AgentConn_t::TimeoutCallback ()
{
if ( StateIs ( Agent_e::CONNECTING ) )
Fatal ( eTimeoutsConnect, "connect timed out" );
else
Fatal ( eTimeoutsQuery, "query timed out" );
StartRemoteLoopTry ();
sphLogDebugA ( "%d <- hard timeout (ref=%d)", m_iStoreTag, ( int ) GetRefcount () );
SetNetLoop ();
auto ePrevKind = m_eTimeoutKind;
m_eTimeoutKind = TIMEOUT_UNKNOWN;
// check if we accidentally orphaned (that is bug!)
if ( IsLast () && !IsBlackhole () )
{
sphWarning ( "Orphaned (last) connection detected!" );
return;
}
if ( m_pReporter && m_pReporter->IsDone () )
{
sphWarning ( "Orphaned (kind of done) connection detected!" );
return;
}
switch ( ePrevKind )
{
case TIMEOUT_RETRY:
if ( !DoQuery () )
StartRemoteLoopTry ();
FirePoller (); // fixme? M.b. no more necessary, since processing queue will restart on fired timeout.
sphLogDebugA ( "%d finished retry timeout ref=%d", m_iStoreTag, ( int ) GetRefcount () );
break;
case TIMEOUT_HARD:
if ( StateIs ( Agent_e::CONNECTING ) )
Fatal ( eTimeoutsConnect, "connect timed out" );
else
Fatal ( eTimeoutsQuery, "query timed out" );
StartRemoteLoopTry ();
sphLogDebugA ( "%d <- hard timeout (ref=%d)", m_iStoreTag, ( int ) GetRefcount () );
break;
case TIMEOUT_UNKNOWN:
default:
sphLogDebugA ("%d Unknown kind of timeout invoked. No action", m_iStoreTag );
}
}
void AgentConn_t::AbortCallback()
@@ -2098,7 +2140,7 @@ void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery
void AgentConn_t::GenericInit ( IRequestBuilder_t * pQuery, IReplyParser_t * pParser, IReporter_t * pReporter
, int iQueryRetry, int iQueryDelay )
{
sphLogDebugA ( "%d GenericInit() pBuilder %p, parser %p, retries %d, delay %d, ref=%d", m_iStoreTag, pQuery, pParser, m_iRetries, iQueryDelay, ( int ) GetRefcount ());
sphLogDebugA ( "%d GenericInit() pBuilder %p, parser %p, retries %d, delay %d, ref=%d", m_iStoreTag, pQuery, pParser, iQueryRetry, iQueryDelay, ( int ) GetRefcount ());
if ( iQueryDelay>=0 )
m_iDelay = iQueryDelay;
@@ -2107,13 +2149,16 @@ void AgentConn_t::GenericInit ( IRequestBuilder_t * pQuery, IReplyParser_t * pPa
m_iWaited = 0;
m_bNeedKick = false;
m_pPollerTask = nullptr;
m_pReporter = pReporter;
if ( pReporter )
pReporter->AddRef();
m_pParser = pParser;
m_iRetries = iQueryRetry>=0 ? iQueryRetry * m_iMirrorsCount :
( m_pMultiAgent ? m_pMultiAgent->GetRetryLimit () : 0 );
m_bManyTries = m_iRetries>0;;
if ( m_pMultiAgent || !SwitchBlackhole() )
{
m_pReporter = pReporter;
SafeAddRef ( pReporter );
m_pParser = pParser;
if ( iQueryRetry>=0 )
m_iRetries = iQueryRetry * m_iMirrorsCount;
m_bManyTries = m_iRetries>0;
}
SetNetLoop ( false );
State ( Agent_e::HEALTHY );
}
@@ -2157,14 +2202,6 @@ void AgentConn_t::StartRemoteLoopTry ()
sphLogDebugA ( "%d StartRemoteLoopTry() finished ref=%d", m_iStoreTag, ( int ) GetRefcount () );
}
void AgentConn_t::SoftTimeoutCallback ()
{
if ( !DoQuery () )
StartRemoteLoopTry ();
FirePoller (); // fixme? M.b. no more necessary, since processing queue will restart on fired timeout.
sphLogDebugA ( "%d finished retry timeout ref=%d", m_iStoreTag, ( int ) GetRefcount () );
}
// do oneshot query. Return true on any success
bool AgentConn_t::DoQuery()
{
@@ -2611,7 +2648,6 @@ struct Task_t
int m_iTimeoutIdx = -1; // idx inside timeouts bin heap (or -1 if not there)
int m_ifd = -1;
int m_iStoredfd = -1; //
bool m_bHardTimeout = false;
BYTE m_uIOActive = NO; // active IO callbacks: 0-none, 1-r+w, 2-r
BYTE m_uIOChanged = NO; // need IO changes: dequeue (if !m_uIOActive), 1-set to rw, 2-set to ro
};
@@ -3551,10 +3587,9 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
sphLogDebugL ( "L timeout happens for %p task", pTask );
m_dTimeouts.Pop ();
// adopt timeout action, keep connection
bool bHardTimeout = pTask->m_bHardTimeout;
// Delete task, adopt connection.
// Invoke Timeoutcallback for it
CSphRefcountedPtr<AgentConn_t> pKeepConn ( DeleteTask ( pTask, false ) );
sphLogDebugL ( "%s", m_dTimeouts.DebugDump ( "L heap:" ).cstr () );
if ( pKeepConn )
{
@@ -3565,11 +3600,7 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
* signal it, and we catch the events on the next round.
*/
sphLogDebugL ( "L timeout action started %d", bHardTimeout );
pKeepConn->SetNetLoop ();
if ( bHardTimeout )
pKeepConn->HardTimeoutCallback ();
else
pKeepConn->SoftTimeoutCallback ();
pKeepConn->TimeoutCallback ();
sphLogDebugL ( "L timeout action finished" );
}
}
@@ -3734,7 +3765,7 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
/// Enqueue or perform a timer functor
/// caller guarantee that interhal task exists
bool EnqueueNewTask ( AgentConn_t * pConnection, int64_t iTimeoutMS, bool bTimeoutKind, BYTE uActivateIO )
bool EnqueueNewTask ( AgentConn_t * pConnection, int64_t iTimeoutMS, BYTE uActivateIO )
{
if ( pConnection->m_pPollerTask )
{
@@ -3752,8 +3783,6 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
, pConnection->m_iStoreTag, pConnection, iTimeoutMS, uActivateIO );
pTask->m_iPlannedTimeout = iTimeoutMS;
pTask->m_bHardTimeout = bTimeoutKind;
if ( uActivateIO )
pTask->m_uIOChanged = uActivateIO;
@@ -3838,7 +3867,8 @@ void AgentConn_t::LazyTask ( int64_t iTimeoutMS, bool bHardTimeout, BYTE uActiva
assert ( iTimeoutMS>0 );
m_bNeedKick = !InNetLoop();
LazyPoller ().EnqueueNewTask ( this, iTimeoutMS, bHardTimeout, uActivateIO );
m_eTimeoutKind = bHardTimeout ? TIMEOUT_HARD : TIMEOUT_RETRY;
LazyPoller ().EnqueueNewTask ( this, iTimeoutMS, uActivateIO );
}
void AgentConn_t::LazyDeleteOrChange ( int64_t iTimeoutMS )
@@ -3885,7 +3915,9 @@ class CRemoteAgentsObserver : public IRemoteAgentsObserver
// check that there are no works to do
bool IsDone () const final
{
return m_iFinished==m_iTasks;
if ( m_iTasks>=0 )
return m_iFinished==m_iTasks;
return false;
}
// block execution untill all tasks are finished
@@ -3916,7 +3948,7 @@ class CRemoteAgentsObserver : public IRemoteAgentsObserver
CSphAutoEvent m_tChanged; ///< the signaller
CSphAtomic m_iSucceeded { 0 }; //< num of tasks finished successfully
CSphAtomic m_iFinished { 0 }; //< num of tasks finished.
volatile int m_iTasks = 0; //< total num of tasks
volatile int m_iTasks = -1; //< total num of tasks
};
@@ -372,6 +372,7 @@ struct IReporter_t : ISphRefcountedMT
{
virtual void SetTotal ( int iTasks ) = 0;
virtual void Report ( bool bSuccess ) = 0;
virtual bool IsDone () const = 0;
protected:
virtual ~IReporter_t () {};
};
@@ -459,6 +460,8 @@ class IOVec_c
/// remote agent connection (local per-query state)
struct AgentConn_t : public ISphRefcountedMT
{
enum ETimeoutKind { TIMEOUT_UNKNOWN, TIMEOUT_RETRY, TIMEOUT_HARD, };
public:
AgentDesc_t m_tDesc; ///< desc of my host // fixme! turn to ref to MultiAgent mirror?
int m_iSock = -1;
@@ -495,8 +498,7 @@ struct AgentConn_t : public ISphRefcountedMT
void ErrorCallback ( int64_t iWaited );
void SendCallback ( int64_t iWaited, DWORD uSent );
void RecvCallback ( int64_t iWaited, DWORD uReceived );
void SoftTimeoutCallback();
void HardTimeoutCallback();
void TimeoutCallback ();
void AbortCallback();
#if USE_WINDOWS
@@ -521,7 +523,8 @@ struct AgentConn_t : public ISphRefcountedMT
int m_iDelay { g_iAgentRetryDelay }; ///< delay between retries
// active timeout (directly used by poller)
int64_t m_iPoolerTimeout = -1; ///< m.b. query, or connect+query when TCP_FASTOPEN
int64_t m_iPoolerTimeout = -1; ///< m.b. query, or connect+query when TCP_FASTOPEN
ETimeoutKind m_eTimeoutKind { TIMEOUT_UNKNOWN };
// receiving buffer stuff
CSphFixedVector<BYTE> m_dReplyBuf { 0 };
@@ -582,15 +585,15 @@ struct AgentConn_t : public ISphRefcountedMT
bool SendQuery (DWORD uSent = 0);
bool ReceiveAnswer (DWORD uReceived = 0);
bool CommitResult ();
bool SwitchBlackhole ();
};
using VectorAgentConn_t = CSphVector<AgentConn_t *>;
using VecRefPtrsAgentConn_t = VecRefPtrs_t<AgentConn_t *>;
class IRemoteAgentsObserver : public IReporter_t
{
public:
// check that there are no works to do
virtual bool IsDone () const = 0;
// get num of succeeded agents
virtual long GetSucceeded () const = 0;

0 comments on commit f22ae34

Please sign in to comment.