Skip to content
Permalink
Browse files
do not crash if failed to prealloc a new disk chunk
  • Loading branch information
glookka committed Jun 23, 2019
1 parent 74ffc13 commit 4cd85afaf76eef0ce9fa1cd073f3fed160279890
Showing with 114 additions and 54 deletions.
  1. +23 −5 src/searchd.cpp
  2. +10 −8 src/searchdreplication.cpp
  3. +1 −1 src/sphinxint.h
  4. +13 −8 src/sphinxpq.cpp
  5. +63 −28 src/sphinxrt.cpp
  6. +4 −4 src/sphinxrt.h
@@ -16277,8 +16277,12 @@ void HandleMysqlAttach ( SqlRowBuffer_c & tOut, const SqlStmt_t & tStmt )

auto * pRtTo = ( RtIndex_i * ) pTo->m_pIndex;

if ( !pRtTo->AttachDiskIndex ( pFrom->m_pIndex, bTruncate, sError ) )
bool bFatal = false;
if ( !pRtTo->AttachDiskIndex ( pFrom->m_pIndex, bTruncate, bFatal, sError ) )
{
if ( bFatal )
g_pLocalIndexes->Delete(sFrom);

tOut.Error ( tStmt.m_sStmt, sError.cstr () );
return;
}
@@ -16309,17 +16313,23 @@ void HandleMysqlFlushRtindex ( SqlRowBuffer_c & tOut, const SqlStmt_t & tStmt )

void HandleMysqlFlushRamchunk ( SqlRowBuffer_c & tOut, const SqlStmt_t & tStmt )
{
CSphString sError;
ServedDescRPtr_c pIndex ( GetServed ( tStmt.m_sIndex ) );

if ( !ServedDesc_t::IsMutable ( pIndex ) )
{
tOut.Error ( tStmt.m_sStmt, "FLUSH RAMCHUNK requires an existing RT index" );
return;
}

auto * pRt = (RtIndex_i*)pIndex->m_pIndex;
pRt->ForceDiskChunk();
if ( !pRt->ForceDiskChunk() )
{
CSphString sError;
sError.SetSprintf ( "index '%s': FLUSH RAMCHUNK failed; INDEX UNUSABLE (%s)", tStmt.m_sIndex.cstr(), pRt->GetLastError().cstr() );
tOut.Error ( tStmt.m_sStmt, sError.cstr () );
g_pLocalIndexes->Delete ( tStmt.m_sIndex );
return;
}

tOut.Ok();
}

@@ -17446,7 +17456,15 @@ static void HandleMysqlReconfigure ( SqlRowBuffer_c & tOut, const SqlStmt_t & tS

bool bSame = ( (const RtIndex_i *) dWLocked->m_pIndex )->IsSameSettings ( tSettings, tSetup, sError );
if ( !bSame && sError.IsEmpty() )
( (RtIndex_i *) dWLocked->m_pIndex )->Reconfigure(tSetup);
{
auto pRT = (RtIndex_i *) dWLocked->m_pIndex;
bool bOk = pRT->Reconfigure(tSetup);
if ( !bOk )
{
sError.SetSprintf ( "index '%s': reconfigure failed; INDEX UNUSABLE (%s)", tStmt.m_sIndex.cstr(), pRT->GetLastError().cstr() );
g_pLocalIndexes->Delete ( tStmt.m_sIndex );
}
}

if ( sError.IsEmpty() )
tOut.Ok();
@@ -1616,7 +1616,9 @@ bool HandleCmdReplicated ( RtAccum_t & tAcc )
sphLogDebugRpl ( "pq-commit, index '%s', uid " INT64_FMT ", queries %d, tags %s",
tCmd.m_sIndex.cstr(), ( tCmd.m_pStored ? tCmd.m_pStored->m_iQUID : int64_t(0) ),
tCmd.m_dDeleteQueries.GetLength(), tCmd.m_sDeleteTags.scstr() );
pIndex->Commit ( nullptr, &tAcc );

if ( !pIndex->Commit ( nullptr, &tAcc ) )
return false;

return true;
}
@@ -1770,7 +1772,9 @@ bool CommitMonitor_c::Commit ( CSphString& sError )
if ( !pIndex )
return false;

pIndex->Commit ( m_pDeletedCount, &m_tAcc );
if ( !pIndex->Commit ( m_pDeletedCount, &m_tAcc ) )
return false;

return true;
}

@@ -1813,10 +1817,7 @@ bool CommitMonitor_c::CommitNonEmptyCmds ( RtIndex_i* pIndex, const ReplicationC
{
assert ( pIndex );
if ( !bOnlyTruncate )
{
pIndex->Commit ( m_pDeletedCount, &m_tAcc );
return true;
}
return pIndex->Commit ( m_pDeletedCount, &m_tAcc );

if ( !pIndex->Truncate ( sError ))
return false;
@@ -1827,8 +1828,9 @@ bool CommitMonitor_c::CommitNonEmptyCmds ( RtIndex_i* pIndex, const ReplicationC
assert ( tCmd.m_tReconfigure.Ptr ());
CSphReconfigureSetup tSetup;
bool bSame = pIndex->IsSameSettings ( *tCmd.m_tReconfigure.Ptr (), tSetup, sError );
if ( !bSame && sError.IsEmpty ())
pIndex->Reconfigure ( tSetup );
if ( !bSame && sError.IsEmpty() && !pIndex->Reconfigure ( tSetup ) )
return false;

return sError.IsEmpty ();
}

@@ -47,7 +47,7 @@
inline const char * strerrorm ( int errnum )
{
if (errnum==EMFILE)
return "Too many open files (on linux see /etc/security/limits.conf, also 'ulimit -n')";
return "Too many open files (on linux see /etc/security/limits.conf, 'ulimit -n', max_open_files config option)";
return strerror (errnum);
}

@@ -78,7 +78,7 @@ class PercolateIndex_c : public PercolateIndex_i
bool bReplace, const CSphString & sTokenFilterOptions, const char ** ppStr, const VecTraits_T<int64_t> & dMvas,
CSphString & sError, CSphString & sWarning, RtAccum_t * pAccExt ) override;
bool MatchDocuments ( RtAccum_t * pAccExt, PercolateMatchResult_t &tRes ) override;
void Commit ( int * pDeleted, RtAccum_t * pAccExt ) override;
bool Commit ( int * pDeleted, RtAccum_t * pAccExt ) override;
void RollBack ( RtAccum_t * pAccExt ) override;

StoredQuery_i * AddQuery ( const PercolateQueryArgs_t & tArgs, const ISphTokenizer * pTokenizer, CSphDict * pDict, CSphString & sError )
@@ -101,11 +101,11 @@ class PercolateIndex_c : public PercolateIndex_i
bool DeleteDocument ( const DocID_t * , int , CSphString & , RtAccum_t * pAccExt ) override { RollBack ( pAccExt ); return true; }
void CheckRamFlush () override;
void ForceRamFlush ( bool bPeriodic ) override;
void ForceDiskChunk () override;
bool AttachDiskIndex ( CSphIndex * , bool, CSphString & ) override { return true; }
bool ForceDiskChunk () override;
bool AttachDiskIndex ( CSphIndex * , bool, bool &, CSphString & ) override { return true; }
void Optimize () override {}
bool IsSameSettings ( CSphReconfigureSettings & tSettings, CSphReconfigureSetup & tSetup, CSphString & sError ) const override;
void Reconfigure ( CSphReconfigureSetup & tSetup ) override REQUIRES ( !m_tLock );
bool Reconfigure ( CSphReconfigureSetup & tSetup ) override REQUIRES ( !m_tLock );
CSphIndex * GetDiskChunk ( int ) override { return NULL; } // NOLINT
int64_t GetFlushAge() const override { return 0; }

@@ -1668,13 +1668,13 @@ int PercolateIndex_c::ReplayDeleteQueries ( const char * sTags )
return iDeleted;
}

void PercolateIndex_c::Commit ( int * pDeleted, RtAccum_t * pAccExt )
bool PercolateIndex_c::Commit ( int * pDeleted, RtAccum_t * pAccExt )
{
assert ( g_bRTChangesAllowed );

RtAccum_t * pAcc = (RtAccum_t *)AcquireAccum ( m_pDict, pAccExt );
if ( !pAcc )
return;
return true;

int iDeleted = 0;
for ( ReplicationCommand_t * pCmd : pAcc->m_dCmd )
@@ -1701,6 +1701,8 @@ void PercolateIndex_c::Commit ( int * pDeleted, RtAccum_t * pAccExt )

if ( pDeleted )
*pDeleted = iDeleted;

return true;
}

struct PqMatchProcessor_t : ISphMatchProcessor, ISphNoncopyable
@@ -2343,7 +2345,7 @@ bool PercolateIndex_c::IsSameSettings ( CSphReconfigureSettings & tSettings, CSp
m_pTokenizer->GetSettingsFNV(), m_pDict->GetSettingsFNV(), m_pTokenizer->GetMaxCodepointLength(), bSameSchema, tSettings, tSetup, sError );
}

void PercolateIndex_c::Reconfigure ( CSphReconfigureSetup & tSetup )
bool PercolateIndex_c::Reconfigure ( CSphReconfigureSetup & tSetup )
{
if ( GetBinlog() )
GetBinlog()->BinlogReconfigure ( &m_iTID, m_sIndexName.cstr(), tSetup );
@@ -2378,6 +2380,8 @@ void PercolateIndex_c::Reconfigure ( CSphReconfigureSetup & tSetup )
}

PostSetup();

return true;
}

void SetPercolateThreads ( int iThreads )
@@ -2411,9 +2415,10 @@ void PercolateIndex_c::RamFlush ( bool bPeriodic )
, (int) (tmAge/1000000), (int)(tmSave/1000000), (int)((tmSave/1000)%1000) );
}

void PercolateIndex_c::ForceDiskChunk ()
bool PercolateIndex_c::ForceDiskChunk()
{
ForceRamFlush ( false );
return true;
}

void PercolateIndex_c::CheckRamFlush ()

0 comments on commit 4cd85af

Please sign in to comment.