Permalink
Browse files

Fix #378 fix leak in search

  • Loading branch information...
klirichek committed Aug 8, 2018
1 parent 593ab0c commit 978d839641ad6040c22c9fc3b703b1a02685f266
Showing with 27 additions and 37 deletions.
  1. +13 −35 src/searchd.cpp
  2. +10 −0 src/searchdha.cpp
  3. +4 −2 src/searchdha.h
@@ -7841,19 +7841,20 @@ void SearchHandler_c::RunSubset ( int iStart, int iEnd )
// connect to remote agents and query them, if required
CSphScopedPtr<SearchRequestBuilder_t> tReqBuilder { nullptr };
CSphScopedPtr<IRemoteAgentsObserver> tReporter { nullptr };
CSphScopedPtr<IReplyParser_t> tParser { nullptr };
if ( !dRemotes.IsEmpty() )
{
if ( m_pProfile )
m_pProfile->Switch ( SPH_QSTATE_DIST_CONNECT );
tReqBuilder = new SearchRequestBuilder_t ( m_dQueries, iStart, iEnd, iDivideLimits );
tParser = new SearchReplyParser_c ( iStart, iEnd );
tReporter = GetObserver();
// run remote queries. tReporter will tell us when they're finished.
// also blackholes will be removed from this flow of remotes.
ScheduleDistrJobs ( dRemotes, tReqBuilder.Ptr (),
new SearchReplyParser_c( iStart, iEnd ),
tParser.Ptr (),
tReporter.Ptr(), tFirst.m_iRetryCount, tFirst.m_iRetryDelay );
}
@@ -10865,11 +10866,7 @@ void HandleCommandUpdate ( ISphOutputBuffer & tOut, int iVer, InputBuffer_c & tR
// connect to remote agents and query them
UpdateRequestBuilder_t tReqBuilder ( tUpd );
UpdateReplyParser_t tParser ( &iUpdated );
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dAgents, &tReqBuilder, &tParser, tReporter.Ptr ());
tReporter->Finish();
iSuccesses += tReporter->GetSucceeded ();
iSuccesses += PerformRemoteTasks ( dAgents, &tReqBuilder, &tParser );
}
}
}
@@ -13883,26 +13880,17 @@ void HandleMysqlCallKeywords ( SqlRowBuffer_c & tOut, SqlStmt_t & tStmt, CSphStr
pDistributed->GetAllHosts ( dAgents );
int iAgentsReply = 0;
if ( dAgents.GetLength() )
if ( !dAgents.IsEmpty() )
{
// connect to remote agents and query them
KeywordsRequestBuilder_t tReqBuilder ( tSettings, sTerm );
KeywordsReplyParser_t tParser ( tSettings.m_bStats, dKeywords );
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
iAgentsReply = PerformRemoteTasks ( dAgents, &tReqBuilder, &tParser );
ScheduleDistrJobs ( dAgents, &tReqBuilder, &tParser, tReporter.Ptr() );
tReporter->Finish();
iAgentsReply = ( int ) tReporter->GetSucceeded ();
ARRAY_FOREACH ( i, dAgents )
{
const AgentConn_t * pAgent = dAgents[i];
for ( const AgentConn_t * pAgent : dAgents )
if ( !pAgent->m_bSuccess && !pAgent->m_sFailure.IsEmpty() )
{
const char * sAgent = ( pAgent->m_tDesc.m_bBlackhole ? "blackhole" : "agent" );
tFailureLog.SubmitEx ( pAgent->m_tDesc.m_sIndexes.cstr(), sIndex.cstr(), "%s %s: %s", sAgent, pAgent->m_tDesc.GetMyUrl().cstr(), pAgent->m_sFailure.cstr() );
}
}
tFailureLog.SubmitEx ( pAgent->m_tDesc.m_sIndexes.cstr(), sIndex.cstr(),
"agent %s: %s", pAgent->m_tDesc.GetMyUrl().cstr(), pAgent->m_sFailure.cstr() );
}
// process result sets
@@ -14554,9 +14542,7 @@ static void CheckPing ( int64_t iNow )
// fixme! Let's rewrite ping proc another (async) way.
PingRequestBuilder_t tReqBuilder ( (int)iNow );
PingReplyParser_t tParser ( &iReplyCookie );
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dConnections, &tReqBuilder, &tParser, tReporter.Ptr() );
tReporter->Finish();
PerformRemoteTasks ( dConnections, &tReqBuilder, &tParser );
// connect to remote agents and query them
if ( g_bShutdown )
@@ -14685,9 +14671,7 @@ static bool SendUserVar ( const char * sIndex, const char * sUserVarName, CSphVe
{
UVarRequestBuilder_t tReqBuilder ( sUserVarName, dSetValues );
UVarReplyParser_t tParser;
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dAgents, &tReqBuilder, &tParser, tReporter.Ptr() );
tReporter->Finish ();
PerformRemoteTasks ( dAgents, &tReqBuilder, &tParser );
}
// should be at the end due to swap of dSetValues values
@@ -14932,10 +14916,7 @@ void sphHandleMysqlUpdate ( StmtErrorReporter_i & tOut, const QueryParserFactory
// connect to remote agents and query them
CSphScopedPtr<IRequestBuilder_t> pRequestBuilder ( tQueryParserFactory.CreateRequestBuilder ( sQuery, tStmt ) ) ;
CSphScopedPtr<IReplyParser_t> pReplyParser ( tQueryParserFactory.CreateReplyParser ( iUpdated, iWarns ) );
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dAgents, pRequestBuilder.Ptr (), pReplyParser.Ptr (), tReporter.Ptr() );
tReporter->Finish();
iSuccesses += tReporter->GetSucceeded ();
iSuccesses += PerformRemoteTasks ( dAgents, pRequestBuilder.Ptr (), pReplyParser.Ptr () );
}
}
@@ -15713,10 +15694,7 @@ void sphHandleMysqlDelete ( StmtErrorReporter_i & tOut, const QueryParserFactory
// connect to remote agents and query them
CSphScopedPtr<IRequestBuilder_t> pRequestBuilder ( tQueryParserFactory.CreateRequestBuilder ( sQuery, tStmt ) ) ;
CSphScopedPtr<IReplyParser_t> pReplyParser ( tQueryParserFactory.CreateReplyParser ( iGot, iWarns ) );
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dAgents, pRequestBuilder.Ptr (), pReplyParser.Ptr(), tReporter.Ptr () );
tReporter->Finish();
PerformRemoteTasks ( dAgents, pRequestBuilder.Ptr (), pReplyParser.Ptr () );
// FIXME!!! report error & warnings from agents
// FIXME? profile update time too?
@@ -2016,6 +2016,16 @@ int AgentConn_t::DoTFO ( struct sockaddr * pSs, int iLen )
#endif
}
//! Simplified wrapper for ScheduleDistrJobs, wait for finish and return succeeded
int PerformRemoteTasks ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser )
{
CSphScopedPtr<IRemoteAgentsObserver> tReporter { GetObserver () };
ScheduleDistrJobs ( dRemotes, pQuery, pParser, tReporter.Ptr () );
tReporter->Finish ();
return (int)tReporter->GetSucceeded ();
}
/// Add set of works (dRemotes) to the queue.
/// jobs themselves are ref-counted and owned by nobody (they're just released on finish, so
/// if nobody waits them (say, blackhole), they just dissapeared).
@@ -604,9 +604,11 @@ class IRemoteAgentsObserver : public IReporter_t
IRemoteAgentsObserver * GetObserver ();
void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery,
IReplyParser_t * pParser, IRemoteAgentsObserver * pReporter=nullptr, int iQueryRetry = -1, int iQueryDelay = -1 );
void ScheduleDistrJobs ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser,
IRemoteAgentsObserver * pReporter=nullptr, int iQueryRetry = -1, int iQueryDelay = -1 );
// simplified full task - schedule jobs, wait for complete, report num of succeeded
int PerformRemoteTasks ( VectorAgentConn_t &dRemotes, IRequestBuilder_t * pQuery, IReplyParser_t * pParser );
/////////////////////////////////////////////////////////////////////////////
// DISTRIBUTED QUERIES

0 comments on commit 978d839

Please sign in to comment.