Permalink
Browse files

fixed manticoresearch/craigslist#12 optimize with progressive option …

…enabled merges kill-lists with wrong order; added test 341
  • Loading branch information...
tomatolog committed Dec 6, 2017
1 parent 0d7b34f commit 324291efe664f15b3e0624fa530a640027e45668
Showing with 202 additions and 68 deletions.
  1. +1 −1 src/gtests_rtstuff.cpp
  2. +1 −1 src/searchd.cpp
  3. +0 −1 src/sphinx.cpp
  4. +0 −1 src/sphinx.h
  5. +59 −60 src/sphinxrt.cpp
  6. +1 −1 src/sphinxrt.h
  7. +0 −2 src/sphinxutils.cpp
  8. +1 −1 src/testrt.cpp
  9. +18 −0 test/test_341/model.bin
  10. +121 −0 test/test_341/test.xml
@@ -31,7 +31,7 @@ void TestRTInit ()
{
CSphConfigSection tRTConfig;
sphRTInit ( tRTConfig, true );
sphRTInit ( tRTConfig, true, nullptr );
sphRTConfigure ( tRTConfig, true );
SmallStringHash_T<CSphIndex *> hIndexes;
@@ -23433,7 +23433,7 @@ int WINAPI ServiceMain ( int argc, char **argv )
// startup
///////////
sphRTInit ( hSearchd, bTestMode );
sphRTInit ( hSearchd, bTestMode, hConf("common") ? hConf["common"]("common") : nullptr );
if ( hSearchd.Exists ( "snippets_file_prefix" ) )
g_sSnippetsFilePrefix = hSearchd["snippets_file_prefix"].cstr();
@@ -186,7 +186,6 @@ static int g_iReadUnhinted = DEFAULT_READ_UNHINTED;
#endif
CSphString g_sLemmatizerBase = SHAREDIR;
bool g_bProgressiveMerge = false;
// quick hack for indexer crash reporting
// one day, these might turn into a callback or something
@@ -3618,7 +3618,6 @@ bool sphIsDataPtrAttr ( ESphAttr eAttrType );
//////////////////////////////////////////////////////////////////////////
extern CSphString g_sLemmatizerBase;
extern bool g_bProgressiveMerge;
/////////////////////////////////////////////////////////////////////////////
@@ -71,6 +71,9 @@ static RtBinlog_c * g_pRtBinlog = NULL;
/// protection from concurrent changes during binlog replay
static bool g_bRTChangesAllowed = false;
// optimize mode for disk chunks merge
static bool g_bProgressiveMerge = true;
//////////////////////////////////////////////////////////////////////////
// !COMMIT cleanup extern ref to sphinx.cpp
@@ -8666,15 +8669,15 @@ void RtIndex_t::Optimize ( volatile bool * pForceTerminate, ThrottleState_t * pT
// PROGRESSIVE MERGE
//////////////////////////////////////////////////////////////////////////
int64_t GetChunkSize ( const CSphVector<CSphIndex*> & dDiskChunks, int iIndex )
static int64_t GetChunkSize ( const CSphVector<CSphIndex*> & dDiskChunks, int iIndex )
{
CSphIndexStatus tDisk;
dDiskChunks[iIndex]->GetStatus(&tDisk);
return tDisk.m_iDiskUse;
}
int GetNextSmallestChunk ( const CSphVector<CSphIndex*> & dDiskChunks, int iIndex=-1 )
static int GetNextSmallestChunk ( const CSphVector<CSphIndex*> & dDiskChunks, int iIndex )
{
int iRes = -1;
int64_t iLastSize = INT64_MAX;
@@ -8699,9 +8702,9 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
// Applying kill-lists is where it all gets complicated (kill-lists must take the chronology into account)
// 1) On every step, select two smallest chunks, A and B (A also should be older than B).
// 2) collect all kill-lists from A to B (inclusive)
// 3) check if A+1 isn't B, merge A and A+1 kill-lists, write them to A+1
// 3) merge A and A+1 kill-lists, write them to A+1
// 4) merge A and B chunk data to A, apply all kill lists collected on step 2
// the timeline is: [older chunks], ..., A (iDst), A+1, ..., B (iSrc), ..., [younger chunks]
// the timeline is: [older chunks], ..., A, A+1, ..., B, ..., [younger chunks]
// this also needs meta v.12 (chunk list with possible skips, instead of a base chunk + length as in meta v.11)
assert ( pForceTerminate && pThrottle );
@@ -8717,6 +8720,7 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
while ( m_dDiskChunks.GetLength()>1 && !*pForceTerminate && !m_bOptimizeStop )
{
CSphVector<SphDocID_t> dKlist;
CSphVector<SphDocID_t> dMergedKlist;
// make kill-list
// initially add RAM kill-list
@@ -8725,30 +8729,27 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
Verify ( m_tChunkLock.ReadLock () );
// merge 'smallest'(pSrc) to 'smaller'(pDst) and get 'merged' that names like 'dst'+.tmp
// to get rid of keeping actual kill-list
// however 'merged' got placed at 'src' position and 'merged' renamed to 'src' name
// merge 'smallest' to 'smaller' and get 'merged' that names like 'A'+.tmp
// however 'merged' got placed at 'B' position and 'merged' renamed to 'B' name
// TODO: presort a list of chunks by size before the main loop?
int iSrc = GetNextSmallestChunk ( m_dDiskChunks );
int iDst = GetNextSmallestChunk ( m_dDiskChunks, iSrc );
int iA = GetNextSmallestChunk ( m_dDiskChunks, 0 );
int iB = GetNextSmallestChunk ( m_dDiskChunks, iA );
// in order to merge kill-lists correctly we need to make sure that iDst is the oldest one
// indexes go from oldest to newest so iDst must go before iSrc (iDst is always older than iSrc)
if ( iDst > iSrc )
Swap ( iSrc, iDst );
// in order to merge kill-lists correctly we need to make sure that A is the oldest one
// indexes go from oldest to newest so A must go before B (A is always older than B)
if ( iA > iB )
Swap ( iB, iA );
sphLogDebug ( "Progressive merge - merging %d (%d kb) with %d (%d kb)", iDst, (int)(GetChunkSize ( m_dDiskChunks, iDst )/1024), iSrc, (int)(GetChunkSize ( m_dDiskChunks, iSrc )/1024) );
sphLogDebug ( "progressive merge - merging %d (%d kb) with %d (%d kb)", iA, (int)(GetChunkSize ( m_dDiskChunks, iA )/1024), iB, (int)(GetChunkSize ( m_dDiskChunks, iB )/1024) );
const CSphIndex * pSrc = m_dDiskChunks[iSrc];
const CSphIndex * pDst = m_dDiskChunks[iDst];
const CSphIndex * pOldest = m_dDiskChunks[iA];
const CSphIndex * pOlder = m_dDiskChunks[iB];
// so we're merging chunk A (older) with the (younger) B, so A < B
// we have to merge chunk's A kill-list to chunk A+1 to maintain the integrity
// (chunks A and B are respectively iDst and iSrc)
// collect all kill-lists from A to B (inclusive)
for ( int iChunk=iDst; iChunk<=iSrc; iChunk++ )
for ( int iChunk=iA+1; iChunk<=iB; iChunk++ )
{
if ( *pForceTerminate || m_bOptimizeStop )
break;
@@ -8762,49 +8763,42 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
memcpy ( dKlist.Begin()+iOff, pIndex->GetKillList(), sizeof(SphDocID_t)*pIndex->GetKillListSize() );
}
// check if A+1 isn't B, merge A and A+1 kill-lists, write to A+1
CSphVector<SphDocID_t> dMergedKlist;
if ( iDst+1!=iSrc )
{
for ( int iChunk=iDst; iChunk<=iDst+1; iChunk++ )
{
const CSphIndex * pIndex = m_dDiskChunks[iChunk];
if ( !pIndex->GetKillListSize() )
continue;
int iOff = dMergedKlist.GetLength();
dMergedKlist.Resize ( iOff+pIndex->GetKillListSize() );
memcpy ( dMergedKlist.Begin()+iOff, pIndex->GetKillList(), sizeof(SphDocID_t)*pIndex->GetKillListSize() );
}
}
// merge klist from oldest disk chunk to next disk chunk
// that might be either A and A+1 or A and B
int iOff = pOldest->GetKillListSize();
CSphIndex * pNextChunk = m_dDiskChunks[iA+1];
dMergedKlist.Resize ( iOff + pNextChunk->GetKillListSize() );
memcpy ( dMergedKlist.Begin(), pOldest->GetKillList(), sizeof(SphDocID_t) * pOldest->GetKillListSize() );
memcpy ( dMergedKlist.Begin() + iOff, pNextChunk->GetKillList(), sizeof(SphDocID_t) * pNextChunk->GetKillListSize() );
Verify ( m_tChunkLock.Unlock() );
// for filtering have to set bounds
dKlist.Add ( 0 );
dKlist.Add ( DOCID_MAX );
dKlist.Uniq();
// got rid of duplicates at A+1 klist
dMergedKlist.Uniq();
CSphString sSrc, sDst, sRename, sMerged;
sSrc.SetSprintf ( "%s", pSrc->GetFilename() );
sDst.SetSprintf ( "%s", pDst->GetFilename() );
sRename.SetSprintf ( "%s.old", pSrc->GetFilename() );
sMerged.SetSprintf ( "%s.tmp", pDst->GetFilename() );
CSphString sOlder, sOldest, sRename, sMerged;
sOlder.SetSprintf ( "%s", pOlder->GetFilename() );
sOldest.SetSprintf ( "%s", pOldest->GetFilename() );
sRename.SetSprintf ( "%s.old", pOlder->GetFilename() );
sMerged.SetSprintf ( "%s.tmp", pOldest->GetFilename() );
// check forced exit after long operation
if ( *pForceTerminate || m_bOptimizeStop )
break;
// merge data to disk ( data is constant during that phase )
CSphIndexProgress tProgress;
bool bMerged = sphMerge ( pDst, pSrc, dKlist, sError, tProgress, pThrottle, pForceTerminate, &m_bOptimizeStop, true );
bool bMerged = sphMerge ( pOldest, pOlder, dKlist, sError, tProgress, pThrottle, pForceTerminate, &m_bOptimizeStop, true );
if ( !bMerged )
{
sphWarning ( "rt optimize: index %s: failed to merge %s to %s (error %s)",
m_sIndexName.cstr(), sSrc.cstr(), sDst.cstr(), sError.cstr() );
m_sIndexName.cstr(), sOlder.cstr(), sOldest.cstr(), sError.cstr() );
break;
}
// check forced exit after long operation
if ( *pForceTerminate || m_bOptimizeStop )
break;
@@ -8816,44 +8810,46 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
m_sIndexName.cstr(), sError.cstr() );
break;
}
// check forced exit after long operation
if ( *pForceTerminate || m_bOptimizeStop )
break;
// lets rotate indexes
// rename older disk chunk to 'old'
if ( !const_cast<CSphIndex *>( pSrc )->Rename ( sRename.cstr() ) )
if ( !const_cast<CSphIndex *>( pOlder )->Rename ( sRename.cstr() ) )
{
sphWarning ( "rt optimize: index %s: cur to old rename failed (error %s)",
m_sIndexName.cstr(), pSrc->GetLastError().cstr() );
m_sIndexName.cstr(), pOlder->GetLastError().cstr() );
break;
}
// rename merged disk chunk to 0
if ( !pMerged->Rename ( sSrc.cstr() ) )
// rename merged disk chunk to B
if ( !pMerged->Rename ( sOlder.cstr() ) )
{
sphWarning ( "rt optimize: index %s: merged to cur rename failed (error %s)",
m_sIndexName.cstr(), pMerged->GetLastError().cstr() );
if ( !const_cast<CSphIndex *>( pSrc )->Rename ( sSrc.cstr() ) )
if ( !const_cast<CSphIndex *>( pOlder )->Rename ( sOlder.cstr() ) )
{
sphWarning ( "rt optimize: index %s: old to cur rename failed (error %s)",
m_sIndexName.cstr(), pSrc->GetLastError().cstr() );
m_sIndexName.cstr(), pOlder->GetLastError().cstr() );
}
break;
}
if ( *pForceTerminate || m_bOptimizeStop ) // protection
break;
// merged replaces recent chunk
// oldest chunk got deleted
// next after oldest keeps klist from oldest
Verify ( m_tWriting.Lock() );
Verify ( m_tChunkLock.WriteLock() );
if ( dMergedKlist.GetLength() )
m_dDiskChunks[iDst + 1]->ReplaceKillList ( dMergedKlist.Begin(), dMergedKlist.GetLength() );
m_dDiskChunks[iDst] = pMerged.LeakPtr();
m_dDiskChunks.Remove ( iSrc );
m_dDiskChunks[iB] = pMerged.LeakPtr();
// move merged klist to next after oldest disk chunk
m_dDiskChunks[iA+1]->ReplaceKillList ( dMergedKlist.Begin(), dMergedKlist.GetLength() );
m_dDiskChunks.Remove ( iA );
CSphFixedVector<int> dChunkNames = GetIndexNames ( m_dDiskChunks, false );
Verify ( m_tChunkLock.Unlock() );
@@ -8863,23 +8859,23 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
if ( *pForceTerminate || m_bOptimizeStop )
{
sphWarning ( "rt optimize: index %s: forced to shutdown, remove old index files manually '%s', '%s'",
m_sIndexName.cstr(), sRename.cstr(), sDst.cstr() );
m_sIndexName.cstr(), sRename.cstr(), sOldest.cstr() );
break;
}
// exclusive reader (to make sure that disk chunks not used any more) and writer lock here
Verify ( m_tReading.WriteLock() );
Verify ( m_tWriting.Lock() );
SafeDelete ( pSrc );
SafeDelete ( pDst );
SafeDelete ( pOlder );
SafeDelete ( pOldest );
Verify ( m_tWriting.Unlock() );
Verify ( m_tReading.Unlock() );
// we might remove old index files
sphUnlinkIndex ( sRename.cstr(), true );
sphUnlinkIndex ( sDst.cstr(), true );
sphUnlinkIndex ( sOldest.cstr(), true );
// FIXEME: wipe out 'merged' index files in case of error
}
@@ -8892,7 +8888,7 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
m_sIndexName.cstr(), iChunks-m_dDiskChunks.GetLength(), iChunks, (int)(tmPass/1000000), (int)((tmPass/1000)%1000) );
} else
{
sphInfo ( "rt: index %s: optimized chunk(s) %d ( of %d ) in %d.%03d sec",
sphInfo ( "rt: index %s: optimized (progressive) chunk(s) %d ( of %d ) in %d.%03d sec",
m_sIndexName.cstr(), iChunks-m_dDiskChunks.GetLength(), iChunks, (int)(tmPass/1000000), (int)((tmPass/1000)%1000) );
}
}
@@ -10435,7 +10431,7 @@ ISphRtIndex * sphCreateIndexRT ( const CSphSchema & tSchema, const char * sIndex
return new RtIndex_t ( tSchema, sIndexName, iRamSize, sPath, bKeywordDict );
}
void sphRTInit ( const CSphConfigSection & hSearchd, bool bTestMode )
void sphRTInit ( const CSphConfigSection & hSearchd, bool bTestMode, const CSphConfigSection * pCommon )
{
MEMORY ( MEM_BINLOG );
@@ -10449,6 +10445,9 @@ void sphRTInit ( const CSphConfigSection & hSearchd, bool bTestMode )
// check binlog path before detaching from the console
g_pRtBinlog->CheckPath ( hSearchd, bTestMode );
if ( pCommon )
g_bProgressiveMerge = ( pCommon->GetInt ( "progressive_merge", 1 )!=0 );
}
@@ -83,7 +83,7 @@ class ISphRtIndex : public CSphIndex
/// initialize subsystem
class CSphConfigSection;
void sphRTInit ( const CSphConfigSection & hSearchd, bool bTestMode );
void sphRTInit ( const CSphConfigSection & hSearchd, bool bTestMode, const CSphConfigSection * pCommon );
void sphRTConfigure ( const CSphConfigSection & hSearchd, bool bTestMode );
bool sphRTSchemaConfigure ( const CSphConfigSection & hIndex, CSphSchema * pSchema, CSphString * pError );
void sphRTSetTestMode ();
@@ -2459,8 +2459,6 @@ void sphConfigureCommon ( const CSphConfig & hConf )
g_sLemmatizerBase = hCommon.GetStr ( "lemmatizer_base" );
sphConfigureRLP ( hCommon );
g_bProgressiveMerge = ( hCommon.GetInt ( "progressive_merge", 1 )!=0 );
bool bJsonStrict = false;
bool bJsonAutoconvNumbers;
bool bJsonKeynamesToLowercase = false;
@@ -211,7 +211,7 @@ int main ( int argc, char ** argv )
g_iFieldsCount = tSrcSchema.GetFieldsCount();
CSphConfigSection tRTConfig;
sphRTInit ( tRTConfig, true );
sphRTInit ( tRTConfig, true, nullptr );
sphRTConfigure ( tRTConfig, true );
SmallStringHash_T< CSphIndex * > dTemp;
BinlogFlushInfo_t tBinlogFlush;
@@ -0,0 +1,18 @@
a:1:{i:0;a:1:{i:0;a:26:{i:0;s:15:"populate commit";i:1;s:2:"OK";i:2;s:2:"OK";i:3;s:11:"
flushed 61";i:4;s:2:"OK";i:5;s:2:"OK";i:6;s:12:"
flushed 561";i:7;s:2:"OK";i:8;s:2:"OK";i:9;s:12:"
flushed 661";i:10;s:2:"OK";i:11;s:2:"OK";i:12;s:12:"
flushed 811";i:13;s:3:"
OK";i:14;s:2:"OK";i:15;s:2:"OK";i:16;s:11:"
flushed 80";i:17;s:2:"OK";i:18;s:2:"OK";i:19;s:12:"
flushed 330";i:20;s:2:"OK";i:21;s:2:"OK";i:22;s:12:"
flushed 180";i:23;s:28:"
31 | 3
32 | 3
33 | 3
3 rows";i:24;s:3:"
OK";i:25;s:28:"
31 | 3
32 | 3
33 | 3
3 rows";}}}
Oops, something went wrong.

0 comments on commit 324291e

Please sign in to comment.