Skip to content

Commit

Permalink
fixed #290; added flush statement handlers to PQ index
Browse files Browse the repository at this point in the history
  • Loading branch information
tomatolog committed May 17, 2018
1 parent 0c296bf commit 8ae0e59
Showing 1 changed file with 45 additions and 2 deletions.
47 changes: 45 additions & 2 deletions src/sphinxrt.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -10729,8 +10729,8 @@ class PercolateIndex_c : public PercolateIndex_i
void Commit ( int * , ISphRtAccum * pAccExt ) override { RollBack ( pAccExt ); } void Commit ( int * , ISphRtAccum * pAccExt ) override { RollBack ( pAccExt ); }
bool DeleteDocument ( const SphDocID_t * , int , CSphString & , ISphRtAccum * pAccExt ) override { RollBack ( pAccExt ); return true; } bool DeleteDocument ( const SphDocID_t * , int , CSphString & , ISphRtAccum * pAccExt ) override { RollBack ( pAccExt ); return true; }
void CheckRamFlush () override {} void CheckRamFlush () override {}
void ForceRamFlush ( bool ) override {} void ForceRamFlush ( bool bPeriodic ) override;
void ForceDiskChunk () override {} void ForceDiskChunk () override;
bool AttachDiskIndex ( CSphIndex * , CSphString & ) override { return true; } bool AttachDiskIndex ( CSphIndex * , CSphString & ) override { return true; }
void Optimize ( volatile bool * , ThrottleState_t * ) override {} void Optimize ( volatile bool * , ThrottleState_t * ) override {}
bool IsSameSettings ( CSphReconfigureSettings & tSettings, CSphReconfigureSetup & tSetup, CSphString & sError ) const override; bool IsSameSettings ( CSphReconfigureSettings & tSettings, CSphReconfigureSetup & tSetup, CSphString & sError ) const override;
Expand Down Expand Up @@ -10775,6 +10775,8 @@ class PercolateIndex_c : public PercolateIndex_i
CSphSourceStats m_tStat; CSphSourceStats m_tStat;
ISphTokenizer * m_pTokenizerIndexing = nullptr; ISphTokenizer * m_pTokenizerIndexing = nullptr;
int m_iMaxCodepointLength = 0; int m_iMaxCodepointLength = 0;
int64_t m_iSavedTID = 1;
int64_t m_tmSaved = 0;


CSphVector<StoredQueryKey_t> m_dStored; CSphVector<StoredQueryKey_t> m_dStored;
CSphRwlock m_tLock; CSphRwlock m_tLock;
Expand Down Expand Up @@ -12214,6 +12216,8 @@ bool PercolateIndex_c::AddQuery ( const char * sQuery, const char * sTags, const
m_dStored.Insert ( iPos+1, tItem ); m_dStored.Insert ( iPos+1, tItem );
} }
} }
if ( bAdded )
m_iTID++;


m_tLock.Unlock(); m_tLock.Unlock();


Expand All @@ -12238,6 +12242,8 @@ int PercolateIndex_c::DeleteQueries ( const uint64_t * pQueries, int iCount )
iDeleted++; iDeleted++;
} }
} }
if ( iDeleted )
m_iTID++;


m_tLock.Unlock(); m_tLock.Unlock();


Expand Down Expand Up @@ -12269,6 +12275,8 @@ int PercolateIndex_c::DeleteQueries ( const char * sTags )
iDeleted++; iDeleted++;
} }
} }
if ( iDeleted )
m_iTID++;


m_tLock.Unlock(); m_tLock.Unlock();


Expand Down Expand Up @@ -12338,6 +12346,8 @@ void PercolateIndex_c::PostSetup()
} }


m_dLoadedQueries.Reset ( 0 ); m_dLoadedQueries.Reset ( 0 );
m_tmSaved = sphMicroTimer();
m_iSavedTID = m_iTID;
} }


bool PercolateIndex_c::Prealloc ( bool bStripPath ) bool PercolateIndex_c::Prealloc ( bool bStripPath )
Expand Down Expand Up @@ -12475,6 +12485,8 @@ bool PercolateIndex_c::Prealloc ( bool bStripPath )
tItem.m_bOr = ( rdMeta.GetDword()!=0 ); tItem.m_bOr = ( rdMeta.GetDword()!=0 );
} }
} }
m_tmSaved = sphMicroTimer();
m_iTID = m_iSavedTID = 1;


return true; return true;
} }
Expand Down Expand Up @@ -12547,6 +12559,9 @@ void PercolateIndex_c::SaveMeta()
} }
} }


m_iSavedTID = m_iTID;
m_tmSaved = sphMicroTimer();

m_tLock.Unlock(); m_tLock.Unlock();


wrMeta.CloseFile(); wrMeta.CloseFile();
Expand Down Expand Up @@ -12617,6 +12632,7 @@ bool PercolateIndex_c::Truncate ( CSphString & )
ARRAY_FOREACH ( i, m_dStored ) ARRAY_FOREACH ( i, m_dStored )
SafeDelete ( m_dStored[i].m_pQuery ); SafeDelete ( m_dStored[i].m_pQuery );
m_dStored.Reset(); m_dStored.Reset();
m_iTID++;
m_tLock.Unlock(); m_tLock.Unlock();


SaveMeta(); SaveMeta();
Expand Down Expand Up @@ -12717,6 +12733,7 @@ void PercolateIndex_c::Reconfigure ( CSphReconfigureSetup & tSetup )
SafeDelete ( pStored ); SafeDelete ( pStored );
} }
m_dStored.Resize ( 0 ); m_dStored.Resize ( 0 );
m_iTID++;


PostSetup(); PostSetup();


Expand All @@ -12732,6 +12749,32 @@ void SetPercolateThreads ( int iThreads )
g_iPercolateThreads = Max ( 1, iThreads ); g_iPercolateThreads = Max ( 1, iThreads );
} }


void PercolateIndex_c::ForceRamFlush ( bool bPeriodic )
{
if ( m_iTID<=m_iSavedTID )
return;

int64_t tmStart = sphMicroTimer();
int64_t iWasTID = m_iSavedTID;
int64_t tmWas = m_tmSaved;
SaveMeta ();

int64_t tmNow = sphMicroTimer();
int64_t tmAge = tmNow - tmWas;
int64_t tmSave = tmNow - tmStart;

sphInfo ( "percolate: index %s: saved ok (mode=%s, last TID=" INT64_FMT ", current TID=" INT64_FMT ", "
"time delta=%d sec, took=%d.%03d sec)"
, m_sIndexName.cstr(), bPeriodic ? "periodic" : "forced"
, iWasTID, m_iTID
, (int) (tmAge/1000000), (int)(tmSave/1000000), (int)((tmSave/1000)%1000) );
}

void PercolateIndex_c::ForceDiskChunk ()
{
ForceRamFlush ( false );
}



// //
// $Id$ // $Id$
Expand Down

0 comments on commit 8ae0e59

Please sign in to comment.