Skip to content

Commit

Permalink
fixed #160 github#29 RT issue; crash on searching ram segments; deadl…
Browse files Browse the repository at this point in the history
…ock on save disk chunk with double buffer; deadlock on save disk chunk during optimize
  • Loading branch information
tomatolog committed Jan 29, 2018
1 parent f9921a9 commit 72dcf66
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/sphinx.h
Expand Up @@ -3414,7 +3414,7 @@ class CSphIndex : public ISphKeywordsStat
virtual void SetDebugCheck () {}

/// getter for name
const char * GetName () { return m_sIndexName.cstr(); }
const char * GetName () const { return m_sIndexName.cstr(); }

void SetName ( const char * sName ) { m_sIndexName = sName; }

Expand Down
58 changes: 43 additions & 15 deletions src/sphinxrt.cpp
Expand Up @@ -1228,7 +1228,7 @@ struct RtIndex_t : public ISphRtIndex, public ISphNoncopyable, public ISphWordli
void SaveMeta ( int64_t iTID, const CSphFixedVector<int> & dChunkNames );
void SaveDiskHeader ( const char * sFilename, SphDocID_t iMinDocID, int iCheckpoints, SphOffset_t iCheckpointsPosition, DWORD iInfixBlocksOffset, int iInfixCheckpointWordsSize, DWORD uKillListSize, uint64_t uMinMaxSize, const ChunkStats_t & tStats ) const;
void SaveDiskDataImpl ( const char * sFilename, const SphChunkGuard_t & tGuard, const ChunkStats_t & tStats ) const;
void SaveDiskChunk ( int64_t iTID, const SphChunkGuard_t & tGuard, const ChunkStats_t & tStats );
void SaveDiskChunk ( int64_t iTID, const SphChunkGuard_t & tGuard, const ChunkStats_t & tStats, bool bMoveRetired );
CSphIndex * LoadDiskChunk ( const char * sChunk, CSphString & sError ) const;
bool LoadRamChunk ( DWORD uVersion, bool bRebuildInfixes );
bool SaveRamChunk ();
Expand Down Expand Up @@ -3296,11 +3296,22 @@ void RtIndex_t::CommitReplayable ( RtSegment_t * pNewSeg, CSphVector<SphDocID_t>
m_dDiskChunkKlist.Resize ( 0 );
m_tKlist.Flush ( m_dDiskChunkKlist );

// need release m_tReading lock to prevent deadlock - commit vs SaveDiskChunk
// chunks will keep till this scope and
// will be freed after this scope on next commit at FreeRetired under writer lock
ARRAY_FOREACH ( i, tGuard.m_dRamChunks )
m_dRetired.Add ( tGuard.m_dRamChunks[i] );
tGuard.m_pReading->Unlock();
tGuard.m_pReading = nullptr;

Verify ( m_tWriting.Unlock() );

SaveDiskChunk ( iTID, tGuard, tStat2Dump );
SaveDiskChunk ( iTID, tGuard, tStat2Dump, false );
g_pBinlog->NotifyIndexFlush ( m_sIndexName.cstr(), iTID, false );
}

// TODO - try to call FreeRetired after take writer lock again
// to free more memory
}


Expand Down Expand Up @@ -3389,8 +3400,8 @@ void RtIndex_t::ForceDiskChunk () NO_THREAD_SAFETY_ANALYSIS
m_tKlist.Flush ( m_dDiskChunkKlist );
Verify ( m_tWriting.Unlock() );

ChunkStats_t s ( m_tStats, m_dFieldLensRam );
SaveDiskChunk ( m_iTID, tGuard, s );
ChunkStats_t tStats ( m_tStats, m_dFieldLensRam );
SaveDiskChunk ( m_iTID, tGuard, tStats, true );
}


Expand Down Expand Up @@ -4041,7 +4052,7 @@ void RtIndex_t::SaveMeta ( int64_t iTID, const CSphFixedVector<int> & dChunkName
}


void RtIndex_t::SaveDiskChunk ( int64_t iTID, const SphChunkGuard_t & tGuard, const ChunkStats_t & tStats )
void RtIndex_t::SaveDiskChunk ( int64_t iTID, const SphChunkGuard_t & tGuard, const ChunkStats_t & tStats, bool bMoveRetired )
{
if ( !tGuard.m_dRamChunks.GetLength() )
return;
Expand Down Expand Up @@ -4094,8 +4105,11 @@ void RtIndex_t::SaveDiskChunk ( int64_t iTID, const SphChunkGuard_t & tGuard, co

Verify ( m_tChunkLock.Unlock() );

ARRAY_FOREACH ( i, tGuard.m_dRamChunks )
m_dRetired.Add ( tGuard.m_dRamChunks[i] );
if ( bMoveRetired )
{
ARRAY_FOREACH ( i, tGuard.m_dRamChunks )
m_dRetired.Add ( tGuard.m_dRamChunks[i] );
}

// abandon .ram file
CSphString sChunk;
Expand Down Expand Up @@ -7036,8 +7050,8 @@ bool RtIndex_t::MultiQuery ( const CSphQuery * pQuery, CSphQueryResult * pResult
{
*pDst = *pSrc1++;
// handle duplicates
while ( *pDst==*pSrc1 ) pSrc1++;
while ( *pDst==*pSrc2 ) pSrc2++;
while ( pSrc1!=pEnd1 && *pDst==*pSrc1 ) pSrc1++;
while ( pSrc2!=pEnd2 && *pDst==*pSrc2 ) pSrc2++;
}
pDst++;
}
Expand Down Expand Up @@ -8325,8 +8339,8 @@ bool RtIndex_t::AttachDiskIndex ( CSphIndex * pIndex, CSphString & sError )
SphChunkGuard_t tGuard;
GetReaderChunks ( tGuard );

ChunkStats_t s ( m_tStats, m_dFieldLensRam );
SaveDiskChunk ( m_iTID, tGuard, s );
ChunkStats_t tStats ( m_tStats, m_dFieldLensRam );
SaveDiskChunk ( m_iTID, tGuard, tStats, true );

int64_t iKeep = 0;

Expand Down Expand Up @@ -8617,6 +8631,8 @@ void RtIndex_t::Optimize ( volatile bool * pForceTerminate, ThrottleState_t * pT
Verify ( m_tWriting.Lock() );
Verify ( m_tChunkLock.WriteLock() );

sphLogDebug ( "optimized 0=%s, 1=%s, new=%s", m_dDiskChunks[0]->GetName(), m_dDiskChunks[1]->GetName(), pMerged->GetName() );

m_dDiskChunks[1] = pMerged.LeakPtr();
m_dDiskChunks.Remove ( 0 );
CSphFixedVector<int> dChunkNames = GetIndexNames ( m_dDiskChunks, false );
Expand All @@ -8633,14 +8649,15 @@ void RtIndex_t::Optimize ( volatile bool * pForceTerminate, ThrottleState_t * pT
}

// exclusive reader (to make sure that disk chunks not used any more) and writer lock here
Verify ( m_tReading.WriteLock() );
// write lock goes first as with commit
Verify ( m_tWriting.Lock() );
Verify ( m_tReading.WriteLock() );

SafeDelete ( pOlder );
SafeDelete ( pOldest );

Verify ( m_tWriting.Unlock() );
Verify ( m_tReading.Unlock() );
Verify ( m_tWriting.Unlock() );

// we might remove old index files
sphUnlinkIndex ( sRename.cstr(), true );
Expand Down Expand Up @@ -8854,16 +8871,25 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
// oldest chunk got deleted
// next after oldest keeps klist from oldest

// Writing lock - to wipe out writers
// Reading wlock - to wipe out searches as we replacing klist
// Chunk wlock - to lock chunks as going to modify chinks vector
// order same as GetReaderChunks and SaveDiskChunk to prevent deadlock

Verify ( m_tWriting.Lock() );
Verify ( m_tReading.WriteLock() );
Verify ( m_tChunkLock.WriteLock() );

sphLogDebug ( "optimized (progressive) a=%s, b=%s, new=%s", pOldest->GetName(), pOlder->GetName(), pMerged->GetName() );

m_dDiskChunks[iB] = pMerged.LeakPtr();
// move merged klist to next after oldest disk chunk
m_dDiskChunks[iA+1]->ReplaceKillList ( dMergedKlist.Begin(), dMergedKlist.GetLength() );
m_dDiskChunks.Remove ( iA );
CSphFixedVector<int> dChunkNames = GetIndexNames ( m_dDiskChunks, false );

Verify ( m_tChunkLock.Unlock() );
Verify ( m_tReading.Unlock() );
SaveMeta ( m_iTID, dChunkNames );
Verify ( m_tWriting.Unlock() );

Expand All @@ -8875,14 +8901,16 @@ void RtIndex_t::ProgressiveMerge ( volatile bool * pForceTerminate, ThrottleStat
}

// exclusive reader (to make sure that disk chunks not used any more) and writer lock here
Verify ( m_tReading.WriteLock() );
// wipe out writer then way all readers get out - to delete indexes
// as readers might keep copy of chunks vector
Verify ( m_tWriting.Lock() );
Verify ( m_tReading.WriteLock() );

SafeDelete ( pOlder );
SafeDelete ( pOldest );

Verify ( m_tWriting.Unlock() );
Verify ( m_tReading.Unlock() );
Verify ( m_tWriting.Unlock() );

// we might remove old index files
sphUnlinkIndex ( sRename.cstr(), true );
Expand Down

0 comments on commit 72dcf66

Please sign in to comment.