Skip to content

Commit

Permalink
add load metrics
Browse files Browse the repository at this point in the history
Process values, as:
 * N of currently run cpu threads (in threadpool)
 * Length of primary threadpool queue
 * Length of secondary threadpool queue.

These elements makes 3 metrics: primary queue, secondary queue, and
total queue, which is sum of pri + second + N.

These metrics sampled the same way as linux does for load average - that
 is sliding exponential window for 1, 5 and 15 minutes, by a sample once
  in 5 seconds.

You can see 'load', 'load_primary', and 'load_secondary' in 'show
status' output.

You can see 'Queue now+pri+sec=total', and 'Load average' line in
'status' output.

Also, set env MANTICORE_TRACE_LOAD to true will trace actual values as
they updated (each 5s) in search.log with 'info' priority (that is
simple and useful when playing with settings, running searchd with
'--console' key).
  • Loading branch information
klirichek committed Jun 29, 2023
1 parent 43bde62 commit f3ae8be
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 33 deletions.
1 change: 1 addition & 0 deletions api/libsphinxclient/test.c
Expand Up @@ -373,6 +373,7 @@ void test_status ( sphinx_client * client )
&& strstr ( status[k], "workers_total" )==NULL
&& strstr ( status[k], "workers_active" )==NULL
&& strstr ( status[k], "agent_tfo")==NULL
&& strstr ( status[k], "load") == NULL
&& strstr ( status[k], "connect_count" )==NULL ))
{
for ( j=0; j<num_cols; j++, k++ )
Expand Down
3 changes: 2 additions & 1 deletion src/CMakeLists.txt
Expand Up @@ -150,7 +150,8 @@ set ( HEADERS sphinxexcerpt.h sphinxfilter.h sphinxint.h sphinxjsonquery.h sphin
indexsettings.h columnarlib.h fileio.h memio.h memio_impl.h queryprofile.h columnarfilter.h columnargrouper.h fileutils.h
libutils.h conversion.h columnarsort.h sortcomp.h binlog_defs.h binlog.h ${MANTICORE_BINARY_DIR}/config/config.h
chunksearchctx.h indexfilebase.h indexfiles.h attrindex_builder.h queryfilter.h aggregate.h secondarylib.h
costestimate.h docidlookup.h tracer.h attrindex_merge.h columnarmisc.h distinct.h hyperloglog.h pseudosharding.h detail/indexlink.h )
costestimate.h docidlookup.h tracer.h attrindex_merge.h columnarmisc.h distinct.h hyperloglog.h pseudosharding.h detail/indexlink.h
detail/expmeter.h )

set ( SEARCHD_H searchdaemon.h searchdconfig.h searchdddl.h searchdexpr.h searchdha.h searchdreplication.h searchdsql.h
searchdtask.h client_task_info.h taskflushattrs.h taskflushbinlog.h taskflushmutable.h taskglobalidf.h
Expand Down
50 changes: 50 additions & 0 deletions src/detail/expmeter.h
@@ -0,0 +1,50 @@
//
// Copyright (c) 2023, Manticore Software LTD (https://manticoresearch.com)
// All rights reserved
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License. You should have
// received a copy of the GPL license along with this program; if you
// did not, you can find it at http://www.gnu.org
//

#pragma once

#include "std/ints.h"

// implement exponential sliding stat

class ExpMeter_c
{
double m_fSum = 0.0;
double m_fVal = 0.0;
BYTE m_uSamples = 0;
const BYTE m_uTimeConstant;

public:
explicit ExpMeter_c ( BYTE uTimeConstant )
: m_uTimeConstant { uTimeConstant }
{}

[[nodiscard]] double Value() const noexcept { return m_fVal; }

void Reset()
{
m_fSum = 0.0;
m_fVal = 0;
m_uSamples = 0;
}

template<typename NUM>
void Tick ( NUM tSample ) noexcept
{
m_fSum += tSample;
if ( m_uSamples < m_uTimeConstant )
++m_uSamples;
else
m_fSum -= m_fVal;

m_fVal = m_fSum / m_uSamples;
}

};
48 changes: 45 additions & 3 deletions src/searchd.cpp
Expand Up @@ -68,6 +68,7 @@
#include "dynamic_idx.h"
#include "searchdbuddy.h"
#include "detail/indexlink.h"
#include "detail/expmeter.h"

extern "C"
{
Expand Down Expand Up @@ -261,6 +262,18 @@ static volatile bool g_bReloadForced = false; // true in case reload issued
static WorkerSharedPtr_t g_pTickPoolThread;
static CSphVector<CSphNetLoop*> g_dNetLoops;

constexpr int g_iExpMeterPeriod = 5000000; // once per 5s
static ExpMeter_c g_tStat1m { 12 }; // once a minute (12 * 5s)
static ExpMeter_c g_tStat5m { 12*5 }; // once a 5 minutes
static ExpMeter_c g_tStat15m { 12*15 }; // once a 15 minutes
static ExpMeter_c g_tPriStat1m { 12 }; // once a minute (12 * 5s)
static ExpMeter_c g_tPriStat5m { 12*5 }; // once a 5 minutes
static ExpMeter_c g_tPriStat15m { 12*15 }; // once a 15 minutes
static ExpMeter_c g_tSecStat1m { 12 }; // once a minute (12 * 5s)
static ExpMeter_c g_tSecStat5m { 12*5 }; // once a 5 minutes
static ExpMeter_c g_tSecStat15m { 12*15 }; // once a 15 minutes
int64_t g_iNextExpMeterTimestamp = sphMicroTimer() + g_iExpMeterPeriod;

/// command names
static const char * g_dApiCommands[] =
{
Expand Down Expand Up @@ -8945,6 +8958,9 @@ void BuildStatus ( VectorLike & dStatus )
dStatus.MatchTupletf ( "workers_clients", "%d", myinfo::CountClients () );
dStatus.MatchTupletf ( "workers_clients_vip", "%u", session::GetVips() );
dStatus.MatchTupletf ( "work_queue_length", "%d", GlobalWorkPool ()->Works () );
dStatus.MatchTupletf ( "load", "%0.2f %0.2f %0.2f", g_tStat1m.Value(), g_tStat5m.Value(), g_tStat15m.Value() );
dStatus.MatchTupletf ( "load_primary", "%0.2f %0.2f %0.2f", g_tPriStat1m.Value(), g_tPriStat5m.Value(), g_tPriStat15m.Value() );
dStatus.MatchTupletf ( "load_secondary", "%0.2f %0.2f %0.2f", g_tSecStat1m.Value(), g_tSecStat5m.Value(), g_tSecStat15m.Value() );

assert ( g_pDistIndexes );
auto pDistSnapshot = g_pDistIndexes->GetHash();
Expand Down Expand Up @@ -9047,14 +9063,17 @@ void BuildStatus ( VectorLike & dStatus )
void BuildStatusOneline ( StringBuilder_c & sOut )
{
auto iThreads = GlobalWorkPool ()->WorkingThreads ();
auto iQueue = GlobalWorkPool ()->Works ();
auto tSample = GlobalWorkPool()->Tasks();
auto tCurrent = GlobalWorkPool()->CurTasks();
auto iQueue = tSample.iPri + tSample.iSec + tCurrent;
auto iTasks = myinfo::CountTasks ();
auto & g_tStats = gStats ();
sOut.StartBlock ( " " );
sOut
<< "Uptime:" << (DWORD) time ( NULL )-g_tStats.m_uStarted
<< " Threads:" << iThreads
<< " Queue:" << iQueue
<< " Threads:" << iThreads;
sOut.Sprintf (" Queue now+pri+sec=total: %d+%d+%d=%d", tCurrent, tSample.iPri, tSample.iSec, iQueue );
sOut
<< " Clients:" << myinfo::CountClients()
<< " Vip clients:" << session::GetVips()
<< " Tasks:" << iTasks
Expand All @@ -9063,6 +9082,7 @@ void BuildStatusOneline ( StringBuilder_c & sOut )
sOut.Sprintf ( " CPU: %t", (int64_t)g_tStats.m_iQueryCpuTime.load ( std::memory_order_relaxed ) );
sOut.Sprintf ( "\nQueue/Th: %0.1F%", iQueue * 10 / iThreads );
sOut.Sprintf ( " Tasks/Th: %0.1F%", iTasks * 10 / iThreads );
sOut.Sprintf ( "\nLoad average: %0.2f, %0.2f, %0.2f", g_tStat1m.Value(), g_tStat5m.Value(), g_tStat15m.Value() );
}

void BuildOneAgentStatus ( VectorLike & dStatus, HostDashboardRefPtr_t pDash, const char * sPrefix="agent" )
Expand Down Expand Up @@ -19260,6 +19280,9 @@ void CheckSignals () REQUIRES ( MainThread )
#endif
}

static bool g_bLoadInfo = val_from_env ( "MANTICORE_TRACE_LOAD", false );


void TickHead () REQUIRES ( MainThread )
{
CheckSignals ();
Expand All @@ -19279,6 +19302,25 @@ void TickHead () REQUIRES ( MainThread )
int tmSleep = 500;
#endif
sphSleepMsec ( tmSleep );

if ( sphMicroTimer() > g_iNextExpMeterTimestamp )
{
g_iNextExpMeterTimestamp += g_iExpMeterPeriod;
auto tSample = GlobalWorkPool()->Tasks();
auto tCurrent = GlobalWorkPool()->CurTasks();
g_tPriStat1m.Tick ( tSample.iPri );
g_tPriStat5m.Tick ( tSample.iPri );
g_tPriStat15m.Tick ( tSample.iPri );
g_tSecStat1m.Tick ( tSample.iSec );
g_tSecStat5m.Tick ( tSample.iSec );
g_tSecStat15m.Tick ( tSample.iSec );

g_tStat1m.Tick ( tSample.iPri + tSample.iSec + tCurrent);
g_tStat5m.Tick ( tSample.iPri + tSample.iSec + tCurrent);
g_tStat15m.Tick ( tSample.iPri + tSample.iSec + tCurrent);
if ( g_bLoadInfo )
sphInfo("Sample: %d, %d, %d; Load average: %0.2f, %0.2f, %0.2f, sec: %0.2f, %0.2f, %0.2f, pri: %0.2f, %0.2f, %0.2f", tCurrent, tSample.iSec, tSample.iPri, g_tStat1m.Value(), g_tStat5m.Value(), g_tStat15m.Value(), g_tSecStat1m.Value(), g_tSecStat5m.Value(), g_tSecStat15m.Value(), g_tPriStat1m.Value(), g_tPriStat5m.Value(), g_tPriStat15m.Value());
}
}

bool g_bVtune = false;
Expand Down
51 changes: 43 additions & 8 deletions src/threadutils.cpp
Expand Up @@ -181,7 +181,7 @@ class CallStack_c
}

// Find the next context with the same key.
Value * NextByKey () const
Value * NextByKey () const noexcept
{
for ( auto* pElem = m_pNext; pElem!=nullptr; pElem = pElem->m_pNext )
if ( pElem->m_pService==m_pService )
Expand All @@ -194,7 +194,7 @@ class CallStack_c

// Determine whether the specified owner is on the stack.
// Returns address of key, if present, nullptr otherwise.
static Value * Contains ( Key * pKey )
static Value * Contains ( Key * pKey ) noexcept
{
for ( auto* pElem = m_pTop; pElem!=nullptr; pElem = pElem->m_pNext )
if ( pElem->m_pService==pKey )
Expand All @@ -203,7 +203,7 @@ class CallStack_c
}

// Obtain the value at the top of the stack.
static Value * Top ()
static Value * Top () noexcept
{
Context_c * pElem = m_pTop;
return pElem ? pElem->m_pThisContext : nullptr;
Expand Down Expand Up @@ -240,8 +240,9 @@ struct Service_t : public TaskService_t//, public Service_i
bool m_bStopped = false; /// dispatcher has been stopped.
bool m_bOneThread; /// optimize for single-threaded use case
sph::Event_c m_tWakeupEvent; /// event to wake up blocked threads
OpSchedule_t m_OpQueue; /// The queue of handlers that are ready to be delivered
OpSchedule_t m_OpVipQueue; /// The queue of handlers that have to be delivered BEFORE OpQueue
OpSchedule_t m_OpQueue GUARDED_BY ( m_dMutex ); /// The queue of handlers that are ready to be delivered
OpSchedule_t m_OpVipQueue GUARDED_BY ( m_dMutex ); /// The queue of handlers that have to be delivered BEFORE OpQueue
std::atomic<int> m_iWorkingNow {0};

// Per-thread call stack to track the state of each thread in the service.
using ThreadCallStack_c = CallStack_c<Service_t, TaskServiceThreadInfo_t>;
Expand Down Expand Up @@ -352,8 +353,10 @@ struct Service_t : public TaskService_t//, public Service_i
else
dLock.Unlock ();

m_iWorkingNow.fetch_add ( 1, std::memory_order_relaxed );
boost::context::detail::prefetch_range ( pOp, sizeof ( Operation_t ) );
pOp->Complete (this);
m_iWorkingNow.fetch_sub ( 1, std::memory_order_relaxed );

LOG ( SERVICE, MT ) << "completed & unlocked";
if ( this_thread.m_iPrivateOutstandingWork>1 )
Expand Down Expand Up @@ -425,10 +428,21 @@ struct Service_t : public TaskService_t//, public Service_i
dLock.Unlock ();
}

long works() const
long works() const noexcept
{
return m_iOutstandingWork;
}

int curtasks() const noexcept
{
return m_iWorkingNow.load ( std::memory_order_relaxed );
}

NTasks_t tasks() const
{
ScopedMutex_t dLock ( m_dMutex );
return { (int)m_OpVipQueue.GetLength(), (int)m_OpQueue.GetLength() };
}
};

/// helper to hold the service running
Expand Down Expand Up @@ -825,6 +839,16 @@ class ThreadPool_c final : public Worker_i
return (int)m_tService.works ();
}

NTasks_t Tasks() const noexcept final
{
return m_tService.tasks();
}

int CurTasks() const noexcept final
{
return m_tService.curtasks();
}

void IterateChildren ( ThreadFN& fnHandler ) final
{
ScRL_t _ ( m_dChildGuard );
Expand Down Expand Up @@ -941,6 +965,16 @@ class AloneThread_c final : public Worker_i
return GetRunners ();
}

NTasks_t Tasks() const noexcept final
{
return m_tService.tasks();
}

int CurTasks() const noexcept final
{
return m_tService.curtasks();
}

const char* Name() const override
{
return m_sName.cstr();
Expand Down Expand Up @@ -1081,10 +1115,11 @@ static int g_iMaxChildrenThreads = 1;


namespace {
static WorkerSharedPtr_t pGlobalPool;

WorkerSharedPtr_t& GlobalPoolSingletone ()
{
static WorkerSharedPtr_t pPool;
return pPool;
return pGlobalPool;
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/threadutils.h
Expand Up @@ -148,20 +148,27 @@ struct Scheduler_i
}

template<typename HANDLER>
void Schedule ( HANDLER handler, bool bVip );
void Schedule ( HANDLER handler, bool bVip ) noexcept;

template<typename HANDLER>
void ScheduleContinuation ( HANDLER handler );
void ScheduleContinuation ( HANDLER handler ) noexcept;
};

struct SchedulerWithBackend_i: public Scheduler_i
{
virtual bool SetBackend ( Scheduler_i* pBackend ) = 0;
};

struct NTasks_t { // snapshot length of Op queue
int iPri;
int iSec;
};

struct Worker_i: public Scheduler_i
{
virtual int Works() const = 0;
virtual NTasks_t Tasks() const noexcept = 0;
virtual int CurTasks() const noexcept = 0;
virtual void StopAll () = 0;
virtual void DiscardOnFork() {}
virtual void IterateChildren ( ThreadFN & fnHandler ) {}
Expand Down

0 comments on commit f3ae8be

Please sign in to comment.