Permalink
Browse files

Fix #348, ref-counted ptr fix

  • Loading branch information...
klirichek committed Aug 13, 2018
1 parent 32c40ea commit 511a3cf2c1f1858f6e94fe9f8175b7216db3cbd6
Showing with 20 additions and 13 deletions.
  1. +4 −4 src/searchd.cpp
  2. +8 −5 src/searchdha.cpp
  3. +8 −4 src/searchdha.h
@@ -7845,7 +7845,7 @@ void SearchHandler_c::RunSubset ( int iStart, int iEnd )
// connect to remote agents and query them, if required
CSphScopedPtr<SearchRequestBuilder_t> tReqBuilder { nullptr };
CSphScopedPtr<IRemoteAgentsObserver> tReporter { nullptr };
CSphRefcountedPtr<IRemoteAgentsObserver> tReporter { nullptr };
CSphScopedPtr<IReplyParser_t> tParser { nullptr };
if ( !dRemotes.IsEmpty() )
{
@@ -7860,7 +7860,7 @@ void SearchHandler_c::RunSubset ( int iStart, int iEnd )
// also blackholes will be removed from this flow of remotes.
ScheduleDistrJobs ( dRemotes, tReqBuilder.Ptr (),
tParser.Ptr (),
tReporter.Ptr(), tFirst.m_iRetryCount, tFirst.m_iRetryDelay );
tReporter, tFirst.m_iRetryCount, tFirst.m_iRetryDelay );
}
/////////////////////
@@ -10314,8 +10314,8 @@ bool MakeSnippets ( CSphString sIndex, CSphVector<ExcerptQueryChained_t> & dQuer
// connect to remote agents and query them
SnippetRequestBuilder_t tReqBuilder ( &dRemoteSnippets );
SnippetReplyParser_t tParser ( &dRemoteSnippets );
CSphScopedPtr<IRemoteAgentsObserver> tReporter ( GetObserver() );
ScheduleDistrJobs ( dRemoteSnippets.m_dAgents, &tReqBuilder, &tParser, tReporter.Ptr() );
CSphRefcountedPtr<IRemoteAgentsObserver> tReporter ( GetObserver() );
ScheduleDistrJobs ( dRemoteSnippets.m_dAgents, &tReqBuilder, &tParser, tReporter );
// run local worker in current thread also
SnippetThreadFunc ( &dThreads[0] );
@@ -1301,7 +1301,7 @@ HostDashboardPtr_t cDashStorage::FindAgent ( const CSphString & sAgent ) const
return HostDashboardPtr_t(); // not found
}
void cDashStorage::GetActiveDashes ( VecRefPtrs_t<HostDashboard_t *> & dAgents ) const
void cDashStorage::GetActiveDashes ( CSphVector<HostDashboard_t *> & dAgents ) const
{
assert ( dAgents.IsEmpty ());
CSphScopedRLock tRguard ( m_tDashLock );
@@ -2038,8 +2038,8 @@ int AgentConn_t::DoTFO ( struct sockaddr * pSs, int iLen )
//! Simplified wrapper for ScheduleDistrJobs, wait for finish and return succeeded
int PerformRemoteTasks ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser )
{
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dRemotes, pQuery, pParser, tReporter.Ptr () );
CSphRefcountedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dRemotes, pQuery, pParser, tReporter );
tReporter->Finish ();
return (int)tReporter->GetSucceeded ();
}
@@ -2105,6 +2105,8 @@ void AgentConn_t::GenericInit ( IRequestBuilder_t * pQuery, IReplyParser_t * pPa
m_bNeedKick = false;
m_pPollerTask = nullptr;
m_pReporter = pReporter;
if ( pReporter )
pReporter->AddRef();
m_pParser = pParser;
m_iRetries = iQueryRetry>=0 ? iQueryRetry * m_iMirrorsCount :
( m_pMultiAgent ? m_pMultiAgent->GetRetryLimit () : 0 );
@@ -3888,12 +3890,13 @@ class CRemoteAgentsObserver : public IRemoteAgentsObserver
{
return m_iFinished;
}
protected:
~CRemoteAgentsObserver() final {}
private:
CSphAutoEvent m_tChanged; ///< the signaller
CSphAtomic m_iSucceeded { 0 }; //< num of tasks finished successfully
CSphAtomic m_iFinished { 0 }; //< num of tasks finished.
int m_iTasks = 0; //< total num of tasks
volatile int m_iTasks = 0; //< total num of tasks
};
@@ -368,11 +368,12 @@ using MultiAgentDescRefPtr_c = CSphRefcountedPtr<MultiAgentDesc_c>;
extern int g_iAgentRetryCount;
extern int g_iAgentRetryDelay;
struct IReporter_t : ISphNoncopyable
struct IReporter_t : ISphRefcountedMT
{
virtual ~IReporter_t () {};
virtual void SetTotal ( int iTasks ) = 0;
virtual void Report ( bool bSuccess ) = 0;
protected:
virtual ~IReporter_t () {};
};
#if USE_WINDOWS
@@ -475,7 +476,7 @@ struct AgentConn_t : public ISphRefcountedMT
mutable int m_iStoreTag = -1; ///< cookie, m.b. used to 'glue' to concrete connection
int m_iWeight = -1; ///< weight of the index, will be send with query to remote host
IReporter_t * m_pReporter = nullptr; ///< used to report back when we're finished
CSphRefcountedPtr<IReporter_t> m_pReporter { nullptr }; ///< used to report back when we're finished
LPKEY m_pPollerTask = nullptr; ///< internal for poller. fixme! privatize?
bool m_bSuccess = false; ///< agent got processed, no need to retry
@@ -602,6 +603,9 @@ class IRemoteAgentsObserver : public IReporter_t
// block execution while some works finished
virtual void WaitChanges () = 0;
protected:
~IRemoteAgentsObserver() override {}
};
IRemoteAgentsObserver * GetObserver ();
@@ -705,7 +709,7 @@ class cDashStorage : public ISphNoncopyable
public:
void LinkHost ( HostDesc_t &dHost ); ///< put host into dashboard and init link to it
HostDashboardPtr_t FindAgent ( const CSphString& sAgent ) const;
void GetActiveDashes ( VecRefPtrs_t<HostDashboard_t *> & dAgents ) const;
void GetActiveDashes ( CSphVector<HostDashboard_t *> & dAgents ) const;
void CleanupOrphaned();
};

0 comments on commit 511a3cf

Please sign in to comment.