Skip to content
Permalink
Browse files
fixed #479 write lock at FLUSH RTINDEX to not write lock whole index …
…during save and on regular flush from rt_flush_period; fixed write lock at FLUSH RAMCHUNK to user regular commit path and do not write lock whole index during save
  • Loading branch information
tomatolog committed Nov 27, 2018
1 parent f484c41 commit 238bdea59bad89f097403f1c978658ce45f16c70
Showing with 41 additions and 36 deletions.
  1. +41 −36 src/sphinxrt.cpp
@@ -996,7 +996,7 @@ struct RtIndex_t : public ISphRtIndex, public ISphNoncopyable, public ISphWordli
virtual bool DeleteDocument ( const SphDocID_t * pDocs, int iDocs, CSphString & sError, ISphRtAccum * pAccExt );
virtual void Commit ( int * pDeleted, ISphRtAccum * pAccExt );
virtual void RollBack ( ISphRtAccum * pAccExt );
void CommitReplayable ( RtSegment_t * pNewSeg, CSphVector<SphDocID_t> & dAccKlist, int * pTotalKilled ); // FIXME? protect?
void CommitReplayable ( RtSegment_t * pNewSeg, CSphVector<SphDocID_t> & dAccKlist, int * pTotalKilled, bool bForceDump ); // FIXME? protect?
virtual void CheckRamFlush ();
virtual void ForceRamFlush ( bool bPeriodic=false );
virtual void ForceDiskChunk ();
@@ -1021,7 +1021,7 @@ struct RtIndex_t : public ISphRtIndex, public ISphNoncopyable, public ISphWordli
void SaveDiskChunk ( int64_t iTID, const SphChunkGuard_t & tGuard, const ChunkStats_t & tStats, bool bMoveRetired );
CSphIndex * LoadDiskChunk ( const char * sChunk, CSphString & sError ) const;
bool LoadRamChunk ( DWORD uVersion, bool bRebuildInfixes );
bool SaveRamChunk ();
bool SaveRamChunk ( const RtSegment_t ** ppSegments, int iSegCount );

virtual void GetPrefixedWords ( const char * sSubstring, int iSubLen, const char * sWildcard, Args_t & tArgs ) const;
virtual void GetInfixedWords ( const char * sSubstring, int iSubLen, const char * sWildcard, Args_t & tArgs ) const;
@@ -1089,6 +1089,7 @@ struct RtIndex_t : public ISphRtIndex, public ISphNoncopyable, public ISphWordli
virtual const CSphSchema & GetMatchSchema () const { return m_tSchema; }
virtual const CSphSchema & GetInternalSchema () const { return m_tSchema; }
int64_t GetUsedRam () const;
static int64_t GetUsedRam ( const SphChunkGuard_t & tGuard );

bool IsWordDict () const { return m_bKeywordDict; }
int GetWordCheckoint() const { return m_iWordsCheckpoint; }
@@ -1171,7 +1172,7 @@ RtIndex_t::~RtIndex_t ()

if ( bValid )
{
SaveRamChunk ();
SaveRamChunk ( (const RtSegment_t **)m_dRamChunks.Begin(), m_dRamChunks.GetLength() );
CSphFixedVector<int> dNames = GetIndexNames ( m_dDiskChunks, false );
SaveMeta ( m_iTID, dNames );
}
@@ -1230,26 +1231,28 @@ void RtIndex_t::ForceRamFlush ( bool bPeriodic )
if ( g_pRtBinlog->IsActive() && m_iTID<=m_iSavedTID )
return;

Verify ( m_tWriting.Lock() );

int64_t iUsedRam = GetUsedRam();
if ( !SaveRamChunk () )
int64_t iUsedRam = 0;
int64_t iSavedTID = m_iTID;
{
sphWarning ( "rt: index %s: ramchunk save FAILED! (error=%s)", m_sIndexName.cstr(), m_sLastError.cstr() );
Verify ( m_tWriting.Unlock() );
return;
SphChunkGuard_t tGuard;
GetReaderChunks ( tGuard );
iUsedRam = GetUsedRam ( tGuard );

if ( !SaveRamChunk ( tGuard.m_dRamChunks.Begin(), tGuard.m_dRamChunks.GetLength() ) )
{
sphWarning ( "rt: index %s: ramchunk save FAILED! (error=%s)", m_sIndexName.cstr(), m_sLastError.cstr() );
return;
}
CSphFixedVector<int> dNames = GetIndexNames ( tGuard.m_dDiskChunks, false );
SaveMeta ( iSavedTID, dNames );
}
CSphFixedVector<int> dNames = GetIndexNames ( m_dDiskChunks, false );
SaveMeta ( m_iTID, dNames );
g_pBinlog->NotifyIndexFlush ( m_sIndexName.cstr(), m_iTID, false );
g_pBinlog->NotifyIndexFlush ( m_sIndexName.cstr(), iSavedTID, false );

int64_t iWasTID = m_iSavedTID;
int64_t tmDelta = sphMicroTimer() - m_tmSaved;
m_iSavedTID = m_iTID;
m_iSavedTID = iSavedTID;
m_tmSaved = sphMicroTimer();

Verify ( m_tWriting.Unlock() );

tmSave = sphMicroTimer() - tmSave;
sphInfo ( "rt: index %s: ramchunk saved ok (mode=%s, last TID=" INT64_FMT ", current TID=" INT64_FMT ", "
"ram=%d.%03d Mb, time delta=%d sec, took=%d.%03d sec)"
@@ -1277,6 +1280,16 @@ int64_t RtIndex_t::GetUsedRam () const
return iTotal;
}


int64_t RtIndex_t::GetUsedRam ( const SphChunkGuard_t & tGuard )
{
int64_t iTotal = 0;
ARRAY_FOREACH ( i, tGuard.m_dRamChunks )
iTotal += tGuard.m_dRamChunks[i]->GetUsedRam();

return iTotal;
}

//////////////////////////////////////////////////////////////////////////
// INDEXING
//////////////////////////////////////////////////////////////////////////
@@ -2647,7 +2660,7 @@ void RtIndex_t::Commit ( int * pDeleted, ISphRtAccum * pAccExt )
pAcc->m_dAccumKlist.Uniq ();

// now on to the stuff that needs locking and recovery
CommitReplayable ( pNewSeg, pAcc->m_dAccumKlist, pDeleted );
CommitReplayable ( pNewSeg, pAcc->m_dAccumKlist, pDeleted, false );

// done; cleanup accum
pAcc->Cleanup ( RtAccum_t::ERest );
@@ -2656,7 +2669,7 @@ void RtIndex_t::Commit ( int * pDeleted, ISphRtAccum * pAccExt )
pAcc->GrabLastWarning ( sWarning );
}

void RtIndex_t::CommitReplayable ( RtSegment_t * pNewSeg, CSphVector<SphDocID_t> & dAccKlist, int * pTotalKilled )
void RtIndex_t::CommitReplayable ( RtSegment_t * pNewSeg, CSphVector<SphDocID_t> & dAccKlist, int * pTotalKilled, bool bForceDump )
{
// store statistics, because pNewSeg just might get merged
int iNewDocs = pNewSeg ? pNewSeg->m_iRows : 0;
@@ -2716,7 +2729,7 @@ void RtIndex_t::CommitReplayable ( RtSegment_t * pNewSeg, CSphVector<SphDocID_t>
iRamLeft = Max ( iRamLeft - dRetired->GetUsedRam(), 0 );

// skip merging if no rows were added or no memory left
bool bDump = ( iRamLeft==0 );
bool bDump = ( iRamLeft==0 || bForceDump );
const int MAX_SEGMENTS = 32;
const int MAX_PROGRESSION_SEGMENT = 8;
const int64_t MAX_SEGMENT_VECTOR_LEN = INT_MAX;
@@ -3070,17 +3083,8 @@ void RtIndex_t::ForceDiskChunk () NO_THREAD_SAFETY_ANALYSIS
if ( !m_dRamChunks.GetLength() )
return;

Verify ( m_tWriting.Lock() );

SphChunkGuard_t tGuard;
GetReaderChunks ( tGuard );

m_dDiskChunkKlist.Resize ( 0 );
m_tKlist.Flush ( m_dDiskChunkKlist );
Verify ( m_tWriting.Unlock() );

ChunkStats_t tStats ( m_tStats, m_dFieldLensRam );
SaveDiskChunk ( m_iTID, tGuard, tStats, true );
CSphVector<SphDocID_t> dTmp;
CommitReplayable ( nullptr, dTmp, nullptr, true );
}


@@ -4126,13 +4130,14 @@ static bool LoadVector ( BinlogReader_c & tReader, CSphVector < T, P > & tVector
}


bool RtIndex_t::SaveRamChunk ()
bool RtIndex_t::SaveRamChunk ( const RtSegment_t ** ppSegments, int iSegCount )
{
MEMORY ( MEM_INDEX_RT );

CSphString sChunk, sNewChunk;
sChunk.SetSprintf ( "%s.ram", m_sPath.cstr() );
sNewChunk.SetSprintf ( "%s.ram.new", m_sPath.cstr() );
// thread safe as kill-list w-locked itself during write
m_tKlist.SaveToFile ( m_sPath.cstr() );

CSphWriter wrChunk;
@@ -4141,12 +4146,12 @@ bool RtIndex_t::SaveRamChunk ()

wrChunk.PutDword ( 1 ); // was USE_64BIT
wrChunk.PutDword ( RtSegment_t::m_iSegments );
wrChunk.PutDword ( m_dRamChunks.GetLength() );
wrChunk.PutDword ( iSegCount );

// no locks here, because it's only intended to be called from dtor
ARRAY_FOREACH ( iSeg, m_dRamChunks )
for ( int iSeg=0; iSeg<iSegCount; iSeg++ )
{
const RtSegment_t * pSeg = m_dRamChunks[iSeg];
const RtSegment_t * pSeg = ppSegments[iSeg];
wrChunk.PutDword ( pSeg->m_iTag );
SaveVector ( wrChunk, pSeg->m_dWords );
if ( m_bKeywordDict )
@@ -7915,7 +7920,7 @@ bool RtIndex_t::AddRemoveAttribute ( bool bAdd, const CSphString & sAttrName, ES
}

// fixme: we can't rollback at this point
Verify ( SaveRamChunk () );
Verify ( SaveRamChunk ( (const RtSegment_t **)m_dRamChunks.Begin(), m_dRamChunks.GetLength() ) );

SaveMeta ( m_iTID, dChunkNames );

@@ -9796,7 +9801,7 @@ bool RtBinlog_c::ReplayCommit ( int iBinlog, DWORD uReplayFlags, BinlogReader_c
}

// actually replay
tIndex.m_pRT->CommitReplayable ( pSeg.LeakPtr(), dKlist, NULL );
tIndex.m_pRT->CommitReplayable ( pSeg.LeakPtr(), dKlist, NULL, false );

// update committed tid on replay in case of unexpected / mismatched tid
tIndex.m_pRT->m_iTID = iTID;

0 comments on commit 238bdea

Please sign in to comment.