Skip to content

Commit

Permalink
fixed #459 queue_max_length bad reply for SphinxQL and API at thread …
Browse files Browse the repository at this point in the history
…pool worker mode; added sleep command to debug SphinxQL statement
  • Loading branch information
tomatolog committed Sep 18, 2018
1 parent ae4b320 commit 34b0324
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 24 deletions.
59 changes: 38 additions & 21 deletions src/searchd.cpp
Expand Up @@ -16351,7 +16351,17 @@ void HandleMysqlDebug ( SqlRowBuffer_c &tOut, const SqlStmt_t &tStmt, bool bVipC
tOut.DataTuplet ( "malloc_trim", sResult.cstr () );
}
#endif
else
else if ( sCommand=="sleep" )
{
int64_t tmStart = sphMicroTimer();
sphSleepMsec ( Max ( tStmt.m_iIntParam, 1 )*1000 );
int64_t tmDelta = sphMicroTimer() - tmStart;

tOut.HeadTuplet ( "command", "result" );
CSphString sResult;
sResult.SetSprintf ( "%.3f", (float)tmDelta/1000000.0f );
tOut.DataTuplet ( "sleep", sResult.cstr () );
} else
{
// no known command; provide short help.
tOut.HeadTuplet ( "command", "meaning" );
Expand All @@ -16367,6 +16377,7 @@ void HandleMysqlDebug ( SqlRowBuffer_c &tOut, const SqlStmt_t &tStmt, bool bVipC
tOut.DataTuplet ( "malloc_stats", "perform 'malloc_stats', result in searchd.log" );
tOut.DataTuplet ( "malloc_trim", "pefrorm 'malloc_trim' call" );
#endif
tOut.DataTuplet ( "sleep Nsec", "sleep for N seconds" );
}
// done
tOut.Eof ();
Expand Down Expand Up @@ -22046,8 +22057,7 @@ NetEvent_e NetReceiveDataAPI_t::Tick ( DWORD uGotEvents, CSphVector<ISphNetActio
m_tState->m_iLeft = NetBufGetInt ( m_tState->m_dBuf.Begin() + 4 );
bool bBadCommand = ( m_eCommand>=SEARCHD_COMMAND_WRONG );
bool bBadLength = ( m_tState->m_iLeft<0 || m_tState->m_iLeft>g_iMaxPacketSize );
bool bMaxedOut = ( g_iThdQueueMax && !m_tState->m_bVIP && g_pThdPool->GetQueueLength()>=g_iThdQueueMax );
if ( bBadCommand || bBadLength || bMaxedOut )
if ( bBadCommand || bBadLength )
{
m_tState->m_bKeepSocket = false;

Expand All @@ -22057,26 +22067,11 @@ NetEvent_e NetReceiveDataAPI_t::Tick ( DWORD uGotEvents, CSphVector<ISphNetActio
// if command is insane, low level comm is broken, so we bail out
if ( bBadCommand )
sphWarning ( "ill-formed client request (command=%d, SEARCHD_COMMAND_TOTAL=%d)", m_eCommand, SEARCHD_COMMAND_TOTAL );
// warning on thread pool work maxed out
if ( bMaxedOut )
sphWarning ( "%s", g_sMaxedOutMessage );

m_tState->m_dBuf.Resize ( 0 );
ISphOutputBuffer tOut;
tOut.SwapData ( m_tState->m_dBuf );
if ( !bMaxedOut )
{
SendErrorReply ( tOut, "invalid command (code=%d, len=%d)", m_eCommand, m_tState->m_iLeft );
} else
{
int iRespLen = 4 + strlen(g_sMaxedOutMessage);

tOut.SendInt ( SPHINX_CLIENT_VERSION );
tOut.SendWord ( (WORD)SEARCHD_RETRY );
tOut.SendWord ( 0 ); // version doesn't matter
tOut.SendInt ( iRespLen );
tOut.SendString ( g_sMaxedOutMessage );
}
SendErrorReply ( tOut, "invalid command (code=%d, len=%d)", m_eCommand, m_tState->m_iLeft );

tOut.SwapData ( m_tState->m_dBuf );
NetSendData_t * pSend = new NetSendData_t ( m_tState.LeakPtr(), PROTO_SPHINX );
Expand All @@ -22095,6 +22090,8 @@ NetEvent_e NetReceiveDataAPI_t::Tick ( DWORD uGotEvents, CSphVector<ISphNetActio

case AAPI_BODY:
{
const bool bMaxedOut = ( g_iThdQueueMax && !m_tState->m_bVIP && g_pThdPool->GetQueueLength()>=g_iThdQueueMax );

if ( m_eCommand==SEARCHD_COMMAND_PING )
{
bool bGotError = false;
Expand Down Expand Up @@ -22128,6 +22125,27 @@ NetEvent_e NetReceiveDataAPI_t::Tick ( DWORD uGotEvents, CSphVector<ISphNetActio
tOut.SwapData ( m_tState->m_dBuf );
NetSendData_t * pSend = new NetSendData_t ( m_tState.LeakPtr (), PROTO_SPHINX );
dNextTick.Add ( pSend );

} else if ( bMaxedOut )
{
m_tState->m_bKeepSocket = false;
sphWarning ( "%s", g_sMaxedOutMessage );

m_tState->m_dBuf.Resize ( 0 );
ISphOutputBuffer tOut;
tOut.SwapData ( m_tState->m_dBuf );

int iRespLen = 4 + strlen(g_sMaxedOutMessage);

tOut.SendWord ( (WORD)SEARCHD_RETRY );
tOut.SendWord ( 0 ); // version doesn't matter
tOut.SendInt ( iRespLen );
tOut.SendString ( g_sMaxedOutMessage );

tOut.SwapData ( m_tState->m_dBuf );
NetSendData_t * pSend = new NetSendData_t ( m_tState.LeakPtr(), PROTO_SPHINX );
dNextTick.Add ( pSend );

} else
{
AddJobAPI ( pLoop );
Expand Down Expand Up @@ -22332,8 +22350,7 @@ NetEvent_e NetReceiveDataQL_t::Tick ( DWORD uGotEvents, CSphVector<ISphNetAction
SendMysqlErrorPacket ( tOut, m_tState->m_uPacketID, "", "unknown command with size 0", m_tState->m_iConnID, MYSQL_ERR_UNKNOWN_COM_ERROR );
} else
{
LogSphinxqlError ( "", g_sMaxedOutMessage, m_tState->m_iConnID );
tOut.SendBytes ( g_dSphinxQLMaxedOutPacket, g_iSphinxQLMaxedOutLen );
SendMysqlErrorPacket ( tOut, m_tState->m_uPacketID, "", "Too many connections", m_tState->m_iConnID, MYSQL_ERR_TOO_MANY_USER_CONNECTIONS );
}

tOut.SwapData ( dBuf );
Expand Down
3 changes: 2 additions & 1 deletion src/searchdaemon.h
Expand Up @@ -1151,7 +1151,8 @@ enum MysqlErrors_e
MYSQL_ERR_SERVER_SHUTDOWN = 1053,
MYSQL_ERR_PARSE_ERROR = 1064,
MYSQL_ERR_FIELD_SPECIFIED_TWICE = 1110,
MYSQL_ERR_NO_SUCH_TABLE = 1146
MYSQL_ERR_NO_SUCH_TABLE = 1146,
MYSQL_ERR_TOO_MANY_USER_CONNECTIONS = 1203
};

class SqlRowBuffer_c;
Expand Down
2 changes: 0 additions & 2 deletions src/searchdha.cpp
Expand Up @@ -4137,8 +4137,6 @@ class EpollEvents_c : public IterableEvents_c
// need positive timeout for communicate threads back and shutdown
m_iReady = epoll_wait ( m_iEFD, m_dReady.Begin (), m_dReady.GetLength (), timeoutMs );

sphLogDebugv ( "%d epoll wait returned %d events (timeout %d)", m_iEFD, m_iReady, timeoutMs );

if ( m_iReady<0 )
{
int iErrno = sphSockGetErrno ();
Expand Down
5 changes: 5 additions & 0 deletions src/sphinxql.y
Expand Up @@ -1824,6 +1824,11 @@ opt_par:
pParser->ToString ( pParser->m_pStmt->m_sIndex, $1 );
pParser->ToString ( pParser->m_pStmt->m_sStringParam, $2 );
}
| ident TOK_CONST_INT
{
pParser->ToString ( pParser->m_pStmt->m_sIndex, $1 );
pParser->m_pStmt->m_iIntParam = $2.m_iValue;
}
%%

#if USE_WINDOWS
Expand Down

0 comments on commit 34b0324

Please sign in to comment.