Skip to content
Permalink
Browse files

fixed #983 #985 flush got rescheduled for indexes wo activity

  • Loading branch information...
tomatolog committed Aug 12, 2019
1 parent c0af34b commit 4adc075294ac823289f745e2cc419f18c7dcf2e2
Showing with 34 additions and 13 deletions.
  1. +13 −3 src/sphinxpq.cpp
  2. +13 −3 src/sphinxrt.cpp
  3. +2 −0 src/sphinxrt.h
  4. +6 −7 src/taskflushmutable.cpp
@@ -100,6 +100,7 @@ class PercolateIndex_c : public PercolateIndex_i
virtual bool AddDocument ( ISphHits * , const CSphMatch & , const char ** , const CSphVector<DWORD> & , CSphString & , CSphString & ) { return true; }
bool DeleteDocument ( const DocID_t * , int , CSphString & , RtAccum_t * pAccExt ) override { RollBack ( pAccExt ); return true; }
void ForceRamFlush ( bool bPeriodic ) override;
bool IsFlushNeed() const override;
bool ForceDiskChunk () override;
bool AttachDiskIndex ( CSphIndex * , bool, bool &, CSphString & ) override { return true; }
void Optimize () override {}
@@ -2392,12 +2393,21 @@ void SetPercolateThreads ( int iThreads )
g_iPercolateThreads = Max ( 1, iThreads );
}

void PercolateIndex_c::ForceRamFlush ( bool bPeriodic )
bool PercolateIndex_c::IsFlushNeed() const
{
if ( GetBinlog ()->IsActive () && m_iTID<=m_iSavedTID )
return;
// m_iTID get managed by binlog that is why wo binlog there is no need to compare it
if ( GetBinlog() && GetBinlog()->IsActive() && m_iTID<=m_iSavedTID )
return false;

if ( m_bSaveDisabled )
return false;

return true;
}

void PercolateIndex_c::ForceRamFlush ( bool bPeriodic )
{
if ( !IsFlushNeed() )
return;

RamFlush ( bPeriodic );
@@ -1035,6 +1035,7 @@ class RtIndex_c : public RtIndex_i, public ISphNoncopyable, public ISphWordlist,
void RollBack ( RtAccum_t * pAccExt ) final;
bool CommitReplayable ( RtSegment_t * pNewSeg, CSphVector<DocID_t> & dAccKlist, int * pTotalKilled, bool bForceDump ); // FIXME? protect?
void ForceRamFlush ( bool bPeriodic=false ) final;
bool IsFlushNeed() const final;
bool ForceDiskChunk() final;
bool AttachDiskIndex ( CSphIndex * pIndex, bool bTruncate, bool & bFatal, CSphString & sError ) final;
bool Truncate ( CSphString & sError ) final;
@@ -1289,12 +1290,21 @@ RtIndex_c::~RtIndex_c ()
}
}

void RtIndex_c::ForceRamFlush ( bool bPeriodic ) REQUIRES (!this->m_tFlushLock)
bool RtIndex_c::IsFlushNeed() const
{
if ( g_pRtBinlog->IsActive () && m_iTID<=m_iSavedTID )
return;
// m_iTID get managed by binlog that is why wo binlog there is no need to compare it
if ( g_pBinlog && g_pBinlog->IsActive() && m_iTID<=m_iSavedTID )
return false;

if ( m_bSaveDisabled )
return false;

return true;
}

void RtIndex_c::ForceRamFlush ( bool bPeriodic ) REQUIRES (!this->m_tFlushLock)
{
if ( !IsFlushNeed() )
return;

int64_t tmSave = sphMicroTimer();
@@ -53,6 +53,8 @@ class RtIndex_i : public CSphIndex
/// forcibly flush RAM chunk to disk
virtual void ForceRamFlush ( bool bPeriodic=false ) = 0;

virtual bool IsFlushNeed() const = 0;

/// get time of last flush happened
virtual int64_t GetLastFlushTimestamp() const = 0;

@@ -108,17 +108,16 @@ static void ScheduleFlushTask ( void* pName, int64_t iNextTimestamp=-1 )
// check timeout, schedule or run immediately.
auto iLastTimestamp = pRT->GetLastFlushTimestamp ();
auto iPlannedTimestamp = iLastTimestamp+g_iRtFlushPeriod;
if (( iPlannedTimestamp-1000 )<=sphMicroTimer ())
bool bNeedFlush = pRT->IsFlushNeed();
if ( bNeedFlush && ( iPlannedTimestamp-1000 )<=sphMicroTimer() )
{
pRT->ForceRamFlush ( true );
// if flush not happened (by any reason) - stamp not updated
if ( iLastTimestamp==pRT->GetLastFlushTimestamp ()) {
sphInfo ("Scheduled flush of index %s didn't happened; fallback", sName.cstr());
iPlannedTimestamp = sphMicroTimer() + FALLBACK_FLUSH_PERIOD;
} else
iPlannedTimestamp = pRT->GetLastFlushTimestamp ()+g_iRtFlushPeriod;
iPlannedTimestamp = pRT->GetLastFlushTimestamp()+g_iRtFlushPeriod;
}

if ( !bNeedFlush )
iPlannedTimestamp = sphMicroTimer() + g_iRtFlushPeriod;

// once more check for disabled - since ForceRamFlush may be long
if ( g_Flushable.IsDisabled ())
return;

0 comments on commit 4adc075

Please sign in to comment.
You can’t perform that action at this time.