Skip to content

Commit

Permalink
bypass check of max_packet_size for replication command via API; fixe…
Browse files Browse the repository at this point in the history
…d #3571; added last cluster error to show status of the cluster
  • Loading branch information
tomatolog committed Jun 6, 2023
1 parent 2eade1f commit 844b1ae
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 15 deletions.
7 changes: 6 additions & 1 deletion src/netreceive_api.cpp
Expand Up @@ -135,8 +135,10 @@ void ApiServe ( std::unique_ptr<AsyncNetBuffer_c> pBuf )
sphLogDebugv ( "read command %d, version %d, reply size %d", eCommand, uVer, iReplySize );


bool bCheckLen = ( eCommand!=SEARCHD_COMMAND_CLUSTERPQ );
bool bBadCommand = ( eCommand>=SEARCHD_COMMAND_WRONG );
bool bBadLength = ( iReplySize<0 || iReplySize>g_iMaxPacketSize );
// should not fail replication commands from other nodes as max_packet_size could be different between nodes
bool bBadLength = ( iReplySize<0 || ( bCheckLen && iReplySize>tIn.GetMaxPacketSize() ) );
if ( bBadCommand || bBadLength )
{
// unknown command, default response header
Expand All @@ -152,6 +154,9 @@ void ApiServe ( std::unique_ptr<AsyncNetBuffer_c> pBuf )
break;
}

if ( !bCheckLen )
tIn.SetMaxPacketSize ( tIn.GetBufferPos() + iReplySize );

if ( iReplySize && !tIn.ReadFrom ( iReplySize, true ))
{
sphWarning ( "failed to receive API body (client=%s(%d), exp=%d(%d), error='%s')",
Expand Down
7 changes: 4 additions & 3 deletions src/networking_daemon.cpp
Expand Up @@ -914,7 +914,7 @@ Proto_e AsyncNetInputBuffer_c::Probe ( bool bLight )
int iRest = 0;
if ( !HasBytes() )
{
iRest = GetRoomForTail ( g_iMaxPacketSize );
iRest = GetRoomForTail();
if ( !iRest )
return eResult; // hard limit reached
AppendData ( 0, iRest, true );
Expand Down Expand Up @@ -1028,7 +1028,7 @@ int AsyncNetInputBuffer_c::ReadAny ()
{
m_bIntr = false;

auto iRest = GetRoomForTail ( g_iMaxPacketSize );
auto iRest = GetRoomForTail();
if ( !iRest )
return 0;

Expand Down Expand Up @@ -1056,8 +1056,9 @@ ByteBlob_t AsyncNetInputBuffer_c::PopTail ( int iSize )
return { nullptr, 0 };
}

int AsyncNetInputBuffer_c::GetRoomForTail ( int iHardLimit )
int AsyncNetInputBuffer_c::GetRoomForTail()
{
int iHardLimit = GetMaxPacketSize();
if ( iHardLimit-m_iLen<=0 )
DiscardProcessed ( -1 );

Expand Down
4 changes: 2 additions & 2 deletions src/networking_daemon.h
Expand Up @@ -139,8 +139,8 @@ class AsyncNetInputBuffer_c : protected LazyVector_T<BYTE>, public InputBuffer_c
/// iSpace limits max quantity of data (limited by g_iMaxPacketSize), however m.b. ignored by compressed backends
int AppendData ( int iNeed, int iSpace, bool bIntr );

/// internal - return place available to not exceed iHardLimit. Dispose consumed data, if necesary.
int GetRoomForTail ( int iHardLimit );
/// internal - return place available to not exceed m_iMaxPacketSize. Dispose consumed data, if necesary.
int GetRoomForTail();

/// internal - discard processed data, then ensure at least iSpace is available, and return blob for it.
/// ReadFromBackend on return will process results right way, nothing will be lost or ignored.
Expand Down
2 changes: 1 addition & 1 deletion src/searchd.cpp
Expand Up @@ -15068,10 +15068,10 @@ void HandleMysqlSelectColumns ( RowBuffer_i & tOut, const SqlStmt_t & tStmt, Cli
}},
{ MYSQL_COL_LONG, "@@autocommit", [pSession] { return pSession->m_bAutoCommit ? "1" : "0"; } },
};
constexpr auto iSysvars = sizeof ( dSysvars ) / sizeof ( dSysvars[0] );

auto VarIdxByName = [&dSysvars] ( const CSphString& sName ) noexcept -> int
{
constexpr auto iSysvars = sizeof ( dSysvars ) / sizeof ( dSysvars[0] );
for ( int i = 1; i < iSysvars; ++i )
if ( sName == dSysvars[i].m_szName )
return i;
Expand Down
9 changes: 7 additions & 2 deletions src/searchdaemon.cpp
Expand Up @@ -993,9 +993,9 @@ bool InputBuffer_c::IsLessMaxPacket ( int iSize )
{
SetError( "negative data length %d", iSize );
return false;
} else if ( iSize>g_iMaxPacketSize )
} else if ( iSize>m_iMaxPacketSize )
{
SetError( "length out of bounds %d(%d)", iSize, g_iMaxPacketSize );
SetError( "length out of bounds %d(%d)", iSize, m_iMaxPacketSize );
return false;
}

Expand All @@ -1014,6 +1014,11 @@ void GenericOutputBuffer_c::ResetError()
m_sError = "";
}

void InputBuffer_c::SetMaxPacketSize( int iMaxPacketSize )
{
m_iMaxPacketSize = Max ( g_iMaxPacketSize, iMaxPacketSize );
}

/////////////////////////////////////////////////////////////////////////////
// SERVED INDEX DESCRIPTORS STUFF
/////////////////////////////////////////////////////////////////////////////
Expand Down
3 changes: 3 additions & 0 deletions src/searchdaemon.h
Expand Up @@ -533,6 +533,8 @@ class InputBuffer_c
bool GetError() const { return m_bError; }
const CSphString & GetErrorMessage() const { return m_sError; }
void ResetError();
int GetMaxPacketSize() const { return m_iMaxPacketSize; }
void SetMaxPacketSize( int iMaxPacketSize );

protected:
const BYTE * m_pBuf;
Expand All @@ -557,6 +559,7 @@ class InputBuffer_c
private:
CSphString m_sError;
bool m_bError = false;
int m_iMaxPacketSize = g_iMaxPacketSize;
};

/// simple memory request buffer
Expand Down
11 changes: 7 additions & 4 deletions src/searchdha.cpp
Expand Up @@ -2445,10 +2445,8 @@ bool AgentConn_t::ReceiveAnswer ( DWORD uRecv )

sphLogDebugA ( "%d Header (Status=%d, Version=%d, answer need %d bytes)", m_iStoreTag, uStat, uVer, iReplySize );

if ( iReplySize<0
|| iReplySize>g_iMaxPacketSize ) // FIXME! add reasonable max packet len too
return Fatal ( eWrongReplies, "invalid packet size (status=%d, len=%d, max_packet_size=%d)"
, uStat, iReplySize, g_iMaxPacketSize );
if ( iReplySize<0 || ( m_bReplyLimitSize && iReplySize>g_iMaxPacketSize ) ) // FIXME! add reasonable max packet len too
return Fatal ( eWrongReplies, "invalid packet size (status=%d, len=%d, max_packet_size=%d)", uStat, iReplySize, g_iMaxPacketSize );

// allocate buf for reply
InitReplyBuf ( iReplySize );
Expand Down Expand Up @@ -2478,6 +2476,11 @@ bool AgentConn_t::ReceiveAnswer ( DWORD uRecv )
return true;
}

void AgentConn_t::SetNoLimitReplySize()
{
m_bReplyLimitSize = false;
}

// when full blob with expected size is received...
// just a fine: parse the answer, collect results, dispose agent as one is done.
bool AgentConn_t::CommitResult ()
Expand Down
6 changes: 4 additions & 2 deletions src/searchdha.h
Expand Up @@ -550,6 +550,7 @@ struct AgentConn_t : public ISphRefcountedMT
void TimeoutCallback ();
void AbortCallback();
bool CheckOrphaned();
void SetNoLimitReplySize();

#if _WIN32
// move recv buffer to dOut, reinit mine.
Expand Down Expand Up @@ -578,10 +579,11 @@ struct AgentConn_t : public ISphRefcountedMT

// receiving buffer stuff
CSphFixedVector<BYTE> m_dReplyBuf { 0 };
int m_iReplySize = -1; ///< how many reply bytes are there
int m_iReplySize = -1; ///< how many reply bytes are there
static const size_t REPLY_HEADER_SIZE = 12;
CSphFixedVector<BYTE> m_dReplyHeader { REPLY_HEADER_SIZE };
BYTE * m_pReplyCur = nullptr;
BYTE * m_pReplyCur = nullptr;
bool m_bReplyLimitSize = true;

// sending buffer stuff
SmartOutputBuffer_t m_tOutput; ///< chain of blobs we're sending to a host
Expand Down
4 changes: 4 additions & 0 deletions src/searchdreplication.cpp
Expand Up @@ -1365,6 +1365,9 @@ static void ReplicateClusterStats ( ReplicationCluster_t * pCluster, VectorLike

dOut.Add( tBuf.cstr() );
}
// show last cluster error if any
if ( !pCluster->m_sError.IsEmpty() && dOut.MatchAddf ( "cluster_%s_last_error", sName ) )
dOut.Add ( pCluster->m_sError.cstr() );

// cluster status
// trick
Expand Down Expand Up @@ -3229,6 +3232,7 @@ static AgentConn_t * CreateAgent ( const AgentDesc_t & tDesc, const PQRemoteData
{
AgentConn_t * pAgent = new AgentConn_t;
pAgent->m_tDesc.CloneFrom ( tDesc );
pAgent->SetNoLimitReplySize();

HostDesc_t tHost;
pAgent->m_tDesc.m_pDash = new HostDashboard_t ( tHost );
Expand Down

0 comments on commit 844b1ae

Please sign in to comment.