Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
22950 lines (19264 sloc) 642 KB
//
// $Id$
//
//
// Copyright (c) 2001-2016, Andrew Aksyonoff
// Copyright (c) 2008-2016, Sphinx Technologies Inc
// All rights reserved
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License. You should have
// received a copy of the GPL license along with this program; if you
// did not, you can find it at http://www.gnu.org/
//
#include "sphinx.h"
#include "sphinxutils.h"
#include "sphinxexcerpt.h"
#include "sphinxrt.h"
#include "sphinxint.h"
#include "sphinxquery.h"
#include "sphinxjson.h"
#include "sphinxplugin.h"
#include "sphinxqcache.h"
#include "sphinxrlp.h"
extern "C"
{
#include "sphinxudf.h"
}
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <stdarg.h>
#include <limits.h>
#include <locale.h>
#include <math.h>
#include "searchdaemon.h"
#include "searchdha.h"
#define SEARCHD_BACKLOG 5
#define SPHINXAPI_PORT 9312
#define SPHINXQL_PORT 9306
#define SPH_ADDRESS_SIZE sizeof("000.000.000.000")
#define SPH_ADDRPORT_SIZE sizeof("000.000.000.000:00000")
#define MVA_UPDATES_POOL 1048576
#define NETOUTBUF 8192
#define PING_INTERVAL 1000
#define QLSTATE_FLUSH_MSEC 50
// don't shutdown on SIGKILL (debug purposes)
// 1 - SIGKILL will shut down the whole daemon; 0 - watchdog will reincarnate the daemon
#define WATCHDOG_SIGKILL 1
#define SPH_MYSQL_FLAG_MORE_RESULTS 8 // mysql.h: SERVER_MORE_RESULTS_EXISTS
/////////////////////////////////////////////////////////////////////////////
#if USE_WINDOWS
// Win-specific headers and calls
#include <io.h>
#include <tlhelp32.h>
#define sphSeek _lseeki64
#define stat _stat
#else
// UNIX-specific headers and calls
#include <unistd.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/file.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <netdb.h>
#include <sys/syscall.h>
// for thr_self()
#ifdef __FreeBSD__
#include <sys/thr.h>
#endif
#define sphSeek lseek
#if HAVE_EVENTFD
#include <sys/eventfd.h>
#endif
#endif
#if USE_SYSLOG
#include <syslog.h>
#endif
/////////////////////////////////////////////////////////////////////////////
enum ProtocolType_e
{
PROTO_SPHINX = 0,
PROTO_MYSQL41,
PROTO_HTTP,
PROTO_TOTAL
};
static const char * g_dProtoNames[PROTO_TOTAL] =
{
"sphinxapi", "sphinxql", "http"
};
static bool g_bService = false;
#if USE_WINDOWS
static bool g_bServiceStop = false;
static const char * g_sServiceName = "searchd";
HANDLE g_hPipe = INVALID_HANDLE_VALUE;
#endif
static CSphVector<CSphString> g_dArgs;
enum LogFormat_e
{
LOG_FORMAT_PLAIN,
LOG_FORMAT_SPHINXQL
};
#define LOG_COMPACT_IN 128 // upto this many IN(..) values allowed in query_log
ESphLogLevel g_eLogLevel = SPH_LOG_INFO; // current log level, can be changed on the fly
static int g_iLogFile = STDOUT_FILENO; // log file descriptor
static bool g_bLogSyslog = false;
static bool g_bQuerySyslog = false;
static CSphString g_sLogFile; // log file name
static bool g_bLogTty = false; // cached isatty(g_iLogFile)
static bool g_bLogStdout = true; // extra copy of startup log messages to stdout; true until around "accepting connections", then MUST be false
static LogFormat_e g_eLogFormat = LOG_FORMAT_PLAIN;
static bool g_bLogCompactIn = false; // whether to cut list in IN() clauses.
static int g_iQueryLogMinMsec = 0; // log 'slow' threshold for query
static char g_sLogFilter[SPH_MAX_FILENAME_LEN] = "\0";
static int g_iLogFilterLen = 0;
static const int64_t MS2SEC = I64C ( 1000000 );
int g_iReadTimeout = 5; // sec
static int g_iWriteTimeout = 5;
static int g_iClientTimeout = 300;
static int g_iClientQlTimeout = 900; // sec
static int g_iMaxChildren = 0;
#if !USE_WINDOWS
static bool g_bPreopenIndexes = true;
#else
static bool g_bPreopenIndexes = false;
#endif
static bool g_bWatchdog = true;
static int g_iExpansionLimit = 0;
static bool g_bOnDiskAttrs = false;
static bool g_bOnDiskPools = false;
static int g_iShutdownTimeout = 3000000; // default timeout on daemon shutdown and stopwait is 3 seconds
static int g_iBacklog = SEARCHD_BACKLOG;
static int g_iThdPoolCount = 2;
static int g_iThdQueueMax = 0;
static int g_tmWait = 1;
struct Listener_t
{
int m_iSock;
bool m_bTcp;
ProtocolType_e m_eProto;
bool m_bVIP;
};
static CSphVector<Listener_t> g_dListeners;
static int g_iQueryLogFile = -1;
static CSphString g_sQueryLogFile;
static CSphString g_sPidFile;
static int g_iPidFD = -1;
static int g_iMaxCachedDocs = 0; // in bytes
static int g_iMaxCachedHits = 0; // in bytes
static int g_iAttrFlushPeriod = 0; // in seconds; 0 means "do not flush"
int g_iMaxPacketSize = 8*1024*1024; // in bytes; for both query packets from clients and response packets from agents
static int g_iMaxFilters = 256;
static int g_iMaxFilterValues = 4096;
static int g_iMaxBatchQueries = 32;
static ESphCollation g_eCollation = SPH_COLLATION_DEFAULT;
static CSphString g_sSnippetsFilePrefix;
static ISphThdPool * g_pThdPool = NULL;
int g_iDistThreads = 0;
int g_iAgentConnectTimeout = 1000;
int g_iAgentQueryTimeout = 3000; // global (default). May be override by index-scope values, if one specified
const int MAX_RETRY_COUNT = 8;
const int MAX_RETRY_DELAY = 1000;
static int g_iAgentRetryCount = 0;
static int g_iAgentRetryDelay = MAX_RETRY_DELAY/2; // global (default) values. May be override by the query options 'retry_count' and 'retry_timeout'
bool g_bHostnameLookup = false;
enum ThdState_e
{
THD_HANDSHAKE = 0,
THD_NET_READ,
THD_NET_WRITE,
THD_QUERY,
THD_NET_IDLE,
THD_STATE_TOTAL
};
static const char * g_dThdStates[THD_STATE_TOTAL] = {
"handshake", "net_read", "net_write", "query", "net_idle"
};
struct ThdDesc_t : public ListNode_t
{
SphThread_t m_tThd;
ProtocolType_e m_eProto;
int m_iClientSock;
CSphString m_sClientName;
ThdState_e m_eThdState;
const char * m_sCommand;
int m_iConnID; ///< current conn-id for this thread
// stuff for SHOW THREADS
int m_iTid; ///< OS thread id, or 0 if unknown
int64_t m_tmConnect; ///< when did the client connect?
int64_t m_tmStart; ///< when did the current request start?
CSphFixedVector<char> m_dBuf; ///< current request description
ThdDesc_t ()
: m_eProto ( PROTO_MYSQL41 )
, m_iClientSock ( 0 )
, m_eThdState ( THD_HANDSHAKE )
, m_sCommand ( NULL )
, m_iConnID ( -1 )
, m_iTid ( 0 )
, m_tmConnect ( 0 )
, m_tmStart ( 0 )
, m_dBuf ( 512 )
{
m_dBuf[0] = '\0';
m_dBuf.Last() = '\0';
}
void SetSnippetThreadInfo ( const CSphVector<ExcerptQuery_t> & dSnippets )
{
int64_t iSize = 0;
ARRAY_FOREACH ( i, dSnippets )
{
if ( dSnippets[i].m_iSize )
iSize -= dSnippets[i].m_iSize; // because iSize negative for sorting purpose
else
iSize += dSnippets[i].m_sSource.Length();
}
iSize /= 100;
SetThreadInfo ( "api-snippet datasize=%d.%d""k query=\"%s\"", int ( iSize/10 ), int ( iSize%10 ), dSnippets[0].m_sWords.scstr() );
}
void SetThreadInfo ( const char * sTemplate, ... )
{
va_list ap;
va_start ( ap, sTemplate );
int iPrinted = vsnprintf ( m_dBuf.Begin(), m_dBuf.GetLength()-1, sTemplate, ap );
va_end ( ap );
if ( iPrinted>0 )
m_dBuf[Min ( iPrinted, m_dBuf.GetLength()-1 )] = '\0';
}
};
static CSphMutex g_tThdMutex;
static List_t g_dThd; ///< existing threads table
static int g_iConnectionID = 0; ///< global conn-id
// handshake
static char g_sMysqlHandshake[128];
static int g_iMysqlHandshake = 0;
//////////////////////////////////////////////////////////////////////////
static CSphString g_sConfigFile;
static DWORD g_uCfgCRC32 = 0;
static struct stat g_tCfgStat;
static CSphConfigParser g_pCfg;
#if USE_WINDOWS
static bool g_bSeamlessRotate = false;
#else
static bool g_bSeamlessRotate = true;
#endif
static bool g_bIOStats = false;
static bool g_bCpuStats = false;
static bool g_bOptNoDetach = false;
static bool g_bOptNoLock = false;
static bool g_bSafeTrace = false;
static bool g_bStripPath = false;
static bool g_bCoreDump = false;
static volatile bool g_bDoDelete = false; // do we need to delete any indexes?
static volatile sig_atomic_t g_bGotSighup = 0; // we just received SIGHUP; need to log
static volatile sig_atomic_t g_bGotSigterm = 0; // we just received SIGTERM; need to shutdown
static volatile sig_atomic_t g_bGotSigusr1 = 0; // we just received SIGUSR1; need to reopen logs
// pipe to watchdog to inform that daemon is going to close, so no need to restart it in case of crash
static CSphLargeBuffer<DWORD, true> g_bDaemonAtShutdown;
volatile bool g_bShutdown = false;
static CSphLargeBuffer<DWORD, true> g_bHaveTTY;
IndexHash_c * g_pLocalIndexes = new IndexHash_c(); // served (local) indexes hash
IndexHash_c * g_pTemplateIndexes = new IndexHash_c(); // template (tokenizer) indexes hash
static CSphMutex g_tRotateQueueMutex;
static CSphVector<CSphString> g_dRotateQueue;
static CSphMutex g_tRotateConfigMutex;
static SphThread_t g_tRotateThread;
static SphThread_t g_tRotationServiceThread;
static volatile bool g_bInvokeRotationService = false;
static volatile bool g_bNeedRotate = false; // true if there were pending HUPs to handle (they could fly in during previous rotate)
static volatile bool g_bInRotate = false; // true while we are rotating
static SphThread_t g_tPingThread;
static CSphVector<SphThread_t> g_dTickPoolThread;
/// flush parameters of rt indexes
static SphThread_t g_tRtFlushThread;
// optimize thread
static SphThread_t g_tOptimizeThread;
static CSphMutex g_tOptimizeQueueMutex;
static CSphVector<CSphString> g_dOptimizeQueue;
static ThrottleState_t g_tRtThrottle;
static CSphMutex g_tDistLock;
static CSphMutex g_tPersLock;
static CSphAtomic g_iPersistentInUse;
/// master-agent API protocol extensions version
enum
{
VER_MASTER = 14
};
/// command names
static const char * g_dApiCommands[SEARCHD_COMMAND_TOTAL] =
{
"search", "excerpt", "update", "keywords", "persist", "status", "query", "flushattrs", "query", "ping", "delete", "uvar"
};
//////////////////////////////////////////////////////////////////////////
const char * sAgentStatsNames[eMaxAgentStat+ehMaxStat]=
{ "query_timeouts", "connect_timeouts", "connect_failures",
"network_errors", "wrong_replies", "unexpected_closings",
"warnings", "succeeded_queries", "total_query_time",
"connect_count", "connect_avg", "connect_max" };
static CSphQueryResultMeta g_tLastMeta;
static CSphMutex g_tLastMetaMutex;
//////////////////////////////////////////////////////////////////////////
struct FlushState_t
{
int m_bFlushing; ///< update flushing in progress
int m_iFlushTag; ///< last flushed tag
bool m_bForceCheck; ///< forced check/flush flag
};
static FlushState_t g_tFlush;
//////////////////////////////////////////////////////////////////////////
/// available uservar types
enum Uservar_e
{
USERVAR_INT_SET
};
/// uservar name to value binding
struct Uservar_t
{
Uservar_e m_eType;
UservarIntSet_c * m_pVal;
Uservar_t ()
: m_eType ( USERVAR_INT_SET )
, m_pVal ( NULL )
{}
};
static CSphMutex g_tUservarsMutex;
static SmallStringHash_T<Uservar_t> g_hUservars;
static volatile int64_t g_tmSphinxqlState; // last state (uservars+udfs+...) update timestamp
static SphThread_t g_tSphinxqlStateFlushThread;
static CSphString g_sSphinxqlState;
/////////////////////////////////////////////////////////////////////////////
// MISC
/////////////////////////////////////////////////////////////////////////////
void ReleaseTTYFlag()
{
if ( !g_bHaveTTY.IsEmpty() )
*g_bHaveTTY.GetWritePtr() = 1;
}
//////////////////////////////////////////////////////////////////////////
QueryStatElement_t::QueryStatElement_t()
: m_uTotalQueries ( 0 )
{
memset ( m_dData, 0, sizeof ( m_dData ) );
m_dData[QUERY_STATS_TYPE_MIN] = UINT64_MAX;
}
//////////////////////////////////////////////////////////////////////////
void QueryStatContainer_c::Add ( uint64_t uFoundRows, uint64_t uQueryTime, uint64_t uTimestamp )
{
if ( !m_dRecords.IsEmpty() )
{
QueryStatRecord_t & tLast = m_dRecords.Last();
const uint64_t BUCKET_TIME_DELTA = 100000;
if ( uTimestamp-tLast.m_uTimestamp<=BUCKET_TIME_DELTA )
{
tLast.m_uFoundRowsMin = Min ( uFoundRows, tLast.m_uFoundRowsMin );
tLast.m_uFoundRowsMax = Max ( uFoundRows, tLast.m_uFoundRowsMax );
tLast.m_uFoundRowsSum += uFoundRows;
tLast.m_uQueryTimeMin = Min ( uQueryTime, tLast.m_uQueryTimeMin );
tLast.m_uQueryTimeMax = Max ( uQueryTime, tLast.m_uQueryTimeMax );
tLast.m_uQueryTimeSum += uQueryTime;
tLast.m_iCount++;
return;
}
}
const uint64_t MAX_TIME_DELTA = 15*60*1000000;
while ( !m_dRecords.IsEmpty() && ( uTimestamp-m_dRecords[0].m_uTimestamp ) > MAX_TIME_DELTA )
m_dRecords.Pop();
QueryStatRecord_t & tRecord = m_dRecords.Push();
tRecord.m_uFoundRowsMin = uFoundRows;
tRecord.m_uFoundRowsMax = uFoundRows;
tRecord.m_uFoundRowsSum = uFoundRows;
tRecord.m_uQueryTimeMin = uQueryTime;
tRecord.m_uQueryTimeMax = uQueryTime;
tRecord.m_uQueryTimeSum = uQueryTime;
tRecord.m_uTimestamp = uTimestamp;
tRecord.m_iCount = 1;
}
void QueryStatContainer_c::GetRecord ( int iRecord, QueryStatRecord_t & tRecord ) const
{
tRecord = m_dRecords[iRecord];
}
int QueryStatContainer_c::GetNumRecords() const
{
return m_dRecords.GetLength();
}
QueryStatContainer_c::QueryStatContainer_c() = default;
QueryStatContainer_c::QueryStatContainer_c ( QueryStatContainer_c && tOther ) : m_dRecords { std::move ( tOther.m_dRecords ) }
{
}
QueryStatContainer_c & QueryStatContainer_c::operator = ( QueryStatContainer_c && tOther )
{
if ( &tOther!=this )
m_dRecords = std::move ( tOther.m_dRecords );
return *this;
}
//////////////////////////////////////////////////////////////////////////
#ifndef NDEBUG
void QueryStatContainerExact_c::Add ( uint64_t uFoundRows, uint64_t uQueryTime, uint64_t uTimestamp )
{
const uint64_t MAX_TIME_DELTA = 15*60*1000000;
while ( !m_dRecords.IsEmpty() && ( uTimestamp-m_dRecords[0].m_uTimestamp ) > MAX_TIME_DELTA )
m_dRecords.Pop();
QueryStatRecordExact_t & tRecord = m_dRecords.Push();
tRecord.m_uFoundRows = uFoundRows;
tRecord.m_uQueryTime = uQueryTime;
tRecord.m_uTimestamp = uTimestamp;
}
int QueryStatContainerExact_c::GetNumRecords() const
{
return m_dRecords.GetLength();
}
void QueryStatContainerExact_c::GetRecord ( int iRecord, QueryStatRecord_t & tRecord ) const
{
const QueryStatRecordExact_t & tExact = m_dRecords[iRecord];
tRecord.m_uQueryTimeMin = tExact.m_uQueryTime;
tRecord.m_uQueryTimeMax = tExact.m_uQueryTime;
tRecord.m_uQueryTimeSum = tExact.m_uQueryTime;
tRecord.m_uFoundRowsMin = tExact.m_uFoundRows;
tRecord.m_uFoundRowsMax = tExact.m_uFoundRows;
tRecord.m_uFoundRowsSum = tExact.m_uFoundRows;
tRecord.m_uTimestamp = tExact.m_uTimestamp;
tRecord.m_iCount = 1;
}
QueryStatContainerExact_c::QueryStatContainerExact_c() = default;
QueryStatContainerExact_c::QueryStatContainerExact_c ( QueryStatContainerExact_c && tOther ) : m_dRecords { std::move ( tOther.m_dRecords ) }
{
}
QueryStatContainerExact_c & QueryStatContainerExact_c::operator = ( QueryStatContainerExact_c && tOther )
{
if ( &tOther!=this )
m_dRecords = std::move ( tOther.m_dRecords );
return *this;
}
#endif
//////////////////////////////////////////////////////////////////////////
ServedDesc_t::ServedDesc_t ()
: m_pIndex ( NULL )
, m_bEnabled ( true )
, m_bMlock ( false )
, m_bPreopen ( false )
, m_bExpand ( false )
, m_bToDelete ( false )
, m_bOnlyNew ( false )
, m_bRT ( false )
, m_bOnDiskAttrs ( false )
, m_bOnDiskPools ( false )
, m_iMass ( 0 )
{}
ServedDesc_t::~ServedDesc_t ()
{
SafeDelete ( m_pIndex );
}
//////////////////////////////////////////////////////////////////////////
ServedStats_c::ServedStats_c()
: m_pQueryTimeDigest ( NULL )
, m_pRowsFoundDigest ( NULL )
{
Reset();
}
ServedStats_c::~ServedStats_c()
{
SafeDelete ( m_pQueryTimeDigest );
SafeDelete ( m_pRowsFoundDigest );
}
void ServedStats_c::AddQueryStat ( uint64_t uFoundRows, uint64_t uQueryTime )
{
LockStats(false);
m_pRowsFoundDigest->Add ( (double)uFoundRows );
m_pQueryTimeDigest->Add ( (double)uQueryTime );
uint64_t uTimeStamp = sphMicroTimer();
m_tQueryStatRecords.Add ( uFoundRows, uQueryTime, uTimeStamp );
#ifndef NDEBUG
m_tQueryStatRecordsExact.Add ( uFoundRows, uQueryTime, uTimeStamp );
#endif
m_uTotalFoundRowsMin = Min ( uFoundRows, m_uTotalFoundRowsMin );
m_uTotalFoundRowsMax = Max ( uFoundRows, m_uTotalFoundRowsMax );
m_uTotalFoundRowsSum+= uFoundRows;
m_uTotalQueryTimeMin = Min ( uQueryTime, m_uTotalQueryTimeMin );
m_uTotalQueryTimeMax = Max ( uQueryTime, m_uTotalQueryTimeMax );
m_uTotalQueryTimeSum+= uQueryTime;
m_uTotalQueries++;
UnlockStats();
}
static const uint64_t g_dStatsIntervals[]=
{
1*60*1000000,
5*60*1000000,
15*60*1000000
};
void ServedStats_c::DoStatCalcStats ( const QueryStatContainer_i * pContainer, QueryStats_t & tRowsFoundStats, QueryStats_t & tQueryTimeStats ) const
{
assert ( pContainer );
uint64_t uTimestamp = sphMicroTimer();
LockStats(true);
for ( int i = QUERY_STATS_INTERVAL_1MIN; i<=QUERY_STATS_INTERVAL_15MIN; i++ )
CalcStatsForInterval ( pContainer, tRowsFoundStats.m_dStats[i], tQueryTimeStats.m_dStats[i], uTimestamp, g_dStatsIntervals[i] );
tRowsFoundStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_AVG] = m_uTotalQueries ? m_uTotalFoundRowsSum/m_uTotalQueries : 0;
tRowsFoundStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_MIN] = m_uTotalFoundRowsMin;
tRowsFoundStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_MAX] = m_uTotalFoundRowsMax;
tRowsFoundStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_95] = (uint64_t)m_pRowsFoundDigest->Percentile(95);
tRowsFoundStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_99] = (uint64_t)m_pRowsFoundDigest->Percentile(99);
tRowsFoundStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_uTotalQueries = m_uTotalQueries;
tQueryTimeStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_AVG] = m_uTotalQueries ? m_uTotalQueryTimeSum/m_uTotalQueries : 0;
tQueryTimeStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_MIN] = m_uTotalQueryTimeMin;
tQueryTimeStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_MAX] = m_uTotalQueryTimeMax;
tQueryTimeStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_95] = (uint64_t)m_pQueryTimeDigest->Percentile(95);
tQueryTimeStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_dData[QUERY_STATS_TYPE_99] = (uint64_t)m_pQueryTimeDigest->Percentile(99);
tQueryTimeStats.m_dStats[QUERY_STATS_INTERVAL_ALLTIME].m_uTotalQueries = m_uTotalQueries;
UnlockStats();
}
void ServedStats_c::CalculateQueryStats ( QueryStats_t & tRowsFoundStats, QueryStats_t & tQueryTimeStats ) const
{
DoStatCalcStats ( &m_tQueryStatRecords, tRowsFoundStats, tQueryTimeStats );
}
#ifndef NDEBUG
void ServedStats_c::CalculateQueryStatsExact ( QueryStats_t & tRowsFoundStats, QueryStats_t & tQueryTimeStats ) const
{
DoStatCalcStats ( &m_tQueryStatRecordsExact, tRowsFoundStats, tQueryTimeStats );
}
#endif // !NDEBUG
ServedStats_c & ServedStats_c::operator = ( const ServedStats_c & /*rhs*/ )
{
Reset();
return *this;
}
ServedStats_c & ServedStats_c::operator = ( ServedStats_c &&rhs )
{
if ( this != &rhs )
{
m_tQueryStatRecords = std::move ( rhs.m_tQueryStatRecords );
#ifndef NDEBUG
m_tQueryStatRecordsExact = std::move (rhs.m_tQueryStatRecordsExact);
#endif
m_pQueryTimeDigest = rhs.m_pQueryTimeDigest;
rhs.m_pQueryTimeDigest = nullptr;
m_pRowsFoundDigest = rhs.m_pRowsFoundDigest;
rhs.m_pRowsFoundDigest = nullptr;
m_uTotalFoundRowsMin = rhs.m_uTotalFoundRowsMin;
m_uTotalFoundRowsMax = rhs.m_uTotalFoundRowsMax;
m_uTotalFoundRowsSum = rhs.m_uTotalFoundRowsSum;
m_uTotalQueryTimeMin = rhs.m_uTotalQueryTimeMin;
m_uTotalQueryTimeMax = rhs.m_uTotalQueryTimeMax;
m_uTotalQueryTimeSum = rhs.m_uTotalQueryTimeSum;
m_uTotalQueries = rhs.m_uTotalQueries;
}
return *this;
}
void ServedStats_c::Reset()
{
m_uTotalFoundRowsMin = UINT64_MAX;
m_uTotalFoundRowsMax = 0;
m_uTotalFoundRowsSum = 0;
m_uTotalQueryTimeMin = UINT64_MAX;
m_uTotalQueryTimeMax = 0;
m_uTotalQueryTimeSum = 0;
m_uTotalQueries = 0;
SafeDelete ( m_pQueryTimeDigest );
SafeDelete ( m_pRowsFoundDigest );
m_pQueryTimeDigest = sphCreateTDigest();
m_pRowsFoundDigest = sphCreateTDigest();
assert ( m_pQueryTimeDigest && m_pRowsFoundDigest );
}
void ServedStats_c::CalcStatsForInterval ( const QueryStatContainer_i * pContainer, QueryStatElement_t & tRowResult, QueryStatElement_t & tTimeResult, uint64_t uTimestamp, uint64_t uInterval ) const
{
assert ( pContainer );
tRowResult.m_dData[QUERY_STATS_TYPE_AVG] = 0;
tRowResult.m_dData[QUERY_STATS_TYPE_MIN] = UINT64_MAX;
tRowResult.m_dData[QUERY_STATS_TYPE_MAX] = 0;
tTimeResult.m_dData[QUERY_STATS_TYPE_AVG] = 0;
tTimeResult.m_dData[QUERY_STATS_TYPE_MIN] = UINT64_MAX;
tTimeResult.m_dData[QUERY_STATS_TYPE_MAX] = 0;
CSphTightVector<uint64_t> dFound, dTime;
dFound.Reserve ( m_tQueryStatRecords.GetNumRecords() );
dTime.Reserve ( m_tQueryStatRecords.GetNumRecords() );
DWORD uTotalQueries = 0;
QueryStatRecord_t tRecord;
for ( int i = 0; i < pContainer->GetNumRecords(); i++ )
{
pContainer->GetRecord ( i, tRecord );
if ( uTimestamp-tRecord.m_uTimestamp<=uInterval )
{
tRowResult.m_dData[QUERY_STATS_TYPE_MIN] = Min ( tRecord.m_uFoundRowsMin, tRowResult.m_dData[QUERY_STATS_TYPE_MIN] );
tRowResult.m_dData[QUERY_STATS_TYPE_MAX] = Max ( tRecord.m_uFoundRowsMax, tRowResult.m_dData[QUERY_STATS_TYPE_MAX] );
tTimeResult.m_dData[QUERY_STATS_TYPE_MIN] = Min ( tRecord.m_uQueryTimeMin, tTimeResult.m_dData[QUERY_STATS_TYPE_MIN] );
tTimeResult.m_dData[QUERY_STATS_TYPE_MAX] = Max ( tRecord.m_uQueryTimeMax, tTimeResult.m_dData[QUERY_STATS_TYPE_MAX] );
dFound.Add ( tRecord.m_uFoundRowsSum/tRecord.m_iCount );
dTime.Add ( tRecord.m_uQueryTimeSum/tRecord.m_iCount );
tRowResult.m_dData[QUERY_STATS_TYPE_AVG] += tRecord.m_uFoundRowsSum;
tTimeResult.m_dData[QUERY_STATS_TYPE_AVG] += tRecord.m_uQueryTimeSum;
uTotalQueries += tRecord.m_iCount;
}
}
dFound.Sort();
dTime.Sort();
tRowResult.m_uTotalQueries = uTotalQueries;
tTimeResult.m_uTotalQueries = uTotalQueries;
if ( !dFound.GetLength() )
return;
tRowResult.m_dData[QUERY_STATS_TYPE_AVG]/= uTotalQueries;
tTimeResult.m_dData[QUERY_STATS_TYPE_AVG]/= uTotalQueries;
int u95 = Max ( 0, Min ( int ( ceilf ( dFound.GetLength()*0.95f ) + 0.5f )-1, dFound.GetLength()-1 ) );
int u99 = Max ( 0, Min ( int ( ceilf ( dFound.GetLength()*0.99f ) + 0.5f )-1, dFound.GetLength()-1 ) );
tRowResult.m_dData[QUERY_STATS_TYPE_95] = dFound[u95];
tRowResult.m_dData[QUERY_STATS_TYPE_99] = dFound[u99];
tTimeResult.m_dData[QUERY_STATS_TYPE_95] = dTime[u95];
tTimeResult.m_dData[QUERY_STATS_TYPE_99] = dTime[u99];
}
//////////////////////////////////////////////////////////////////////////
ServedIndex_c::~ServedIndex_c ()
{
Verify ( m_tLock.Done() );
Verify ( m_tStatsLock.Done() );
}
void ServedIndex_c::ReadLock () const
{
if ( m_tLock.ReadLock() )
sphLogDebugvv ( "ReadLock %p", this );
else
{
sphLogDebug ( "ReadLock %p failed", this );
assert ( false );
}
}
void ServedIndex_c::WriteLock () const
{
sphLogDebugvv ( "WriteLock %p wait", this );
if ( m_tLock.WriteLock() )
sphLogDebugvv ( "WriteLock %p", this );
else
{
sphLogDebug ( "WriteLock %p failed", this );
assert ( false );
}
}
bool ServedIndex_c::InitLock() const
{
return m_tLock.Init ( true ) && m_tStatsLock.Init ( true );
}
void ServedIndex_c::Unlock () const
{
if ( m_tLock.Unlock() )
sphLogDebugvv ( "Unlock %p", this );
else
{
sphLogDebug ( "Unlock %p failed", this );
assert ( false );
}
}
void ServedIndex_c::LockStats ( bool bReader ) const
{
if ( bReader )
m_tStatsLock.ReadLock();
else
m_tStatsLock.WriteLock();
}
void ServedIndex_c::UnlockStats() const
{
m_tStatsLock.Unlock();
}
//////////////////////////////////////////////////////////////////////////
IndexHashIterator_c::IndexHashIterator_c ( const IndexHash_c * pHash, bool bWrite )
: m_pHash ( pHash )
, m_pIterator ( NULL )
{
if ( !bWrite )
m_pHash->Rlock();
else
m_pHash->Wlock();
}
IndexHashIterator_c::~IndexHashIterator_c ()
{
m_pHash->Unlock();
}
bool IndexHashIterator_c::Next ()
{
m_pIterator = m_pIterator ? m_pIterator->m_pNextByOrder : m_pHash->m_pFirstByOrder;
return m_pIterator!=NULL;
}
ServedIndex_c & IndexHashIterator_c::Get ()
{
assert ( m_pIterator );
return m_pIterator->m_tValue;
}
const CSphString & IndexHashIterator_c::GetKey ()
{
assert ( m_pIterator );
return m_pIterator->m_tKey;
}
//////////////////////////////////////////////////////////////////////////
IndexHash_c::IndexHash_c ()
{
if ( !m_tLock.Init() )
sphDie ( "failed to init hash indexes rwlock" );
}
IndexHash_c::~IndexHash_c()
{
Verify ( m_tLock.Done() );
}
void IndexHash_c::Rlock () const
{
Verify ( m_tLock.ReadLock() );
}
void IndexHash_c::Wlock () const
{
Verify ( m_tLock.WriteLock() );
}
void IndexHash_c::Unlock () const
{
Verify ( m_tLock.Unlock() );
}
bool IndexHash_c::Add ( const ServedDesc_t & tDesc, const CSphString & tKey )
{
Wlock();
int iPrevSize = GetLength ();
ServedIndex_c & tVal = BASE::AddUnique ( tKey );
bool bAdded = ( iPrevSize<GetLength() );
if ( bAdded )
{
*( (ServedDesc_t *)&tVal ) = tDesc;
Verify ( tVal.InitLock() );
}
Unlock();
return bAdded;
}
bool IndexHash_c::Delete ( const CSphString & tKey )
{
// tricky part
// hash itself might be unlocked, but entry (!) might still be locked
// hence, we also need to acquire a lock on entry, and an exclusive one
Wlock();
bool bRes = false;
ServedIndex_c * pEntry = BASE::operator() ( tKey );
if ( pEntry )
{
pEntry->WriteLock();
pEntry->Unlock();
bRes = BASE::Delete ( tKey );
}
Unlock();
return bRes;
}
ServedIndex_c * IndexHash_c::GetRlockedEntry ( const CSphString & tKey ) const
{
Rlock();
ServedIndex_c * pEntry = BASE::operator() ( tKey );
if ( pEntry )
pEntry->ReadLock();
Unlock();
return pEntry;
}
ServedIndex_c * IndexHash_c::GetWlockedEntry ( const CSphString & tKey ) const
{
Rlock();
ServedIndex_c * pEntry = BASE::operator() ( tKey );
if ( pEntry )
pEntry->WriteLock();
Unlock();
return pEntry;
}
ServedIndex_c & IndexHash_c::GetUnlockedEntry ( const CSphString & tKey ) const
{
Rlock();
ServedIndex_c & tRes = BASE::operator[] ( tKey );
Unlock();
return tRes;
}
ServedIndex_c * IndexHash_c::GetUnlockedEntryPtr ( const CSphString & tKey ) const
{
Rlock();
ServedIndex_c * pRes = BASE::operator() ( tKey );
Unlock();
return pRes;
}
bool IndexHash_c::Exists ( const CSphString & tKey ) const
{
Rlock();
bool bRes = BASE::Exists ( tKey );
Unlock();
return bRes;
}
/////////////////////////////////////////////////////////////////////////////
// LOGGING
/////////////////////////////////////////////////////////////////////////////
void Shutdown (); // forward ref for sphFatal()
int GetOsThreadId ();
/// format current timestamp for logging
int sphFormatCurrentTime ( char * sTimeBuf, int iBufLen )
{
int64_t iNow = sphMicroTimer ();
time_t ts = (time_t) ( iNow/1000000 ); // on some systems (eg. FreeBSD 6.2), tv.tv_sec has another type and we can't just pass it
#if !USE_WINDOWS
struct tm tmp;
localtime_r ( &ts, &tmp );
#else
struct tm tmp;
tmp = *localtime ( &ts );
#endif
static const char * sWeekday[7] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
static const char * sMonth[12] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
return snprintf ( sTimeBuf, iBufLen, "%.3s %.3s%3d %.2d:%.2d:%.2d.%.3d %d",
sWeekday [ tmp.tm_wday ],
sMonth [ tmp.tm_mon ],
tmp.tm_mday, tmp.tm_hour,
tmp.tm_min, tmp.tm_sec, (int)((iNow%1000000)/1000),
1900+tmp.tm_year );
}
/// physically emit log entry
/// buffer must have 1 extra byte for linefeed
#if USE_WINDOWS
void sphLogEntry ( ESphLogLevel eLevel, char * sBuf, char * sTtyBuf )
#else
void sphLogEntry ( ESphLogLevel , char * sBuf, char * sTtyBuf )
#endif
{
#if USE_WINDOWS
if ( g_bService && g_iLogFile==STDOUT_FILENO )
{
HANDLE hEventSource;
LPCTSTR lpszStrings[2];
hEventSource = RegisterEventSource ( NULL, g_sServiceName );
if ( hEventSource )
{
lpszStrings[0] = g_sServiceName;
lpszStrings[1] = sBuf;
WORD eType = EVENTLOG_INFORMATION_TYPE;
switch ( eLevel )
{
case SPH_LOG_FATAL: eType = EVENTLOG_ERROR_TYPE; break;
case SPH_LOG_WARNING: eType = EVENTLOG_WARNING_TYPE; break;
case SPH_LOG_INFO: eType = EVENTLOG_INFORMATION_TYPE; break;
}
ReportEvent ( hEventSource, // event log handle
eType, // event type
0, // event category
0, // event identifier
NULL, // no security identifier
2, // size of lpszStrings array
0, // no binary data
lpszStrings, // array of strings
NULL ); // no binary data
DeregisterEventSource ( hEventSource );
}
} else
#endif
{
strcat ( sBuf, "\n" ); // NOLINT
sphSeek ( g_iLogFile, 0, SEEK_END );
if ( g_bLogTty )
sphWrite ( g_iLogFile, sTtyBuf, strlen(sTtyBuf) );
else
sphWrite ( g_iLogFile, sBuf, strlen(sBuf) );
if ( g_bLogStdout && g_iLogFile!=STDOUT_FILENO )
sphWrite ( STDOUT_FILENO, sTtyBuf, strlen(sTtyBuf) );
}
}
/// log entry (with log levels, dupe catching, etc)
/// call with NULL format for dupe flushing
void sphLog ( ESphLogLevel eLevel, const char * sFmt, va_list ap )
{
// dupe catcher state
static const int FLUSH_THRESH_TIME = 1000000; // in microseconds
static const int FLUSH_THRESH_COUNT = 100;
static ESphLogLevel eLastLevel = SPH_LOG_INFO;
static DWORD uLastEntry = 0;
static int64_t tmLastStamp = -1000000-FLUSH_THRESH_TIME;
static int iLastRepeats = 0;
// only if we can
if ( sFmt && eLevel>g_eLogLevel )
return;
#if USE_SYSLOG
if ( g_bLogSyslog && sFmt )
{
const int levels[] = { LOG_EMERG, LOG_WARNING, LOG_INFO, LOG_DEBUG, LOG_DEBUG, LOG_DEBUG };
vsyslog ( levels[eLevel], sFmt, ap );
return;
}
#endif
if ( g_iLogFile<0 && !g_bService )
return;
// format the banner
char sTimeBuf[128];
sphFormatCurrentTime ( sTimeBuf, sizeof(sTimeBuf) );
const char * sBanner = "";
if ( sFmt==NULL ) eLevel = eLastLevel;
if ( eLevel==SPH_LOG_FATAL ) sBanner = "FATAL: ";
if ( eLevel==SPH_LOG_WARNING ) sBanner = "WARNING: ";
if ( eLevel>=SPH_LOG_DEBUG ) sBanner = "DEBUG: ";
char sBuf [ 1024 ];
snprintf ( sBuf, sizeof(sBuf)-1, "[%s] [%d] ", sTimeBuf, GetOsThreadId() );
char * sTtyBuf = sBuf + strlen(sBuf);
strncpy ( sTtyBuf, sBanner, 32 ); // 32 is arbitrary; just something that is enough and keeps lint happy
int iLen = strlen(sBuf);
// format the message
if ( sFmt )
{
// need more space for tail zero and "\n" that added at sphLogEntry
int iSafeGap = 4;
int iBufSize = sizeof(sBuf)-iLen-iSafeGap;
vsnprintf ( sBuf+iLen, iBufSize, sFmt, ap );
sBuf[ sizeof(sBuf)-iSafeGap ] = '\0';
}
if ( sFmt && eLevel>SPH_LOG_INFO && g_iLogFilterLen )
{
if ( strncmp ( sBuf+iLen, g_sLogFilter, g_iLogFilterLen )!=0 )
return;
}
// catch dupes
DWORD uEntry = sFmt ? sphCRC32 ( sBuf+iLen ) : 0;
int64_t tmNow = sphMicroTimer();
// accumulate while possible
if ( sFmt && eLevel==eLastLevel && uEntry==uLastEntry && iLastRepeats<FLUSH_THRESH_COUNT && tmNow<tmLastStamp+FLUSH_THRESH_TIME )
{
tmLastStamp = tmNow;
iLastRepeats++;
return;
}
// flush if needed
if ( iLastRepeats!=0 && ( sFmt || tmNow>=tmLastStamp+FLUSH_THRESH_TIME ) )
{
// flush if we actually have something to flush, and
// case 1: got a message we can't accumulate
// case 2: got a periodic flush and been otherwise idle for a thresh period
char sLast[256];
strncpy ( sLast, sBuf, iLen );
snprintf ( sLast+iLen, sizeof(sLast)-iLen, "last message repeated %d times", iLastRepeats );
sphLogEntry ( eLastLevel, sLast, sLast + ( sTtyBuf-sBuf ) );
tmLastStamp = tmNow;
iLastRepeats = 0;
eLastLevel = SPH_LOG_INFO;
uLastEntry = 0;
}
// was that a flush-only call?
if ( !sFmt )
return;
tmLastStamp = tmNow;
iLastRepeats = 0;
eLastLevel = eLevel;
uLastEntry = uEntry;
// do the logging
sphLogEntry ( eLevel, sBuf, sTtyBuf );
}
void sphFatal ( const char * sFmt, ... ) __attribute__ ( ( format ( printf, 1, 2 ) ) );
void sphFatal ( const char * sFmt, ... )
{
va_list ap;
va_start ( ap, sFmt );
sphLog ( SPH_LOG_FATAL, sFmt, ap );
va_end ( ap );
Shutdown ();
exit ( 1 );
}
void sphFatalLog ( const char * sFmt, ... ) __attribute__ ( ( format ( printf, 1, 2 ) ) );
void sphFatalLog ( const char * sFmt, ... )
{
va_list ap;
va_start ( ap, sFmt );
sphLog ( SPH_LOG_FATAL, sFmt, ap );
va_end ( ap );
}
#if !USE_WINDOWS
static CSphString GetNamedPipeName ( int iPid )
{
CSphString sRes;
sRes.SetSprintf ( "/tmp/searchd_%d", iPid );
return sRes;
}
#endif
void LogWarning ( const char * sWarning )
{
sphWarning ( "%s", sWarning );
}
/////////////////////////////////////////////////////////////////////////////
static int CmpString ( const CSphString & a, const CSphString & b )
{
if ( !a.cstr() && !b.cstr() )
return 0;
if ( !a.cstr() || !b.cstr() )
return a.cstr() ? -1 : 1;
return strcmp ( a.cstr(), b.cstr() );
}
struct SearchFailure_t
{
public:
CSphString m_sParentIndex;
CSphString m_sIndex; ///< searched index name
CSphString m_sError; ///< search error message
public:
SearchFailure_t () {}
public:
bool operator == ( const SearchFailure_t & r ) const
{
return m_sIndex==r.m_sIndex && m_sError==r.m_sError && m_sParentIndex==r.m_sParentIndex;
}
bool operator < ( const SearchFailure_t & r ) const
{
int iRes = CmpString ( m_sError.cstr(), r.m_sError.cstr() );
if ( !iRes )
iRes = CmpString ( m_sParentIndex.cstr (), r.m_sParentIndex.cstr () );
if ( !iRes )
iRes = CmpString ( m_sIndex.cstr(), r.m_sIndex.cstr() );
return iRes<0;
}
const SearchFailure_t & operator = ( const SearchFailure_t & r )
{
if ( this!=&r )
{
m_sParentIndex = r.m_sParentIndex;
m_sIndex = r.m_sIndex;
m_sError = r.m_sError;
}
return *this;
}
};
static void ReportIndexesName ( int iSpanStart, int iSpandEnd, const CSphVector<SearchFailure_t> & dLog, CSphStringBuilder & sOut );
class SearchFailuresLog_c
{
protected:
CSphVector<SearchFailure_t> m_dLog;
public:
void Submit ( const char * sIndex, const char * sParentIndex , const char * sError )
{
SearchFailure_t & tEntry = m_dLog.Add ();
tEntry.m_sIndex = sIndex;
tEntry.m_sError = sError;
tEntry.m_sParentIndex = sParentIndex;
}
void SubmitEx ( const char * sIndex, const char * sParentIndex, const char * sTemplate, ... ) __attribute__ ( ( format ( printf, 4, 5 ) ) )
{
SearchFailure_t & tEntry = m_dLog.Add ();
va_list ap;
va_start ( ap, sTemplate );
tEntry.m_sIndex = sIndex;
tEntry.m_sError.SetSprintfVa ( sTemplate, ap );
va_end ( ap );
tEntry.m_sParentIndex = sParentIndex;
}
public:
bool IsEmpty ()
{
return m_dLog.GetLength()==0;
}
void BuildReport ( CSphStringBuilder & sReport )
{
if ( IsEmpty() )
return;
// collapse same messages
m_dLog.Uniq ();
int iSpanStart = 0;
for ( int i=1; i<=m_dLog.GetLength(); i++ )
{
// keep scanning while error text is the same
if ( i!=m_dLog.GetLength() )
if ( m_dLog[i].m_sError==m_dLog[i-1].m_sError )
continue;
if ( iSpanStart )
sReport += ";\n";
sReport += "index ";
ReportIndexesName ( iSpanStart, i, m_dLog, sReport );
sReport += m_dLog[iSpanStart].m_sError.cstr();
// done
iSpanStart = i;
}
}
};
/////////////////////////////////////////////////////////////////////////////
// SIGNAL HANDLERS
/////////////////////////////////////////////////////////////////////////////
static bool SaveIndexes ()
{
CSphString sError;
bool bAllSaved = true;
for ( IndexHashIterator_c it ( g_pLocalIndexes ); it.Next(); )
{
const ServedIndex_c & tServed = it.Get();
if ( !tServed.m_bEnabled )
continue;
tServed.ReadLock();
if ( !tServed.m_pIndex->SaveAttributes ( sError ) )
{
sphWarning ( "index %s: attrs save failed: %s", it.GetKey().cstr(), sError.cstr() );
bAllSaved = false;
}
tServed.Unlock();
}
return bAllSaved;
}
void Shutdown ()
{
#if !USE_WINDOWS
int fdStopwait = -1;
#endif
bool bAttrsSaveOk = true;
g_bShutdown = true;
if ( !g_bDaemonAtShutdown.IsEmpty() )
{
*g_bDaemonAtShutdown.GetWritePtr() = 1;
}
#if !USE_WINDOWS
// stopwait handshake
CSphString sPipeName = GetNamedPipeName ( getpid() );
fdStopwait = ::open ( sPipeName.cstr(), O_WRONLY | O_NONBLOCK );
if ( fdStopwait>=0 )
{
DWORD uHandshakeOk = 0;
int iDummy; // to avoid gcc unused result warning
iDummy = ::write ( fdStopwait, &uHandshakeOk, sizeof(DWORD) );
iDummy++; // to avoid gcc set but not used variable warning
}
#endif
sphThreadJoin ( &g_tRotationServiceThread );
sphThreadJoin ( &g_tPingThread );
// force even long time searches to shut
sphInterruptNow();
// tell flush-rt thread to shutdown, and wait until it does
sphThreadJoin ( &g_tRtFlushThread );
// tell rotation thread to shutdown, and wait until it does
if ( g_bSeamlessRotate )
{
sphThreadJoin ( &g_tRotateThread );
}
// tell uservars flush thread to shutdown, and wait until it does
if ( !g_sSphinxqlState.IsEmpty() )
sphThreadJoin ( &g_tSphinxqlStateFlushThread );
sphThreadJoin ( &g_tOptimizeThread );
int64_t tmShutStarted = sphMicroTimer();
// stop search threads; up to shutdown_timeout seconds
while ( g_dThd.GetLength() > 0 && ( sphMicroTimer()-tmShutStarted )<g_iShutdownTimeout )
sphSleepMsec ( 50 );
if ( g_pThdPool )
{
g_pThdPool->Shutdown();
SafeDelete ( g_pThdPool );
ARRAY_FOREACH ( i, g_dTickPoolThread )
sphThreadJoin ( g_dTickPoolThread.Begin() + i );
}
CSphString sError;
// save attribute updates for all local indexes
bAttrsSaveOk = SaveIndexes();
// unlock indexes and release locks if needed
for ( IndexHashIterator_c it ( g_pLocalIndexes ); it.Next(); )
if ( it.Get().m_pIndex )
it.Get().m_pIndex->Unlock();
g_pLocalIndexes->Reset();
// unlock indexes and release locks if needed
for ( IndexHashIterator_c it ( g_pTemplateIndexes ); it.Next(); )
if ( it.Get().m_pIndex )
it.Get().m_pIndex->Unlock();
g_pTemplateIndexes->Reset();
// clear shut down of rt indexes + binlog
SafeDelete ( g_pLocalIndexes );
SafeDelete ( g_pTemplateIndexes );
sphDoneIOStats();
sphRTDone();
sphShutdownWordforms ();
sphShutdownGlobalIDFs ();
sphAotShutdown ();
sphRLPDone();
ARRAY_FOREACH ( i, g_dListeners )
if ( g_dListeners[i].m_iSock>=0 )
sphSockClose ( g_dListeners[i].m_iSock );
ClosePersistentSockets();
// remove pid
if ( !g_sPidFile.IsEmpty() )
{
::close ( g_iPidFD );
::unlink ( g_sPidFile.cstr() );
}
sphInfo ( "shutdown complete" );
SphCrashLogger_c::Done();
sphThreadDone ( g_iLogFile );
#if USE_WINDOWS
CloseHandle ( g_hPipe );
#else
if ( fdStopwait>=0 )
{
DWORD uStatus = bAttrsSaveOk;
int iDummy; // to avoid gcc unused result warning
iDummy = ::write ( fdStopwait, &uStatus, sizeof(DWORD) );
iDummy++; // to avoid gcc set but not used variable warning
::close ( fdStopwait );
}
#endif
}
#if !USE_WINDOWS
void sighup ( int )
{
g_bGotSighup = 1;
}
void sigterm ( int )
{
// tricky bit
// we can't call exit() here because malloc()/free() are not re-entrant
// we could call _exit() but let's try to die gracefully on TERM
// and let signal sender wait and send KILL as needed
g_bGotSigterm = 1;
sphInterruptNow();
}
void sigusr1 ( int )
{
g_bGotSigusr1 = 1;
}
#endif // !USE_WINDOWS
struct QueryCopyState_t
{
BYTE * m_pDst;
BYTE * m_pDstEnd;
const BYTE * m_pSrc;
const BYTE * m_pSrcEnd;
};
// crash query handler
static const int g_iQueryLineLen = 80;
static const char g_dEncodeBase64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
bool sphCopyEncodedBase64 ( QueryCopyState_t & tEnc )
{
BYTE * pDst = tEnc.m_pDst;
const BYTE * pDstBase = tEnc.m_pDst;
const BYTE * pSrc = tEnc.m_pSrc;
const BYTE * pDstEnd = tEnc.m_pDstEnd-5;
const BYTE * pSrcEnd = tEnc.m_pSrcEnd-3;
while ( pDst<=pDstEnd && pSrc<=pSrcEnd )
{
// put line delimiter at max line length
if ( ( ( pDst-pDstBase ) % g_iQueryLineLen )>( ( pDst-pDstBase+4 ) % g_iQueryLineLen ) )
*pDst++ = '\n';
// Convert to big endian
DWORD uSrc = ( pSrc[0] << 16 ) | ( pSrc[1] << 8 ) | ( pSrc[2] );
pSrc += 3;
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x00FC0000 ) >> 18 ];
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x0003F000 ) >> 12 ];
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x00000FC0 ) >> 6 ];
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x0000003F ) ];
}
// there is a tail in source data and a room for it at destination buffer
if ( pSrc<tEnc.m_pSrcEnd && ( tEnc.m_pSrcEnd-pSrc<3 ) && ( pDst<=pDstEnd-4 ) )
{
int iLeft = ( tEnc.m_pSrcEnd - pSrc ) % 3;
if ( iLeft==1 )
{
DWORD uSrc = pSrc[0]<<16;
pSrc += 1;
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x00FC0000 ) >> 18 ];
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x0003F000 ) >> 12 ];
*pDst++ = '=';
*pDst++ = '=';
} else if ( iLeft==2 )
{
DWORD uSrc = ( pSrc[0]<<16 ) | ( pSrc[1] << 8 );
pSrc += 2;
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x00FC0000 ) >> 18 ];
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x0003F000 ) >> 12 ];
*pDst++ = g_dEncodeBase64 [ ( uSrc & 0x00000FC0 ) >> 6 ];
*pDst++ = '=';
}
}
tEnc.m_pDst = pDst;
tEnc.m_pSrc = pSrc;
return ( tEnc.m_pSrc<tEnc.m_pSrcEnd );
}
static bool sphCopySphinxQL ( QueryCopyState_t & tState )
{
BYTE * pDst = tState.m_pDst;
const BYTE * pSrc = tState.m_pSrc;
BYTE * pNextLine = pDst+g_iQueryLineLen;
while ( pDst<tState.m_pDstEnd && pSrc<tState.m_pSrcEnd )
{
if ( pDst>pNextLine && pDst+1<tState.m_pDstEnd && ( sphIsSpace ( *pSrc ) || *pSrc==',' ) )
{
*pDst++ = *pSrc++;
*pDst++ = '\n';
pNextLine = pDst + g_iQueryLineLen;
} else
{
*pDst++ = *pSrc++;
}
}
tState.m_pDst = pDst;
tState.m_pSrc = pSrc;
return ( tState.m_pSrc<tState.m_pSrcEnd );
}
typedef bool CopyQuery_fn ( QueryCopyState_t & tState );
#define SPH_TIME_PID_MAX_SIZE 256
const char g_sCrashedBannerAPI[] = "\n--- crashed SphinxAPI request dump ---\n";
const char g_sCrashedBannerMySQL[] = "\n--- crashed SphinxQL request dump ---\n";
const char g_sCrashedBannerTail[] = "\n--- request dump end ---\n";
#if USE_WINDOWS
const char g_sMinidumpBanner[] = "minidump located at: ";
const char g_sMemoryStatBanner[] = "\n--- memory statistics ---\n";
#endif
static BYTE g_dCrashQueryBuff [4096];
static char g_sCrashInfo [SPH_TIME_PID_MAX_SIZE] = "[][]\n";
static int g_iCrashInfoLen = 0;
#if USE_WINDOWS
static char g_sMinidump[SPH_TIME_PID_MAX_SIZE] = "";
#endif
SphThreadKey_t SphCrashLogger_c::m_tTLS = SphThreadKey_t ();
static CrashQuery_t g_tUnhandled;
// lets invalidate pointer when this instance goes out of scope to get immediate crash
// instead of a reference to incorrect stack frame in case of some programming error
SphCrashLogger_c::~SphCrashLogger_c () { sphThreadSet ( m_tTLS, NULL ); }
void SphCrashLogger_c::Init ()
{
Verify ( sphThreadKeyCreate ( &m_tTLS ) );
}
void SphCrashLogger_c::Done ()
{
sphThreadKeyDelete ( m_tTLS );
}
#if !USE_WINDOWS
void SphCrashLogger_c::HandleCrash ( int sig )
#else
LONG WINAPI SphCrashLogger_c::HandleCrash ( EXCEPTION_POINTERS * pExc )
#endif // !USE_WINDOWS
{
if ( g_iLogFile<0 )
{
if ( g_bCoreDump )
{
CRASH_EXIT_CORE;
} else
{
CRASH_EXIT;
}
}
// log [time][pid]
sphSeek ( g_iLogFile, 0, SEEK_END );
sphWrite ( g_iLogFile, g_sCrashInfo, g_iCrashInfoLen );
// log query
CrashQuery_t tQuery = SphCrashLogger_c::GetQuery();
// request dump banner
int iBannerLen = ( tQuery.m_bMySQL ? sizeof(g_sCrashedBannerMySQL) : sizeof(g_sCrashedBannerAPI) ) - 1;
const char * pBanner = tQuery.m_bMySQL ? g_sCrashedBannerMySQL : g_sCrashedBannerAPI;
sphWrite ( g_iLogFile, pBanner, iBannerLen );
// query
if ( tQuery.m_iSize )
{
QueryCopyState_t tCopyState;
tCopyState.m_pDst = g_dCrashQueryBuff;
tCopyState.m_pDstEnd = g_dCrashQueryBuff + sizeof(g_dCrashQueryBuff);
tCopyState.m_pSrc = tQuery.m_pQuery;
tCopyState.m_pSrcEnd = tQuery.m_pQuery + tQuery.m_iSize;
CopyQuery_fn * pfnCopy = NULL;
if ( !tQuery.m_bMySQL )
{
pfnCopy = &sphCopyEncodedBase64;
// should be power of 3 to seamlessly convert to BASE64
BYTE dHeader[] = {
(BYTE)( ( tQuery.m_uCMD>>8 ) & 0xff ),
(BYTE)( tQuery.m_uCMD & 0xff ),
(BYTE)( ( tQuery.m_uVer>>8 ) & 0xff ),
(BYTE)( tQuery.m_uVer & 0xff ),
(BYTE)( ( tQuery.m_iSize>>24 ) & 0xff ),
(BYTE)( ( tQuery.m_iSize>>16 ) & 0xff ),
(BYTE)( ( tQuery.m_iSize>>8 ) & 0xff ),
(BYTE)( tQuery.m_iSize & 0xff ),
*tQuery.m_pQuery
};
QueryCopyState_t tHeaderState;
tHeaderState.m_pDst = g_dCrashQueryBuff;
tHeaderState.m_pDstEnd = g_dCrashQueryBuff + sizeof(g_dCrashQueryBuff);
tHeaderState.m_pSrc = dHeader;
tHeaderState.m_pSrcEnd = dHeader + sizeof(dHeader);
pfnCopy ( tHeaderState );
assert ( tHeaderState.m_pSrc==tHeaderState.m_pSrcEnd );
tCopyState.m_pDst = tHeaderState.m_pDst;
tCopyState.m_pSrc++;
} else
{
pfnCopy = &sphCopySphinxQL;
}
while ( pfnCopy ( tCopyState ) )
{
sphWrite ( g_iLogFile, g_dCrashQueryBuff, tCopyState.m_pDst-g_dCrashQueryBuff );
tCopyState.m_pDst = g_dCrashQueryBuff; // reset the destination buffer
}
assert ( tCopyState.m_pSrc==tCopyState.m_pSrcEnd );
int iLeft = tCopyState.m_pDst-g_dCrashQueryBuff;
if ( iLeft>0 )
{
sphWrite ( g_iLogFile, g_dCrashQueryBuff, iLeft );
}
}
// tail
sphWrite ( g_iLogFile, g_sCrashedBannerTail, sizeof(g_sCrashedBannerTail)-1 );
sphSafeInfo ( g_iLogFile, "Sphinx " SPHINX_VERSION );
#if USE_WINDOWS
// mini-dump reference
int iMiniDumpLen = snprintf ( (char *)g_dCrashQueryBuff, sizeof(g_dCrashQueryBuff),
"%s %s.%p.mdmp\n", g_sMinidumpBanner, g_sMinidump, tQuery.m_pQuery );
sphWrite ( g_iLogFile, g_dCrashQueryBuff, iMiniDumpLen );
snprintf ( (char *)g_dCrashQueryBuff, sizeof(g_dCrashQueryBuff), "%s.%p.mdmp",
g_sMinidump, tQuery.m_pQuery );
#endif
// log trace
#if !USE_WINDOWS
sphSafeInfo ( g_iLogFile, "Handling signal %d", sig );
// print message to stdout during daemon start
if ( g_bLogStdout && g_iLogFile!=STDOUT_FILENO )
sphSafeInfo ( STDOUT_FILENO, "Crash!!! Handling signal %d", sig );
sphBacktrace ( g_iLogFile, g_bSafeTrace );
#else
sphBacktrace ( pExc, (char *)g_dCrashQueryBuff );
#endif
// threads table
// FIXME? should we try to lock threads table somehow?
sphSafeInfo ( g_iLogFile, "--- %d active threads ---", g_dThd.GetLength() );
const ListNode_t * pIt = g_dThd.Begin();
int iThd = 0;
while ( pIt!=g_dThd.End() )
{
ThdDesc_t * pThd = (ThdDesc_t *)pIt;
sphSafeInfo ( g_iLogFile, "thd %d, proto %s, state %s, command %s",
iThd,
g_dProtoNames[pThd->m_eProto],
g_dThdStates[pThd->m_eThdState],
pThd->m_sCommand ? pThd->m_sCommand : "-" );
pIt = pIt->m_pNext;
iThd++;
}
// memory info
#if SPH_ALLOCS_PROFILER
sphWrite ( g_iLogFile, g_sMemoryStatBanner, sizeof ( g_sMemoryStatBanner )-1 );
sphMemStatDump ( g_iLogFile );
#endif
sphSafeInfo ( g_iLogFile, "------- CRASH DUMP END -------" );
if ( g_bCoreDump )
{
CRASH_EXIT_CORE;
} else
{
CRASH_EXIT;
}
}
void SphCrashLogger_c::SetLastQuery ( const CrashQuery_t & tQuery )
{
SphCrashLogger_c * pCrashLogger = (SphCrashLogger_c *)sphThreadGet ( m_tTLS );
assert ( pCrashLogger );
pCrashLogger->m_tQuery = tQuery;
}
void SphCrashLogger_c::SetupTimePID ()
{
char sTimeBuf[SPH_TIME_PID_MAX_SIZE];
sphFormatCurrentTime ( sTimeBuf, sizeof(sTimeBuf) );
g_iCrashInfoLen = snprintf ( g_sCrashInfo, SPH_TIME_PID_MAX_SIZE-1,
"------- FATAL: CRASH DUMP -------\n[%s] [%5d]\n", sTimeBuf, (int)getpid() );
}
void SphCrashLogger_c::SetupTLS ()
{
Verify ( sphThreadSet ( m_tTLS, this ) );
}
CrashQuery_t SphCrashLogger_c::GetQuery()
{
SphCrashLogger_c * pCrashLogger = (SphCrashLogger_c *)sphThreadGet ( m_tTLS );
// in case TLS not set \ found handler still should process crash
// FIXME!!! some service threads use raw threads instead ThreadCreate
if ( !pCrashLogger )
return g_tUnhandled;
else
return pCrashLogger->m_tQuery;
}
bool SphCrashLogger_c::ThreadCreate ( SphThread_t * pThread, void (*pCall)(void*), void * pArg, bool bDetached )
{
CallArgPair_t * pWrapperArg = new CallArgPair_t ( pCall, pArg );
bool bSuccess = sphThreadCreate ( pThread, ThreadWrapper, pWrapperArg, bDetached );
if ( !bSuccess )
delete pWrapperArg;
return bSuccess;
}
void SphCrashLogger_c::ThreadWrapper ( void * pArg )
{
CallArgPair_t * pPair = static_cast<CallArgPair_t *> ( pArg );
SphCrashLogger_c tQueryTLS;
tQueryTLS.SetupTLS();
pPair->m_pCall ( pPair->m_pArg );
delete pPair;
}
#if USE_WINDOWS
void SetSignalHandlers ( bool )
{
SphCrashLogger_c::Init();
snprintf ( g_sMinidump, SPH_TIME_PID_MAX_SIZE-1, "%s.%d", g_sPidFile.scstr(), (int)getpid() );
SetUnhandledExceptionFilter ( SphCrashLogger_c::HandleCrash );
}
#else
void SetSignalHandlers ( bool bAllowCtrlC=false )
{
SphCrashLogger_c::Init();
struct sigaction sa;
sigfillset ( &sa.sa_mask );
sa.sa_flags = SA_NOCLDSTOP;
bool bSignalsSet = false;
for ( ;; )
{
sa.sa_handler = sigterm; if ( sigaction ( SIGTERM, &sa, NULL )!=0 ) break;
if ( !bAllowCtrlC )
{
sa.sa_handler = sigterm;
if ( sigaction ( SIGINT, &sa, NULL )!=0 )
break;
}
sa.sa_handler = sighup; if ( sigaction ( SIGHUP, &sa, NULL )!=0 ) break;
sa.sa_handler = sigusr1; if ( sigaction ( SIGUSR1, &sa, NULL )!=0 ) break;
sa.sa_handler = SIG_IGN; if ( sigaction ( SIGPIPE, &sa, NULL )!=0 ) break;
sa.sa_flags |= SA_RESETHAND;
sa.sa_handler = SphCrashLogger_c::HandleCrash; if ( sigaction ( SIGSEGV, &sa, NULL )!=0 ) break;
sa.sa_handler = SphCrashLogger_c::HandleCrash; if ( sigaction ( SIGBUS, &sa, NULL )!=0 ) break;
sa.sa_handler = SphCrashLogger_c::HandleCrash; if ( sigaction ( SIGABRT, &sa, NULL )!=0 ) break;
sa.sa_handler = SphCrashLogger_c::HandleCrash; if ( sigaction ( SIGILL, &sa, NULL )!=0 ) break;
sa.sa_handler = SphCrashLogger_c::HandleCrash; if ( sigaction ( SIGFPE, &sa, NULL )!=0 ) break;
bSignalsSet = true;
break;
}
if ( !bSignalsSet )
sphFatal ( "sigaction(): %s", strerror(errno) );
}
#endif
/////////////////////////////////////////////////////////////////////////////
// NETWORK STUFF
/////////////////////////////////////////////////////////////////////////////
#if USE_WINDOWS
const int WIN32_PIPE_BUFSIZE = 32;
/// on Windows, the wrapper just prevents the warnings
#pragma warning(push) // store current warning values
#pragma warning(disable:4127) // conditional expr is const
#pragma warning(disable:4389) // signed/unsigned mismatch
void sphFDSet ( int fd, fd_set * fdset )
{
FD_SET ( fd, fdset );
}
void sphFDClr ( int fd, fd_set * fdset )
{
FD_SET ( fd, fdset );
}
#pragma warning(pop) // restore warnings
#else // !USE_WINDOWS
#define SPH_FDSET_OVERFLOW(_fd) ( (_fd)<0 || (_fd)>=(int)FD_SETSIZE )
/// on UNIX, we also check that the descript won't corrupt the stack
void sphFDSet ( int fd, fd_set * set )
{
if ( SPH_FDSET_OVERFLOW(fd) )
sphFatal ( "sphFDSet() failed fd=%d, FD_SETSIZE=%d", fd, FD_SETSIZE );
else
FD_SET ( fd, set );
}
void sphFDClr ( int fd, fd_set * set )
{
if ( SPH_FDSET_OVERFLOW( fd ) )
sphFatal ( "sphFDClr() failed fd=%d, FD_SETSIZE=%d", fd, FD_SETSIZE );
else
FD_CLR ( fd, set );
}
#endif // USE_WINDOWS
#if USE_WINDOWS
const char * sphSockError ( int iErr )
{
if ( iErr==0 )
iErr = WSAGetLastError ();
static char sBuf [ 256 ];
_snprintf ( sBuf, sizeof(sBuf), "WSA error %d", iErr );
return sBuf;
}
#else
const char * sphSockError ( int )
{
return strerror ( errno );
}
#endif
int sphSockGetErrno ()
{
#if USE_WINDOWS
return WSAGetLastError();
#else
return errno;
#endif
}
void sphSockSetErrno ( int iErr )
{
#if USE_WINDOWS
WSASetLastError ( iErr );
#else
errno = iErr;
#endif
}
int sphSockPeekErrno ()
{
int iRes = sphSockGetErrno();
sphSockSetErrno ( iRes );
return iRes;
}
/// formats IP address given in network byte order into sBuffer
/// returns the buffer
char * sphFormatIP ( char * sBuffer, int iBufferSize, DWORD uAddress )
{
const BYTE *a = (const BYTE *)&uAddress;
snprintf ( sBuffer, iBufferSize, "%u.%u.%u.%u", a[0], a[1], a[2], a[3] );
return sBuffer;
}
static const bool GETADDR_STRICT = true; ///< strict check, will die with sphFatal() on failure
DWORD sphGetAddress ( const char * sHost, bool bFatal )
{
struct hostent * pHost = gethostbyname ( sHost );
if ( pHost==NULL || pHost->h_addrtype!=AF_INET )
{
if ( bFatal )
sphFatal ( "no AF_INET address found for: %s", sHost );
else
sphLogDebugv ( "no AF_INET address found for: %s", sHost );
return 0;
}
struct in_addr ** ppAddrs = (struct in_addr **)pHost->h_addr_list;
assert ( ppAddrs[0] );
assert ( sizeof(DWORD)==pHost->h_length );
DWORD uAddr;
memcpy ( &uAddr, ppAddrs[0], sizeof(DWORD) );
if ( ppAddrs[1] )
{
char sBuf [ SPH_ADDRESS_SIZE ];
sphWarning ( "multiple addresses found for '%s', using the first one (ip=%s)",
sHost, sphFormatIP ( sBuf, sizeof(sBuf), uAddr ) );
}
return uAddr;
}
#if !USE_WINDOWS
int sphCreateUnixSocket ( const char * sPath )
{
static struct sockaddr_un uaddr;
size_t len = strlen ( sPath );
if ( len + 1 > sizeof( uaddr.sun_path ) )
sphFatal ( "UNIX socket path is too long (len=%d)", (int)len );
sphInfo ( "listening on UNIX socket %s", sPath );
memset ( &uaddr, 0, sizeof(uaddr) );
uaddr.sun_family = AF_UNIX;
memcpy ( uaddr.sun_path, sPath, len + 1 );
int iSock = socket ( AF_UNIX, SOCK_STREAM, 0 );
if ( iSock==-1 )
sphFatal ( "failed to create UNIX socket: %s", sphSockError() );
if ( unlink ( sPath )==-1 )
{
if ( errno!=ENOENT )
sphFatal ( "unlink() on UNIX socket file failed: %s", sphSockError() );
}
int iMask = umask ( 0 );
if ( bind ( iSock, (struct sockaddr *)&uaddr, sizeof(uaddr) )!=0 )
sphFatal ( "bind() on UNIX socket failed: %s", sphSockError() );
umask ( iMask );
return iSock;
}
#endif // !USE_WINDOWS
int sphCreateInetSocket ( DWORD uAddr, int iPort )
{
char sAddress[SPH_ADDRESS_SIZE];
sphFormatIP ( sAddress, SPH_ADDRESS_SIZE, uAddr );
if ( uAddr==htonl ( INADDR_ANY ) )
sphInfo ( "listening on all interfaces, port=%d", iPort );
else
sphInfo ( "listening on %s:%d", sAddress, iPort );
static struct sockaddr_in iaddr;
memset ( &iaddr, 0, sizeof(iaddr) );
iaddr.sin_family = AF_INET;
iaddr.sin_addr.s_addr = uAddr;
iaddr.sin_port = htons ( (short)iPort );
int iSock = socket ( AF_INET, SOCK_STREAM, 0 );
if ( iSock==-1 )
sphFatal ( "failed to create TCP socket: %s", sphSockError() );
int iOn = 1;
if ( setsockopt ( iSock, SOL_SOCKET, SO_REUSEADDR, (char*)&iOn, sizeof(iOn) ) )
sphWarning ( "setsockopt() failed: %s", sphSockError() );
#ifdef TCP_NODELAY
if ( setsockopt ( iSock, IPPROTO_TCP, TCP_NODELAY, (char*)&iOn, sizeof(iOn) ) )
sphWarning ( "setsockopt() failed: %s", sphSockError() );
#endif
int iTries = 12;
int iRes;
do
{
iRes = bind ( iSock, (struct sockaddr *)&iaddr, sizeof(iaddr) );
if ( iRes==0 )
break;
sphInfo ( "bind() failed on %s, retrying...", sAddress );
sphSleepMsec ( 3000 );
} while ( --iTries>0 );
if ( iRes )
sphFatal ( "bind() failed on %s: %s", sAddress, sphSockError() );
return iSock;
}
bool IsPortInRange ( int iPort )
{
return ( iPort>0 ) && ( iPort<=0xFFFF );
}
void CheckPort ( int iPort )
{
if ( !IsPortInRange(iPort) )
sphFatal ( "port %d is out of range", iPort );
}
struct ListenerDesc_t
{
ProtocolType_e m_eProto;
CSphString m_sUnix;
DWORD m_uIP;
int m_iPort;
bool m_bVIP;
};
void ProtoByName ( const CSphString & sProto, ListenerDesc_t & tDesc )
{
if ( sProto=="sphinx" ) tDesc.m_eProto = PROTO_SPHINX;
else if ( sProto=="mysql41" ) tDesc.m_eProto = PROTO_MYSQL41;
else if ( sProto=="http" ) tDesc.m_eProto = PROTO_HTTP;
else if ( sProto=="sphinx_vip" )
{
tDesc.m_eProto = PROTO_SPHINX;
tDesc.m_bVIP = true;
} else if ( sProto=="mysql41_vip" )
{
tDesc.m_eProto = PROTO_MYSQL41;
tDesc.m_bVIP = true;
} else
{
sphFatal ( "unknown listen protocol type '%s'", sProto.cstr() ? sProto.cstr() : "(NULL)" );
}
}
ListenerDesc_t ParseListener ( const char * sSpec )
{
ListenerDesc_t tRes;
tRes.m_eProto = PROTO_SPHINX;
tRes.m_sUnix = "";
tRes.m_uIP = htonl ( INADDR_ANY );
tRes.m_iPort = SPHINXAPI_PORT;
tRes.m_bVIP = false;
// split by colon
int iParts = 0;
CSphString sParts[3];
const char * sPart = sSpec;
for ( const char * p = sSpec; ; p++ )
if ( *p=='\0' || *p==':' )
{
if ( iParts==3 )
sphFatal ( "invalid listen format (too many fields)" );
sParts[iParts++].SetBinary ( sPart, p-sPart );
if ( !*p )
break; // bail out on zero
sPart = p+1;
}
assert ( iParts>=1 && iParts<=3 );
// handle UNIX socket case
// might be either name on itself (1 part), or name+protocol (2 parts)
sPart = sParts[0].cstr();
if ( sPart[0]=='/' )
{
if ( iParts>2 )
sphFatal ( "invalid listen format (too many fields)" );
if ( iParts==2 )
ProtoByName ( sParts[1], tRes );
#if USE_WINDOWS
sphFatal ( "UNIX sockets are not supported on Windows" );
#else
tRes.m_sUnix = sPart;
return tRes;
#endif
}
// check if it all starts with a valid port number
sPart = sParts[0].cstr();
int iLen = strlen(sPart);
bool bAllDigits = true;
for ( int i=0; i<iLen && bAllDigits; i++ )
if ( !isdigit ( sPart[i] ) )
bAllDigits = false;
int iPort = 0;
if ( bAllDigits && iLen<=5 )
{
iPort = atol(sPart);
CheckPort ( iPort ); // lets forbid ambiguous magic like 0:sphinx or 99999:mysql41
}
// handle TCP port case
// one part. might be either port name, or host name, or UNIX socket name
if ( iParts==1 )
{
if ( iPort )
{
// port name on itself
tRes.m_uIP = htonl ( INADDR_ANY );
tRes.m_iPort = iPort;
} else
{
// host name on itself
tRes.m_uIP = sphGetAddress ( sSpec, GETADDR_STRICT );
tRes.m_iPort = SPHINXAPI_PORT;
}
return tRes;
}
// two or three parts
if ( iPort )
{
// 1st part is a valid port number; must be port:proto
if ( iParts!=2 )
sphFatal ( "invalid listen format (expected port:proto, got extra trailing part in listen=%s)", sSpec );
tRes.m_uIP = htonl ( INADDR_ANY );
tRes.m_iPort = iPort;
ProtoByName ( sParts[1], tRes );
} else
{
// 1st part must be a host name; must be host:port[:proto]
if ( iParts==3 )
ProtoByName ( sParts[2], tRes );
tRes.m_iPort = atol ( sParts[1].cstr() );
CheckPort ( tRes.m_iPort );
tRes.m_uIP = sParts[0].IsEmpty()
? htonl ( INADDR_ANY )
: sphGetAddress ( sParts[0].cstr(), GETADDR_STRICT );
}
return tRes;
}
void AddListener ( const CSphString & sListen, bool bHttpAllowed )
{
ListenerDesc_t tDesc = ParseListener ( sListen.cstr() );
Listener_t tListener;
tListener.m_eProto = tDesc.m_eProto;
tListener.m_bTcp = true;
tListener.m_bVIP = tDesc.m_bVIP;
if ( tDesc.m_eProto==PROTO_HTTP && !bHttpAllowed )
{
sphWarning ( "thread_pool disabled, can not listen for http interface, port=%d, use workers=thread_pool", tDesc.m_iPort );
return;
}
#if !USE_WINDOWS
if ( !tDesc.m_sUnix.IsEmpty() )
{
tListener.m_iSock = sphCreateUnixSocket ( tDesc.m_sUnix.cstr() );
tListener.m_bTcp = false;
} else
#endif
tListener.m_iSock = sphCreateInetSocket ( tDesc.m_uIP, tDesc.m_iPort );
g_dListeners.Add ( tListener );
}
int sphSetSockNB ( int iSock )
{
#if USE_WINDOWS
u_long uMode = 1;
return ioctlsocket ( iSock, FIONBIO, &uMode );
#else
return fcntl ( iSock, F_SETFL, O_NONBLOCK );
#endif
}
/// wait until socket is readable or writable
int sphPoll ( int iSock, int64_t tmTimeout, bool bWrite=false )
{
// don't need any epoll/kqueue here, since we check only 1 socket
#if HAVE_POLL
struct pollfd pfd;
pfd.fd = iSock;
pfd.events = bWrite ? POLLOUT : POLLIN;
return ::poll ( &pfd, 1, int ( tmTimeout/1000 ) );
#else
fd_set fdSet;
FD_ZERO ( &fdSet );
sphFDSet ( iSock, &fdSet );
struct timeval tv;
tv.tv_sec = (int)( tmTimeout / 1000000 );
tv.tv_usec = (int)( tmTimeout % 1000000 );
return ::select ( iSock+1, bWrite ? NULL : &fdSet, bWrite ? &fdSet : NULL, NULL, &tv );
#endif
}
int sphSockRead ( int iSock, void * buf, int iLen, int iReadTimeout, bool bIntr )
{
assert ( iLen>0 );
int64_t tmMaxTimer = sphMicroTimer() + I64C(1000000)*Max ( 1, iReadTimeout ); // in microseconds
int iLeftBytes = iLen; // bytes to read left
char * pBuf = (char*) buf;
int iRes = -1, iErr = 0;
while ( iLeftBytes>0 )
{
int64_t tmMicroLeft = tmMaxTimer - sphMicroTimer();
if ( tmMicroLeft<=0 )
break; // timed out
#if USE_WINDOWS
// Windows EINTR emulation
// Ctrl-C will not interrupt select on Windows, so let's handle that manually
// forcibly limit select() to 100 ms, and check flag afterwards
if ( bIntr )
tmMicroLeft = Min ( tmMicroLeft, 100000 );
#endif
// wait until there is data
iRes = sphPoll ( iSock, tmMicroLeft );
// if there was EINTR, retry
// if any other error, bail
if ( iRes==-1 )
{
// only let SIGTERM (of all them) to interrupt, and only if explicitly allowed
iErr = sphSockGetErrno();
if ( iErr==EINTR && !( g_bGotSigterm && bIntr ))
continue;
if ( iErr==EINTR )
sphLogDebug ( "sphSockRead: select got SIGTERM, exit -1" );
sphSockSetErrno ( iErr );
return -1;
}
// if there was a timeout, report it as an error
if ( iRes==0 )
{
#if USE_WINDOWS
// Windows EINTR emulation
if ( bIntr )
{
// got that SIGTERM
if ( g_bGotSigterm )
{
sphLogDebug ( "sphSockRead: got SIGTERM emulation on Windows, exit -1" );
sphSockSetErrno ( EINTR );
return -1;
}
// timeout might not be fully over just yet, so re-loop
continue;
}
#endif
sphSockSetErrno ( ETIMEDOUT );
return -1;
}
// try to receive next chunk
iRes = sphSockRecv ( iSock, pBuf, iLeftBytes );
// if there was eof, we're done
if ( iRes==0 )
{
sphSockSetErrno ( ECONNRESET );
return -1;
}
// if there was EINTR, retry
// if any other error, bail
if ( iRes==-1 )
{
// only let SIGTERM (of all them) to interrupt, and only if explicitly allowed
iErr = sphSockGetErrno();
if ( iErr==EINTR && !( g_bGotSigterm && bIntr ))
continue;
if ( iErr==EINTR )
sphLogDebug ( "sphSockRead: select got SIGTERM, exit -1" );
sphSockSetErrno ( iErr );
return -1;
}
// update
pBuf += iRes;
iLeftBytes -= iRes;
// avoid partial buffer loss in case of signal during the 2nd (!) read
bIntr = false;
}
// if there was a timeout, report it as an error
if ( iLeftBytes!=0 )
{
sphSockSetErrno ( ETIMEDOUT );
return -1;
}
return iLen;
}
/////////////////////////////////////////////////////////////////////////////
ISphOutputBuffer::ISphOutputBuffer ()
{
m_dBuf.Reserve ( NETOUTBUF );
}
ISphOutputBuffer::ISphOutputBuffer ( CSphVector<BYTE> & dBuf )
{
m_dBuf.SwapData ( dBuf );
}
void ISphOutputBuffer::SendString ( const char * sStr )
{
int iLen = sStr ? strlen(sStr) : 0;
SendInt ( iLen );
SendBytes ( sStr, iLen );
}
int MysqlPackedLen ( int iLen )
{
if ( iLen<251 )
return 1;
if ( iLen<=0xffff )
return 3;
if ( iLen<=0xffffff )
return 4;
return 9;
}
int MysqlPackedLen ( const char * sStr )
{
int iLen = strlen(sStr);
return MysqlPackedLen ( iLen ) + iLen;
}
// encodes Mysql Length-coded binary
void * MysqlPack ( void * pBuffer, int iValue )
{
char * pOutput = (char*)pBuffer;
if ( iValue<0 )
return (void*)pOutput;
if ( iValue<251 )
{
*pOutput++ = (char)iValue;
return (void*)pOutput;
}
if ( iValue<=0xFFFF )
{
*pOutput++ = '\xFC';
*pOutput++ = (char)iValue;
*pOutput++ = (char)( iValue>>8 );
return (void*)pOutput;
}
if ( iValue<=0xFFFFFF )
{
*pOutput++ = '\xFD';
*pOutput++ = (char)iValue;
*pOutput++ = (char)( iValue>>8 );
*pOutput++ = (char)( iValue>>16 );
return (void *) pOutput;
}
*pOutput++ = '\xFE';
*pOutput++ = (char)iValue;
*pOutput++ = (char)( iValue>>8 );
*pOutput++ = (char)( iValue>>16 );
*pOutput++ = (char)( iValue>>24 );
*pOutput++ = 0;
*pOutput++ = 0;
*pOutput++ = 0;
*pOutput++ = 0;
return (void*)pOutput;
}
int MysqlUnpack ( InputBuffer_c & tReq, DWORD * pSize )
{
assert ( pSize );
int iRes = tReq.GetByte();
--*pSize;
if ( iRes < 251 )
return iRes;
if ( iRes==0xFC )
{
*pSize -=2;
return tReq.GetByte() + ((int)tReq.GetByte()<<8);
}
if ( iRes==0xFD )
{
*pSize -= 3;
return tReq.GetByte() + ((int)tReq.GetByte()<<8) + ((int)tReq.GetByte()<<16);
}
if ( iRes==0xFE )
iRes = tReq.GetByte() + ((int)tReq.GetByte()<<8) + ((int)tReq.GetByte()<<16) + ((int)tReq.GetByte()<<24);
tReq.GetByte();
tReq.GetByte();
tReq.GetByte();
tReq.GetByte();
*pSize -= 8;
return iRes;
}
void ISphOutputBuffer::SendMysqlInt ( int iVal )
{
BYTE dBuf[12];
BYTE * pBuf = (BYTE*) MysqlPack ( dBuf, iVal );
SendBytes ( dBuf, (int)( pBuf-dBuf ) );
}
void ISphOutputBuffer::SendMysqlString ( const char * sStr )
{
int iLen = strlen(sStr);
BYTE dBuf[12];
BYTE * pBuf = (BYTE*) MysqlPack ( dBuf, iLen );
SendBytes ( dBuf, (int)( pBuf-dBuf ) );
SendBytes ( sStr, iLen );
}
void ISphOutputBuffer::SendBytes ( const void * pBuf, int iLen )
{
int iOff = m_dBuf.GetLength();
m_dBuf.Resize ( iOff + iLen );
memcpy ( m_dBuf.Begin() + iOff, pBuf, iLen );
}
void ISphOutputBuffer::SendOutput ( const ISphOutputBuffer & tOut )
{
int iLen = tOut.m_dBuf.GetLength();
SendInt ( iLen );
if ( iLen )
SendBytes ( tOut.m_dBuf.Begin(), iLen );
}
NetOutputBuffer_c::NetOutputBuffer_c ( int iSock )
: m_pProfile ( NULL )
, m_iSock ( iSock )
, m_iSent ( 0 )
, m_bError ( false )
{
assert ( m_iSock>0 );
}
const char* NetOutputBuffer_c::GetErrorMsg () const
{
return m_sError.cstr ();
}
void NetOutputBuffer_c::Flush ()
{
if ( m_bError )
return;
int iLen = m_dBuf.GetLength();
if ( !iLen )
return;
if ( g_bGotSigterm )
sphLogDebug ( "SIGTERM in NetOutputBuffer::Flush" );
const char * pBuffer = (const char *)m_dBuf.Begin();
CSphScopedProfile tProf ( m_pProfile, SPH_QSTATE_NET_WRITE );
const int64_t tmMaxTimer = sphMicroTimer() + MS2SEC * g_iWriteTimeout; // in microseconds
while ( !m_bError )
{
int iRes = sphSockSend ( m_iSock, pBuffer, iLen );
if ( iRes < 0 )
{
int iErrno = sphSockGetErrno();
if ( iErrno==EINTR ) // interrupted before any data was sent; just loop
continue;
if ( iErrno!=EAGAIN && iErrno!=EWOULDBLOCK )
{
m_sError.SetSprintf ( "send() failed: %d: %s", iErrno, sphSockError ( iErrno ) );
sphWarning ( "%s", m_sError.cstr () );
m_bError = true;
break;
}
} else
{
m_iSent += iRes;
pBuffer += iRes;
iLen -= iRes;
if ( iLen==0 )
break;
}
// wait until we can write
int64_t tmMicroLeft = tmMaxTimer - sphMicroTimer();
if ( tmMicroLeft>0 )
iRes = sphPoll ( m_iSock, tmMicroLeft, true );
else
iRes = 0; // time out
switch ( iRes )
{
case 1: // ready for writing
break;
case 0: // timed out
{
m_sError.SetSprintf ( "timed out while trying to flush network buffers" );
sphWarning ( "%s", m_sError.cstr () );
m_bError = true;
break;
}
case -1: // error
{
int iErrno = sphSockGetErrno();
if ( iErrno==EINTR )
break;
m_sError.SetSprintf ( "select() failed: %d: %s", iErrno, sphSockError(iErrno) );
sphWarning ( "%s", m_sError.cstr () );
m_bError = true;
break;
}
}
}
m_dBuf.Resize ( 0 );
}
/////////////////////////////////////////////////////////////////////////////
InputBuffer_c::InputBuffer_c ( const BYTE * pBuf, int iLen )
: m_pBuf ( pBuf )
, m_pCur ( pBuf )
, m_bError ( !pBuf || iLen<0 )
, m_iLen ( iLen )
{}
CSphString InputBuffer_c::GetString ()
{
CSphString sRes;
int iLen = GetInt ();
if ( m_bError || iLen<0 || iLen>g_iMaxPacketSize || ( m_pCur+iLen > m_pBuf+m_iLen ) )
{
SetError ( true );
return sRes;
}
if ( iLen )
sRes.SetBinary ( (char*)m_pCur, iLen );
m_pCur += iLen;
return sRes;
}
CSphString InputBuffer_c::GetRawString ( int iLen )
{
CSphString sRes;
if ( m_bError || iLen<0 || iLen>g_iMaxPacketSize || ( m_pCur+iLen > m_pBuf+m_iLen ) )
{
SetError ( true );
return sRes;
}
if ( iLen )
sRes.SetBinary ( (char*)m_pCur, iLen );
m_pCur += iLen;
return sRes;
}
bool InputBuffer_c::GetString ( CSphVector<BYTE> & dBuffer )
{
int iLen = GetInt ();
if ( m_bError || iLen<0 || iLen>g_iMaxPacketSize || ( m_pCur+iLen > m_pBuf+m_iLen ) )
{
SetError ( true );
return false;
}
if ( !iLen )
return true;
int iSize = dBuffer.GetLength();
dBuffer.Resize ( iSize + iLen + 1 );
dBuffer[iSize+iLen] = '\0';
return GetBytes ( dBuffer.Begin()+iSize, iLen );
}
bool InputBuffer_c::GetBytes ( void * pBuf, int iLen )
{
assert ( pBuf );
assert ( iLen>0 && iLen<=g_iMaxPacketSize );
if ( m_bError || ( m_pCur+iLen > m_pBuf+m_iLen ) )
{
SetError ( true );
return false;
}
memcpy ( pBuf, m_pCur, iLen );
m_pCur += iLen;
return true;
}
template < typename T > bool InputBuffer_c::GetDwords ( CSphVector<T> & dBuffer, int & iGot, int iMax )
{
iGot = GetInt ();
if ( iGot<0 || iGot>iMax )
{
SetError ( true );
return false;
}
dBuffer.Resize ( iGot );
ARRAY_FOREACH ( i, dBuffer )
dBuffer[i] = GetDword ();
if ( m_bError )
dBuffer.Reset ();
return !m_bError;
}
template < typename T > bool InputBuffer_c::GetQwords ( CSphVector<T> & dBuffer, int & iGot, int iMax )
{
iGot = GetInt ();
if ( iGot<0 || iGot>iMax )
{
SetError ( true );
return false;
}
dBuffer.Resize ( iGot );
ARRAY_FOREACH ( i, dBuffer )
dBuffer[i] = GetUint64 ();
if ( m_bError )
dBuffer.Reset ();
return !m_bError;
}
/////////////////////////////////////////////////////////////////////////////
NetInputBuffer_c::NetInputBuffer_c ( int iSock )
: InputBuffer_c ( m_dMinibufer, sizeof(m_dMinibufer) )
, m_iSock ( iSock )
, m_bIntr ( false )
, m_iMaxibuffer ( 0 )
, m_pMaxibuffer ( NULL )
{}
NetInputBuffer_c::~NetInputBuffer_c ()
{
SafeDeleteArray ( m_pMaxibuffer );
}
bool NetInputBuffer_c::ReadFrom ( int iLen, int iTimeout, bool bIntr, bool bAppend )
{
assert (!( bAppend && m_pCur!=m_pBuf && m_pBuf!=m_pMaxibuffer )); // only allow appends to untouched maxi-buffers
int iCur = bAppend ? m_iLen : 0;
m_bIntr = false;
if ( iLen<=0 || iLen>g_iMaxPacketSize || m_iSock<0 )
return false;
BYTE * pBuf = m_dMinibufer + iCur;
if ( ( iCur+iLen )>NET_MINIBUFFER_SIZE )
{
if ( ( iCur+iLen )>m_iMaxibuffer )
{
if ( iCur )
{
BYTE * pNew = new BYTE [ iCur+iLen ];
memcpy ( pNew, m_pCur, iCur );
SafeDeleteArray ( m_pMaxibuffer );
m_pMaxibuffer = pNew;
m_iMaxibuffer = iCur+iLen;
} else
{
SafeDeleteArray ( m_pMaxibuffer );
m_pMaxibuffer = new BYTE [ iLen ];
m_iMaxibuffer = iLen;
}
}
pBuf = m_pMaxibuffer;
}
m_pCur = m_pBuf = pBuf;
int iGot = sphSockRead ( m_iSock, pBuf + iCur, iLen, iTimeout, bIntr );
if ( g_bGotSigterm )
{
sphLogDebug ( "NetInputBuffer_c::ReadFrom: got SIGTERM, return false" );
m_bError = true;
m_bIntr = true;
return false;
}
m_bError = ( iGot!=iLen );
m_bIntr = m_bError && ( sphSockPeekErrno()==EINTR );
m_iLen = m_bError ? 0 : iCur+iLen;
return !m_bError;
}
void SendErrorReply ( ISphOutputBuffer & tOut, const char * sTemplate, ... )
{
CSphString sError;
va_list ap;
va_start ( ap, sTemplate );
sError.SetSprintfVa ( sTemplate, ap );
va_end ( ap );
tOut.SendWord ( SEARCHD_ERROR );
tOut.SendWord ( 0 );
tOut.SendInt ( sError.Length() + 4 );
tOut.SendString ( sError.cstr() );
// send!
tOut.Flush();
// --console logging
if ( g_bOptNoDetach && g_eLogFormat!=LOG_FORMAT_SPHINXQL )
sphInfo ( "query error: %s", sError.cstr() );
}
// fix MSVC 2005 fuckup
#if USE_WINDOWS
#pragma conform(forScope,on)
#endif
/////////////////////////////////////////////////////////////////////////////
// DISTRIBUTED QUERIES
/////////////////////////////////////////////////////////////////////////////
/// distributed index
struct DistributedIndex_t : public ServedStats_c
{
CSphVector<MultiAgentDesc_t> m_dAgents; ///< remote agents
CSphVector<CSphString> m_dLocal; ///< local indexes
CSphBitvec m_dKillBreak;
int m_iAgentConnectTimeout; ///< in msec
int m_iAgentQueryTimeout; ///< in msec
bool m_bToDelete; ///< should be deleted
bool m_bDivideRemoteRanges; ///< whether we divide big range onto agents or not
HAStrategies_e m_eHaStrategy; ///< how to select the best of my agents
public:
DistributedIndex_t ()
: m_iAgentConnectTimeout ( g_iAgentConnectTimeout )
, m_iAgentQueryTimeout ( g_iAgentQueryTimeout )
, m_bToDelete ( false )
, m_bDivideRemoteRanges ( false )
, m_eHaStrategy ( HA_DEFAULT )
{}
~DistributedIndex_t()
{
// m_pHAStorage has to be freed separately.
}
DistributedIndex_t ( DistributedIndex_t && rhs );
DistributedIndex_t& operator= ( DistributedIndex_t &&rhs );
DistributedIndex_t& operator= ( const DistributedIndex_t & rhs );
void GetAllAgents ( CSphVector<AgentConn_t> * pTarget ) const;
};
DistributedIndex_t::DistributedIndex_t ( DistributedIndex_t && rhs )
: m_dAgents { std::move ( rhs.m_dAgents ) }
, m_dLocal { std::move ( rhs.m_dLocal ) }
, m_dKillBreak { std::move ( rhs.m_dKillBreak ) }
, m_iAgentConnectTimeout { rhs.m_iAgentConnectTimeout }
, m_iAgentQueryTimeout { rhs.m_iAgentQueryTimeout }
, m_bToDelete { rhs.m_bToDelete }
, m_bDivideRemoteRanges { rhs.m_bDivideRemoteRanges }
, m_eHaStrategy { rhs.m_eHaStrategy }
{
}
DistributedIndex_t& DistributedIndex_t::operator= ( DistributedIndex_t &&rhs )
{
if ( this!=&rhs )
{
m_dAgents = std::move ( rhs.m_dAgents );
m_dLocal = std::move ( rhs.m_dLocal );
m_dKillBreak = std::move ( rhs.m_dKillBreak );
m_iAgentConnectTimeout = rhs.m_iAgentConnectTimeout;
m_iAgentQueryTimeout = rhs.m_iAgentQueryTimeout;
m_bToDelete = rhs.m_bToDelete;
m_bDivideRemoteRanges = rhs.m_bDivideRemoteRanges;
m_eHaStrategy = rhs.m_eHaStrategy;
}
return *this;
}
DistributedIndex_t& DistributedIndex_t::operator= ( const DistributedIndex_t &rhs )
{
if ( this!=&rhs )
{
m_dAgents = rhs.m_dAgents;
m_dLocal = rhs.m_dLocal;
m_dKillBreak = rhs.m_dKillBreak;
m_iAgentConnectTimeout = rhs.m_iAgentConnectTimeout;
m_iAgentQueryTimeout = rhs.m_iAgentQueryTimeout;
m_bToDelete = rhs.m_bToDelete;
m_bDivideRemoteRanges = rhs.m_bDivideRemoteRanges;
m_eHaStrategy = rhs.m_eHaStrategy;
}
return *this;
}
void DistributedIndex_t::GetAllAgents ( CSphVector<AgentConn_t> * pTarget ) const
{
assert ( pTarget );
ARRAY_FOREACH ( i, m_dAgents )
ARRAY_FOREACH ( j, m_dAgents[i].GetAgents() )
{
AgentDesc_c & dAgent = pTarget->Add();
dAgent = m_dAgents[i].GetAgents()[j];
}
}
/// global distributed index definitions hash
static SmallStringHash_T < DistributedIndex_t > g_hDistIndexes;
/////////////////////////////////////////////////////////////////////////////
// SEARCH HANDLER
/////////////////////////////////////////////////////////////////////////////
struct SearchRequestBuilder_t : public IRequestBuilder_t
{
SearchRequestBuilder_t ( const CSphVector<CSphQuery> & dQueries, int iStart, int iEnd, int iDivideLimits )
: m_dQueries ( dQueries ), m_iStart ( iStart ), m_iEnd ( iEnd ), m_iDivideLimits ( iDivideLimits )
{}
virtual void BuildRequest ( AgentConn_t & tAgent, NetOutputBuffer_c & tOut ) const;
protected:
int CalcQueryLen ( const char * sIndexes, const CSphQuery & q, bool bAgentWeight ) const;
void SendQuery ( const char * sIndexes, NetOutputBuffer_c & tOut, const CSphQuery & q, bool bAgentWeight, int iWeight ) const;
protected:
const CSphVector<CSphQuery> & m_dQueries;
const int m_iStart;
const int m_iEnd;
const int m_iDivideLimits;
};
struct SearchReplyParser_t : public IReplyParser_t, public ISphNoncopyable
{
SearchReplyParser_t ( int iStart, int iEnd, CSphVector<DWORD> & dMvaStorage, CSphVector<BYTE> & dStringsStorage )
: m_iStart ( iStart )
, m_iEnd ( iEnd )
, m_dMvaStorage ( dMvaStorage )
, m_dStringsStorage ( dStringsStorage )
{}
virtual bool ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & tAgent ) const;
protected:
int m_iStart;
int m_iEnd;
CSphVector<DWORD> & m_dMvaStorage;
CSphVector<BYTE> & m_dStringsStorage;
};
/////////////////////////////////////////////////////////////////////////////
int SearchRequestBuilder_t::CalcQueryLen ( const char * sIndexes, const CSphQuery & q, bool bAgentWeight ) const
{
int iReqSize = 156 + 2*sizeof(SphDocID_t) + 4*q.m_dWeights.GetLength()
+ q.m_sSortBy.Length()
+ strlen ( sIndexes )
+ q.m_sGroupBy.Length()
+ q.m_sGroupSortBy.Length()
+ q.m_sGroupDistinct.Length()
+ q.m_sComment.Length()
+ q.m_sSelect.Length()
+ q.m_sOuterOrderBy.Length()
+ q.m_sUDRanker.Length()
+ q.m_sUDRankerOpts.Length()
+ q.m_sQueryTokenFilterLib.Length()
+ q.m_sQueryTokenFilterName.Length()
+ q.m_sQueryTokenFilterOpts.Length();
iReqSize += q.m_sRawQuery.IsEmpty()
? q.m_sQuery.Length()
: q.m_sRawQuery.Length();
if ( q.m_eRanker==SPH_RANK_EXPR || q.m_eRanker==SPH_RANK_EXPORT )
iReqSize += q.m_sRankerExpr.Length() + 4;
ARRAY_FOREACH ( j, q.m_dFilters )
{
const CSphFilterSettings & tFilter = q.m_dFilters[j];
iReqSize += 20 + tFilter.m_sAttrName.Length(); // string attr-name; int type; int exclude-flag; int equal-flag; int mva-func
switch ( tFilter.m_eType )
{
case SPH_FILTER_VALUES: iReqSize += 4 + 8*tFilter.GetNumValues (); break; // int values-count; uint64[] values
case SPH_FILTER_RANGE: iReqSize += 16; break; // uint64 min-val, max-val
case SPH_FILTER_FLOATRANGE: iReqSize += 8; break; // int/float min-val,max-val
case SPH_FILTER_USERVAR:
case SPH_FILTER_STRING: iReqSize += 4 + ( tFilter.m_dStrings.GetLength()==1 ? tFilter.m_dStrings[0].Length() : 0 ); break;
case SPH_FILTER_NULL: iReqSize += 1; break; // boolean value
case SPH_FILTER_STRING_LIST: // int values-count; string[] values
iReqSize += 4;
ARRAY_FOREACH ( iString, tFilter.m_dStrings )
iReqSize += 4 + tFilter.m_dStrings[iString].Length();
break;
}
}
if ( q.m_bGeoAnchor )
iReqSize += 16 + q.m_sGeoLatAttr.Length() + q.m_sGeoLongAttr.Length(); // string lat-attr, long-attr; float lat, long
if ( bAgentWeight )
{
iReqSize += 9; // "*" (length=1) + length itself + weight
} else
{
ARRAY_FOREACH ( i, q.m_dIndexWeights )
iReqSize += 8 + q.m_dIndexWeights[i].m_sName.Length(); // string index-name; int index-weight
}
ARRAY_FOREACH ( i, q.m_dFieldWeights )
iReqSize += 8 + q.m_dFieldWeights[i].m_sName.Length(); // string field-name; int field-weight
ARRAY_FOREACH ( i, q.m_dOverrides )
iReqSize += 12 + q.m_dOverrides[i].m_sAttr.Length() + // string attr-name; int type; int values-count
( q.m_dOverrides[i].m_eAttrType==SPH_ATTR_BIGINT ? 16 : 12 )*q.m_dOverrides[i].m_dValues.GetLength(); // ( bigint id; int/float/bigint value )[] values
if ( q.m_bHasOuter )
iReqSize += 4; // outer limit
if ( q.m_iMaxPredictedMsec>0 )
iReqSize += 4;
return iReqSize;
}
/// qflag means Query Flag
/// names are internal to searchd and may be changed for clarity
/// values are communicated over network between searchds and APIs and MUST NOT CHANGE
enum
{
QFLAG_REVERSE_SCAN = 1UL << 0,
QFLAG_SORT_KBUFFER = 1UL << 1,
QFLAG_MAX_PREDICTED_TIME = 1UL << 2,
QFLAG_SIMPLIFY = 1UL << 3,
QFLAG_PLAIN_IDF = 1UL << 4,
QFLAG_GLOBAL_IDF = 1UL << 5,
QFLAG_NORMALIZED_TF = 1UL << 6,
QFLAG_LOCAL_DF = 1UL << 7,
QFLAG_LOW_PRIORITY = 1UL << 8
};
void SearchRequestBuilder_t::SendQuery ( const char * sIndexes, NetOutputBuffer_c & tOut, const CSphQuery & q, bool bAgentWeight, int iWeight ) const
{
// starting with command version 1.27, flags go first
// reason being, i might add flags that affect *any* of the subsequent data (eg. qflag_pack_ints)
DWORD uFlags = 0;
uFlags |= QFLAG_REVERSE_SCAN * q.m_bReverseScan;
uFlags |= QFLAG_SORT_KBUFFER * q.m_bSortKbuffer;
uFlags |= QFLAG_MAX_PREDICTED_TIME * ( q.m_iMaxPredictedMsec > 0 );
uFlags |= QFLAG_SIMPLIFY * q.m_bSimplify;
uFlags |= QFLAG_PLAIN_IDF * q.m_bPlainIDF;
uFlags |= QFLAG_GLOBAL_IDF * q.m_bGlobalIDF;
uFlags |= QFLAG_NORMALIZED_TF * q.m_bNormalizedTFIDF;
uFlags |= QFLAG_LOCAL_DF * q.m_bLocalDF;
uFlags |= QFLAG_LOW_PRIORITY * q.m_bLowPriority;
tOut.SendDword ( uFlags );
// The Search Legacy
tOut.SendInt ( 0 ); // offset is 0
if ( !q.m_bHasOuter )
{
if ( m_iDivideLimits==1 )
tOut.SendInt ( q.m_iMaxMatches ); // OPTIMIZE? normally, agent limit is max_matches, even if master limit is less
else // FIXME!!! that is broken with offset + limit
tOut.SendInt ( 1 + ( ( q.m_iOffset + q.m_iLimit )/m_iDivideLimits) );
} else
{
// with outer order by, inner limit must match between agent and master
tOut.SendInt ( q.m_iLimit );
}
tOut.SendInt ( (DWORD)q.m_eMode ); // match mode
tOut.SendInt ( (DWORD)q.m_eRanker ); // ranking mode
if ( q.m_eRanker==SPH_RANK_EXPR || q.m_eRanker==SPH_RANK_EXPORT )
tOut.SendString ( q.m_sRankerExpr.cstr() );
tOut.SendInt ( q.m_eSort ); // sort mode
tOut.SendString ( q.m_sSortBy.cstr() ); // sort attr
if ( q.m_sRawQuery.IsEmpty() )
tOut.SendString ( q.m_sQuery.cstr() );
else
tOut.SendString ( q.m_sRawQuery.cstr() ); // query
tOut.SendInt ( q.m_dWeights.GetLength() );
ARRAY_FOREACH ( j, q.m_dWeights )
tOut.SendInt ( q.m_dWeights[j] ); // weights
tOut.SendString ( sIndexes ); // indexes
tOut.SendInt ( USE_64BIT ); // id range bits
tOut.SendDocid ( 0 ); // default full id range (any client range must be in filters at this stage)
tOut.SendDocid ( DOCID_MAX );
tOut.SendInt ( q.m_dFilters.GetLength() );
ARRAY_FOREACH ( j, q.m_dFilters )
{
const CSphFilterSettings & tFilter = q.m_dFilters[j];
tOut.SendString ( tFilter.m_sAttrName.cstr() );
tOut.SendInt ( tFilter.m_eType );
switch ( tFilter.m_eType )
{
case SPH_FILTER_VALUES:
tOut.SendInt ( tFilter.GetNumValues () );
for ( int k = 0; k < tFilter.GetNumValues (); k++ )
tOut.SendUint64 ( tFilter.GetValue ( k ) );
break;
case SPH_FILTER_RANGE:
tOut.SendUint64 ( tFilter.m_iMinValue );
tOut.SendUint64 ( tFilter.m_iMaxValue );
break;
case SPH_FILTER_FLOATRANGE:
tOut.SendFloat ( tFilter.m_fMinValue );
tOut.SendFloat ( tFilter.m_fMaxValue );
break;
case SPH_FILTER_USERVAR:
case SPH_FILTER_STRING:
tOut.SendString ( tFilter.m_dStrings.GetLength()==1 ? tFilter.m_dStrings[0].cstr() : NULL );
break;
case SPH_FILTER_NULL:
tOut.SendByte ( tFilter.m_bHasEqual );
break;
case SPH_FILTER_STRING_LIST:
tOut.SendInt ( tFilter.m_dStrings.GetLength() );
ARRAY_FOREACH ( iString, tFilter.m_dStrings )
tOut.SendString ( tFilter.m_dStrings[iString].cstr() );
break;
}
tOut.SendInt ( tFilter.m_bExclude );
tOut.SendInt ( tFilter.m_bHasEqual );
tOut.SendInt ( tFilter.m_eMvaFunc );
}
tOut.SendInt ( q.m_eGroupFunc );
tOut.SendString ( q.m_sGroupBy.cstr() );
if ( m_iDivideLimits==1 )
tOut.SendInt ( q.m_iMaxMatches );
else
tOut.SendInt ( 1+(q.m_iMaxMatches/m_iDivideLimits) ); // Reduce the max_matches also.
tOut.SendString ( q.m_sGroupSortBy.cstr() );
tOut.SendInt ( q.m_iCutoff );
tOut.SendInt ( q.m_iRetryCount );
tOut.SendInt ( q.m_iRetryDelay );
tOut.SendString ( q.m_sGroupDistinct.cstr() );
tOut.SendInt ( q.m_bGeoAnchor );
if ( q.m_bGeoAnchor )
{
tOut.SendString ( q.m_sGeoLatAttr.cstr() );
tOut.SendString ( q.m_sGeoLongAttr.cstr() );
tOut.SendFloat ( q.m_fGeoLatitude );
tOut.SendFloat ( q.m_fGeoLongitude );
}
if ( bAgentWeight )
{
tOut.SendInt ( 1 );
tOut.SendString ( "*" );
tOut.SendInt ( iWeight );
} else
{
tOut.SendInt ( q.m_dIndexWeights.GetLength() );
ARRAY_FOREACH ( i, q.m_dIndexWeights )
{
tOut.SendString ( q.m_dIndexWeights[i].m_sName.cstr() );
tOut.SendInt ( q.m_dIndexWeights[i].m_iValue );
}
}
tOut.SendDword ( q.m_uMaxQueryMsec );
tOut.SendInt ( q.m_dFieldWeights.GetLength() );
ARRAY_FOREACH ( i, q.m_dFieldWeights )
{
tOut.SendString ( q.m_dFieldWeights[i].m_sName.cstr() );
tOut.SendInt ( q.m_dFieldWeights[i].m_iValue );
}
tOut.SendString ( q.m_sComment.cstr() );
tOut.SendInt ( q.m_dOverrides.GetLength() );
ARRAY_FOREACH ( i, q.m_dOverrides )
{
const CSphAttrOverride & tEntry = q.m_dOverrides[i];
tOut.SendString ( tEntry.m_sAttr.cstr() );
tOut.SendDword ( tEntry.m_eAttrType );
tOut.SendInt ( tEntry.m_dValues.GetLength() );
ARRAY_FOREACH ( j, tEntry.m_dValues )
{
tOut.SendUint64 ( tEntry.m_dValues[j].m_uDocID );
switch ( tEntry.m_eAttrType )
{
case SPH_ATTR_FLOAT: tOut.SendFloat ( tEntry.m_dValues[j].m_fValue ); break;
case SPH_ATTR_BIGINT: tOut.SendUint64 ( tEntry.m_dValues[j].m_uValue ); break;
default: tOut.SendDword ( (DWORD)tEntry.m_dValues[j].m_uValue ); break;
}
}
}
tOut.SendString ( q.m_sSelect.cstr() );
if ( q.m_iMaxPredictedMsec>0 )
tOut.SendInt ( q.m_iMaxPredictedMsec );
// emulate empty sud-select for agent (client ver 1.29) as master sends fixed outer offset+limits
tOut.SendString ( NULL );
tOut.SendInt ( 0 );
tOut.SendInt ( 0 );
tOut.SendInt ( q.m_bHasOuter );
// master-agent extensions
tOut.SendDword ( q.m_eCollation ); // v.1
tOut.SendString ( q.m_sOuterOrderBy.cstr() ); // v.2
if ( q.m_bHasOuter )
tOut.SendInt ( q.m_iOuterOffset + q.m_iOuterLimit );
tOut.SendInt ( q.m_iGroupbyLimit );
tOut.SendString ( q.m_sUDRanker.cstr() );
tOut.SendString ( q.m_sUDRankerOpts.cstr() );
tOut.SendString ( q.m_sQueryTokenFilterLib.cstr() );
tOut.SendString ( q.m_sQueryTokenFilterName.cstr() );
tOut.SendString ( q.m_sQueryTokenFilterOpts.cstr() );
}
void SearchRequestBuilder_t::BuildRequest ( AgentConn_t & tAgent, NetOutputBuffer_c & tOut ) const
{
const char* sIndexes = tAgent.m_sIndexes.cstr();
bool bAgentWeigth = ( tAgent.m_iWeight!=-1 );
int iReqLen = 8; // int num-queries
for ( int i=m_iStart; i<=m_iEnd; i++ )
iReqLen += CalcQueryLen ( sIndexes, m_dQueries[i], bAgentWeigth );
tOut.SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
tOut.SendWord ( VER_COMMAND_SEARCH ); // command version
tOut.SendInt ( iReqLen ); // request body length
tOut.SendInt ( VER_MASTER );
tOut.SendInt ( m_iEnd-m_iStart+1 );
for ( int i=m_iStart; i<=m_iEnd; i++ )
SendQuery ( sIndexes, tOut, m_dQueries[i], bAgentWeigth, tAgent.m_iWeight );
}
/////////////////////////////////////////////////////////////////////////////
bool SearchReplyParser_t::ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & tAgent ) const
{
int iResults = m_iEnd-m_iStart+1;
assert ( iResults>0 );
tAgent.m_dResults.Resize ( iResults );
for ( int iRes=0; iRes<iResults; iRes++ )
tAgent.m_dResults[iRes].m_iSuccesses = 0;
for ( int iRes=0; iRes<iResults; iRes++ )
{
CSphQueryResult & tRes = tAgent.m_dResults [ iRes ];
tRes.m_sError = "";
tRes.m_sWarning = "";
// get status and message
DWORD eStatus = tReq.GetDword ();
if ( eStatus!=SEARCHD_OK )
{
CSphString sMessage = tReq.GetString ();
switch ( eStatus )
{
case SEARCHD_ERROR: tRes.m_sError = sMessage; continue;
case SEARCHD_RETRY: tRes.m_sError = sMessage; break;
case SEARCHD_WARNING: tRes.m_sWarning = sMessage; break;
default: tAgent.m_sFailure.SetSprintf ( "internal error: unknown status %d", eStatus ); break;
}
}
// get schema
CSphRsetSchema & tSchema = tRes.m_tSchema;
tSchema.Reset ();
tSchema.m_dFields.Resize ( tReq.GetInt() ); // FIXME! add a sanity check
ARRAY_FOREACH ( j, tSchema.m_dFields )
tSchema.m_dFields[j].m_sName = tReq.GetString ();
int iNumAttrs = tReq.GetInt(); // FIXME! add a sanity check
for ( int j=0; j<iNumAttrs; j++ )
{
CSphColumnInfo tCol;
tCol.m_sName = tReq.GetString ();
tCol.m_eAttrType = (ESphAttr) tReq.GetDword (); // FIXME! add a sanity check
tSchema.AddDynamicAttr ( tCol ); // all attributes received from agents are dynamic
}
// get matches
int iMatches = tReq.GetInt ();
if ( iMatches<0 )
{
tAgent.m_sFailure.SetSprintf ( "invalid match count received (count=%d)", iMatches );
return false;
}
int bAgent64 = tReq.GetInt ();
#if !USE_64BIT
if ( bAgent64 )
tAgent.m_sFailure.SetSprintf ( "id64 agent, id32 master, docids might be wrapped" );
#endif
assert ( !tRes.m_dMatches.GetLength() );
if ( iMatches )
{
tRes.m_dMatches.Resize ( iMatches );
ARRAY_FOREACH ( i, tRes.m_dMatches )
{
CSphMatch & tMatch = tRes.m_dMatches[i];
tMatch.Reset ( tSchema.GetRowSize() );
tMatch.m_uDocID = bAgent64 ? (SphDocID_t)tReq.GetUint64() : tReq.GetDword();
tMatch.m_iWeight = tReq.GetInt ();
for ( int j=0; j<tSchema.GetAttrsCount(); j++ )
{
const CSphColumnInfo & tAttr = tSchema.GetAttr(j);
if ( tAttr.m_eAttrType==SPH_ATTR_UINT32SET || tAttr.m_eAttrType==SPH_ATTR_INT64SET )
{
tMatch.SetAttr ( tAttr.m_tLocator, m_dMvaStorage.GetLength() );
int iValues = tReq.GetDword ();
m_dMvaStorage.Add ( iValues );
if ( tAttr.m_eAttrType==SPH_ATTR_UINT32SET )
{
while ( iValues-- )
m_dMvaStorage.Add ( tReq.GetDword() );
} else
{
assert ( ( iValues%2 )==0 );
for ( ; iValues; iValues -= 2 )
{
uint64_t uMva = tReq.GetUint64();
m_dMvaStorage.Add ( (DWORD)uMva );
m_dMvaStorage.Add ( (DWORD)( uMva>>32 ) );
}
}
} else if ( tAttr.m_eAttrType==SPH_ATTR_FLOAT )
{
float fRes = tReq.GetFloat();
tMatch.SetAttr ( tAttr.m_tLocator, sphF2DW(fRes) );
} else if ( tAttr.m_eAttrType==SPH_ATTR_BIGINT )
{
tMatch.SetAttr ( tAttr.m_tLocator, tReq.GetUint64() );
} else if ( tAttr.m_eAttrType==SPH_ATTR_STRING || tAttr.m_eAttrType==SPH_ATTR_JSON )
{
int iLen = tReq.GetDword();
if ( !iLen )
{
tMatch.SetAttr ( tAttr.m_tLocator, 0 );
} else
{
int iOff = m_dStringsStorage.GetLength();
tMatch.SetAttr ( tAttr.m_tLocator, iOff );
m_dStringsStorage.Resize ( iOff+4+iLen );
int iPackedLen = sphPackStrlen ( m_dStringsStorage.Begin() + iOff, iLen );
tReq.GetBytes ( m_dStringsStorage.Begin() + iOff + iPackedLen, iLen );
m_dStringsStorage.Resize ( iOff+iPackedLen+iLen );
}
} else if ( tAttr.m_eAttrType==SPH_ATTR_STRINGPTR )
{
CSphString sValue = tReq.GetString();
tMatch.SetAttr ( tAttr.m_tLocator, (SphAttr_t) sValue.Leak() );
} else if ( tAttr.m_eAttrType==SPH_ATTR_FACTORS || tAttr.m_eAttrType==SPH_ATTR_FACTORS_JSON )
{
DWORD uLength = tReq.GetDword();
BYTE * pData = new BYTE[uLength];
*(DWORD *)pData = uLength;
tReq.GetBytes ( pData+sizeof(DWORD), uLength-sizeof(DWORD) );
tMatch.SetAttr ( tAttr.m_tLocator, (SphAttr_t) pData );
} else if ( tAttr.m_eAttrType==SPH_ATTR_JSON_FIELD )
{
ESphJsonType eJson = (ESphJsonType)tReq.GetByte();
if ( eJson==JSON_EOF )
{
tMatch.SetAttr ( tAttr.m_tLocator, 0 );
} else
{
int iOff = m_dStringsStorage.GetLength();
int64_t iTypeOffset = ( ( (int64_t)iOff ) | ( ( (int64_t)eJson )<<32 ) );
tMatch.SetAttr ( tAttr.m_tLocator, iTypeOffset );
// read node length if needed
int iLen = sphJsonNodeSize ( eJson, NULL );
if ( iLen<0 )
iLen = tReq.GetDword ();
m_dStringsStorage.Resize ( iOff+iLen );
tReq.GetBytes ( m_dStringsStorage.Begin()+iOff, iLen );
}
} else
{
tMatch.SetAttr ( tAttr.m_tLocator, tReq.GetDword() );
}
}
}
}
// read totals (retrieved count, total count, query time, word count)
int iRetrieved = tReq.GetInt ();
tRes.m_iTotalMatches = (unsigned int)tReq.GetInt ();
tRes.m_iQueryTime = tReq.GetInt ();
// agents always send IO/CPU stats to master
BYTE uStatMask = tReq.GetByte();
if ( uStatMask & 1 )
{
tRes.m_tIOStats.m_iReadTime = tReq.GetUint64();
tRes.m_tIOStats.m_iReadOps = tReq.GetDword();
tRes.m_tIOStats.m_iReadBytes = tReq.GetUint64();
tRes.m_tIOStats.m_iWriteTime = tReq.GetUint64();
tRes.m_tIOStats.m_iWriteOps = tReq.GetDword();
tRes.m_tIOStats.m_iWriteBytes = tReq.GetUint64();
}
if ( uStatMask & 2 )
tRes.m_iCpuTime = tReq.GetUint64();
if ( uStatMask & 4 )
tRes.m_iPredictedTime = tReq.GetUint64();
tRes.m_iAgentFetchedDocs = tReq.GetDword();
tRes.m_iAgentFetchedHits = tReq.GetDword();
tRes.m_iAgentFetchedSkips = tReq.GetDword();
const int iWordsCount = tReq.GetInt (); // FIXME! sanity check?
if ( iRetrieved!=iMatches )
{
tAgent.m_sFailure.SetSprintf ( "expected %d retrieved documents, got %d", iMatches, iRetrieved );
return false;
}
// read per-word stats
for ( int i=0; i<iWordsCount; i++ )
{
const CSphString sWord = tReq.GetString ();
const int64_t iDocs = (unsigned int)tReq.GetInt ();
const int64_t iHits = (unsigned int)tReq.GetInt ();
tReq.GetByte(); // statistics have no expanded terms for now
tRes.AddStat ( sWord, iDocs, iHits );
}
// mark this result as ok
tRes.m_iSuccesses = 1;
}
// all seems OK (and buffer length checks are performed by caller)
return true;
}
/////////////////////////////////////////////////////////////////////////////
// returns true if incoming schema (src) is equal to existing (dst); false otherwise
bool MinimizeSchema ( CSphRsetSchema & tDst, const ISphSchema & tSrc )
{
// if dst is empty, result is also empty
if ( tDst.GetAttrsCount()==0 )
return tSrc.GetAttrsCount()==0;
// check for equality, and remove all dst attributes that are not present in src
CSphVector<CSphColumnInfo> dDst;
for ( int i=0; i<tDst.GetAttrsCount(); i++ )
dDst.Add ( tDst.GetAttr(i) );
bool bEqual = ( tDst.GetAttrsCount()==tSrc.GetAttrsCount() );
ARRAY_FOREACH ( i, dDst )
{
int iSrcIdx = tSrc.GetAttrIndex ( dDst[i].m_sName.cstr() );
// check for index mismatch
if ( iSrcIdx!=i )
bEqual = false;
// check for type/size mismatch (and fixup if needed)
if ( iSrcIdx>=0 )
{
const CSphColumnInfo & tSrcAttr = tSrc.GetAttr ( iSrcIdx );
// should seamlessly convert ( bool > float ) | ( bool > int > bigint )
ESphAttr eDst = dDst[i].m_eAttrType;
ESphAttr eSrc = tSrcAttr.m_eAttrType;
bool bSame = ( eDst==eSrc )
|| ( ( eDst==SPH_ATTR_FLOAT && eSrc==SPH_ATTR_BOOL ) || ( eDst==SPH_ATTR_BOOL && eSrc==SPH_ATTR_FLOAT ) )
|| ( ( eDst==SPH_ATTR_BOOL || eDst==SPH_ATTR_INTEGER || eDst==SPH_ATTR_BIGINT )
&& ( eSrc==SPH_ATTR_BOOL || eSrc==SPH_ATTR_INTEGER || eSrc==SPH_ATTR_BIGINT ) );
int iDstBitCount = dDst[i].m_tLocator.m_iBitCount;
int iSrcBitCount = tSrcAttr.m_tLocator.m_iBitCount;
if ( !bSame )
{
// different types? remove the attr
iSrcIdx = -1;
bEqual = false;
} else if ( iDstBitCount!=iSrcBitCount )
{
// different bit sizes? choose the max one
dDst[i].m_tLocator.m_iBitCount = Max ( iDstBitCount, iSrcBitCount );
bEqual = false;
if ( iDstBitCount<iSrcBitCount )
dDst[i].m_eAttrType = tSrcAttr.m_eAttrType;
}
if ( tSrcAttr.m_tLocator.m_iBitOffset!=dDst[i].m_tLocator.m_iBitOffset )
{
// different offsets? have to force target dynamic then, since we can't use one locator for all matches
bEqual = false;
}
if ( tSrcAttr.m_tLocator.m_bDynamic!=dDst[i].m_tLocator.m_bDynamic )
{
// different location? have to force target dynamic then
bEqual = false;
}
}
// check for presence
if ( iSrcIdx<0 )
{
dDst.Remove ( i );
i--;
}
}
if ( !bEqual )
{
CSphVector<CSphColumnInfo> dFields;
Swap ( dFields, tDst.m_dFields );
tDst.Reset();
ARRAY_FOREACH ( i, dDst )
tDst.AddDynamicAttr ( dDst[i] );
Swap ( dFields, tDst.m_dFields );
} else
{
tDst.SwapAttrs ( dDst );
}
return bEqual;
}
static void ParseIndexList ( const CSphString & sIndexes, CSphVector<CSphString> & dOut )
{
CSphString sSplit = sIndexes;
char * p = (char*)sSplit.cstr();
while ( *p )
{
// skip non-alphas
while ( *p && !isalpha ( *p ) && !isdigit ( *p ) && *p!='_' ) p++;
if ( !(*p) ) break;
// FIXME?
// We no not check that index name shouldn't start with '_'.
// That means it's de facto allowed for API queries.
// But not for SphinxQL ones.
// this is my next index name
const char * sNext = p;
while ( isalpha ( *p ) || isdigit ( *p ) || *p=='_' ) p++;
assert ( sNext!=p );
if ( *p ) *p++ = '\0'; // if it was not the end yet, we'll continue from next char
dOut.Add ( sNext );
}
}
static void CheckQuery ( const CSphQuery & tQuery, CSphString & sError )
{
#define LOC_ERROR(_msg) { sError.SetSprintf ( _msg ); return; }
#define LOC_ERROR1(_msg,_arg1) { sError.SetSprintf ( _msg, _arg1 ); return; }
#define LOC_ERROR2(_msg,_arg1,_arg2) { sError.SetSprintf ( _msg, _arg1, _arg2 ); return; }
sError = NULL;
if ( tQuery.m_eMode<0 || tQuery.m_eMode>SPH_MATCH_TOTAL )
LOC_ERROR1 ( "invalid match mode %d", tQuery.m_eMode );
if ( tQuery.m_eRanker<0 || tQuery.m_eRanker>SPH_RANK_TOTAL )
LOC_ERROR1 ( "invalid ranking mode %d", tQuery.m_eRanker );
if ( tQuery.m_iMaxMatches<1 )
LOC_ERROR ( "max_matches can not be less than one" );
if ( tQuery.m_iOffset<0 || tQuery.m_iOffset>=tQuery.m_iMaxMatches )
LOC_ERROR2 ( "offset out of bounds (offset=%d, max_matches=%d)",
tQuery.m_iOffset, tQuery.m_iMaxMatches );
if ( tQuery.m_iLimit<0 )
LOC_ERROR1 ( "limit out of bounds (limit=%d)", tQuery.m_iLimit );
if ( tQuery.m_iCutoff<0 )
LOC_ERROR1 ( "cutoff out of bounds (cutoff=%d)", tQuery.m_iCutoff );
if ( ( tQuery.m_iRetryCount!=g_iAgentRetryCount )
&& ( tQuery.m_iRetryCount<0 || tQuery.m_iRetryCount>MAX_RETRY_COUNT ) )
LOC_ERROR1 ( "retry count out of bounds (count=%d)", tQuery.m_iRetryCount );
if ( ( tQuery.m_iRetryCount!=g_iAgentRetryDelay )
&& ( tQuery.m_iRetryDelay<0 || tQuery.m_iRetryDelay>MAX_RETRY_DELAY ) )
LOC_ERROR1 ( "retry delay out of bounds (delay=%d)", tQuery.m_iRetryDelay );
if ( tQuery.m_iOffset>0 && tQuery.m_bHasOuter )
LOC_ERROR1 ( "inner offset must be 0 when using outer order by (offset=%d)", tQuery.m_iOffset );
#undef LOC_ERROR
#undef LOC_ERROR1
#undef LOC_ERROR2
}
void PrepareQueryEmulation ( CSphQuery * pQuery )
{
assert ( pQuery && pQuery->m_sRawQuery.cstr() );
// sort filters
ARRAY_FOREACH ( i, pQuery->m_dFilters )
pQuery->m_dFilters[i].m_dValues.Sort();
// sort overrides
ARRAY_FOREACH ( i, pQuery->m_dOverrides )
pQuery->m_dOverrides[i].m_dValues.Sort ();
// fixup query
pQuery->m_sQuery = pQuery->m_sRawQuery;
if ( pQuery->m_eMode==SPH_MATCH_BOOLEAN )
pQuery->m_eRanker = SPH_RANK_NONE;
if ( pQuery->m_eMode==SPH_MATCH_FULLSCAN )
pQuery->m_sQuery = "";
if ( pQuery->m_eMode!=SPH_MATCH_ALL && pQuery->m_eMode!=SPH_MATCH_ANY && pQuery->m_eMode!=SPH_MATCH_PHRASE )
return;
const char * szQuery = pQuery->m_sRawQuery.cstr ();
int iQueryLen = strlen(szQuery);
pQuery->m_sQuery.Reserve ( iQueryLen*2+8 );
char * szRes = (char*) pQuery->m_sQuery.cstr ();
char c;
if ( pQuery->m_eMode==SPH_MATCH_ANY || pQuery->m_eMode==SPH_MATCH_PHRASE )
*szRes++ = '\"';
if ( iQueryLen )
{
while ( ( c = *szQuery++ )!=0 )
{
// must be in sync with EscapeString (php api)
const char sMagics[] = "<\\()|-!@~\"&/^$=";
for ( const char * s = sMagics; *s; s++ )
if ( c==*s )
{
*szRes++ = '\\';
break;
}
*szRes++ = c;
}
}
switch ( pQuery->m_eMode )
{
case SPH_MATCH_ALL: pQuery->m_eRanker = SPH_RANK_PROXIMITY; *szRes = '\0'; break;
case SPH_MATCH_ANY: pQuery->m_eRanker = SPH_RANK_MATCHANY; strncpy ( szRes, "\"/1", 8 ); break;
case SPH_MATCH_PHRASE: pQuery->m_eRanker = SPH_RANK_PROXIMITY; *szRes++ = '\"'; *szRes = '\0'; break;
default: return;
}
if ( !pQuery->m_bHasOuter )
{
pQuery->m_sOuterOrderBy = "";
pQuery->m_iOuterOffset = 0;
pQuery->m_iOuterLimit = 0;
}
}
bool ParseSearchQuery ( InputBuffer_c & tReq, ISphOutputBuffer & tOut, CSphQuery & tQuery, int iVer, int iMasterVer )
{
// daemon-level defaults
tQuery.m_iRetryCount = g_iAgentRetryCount;
tQuery.m_iRetryDelay = g_iAgentRetryDelay;
tQuery.m_iAgentQueryTimeout = g_iAgentQueryTimeout;
// v.1.27+ flags come first
DWORD uFlags = 0;
if ( iVer>=0x11B )
uFlags = tReq.GetDword();
// v.1.0. mode, limits, weights, ID/TS ranges
tQuery.m_iOffset = tReq.GetInt ();
tQuery.m_iLimit = tReq.GetInt ();
tQuery.m_eMode = (ESphMatchMode) tReq.GetInt ();
if ( iVer>=0x110 )
{
tQuery.m_eRanker = (ESphRankMode) tReq.GetInt ();
if ( tQuery.m_eRanker==SPH_RANK_EXPR || tQuery.m_eRanker==SPH_RANK_EXPORT )
tQuery.m_sRankerExpr = tReq.GetString();
}
tQuery.m_eSort = (ESphSortOrder) tReq.GetInt ();
if ( iVer>=0x102 )
{
tQuery.m_sSortBy = tReq.GetString ();
sphColumnToLowercase ( const_cast<char *>( tQuery.m_sSortBy.cstr() ) );
}
tQuery.m_sRawQuery = tReq.GetString ();
int iGot = 0;
if ( !tReq.GetDwords ( tQuery.m_dWeights, iGot, SPH_MAX_FIELDS ) )
{
SendErrorReply ( tOut, "invalid weight count %d (should be in 0..%d range)", iGot, SPH_MAX_FIELDS );
return false;
}
tQuery.m_sIndexes = tReq.GetString ();
bool bIdrange64 = false;
if ( iVer>=0x108 )
bIdrange64 = ( tReq.GetInt()!=0 );
SphDocID_t uMinID = 0;
SphDocID_t uMaxID = DOCID_MAX;
if ( bIdrange64 )
{
uMinID = (SphDocID_t)tReq.GetUint64 ();
uMaxID = (SphDocID_t)tReq.GetUint64 ();
// FIXME? could report clamp here if I'm id32 and client passed id64 range,
// but frequently this won't affect anything at all
} else
{
uMinID = tReq.GetDword ();
uMaxID = tReq.GetDword ();
}
if ( iVer<0x108 && uMaxID==0xffffffffUL )
uMaxID = 0; // fixup older clients which send 32-bit UINT_MAX by default
if ( uMaxID==0 )
uMaxID = DOCID_MAX;
// v.1.2
if ( iVer>=0x102 )
{
int iAttrFilters = tReq.GetInt ();
if ( iAttrFilters>g_iMaxFilters )
{
SendErrorReply ( tOut, "too much attribute filters (req=%d, max=%d)", iAttrFilters, g_iMaxFilters );
return false;
}
tQuery.m_dFilters.Resize ( iAttrFilters );
ARRAY_FOREACH ( iFilter, tQuery.m_dFilters )
{
CSphFilterSettings & tFilter = tQuery.m_dFilters[iFilter];
tFilter.m_sAttrName = tReq.GetString ();
sphColumnToLowercase ( const_cast<char *>( tFilter.m_sAttrName.cstr() ) );
if ( iVer>=0x10E )
{
// v.1.14+
tFilter.m_eType = (ESphFilter) tReq.GetDword ();
switch ( tFilter.m_eType )
{
case SPH_FILTER_RANGE:
tFilter.m_iMinValue = ( iVer>=0x114 ) ? tReq.GetUint64() : tReq.GetDword ();
tFilter.m_iMaxValue = ( iVer>=0x114 ) ? tReq.GetUint64() : tReq.GetDword ();
break;
case SPH_FILTER_FLOATRANGE:
tFilter.m_fMinValue = tReq.GetFloat ();
tFilter.m_fMaxValue = tReq.GetFloat ();
break;
case SPH_FILTER_VALUES:
{
int iGot = 0;
bool bRes = ( iVer>=0x114 )
? tReq.GetQwords ( tFilter.m_dValues, iGot, g_iMaxFilterValues )
: tReq.GetDwords ( tFilter.m_dValues, iGot, g_iMaxFilterValues );
if ( !bRes )
{
SendErrorReply ( tOut, "invalid attribute '%s'(%d) set length %d (should be in 0..%d range)", tFilter.m_sAttrName.cstr(), iFilter, iGot, g_iMaxFilterValues );
return false;
}
}
break;
case SPH_FILTER_STRING:
tFilter.m_dStrings.Add ( tReq.GetString() );
break;
case SPH_FILTER_NULL:
tFilter.m_bHasEqual = tReq.GetByte()!=0;
break;
case SPH_FILTER_USERVAR:
tFilter.m_dStrings.Add ( tReq.GetString() );
break;
case SPH_FILTER_STRING_LIST:
{
int iCount = tReq.GetDword();
if ( iCount<0 || iCount>g_iMaxFilterValues )
{
SendErrorReply ( tOut, "invalid attribute '%s'(%d) set length %d (should be in 0..%d range)", tFilter.m_sAttrName.cstr(), iFilter, iCount, g_iMaxFilterValues );
return false;
}
tFilter.m_dStrings.Resize ( iCount );
ARRAY_FOREACH ( iString, tFilter.m_dStrings )
tFilter.m_dStrings[iString] = tReq.GetString();
}
break;
default:
SendErrorReply ( tOut, "unknown filter type (type-id=%d)", tFilter.m_eType );
return false;
}
} else
{
// pre-1.14
int iGot = 0;
if ( !tReq.GetDwords ( tFilter.m_dValues, iGot, g_iMaxFilterValues ) )
{
SendErrorReply ( tOut, "invalid attribute '%s'(%d) set length %d (should be in 0..%d range)", tFilter.m_sAttrName.cstr(), iFilter, iGot, g_iMaxFilterValues );
return false;
}
if ( !tFilter.m_dValues.GetLength() )
{
// 0 length means this is range, not set
tFilter.m_iMinValue = tReq.GetDword ();
tFilter.m_iMaxValue = tReq.GetDword ();
}
tFilter.m_eType = tFilter.m_dValues.GetLength() ? SPH_FILTER_VALUES : SPH_FILTER_RANGE;
}
if ( iVer>=0x106 )
tFilter.m_bExclude = !!tReq.GetDword ();
if ( iMasterVer>=5 )
tFilter.m_bHasEqual = !!tReq.GetDword();
tFilter.m_eMvaFunc = SPH_MVAFUNC_ANY;
if ( iMasterVer>=13 )
tFilter.m_eMvaFunc = (ESphMvaFunc)tReq.GetDword();
}
}
// now add id range filter
if ( uMinID!=0 || uMaxID!=DOCID_MAX )
{
CSphFilterSettings & tFilter = tQuery.m_dFilters.Add();
tFilter.m_sAttrName = "@id";
tFilter.m_eType = SPH_FILTER_RANGE;
tFilter.m_iMinValue = uMinID;
tFilter.m_iMaxValue = uMaxID;
}
// v.1.3
if ( iVer>=0x103 )
{
tQuery.m_eGroupFunc = (ESphGroupBy) tReq.GetDword ();
tQuery.m_sGroupBy = tReq.GetString ();
sphColumnToLowercase ( const_cast<char *>( tQuery.m_sGroupBy.cstr() ) );
}
// v.1.4
tQuery.m_iMaxMatches = DEFAULT_MAX_MATCHES;
if ( iVer>=0x104 )
tQuery.m_iMaxMatches = tReq.GetInt ();
// v.1.5, v.1.7
if ( iVer>=0x107 )
{
tQuery.m_sGroupSortBy = tReq.GetString ();
} else if ( iVer>=0x105 )
{
bool bSortByGroup = ( tReq.GetInt()!=0 );
if ( !bSortByGroup )
{
char sBuf[256];
switch ( tQuery.m_eSort )
{
case SPH_SORT_RELEVANCE:
tQuery.m_sGroupSortBy = "@weight desc";
break;
case SPH_SORT_ATTR_DESC:
case SPH_SORT_ATTR_ASC:
snprintf ( sBuf, sizeof(sBuf), "%s %s", tQuery.m_sSortBy.cstr(),
tQuery.m_eSort==SPH_SORT_ATTR_ASC ? "asc" : "desc" );