Skip to content

Commit

Permalink
Fixed collecting of global stats
Browse files Browse the repository at this point in the history
There were no counts for global conn tries, total/max/avg query time
  • Loading branch information
klirichek committed Sep 30, 2016
1 parent 7d22350 commit 31eda5f
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 53 deletions.
4 changes: 1 addition & 3 deletions api/libsphinxclient/smoke_ref.txt
Expand Up @@ -217,9 +217,7 @@ ag_dist_1_1_wrong_replies: 0
ag_dist_1_1_unexpected_closings: 0
ag_dist_1_1_warnings: 0
ag_dist_1_1_succeeded_queries: 1
ag_dist_1_1_connect_count: 0
ag_dist_1_1_connect_avg: 0
ag_dist_1_1_connect_max: 0
ag_dist_1_1_connect_count: 1
query_cpu: OFF
dist_local: 0.000
query_reads: OFF
Expand Down
7 changes: 6 additions & 1 deletion api/libsphinxclient/test.c
Expand Up @@ -360,7 +360,12 @@ void test_status ( sphinx_client * client )
k = 0;
for ( i=0; i<num_rows; i++ )
{
if ( !g_smoke || ( strstr ( status[k], "time" )==NULL && strstr ( status[k], "wall" )==NULL && strstr ( status[k], "wait" )==NULL ) )
if ( !g_smoke ||
( strstr ( status[k], "time" )==NULL
&& strstr ( status[k], "wall" )==NULL
&& strstr ( status[k], "wait" )==NULL
&& strstr ( status[k], "connect_avg" )==NULL
&& strstr ( status[k], "connect_max")==NULL ) )
{
for ( j=0; j<num_cols; j++, k++ )
printf ( ( j==0 ) ? "%s:" : " %s", status[k] );
Expand Down
81 changes: 44 additions & 37 deletions src/searchd.cpp
Expand Up @@ -11420,6 +11420,7 @@ static inline void FormatMsec ( CSphString & sOut, int64_t tmTime )
void BuildStatus ( VectorLike & dStatus )
{
const char * FMT64 = INT64_FMT;
const char * FLOAT = "%.2f";
const char * OFF = "OFF";

const int64_t iQueriesDiv = Max ( g_tStats.m_iQueries.GetValue(), 1 );
Expand Down Expand Up @@ -11479,13 +11480,22 @@ void BuildStatus ( VectorLike & dStatus )
ARRAY_FOREACH ( i, dAgents )
ARRAY_FOREACH ( j, dAgents[i].GetAgents() )
{
AgentStats_t * pStats = dAgents[i].GetAgents ()[j].m_pStats;
AgentDash_t * pStats = dAgents[i].GetAgents ()[j].m_pStats;
if ( !pStats )
continue;

for ( int k=0; k<eMaxAgentStat; ++k )
if ( dStatus.MatchAddVa ( "ag_%s_%d_%d_%s", sIdx, i+1, j+1, sAgentStatsNames[k] ) )
dStatus.Add().SetSprintf ( FMT64, (int64_t) pStats->m_dStats[k] );
for ( int k = 0; k<ehMaxStat; ++k )
if ( dStatus.MatchAddVa ( "ag_%s_%d_%d_%s", sIdx, i + 1, j + 1, sAgentStatsNames[eMaxAgentStat+k] ) )
{
if ( k==ehTotalMsecs || k==ehAverageMsecs || k==ehMaxMsecs )
dStatus.Add ().SetSprintf ( FLOAT, (float) pStats->m_dHostStats[k] / 1000.0 );
else
dStatus.Add ().SetSprintf ( FMT64, (int64_t) pStats->m_dHostStats[k] );
}

}
}
g_tDistLock.Unlock();
Expand Down Expand Up @@ -11591,53 +11601,50 @@ void BuildOneAgentStatus ( VectorLike & dStatus, HostDashboard_t* pDash, const c
const char * FMT64 = UINT64_FMT;
const char * FLOAT = "%.2f";

pDash->m_dDataLock.ReadLock ();
if ( dStatus.MatchAddVa ( "%s_hostname", sPrefix ) )
dStatus.Add().SetSprintf ( "%s", pDash->m_tDescriptor.GetMyUrl().cstr() );

if ( dStatus.MatchAddVa ( "%s_references", sPrefix ) )
dStatus.Add().SetSprintf ( "%d", (int) pDash->GetRefcount()-1 ); // -1 since we currently also 'use' the agent, reading it's stats
uint64_t iCur = sphMicroTimer();
uint64_t iLastAccess = iCur - pDash->m_iLastQueryTime;
float fPeriod = (float)iLastAccess/1000000.0f;
if ( dStatus.MatchAddVa ( "%s_lastquery", sPrefix ) )
dStatus.Add().SetSprintf ( FLOAT, fPeriod );
iLastAccess = iCur - pDash->m_iLastAnswerTime;
fPeriod = (float)iLastAccess/1000000.0f;
if ( dStatus.MatchAddVa ( "%s_lastanswer", sPrefix ) )
dStatus.Add().SetSprintf ( FLOAT, fPeriod );
uint64_t iLastTimer = pDash->m_iLastAnswerTime-pDash->m_iLastQueryTime;
if ( dStatus.MatchAddVa ( "%s_lastperiodmsec", sPrefix ) )
dStatus.Add().SetSprintf ( FMT64, iLastTimer/1000 );
if ( dStatus.MatchAddVa ( "%s_errorsarow", sPrefix ) )
dStatus.Add().SetSprintf ( FMT64, pDash->m_iErrorsARow );
pDash->m_dDataLock.Unlock ();
{
CSphScopedRLock tGuard ( pDash->m_dDataLock );
if ( dStatus.MatchAddVa ( "%s_hostname", sPrefix ) )
dStatus.Add().SetSprintf ( "%s", pDash->m_tDescriptor.GetMyUrl().cstr() );

if ( dStatus.MatchAddVa ( "%s_references", sPrefix ) )
dStatus.Add().SetSprintf ( "%d", (int) pDash->GetRefcount()-1 ); // -1 since we currently also 'use' the agent, reading it's stats
uint64_t iCur = sphMicroTimer();
uint64_t iLastAccess = iCur - pDash->m_iLastQueryTime;
float fPeriod = (float)iLastAccess/1000000.0f;
if ( dStatus.MatchAddVa ( "%s_lastquery", sPrefix ) )
dStatus.Add().SetSprintf ( FLOAT, fPeriod );
iLastAccess = iCur - pDash->m_iLastAnswerTime;
fPeriod = (float)iLastAccess/1000000.0f;
if ( dStatus.MatchAddVa ( "%s_lastanswer", sPrefix ) )
dStatus.Add().SetSprintf ( FLOAT, fPeriod );
uint64_t iLastTimer = pDash->m_iLastAnswerTime-pDash->m_iLastQueryTime;
if ( dStatus.MatchAddVa ( "%s_lastperiodmsec", sPrefix ) )
dStatus.Add().SetSprintf ( FMT64, iLastTimer/1000 );
if ( dStatus.MatchAddVa ( "%s_errorsarow", sPrefix ) )
dStatus.Add().SetSprintf ( FMT64, pDash->m_iErrorsARow );
}
int iPeriods = 1;

while ( iPeriods>0 )
{
HostStatSnapshot_t dDashStat;
pDash->GetCollectedStat ( dDashStat, iPeriods );
uint64_t uQueries = 0;

{
for ( int j = 0; j<ehMaxStat+eMaxAgentStat; ++j )
if ( j==ehTotalMsecs ) // hack. Avoid microseconds in human-readable statistic
// hack. Avoid microseconds in human-readable statistic
if ( j==ehTotalMsecs && dStatus.MatchAddVa ( "%s_%dperiods_msecsperqueryy", sPrefix, iPeriods ) )
{
if ( dStatus.MatchAddVa ( "%s_%dperiods_msecsperqueryy", sPrefix, iPeriods ) )
{
if ( uQueries>0 )
{
float fAverageLatency = (float) ((dDashStat[ehTotalMsecs] / 1000.0) / uQueries);
dStatus.Add ().SetSprintf ( FLOAT, fAverageLatency );
} else
dStatus.Add ( "n/a" );
}
} else
if ( dDashStat[ehConnTries]>0 )
dStatus.Add ().SetSprintf ( FLOAT, (float) ((dDashStat[ehTotalMsecs] / 1000.0)
/ dDashStat[ehConnTries]) );
else
dStatus.Add ( "n/a" );
} else if ( dStatus.MatchAddVa ( "%s_%dperiods_%s", sPrefix, iPeriods, sAgentStatsNames[j] ) )
{
if ( dStatus.MatchAddVa ( "%s_%dperiods_%s", sPrefix, iPeriods, sAgentStatsNames[j] ) )
if ( j==ehMaxMsecs || j==ehAverageMsecs )
dStatus.Add ().SetSprintf ( FLOAT, (float) dDashStat[j] / 1000.0);
else
dStatus.Add ().SetSprintf ( FMT64, dDashStat[j] );
uQueries += dDashStat[j];
}
}

Expand Down
30 changes: 24 additions & 6 deletions src/searchdha.cpp
Expand Up @@ -767,25 +767,43 @@ void agent_stats_inc ( AgentConn_t & tAgent, AgentStats_e iCounter )
// do not count query time for pings
// only count errors
if ( !tAgent.m_bPing )
{
tAgentDash.m_dHostStats[ehTotalMsecs]+=tAgent.m_iEndQuery-tAgent.m_iStartQuery;
if ( tAgent.m_pStats )
tAgent.m_pStats->m_dHostStats[ehTotalMsecs] += tAgent.m_iEndQuery - tAgent.m_iStartQuery;
}
}

// special case of stats - all is ok, just need to track the time in dashboard.
void track_processing_time ( AgentConn_t & tAgent )
{
// first we count temporary statistic (into dashboard)
assert ( tAgent.m_pDash );
CSphScopedWLock tWguard ( tAgent.m_pDash->m_dDataLock );
uint64_t* pCurStat = tAgent.m_pDash->GetCurrentStat ()->m_dHostStats;
uint64_t uConnTime = (uint64_t) sphMicroTimer () - tAgent.m_iStartQuery;

++pCurStat[ehConnTries];
int64_t iConnTime = sphMicroTimer () - tAgent.m_iStartQuery;
if ( uint64_t ( iConnTime )>pCurStat[ehMaxMsecs] )
pCurStat[ehMaxMsecs] = iConnTime;
if ( uint64_t ( uConnTime )>pCurStat[ehMaxMsecs] )
pCurStat[ehMaxMsecs] = uConnTime;

if ( pCurStat[ehConnTries]>1 )
pCurStat[ehAverageMsecs] = ( pCurStat[ehAverageMsecs]*( pCurStat[ehConnTries]-1 )+iConnTime )/pCurStat[ehConnTries];
pCurStat[ehAverageMsecs] = ( pCurStat[ehAverageMsecs]*( pCurStat[ehConnTries]-1 )+uConnTime )/pCurStat[ehConnTries];
else
pCurStat[ehAverageMsecs] = iConnTime;
pCurStat[ehAverageMsecs] = uConnTime;

// then we count permanent statistic (for show status)
if ( tAgent.m_pStats )
{
uint64_t * pHStat = tAgent.m_pStats->m_dHostStats;
++pHStat[ehConnTries];
if ( uint64_t ( uConnTime )>pHStat[ehMaxMsecs] )
pHStat[ehMaxMsecs] = uConnTime;
if ( pHStat[ehConnTries]>1 )
pHStat[ehAverageMsecs] = ( pHStat[ehAverageMsecs] * ( pHStat[ehConnTries] - 1 ) + uConnTime ) / pHStat[ehConnTries];
else
pHStat[ehAverageMsecs] = uConnTime;
}
}

// try to parse hostname/ip/port or unixsocket on current pConfigLine.
Expand Down Expand Up @@ -939,7 +957,7 @@ bool ValidateAndAddDashboard ( AgentDesc_c * pNewAgent, WarnInfo_t* pInfo=nullpt
}
}

pNewAgent->m_pStats = new AgentStats_t;
pNewAgent->m_pStats = new AgentDash_t;
g_tDashes.AddAgent ( pNewAgent );

assert ( pNewAgent->m_pStats );
Expand Down
9 changes: 3 additions & 6 deletions src/searchdha.h
Expand Up @@ -37,9 +37,6 @@ extern int g_iAgentConnectTimeout;
extern int g_iAgentQueryTimeout; // global (default). May be override by index-scope values, if one specified
extern bool g_bHostnameLookup;


const int STATS_MAX_AGENTS = 8192; ///< we'll track stats for this much remote agents
const int STATS_MAX_DASH = STATS_MAX_AGENTS / 4; ///< we'll track stats for RR of this much remote agents
const int STATS_DASH_TIME = 15; ///< store the history for last periods

/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -145,13 +142,13 @@ struct AgentStats_t : public ISphRefcountedMT
}
void Add ( const AgentStats_t& rhs )
{
for ( int i = 0; i<=eMaxAgentStat; ++i )
for ( int i = 0; i<eMaxAgentStat; ++i )
m_dStats[i] += rhs.m_dStats[i];
}
};

struct HostDashboard_t;

struct AgentDash_t;
class HostUrl_c
{
public:
Expand All @@ -177,7 +174,7 @@ class AgentDesc_c : public HostUrl_c
CSphString m_sIndexes; ///< remote index names to query
bool m_bBlackhole; ///< blackhole agent flag
DWORD m_uAddr; ///< IP address
mutable AgentStats_t* m_pStats; /// global agent stats
mutable AgentDash_t* m_pStats; /// global agent stats
mutable HostDashboard_t* m_pDash; /// ha dashboard of the host
bool m_bPersistent; ///< whether to keep the persistent connection to the agent.

Expand Down

0 comments on commit 31eda5f

Please sign in to comment.