Skip to content

Commit

Permalink
handle write errors in binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
glookka committed Mar 7, 2023
1 parent 46b2784 commit 3be4503
Show file tree
Hide file tree
Showing 25 changed files with 277 additions and 219 deletions.
294 changes: 161 additions & 133 deletions src/binlog.cpp

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/binlog.h
Expand Up @@ -28,10 +28,10 @@ namespace Binlog {
REPLAY_IGNORE_ALL_ERRORS = 0xFF
};

using FnWriteCommit = std::function<void (CSphWriter&)>;
using FnWriteCommit = std::function<void (Writer_i&)>;

template < typename T >
static void SaveVector ( CSphWriter &tWriter, const VecTraits_T<T> &tVector )
static void SaveVector ( Writer_i & tWriter, const VecTraits_T<T> &tVector )
{
STATIC_ASSERT ( IS_TRIVIALLY_COPYABLE (T), NON_TRIVIAL_VECTORS_ARE_UNSERIALIZABLE );
tWriter.ZipOffset ( tVector.GetLength() );
Expand Down Expand Up @@ -59,7 +59,7 @@ namespace Binlog {
int64_t NextFlushTimestamp();

// bIncTID require increasing *pTID even if binlog is disabled, used in pq
void Commit (Blop_e eOp, int64_t * pTID, const char * sIndexName, bool bIncTID, FnWriteCommit&& fnSaver);
bool Commit ( Blop_e eOp, int64_t * pTID, const char * sIndexName, bool bIncTID, CSphString & sError, FnWriteCommit && fnSaver );

/// replay stored binlog
void Replay ( const SmallStringHash_T<CSphIndex*> & hIndexes, ProgressCallbackSimple_t * pfnProgressCallback = nullptr );
Expand Down
12 changes: 6 additions & 6 deletions src/columnarrt.cpp
Expand Up @@ -51,7 +51,7 @@ class ColumnarAttrRT_i

virtual void Kill ( const CSphVector<RowID_t> & dKilled ) = 0;
virtual void Save ( MemoryWriter_c & tWriter ) const = 0;
virtual void Save ( CSphWriter & tWriter ) const = 0;
virtual void Save ( Writer_i & tWriter ) const = 0;
virtual void Load ( MemoryReader_c & tReader ) = 0;
virtual void Load ( CSphReader & tReader ) = 0;
virtual int64_t AllocatedBytes() const = 0;
Expand Down Expand Up @@ -132,7 +132,7 @@ class ColumnarAttr_Int_T : public ColumnarAttrRT_c

void Kill ( const CSphVector<RowID_t> & dKilled ) override;
void Save ( MemoryWriter_c & tWriter ) const override { SaveData(tWriter); }
void Save ( CSphWriter & tWriter ) const override { SaveData(tWriter); }
void Save ( Writer_i & tWriter ) const override { SaveData(tWriter); }
void Load ( MemoryReader_c & tReader ) override { LoadData(tReader); }
void Load ( CSphReader & tReader ) override { LoadData(tReader); }
int64_t AllocatedBytes() const override { return m_dValues.GetLengthBytes64(); }
Expand Down Expand Up @@ -246,7 +246,7 @@ class ColumnarAttr_String_c : public ColumnarAttrRT_c

void Kill ( const CSphVector<RowID_t> & dKilled ) override;
void Save ( MemoryWriter_c & tWriter ) const override { SaveData(tWriter); }
void Save ( CSphWriter & tWriter ) const override { SaveData(tWriter); }
void Save ( Writer_i & tWriter ) const override { SaveData(tWriter); }
void Load ( MemoryReader_c & tReader ) override { LoadData(tReader); }
void Load ( CSphReader & tReader ) override { LoadData(tReader); }
int64_t AllocatedBytes() const override { return m_dData.GetLengthBytes64() + m_dLengths.GetLengthBytes64(); }
Expand Down Expand Up @@ -366,7 +366,7 @@ class ColumnarAttr_MVA_T : public ColumnarAttrRT_c

void Kill ( const CSphVector<RowID_t> & dKilled ) override;
void Save ( MemoryWriter_c & tWriter ) const override { SaveData(tWriter); }
void Save ( CSphWriter & tWriter ) const override { SaveData(tWriter); }
void Save ( Writer_i & tWriter ) const override { SaveData(tWriter); }
int64_t AllocatedBytes() const override { return m_dData.GetLengthBytes64() + m_dLengths.GetLengthBytes64(); }
void Load ( MemoryReader_c & tReader ) override { LoadData(tReader); }
void Load ( CSphReader & tReader ) override { LoadData(tReader); }
Expand Down Expand Up @@ -533,7 +533,7 @@ class ColumnarRT_c : public ColumnarRT_i
bool EarlyReject ( const std::vector<common::Filter_t> & dFilters, const columnar::BlockTester_i & tBlockTester ) const override { return false; }
bool IsFilterDegenerate ( const common::Filter_t & tFilter ) const override { return false; }

void Save ( CSphWriter & tWriter ) override;
void Save ( Writer_i & tWriter ) override;
int64_t AllocatedBytes() const override;

protected:
Expand Down Expand Up @@ -610,7 +610,7 @@ bool ColumnarRT_c::GetAttrInfo ( const std::string & sName, columnar::AttrInfo_t
}


void ColumnarRT_c::Save ( CSphWriter & tWriter )
void ColumnarRT_c::Save ( Writer_i & tWriter )
{
tWriter.PutDword ( m_dAttrs.GetLength() );
m_dAttrs.for_each ( [&tWriter]( const auto& pAttr ){ pAttr->Save(tWriter); } );
Expand Down
2 changes: 1 addition & 1 deletion src/columnarrt.h
Expand Up @@ -23,7 +23,7 @@ class ColumnarRT_i : public columnar::Columnar_i
{
public:
virtual int64_t AllocatedBytes() const = 0;
virtual void Save ( CSphWriter & tWriter ) = 0;
virtual void Save ( Writer_i & tWriter ) = 0;
};

class MemoryReader_c;
Expand Down
8 changes: 4 additions & 4 deletions src/dict/dict_base.h
Expand Up @@ -78,14 +78,14 @@ class CSphDict: public ISphRefcountedMT
virtual void LoadStopwords ( const CSphVector<SphWordID_t>& dStopwords ) = 0;

/// write stopwords to a file
virtual void WriteStopwords ( CSphWriter& tWriter ) const = 0;
virtual void WriteStopwords ( Writer_i & tWriter ) const = 0;
virtual void WriteStopwords ( JsonEscapedBuilder& tOut ) const = 0;

/// load wordforms from a given list of files
virtual bool LoadWordforms ( const StrVec_t&, const CSphEmbeddedFiles* pEmbedded, const TokenizerRefPtr_c& pTokenizer, const char* szIndex ) = 0;

/// write wordforms to a file
virtual void WriteWordforms ( CSphWriter& tWriter ) const = 0;
virtual void WriteWordforms ( Writer_i & tWriter ) const = 0;
virtual void WriteWordforms ( JsonEscapedBuilder& tOut ) const = 0;

/// get wordforms
Expand Down Expand Up @@ -185,10 +185,10 @@ class DictStub_c: public CSphDict
SphWordID_t GetWordID ( const BYTE*, int, bool ) override { return 0; };
void LoadStopwords ( const char*, const TokenizerRefPtr_c&, bool ) override {};
void LoadStopwords ( const CSphVector<SphWordID_t>& ) override {};
void WriteStopwords ( CSphWriter& ) const override {};
void WriteStopwords ( Writer_i & ) const override {};
void WriteStopwords ( JsonEscapedBuilder& ) const override {};
bool LoadWordforms ( const StrVec_t&, const CSphEmbeddedFiles*, const TokenizerRefPtr_c&, const char* ) override { return false; };
void WriteWordforms ( CSphWriter& ) const override {};
void WriteWordforms ( Writer_i & ) const override {};
void WriteWordforms ( JsonEscapedBuilder& ) const override {};
int SetMorphology ( const char*, CSphString& ) override { return ST_OK; }
void Setup ( const CSphDictSettings& tSettings ) override { m_tSettings = tSettings; };
Expand Down
4 changes: 2 additions & 2 deletions src/dict/dict_proxy.h
Expand Up @@ -24,10 +24,10 @@ class DictProxy_c: public CSphDict

void LoadStopwords ( const char* sFiles, const TokenizerRefPtr_c& pTokenizer, bool bStripFile ) final { m_pDict->LoadStopwords ( sFiles, pTokenizer, bStripFile ); }
void LoadStopwords ( const CSphVector<SphWordID_t>& dStopwords ) final { m_pDict->LoadStopwords ( dStopwords ); }
void WriteStopwords ( CSphWriter& tWriter ) const final { m_pDict->WriteStopwords ( tWriter ); }
void WriteStopwords ( Writer_i & tWriter ) const final { m_pDict->WriteStopwords ( tWriter ); }
void WriteStopwords ( JsonEscapedBuilder& tOut ) const final { m_pDict->WriteStopwords ( tOut ); }
bool LoadWordforms ( const StrVec_t& dFiles, const CSphEmbeddedFiles* pEmbedded, const TokenizerRefPtr_c& pTokenizer, const char* szIndex ) final { return m_pDict->LoadWordforms ( dFiles, pEmbedded, pTokenizer, szIndex ); }
void WriteWordforms ( CSphWriter& tWriter ) const final { m_pDict->WriteWordforms ( tWriter ); }
void WriteWordforms ( Writer_i & tWriter ) const final { m_pDict->WriteWordforms ( tWriter ); }
void WriteWordforms ( JsonEscapedBuilder& tOut ) const final { m_pDict->WriteWordforms ( tOut ); }
int SetMorphology ( const char* szMorph, CSphString& sMessage ) final { return m_pDict->SetMorphology ( szMorph, sMessage ); }

Expand Down
4 changes: 2 additions & 2 deletions src/dict/template_dict_traits.cpp
Expand Up @@ -584,7 +584,7 @@ void TemplateDictTraits_c::LoadStopwords ( const CSphVector<SphWordID_t>& dStopw
}


void TemplateDictTraits_c::WriteStopwords ( CSphWriter& tWriter ) const
void TemplateDictTraits_c::WriteStopwords ( Writer_i & tWriter ) const
{
tWriter.PutDword ( (DWORD)m_iStopwords );
for ( int i = 0; i < m_iStopwords; ++i )
Expand Down Expand Up @@ -1031,7 +1031,7 @@ bool TemplateDictTraits_c::LoadWordforms ( const StrVec_t& dFiles, const CSphEmb
}


void TemplateDictTraits_c::WriteWordforms ( CSphWriter& tWriter ) const
void TemplateDictTraits_c::WriteWordforms ( Writer_i & tWriter ) const
{
if ( !m_pWordforms )
{
Expand Down
4 changes: 2 additions & 2 deletions src/dict/template_dict_traits.h
Expand Up @@ -26,10 +26,10 @@ struct TemplateDictTraits_c: DictStub_c
public:
void LoadStopwords ( const char* sFiles, const TokenizerRefPtr_c& pTokenizer, bool bStripFile ) final;
void LoadStopwords ( const CSphVector<SphWordID_t>& dStopwords ) final;
void WriteStopwords ( CSphWriter& tWriter ) const final;
void WriteStopwords ( Writer_i & tWriter ) const final;
void WriteStopwords ( JsonEscapedBuilder& tOut ) const final;
bool LoadWordforms ( const StrVec_t& dFiles, const CSphEmbeddedFiles* pEmbedded, const TokenizerRefPtr_c& pTokenizer, const char* szIndex ) final;
void WriteWordforms ( CSphWriter& tWriter ) const final;
void WriteWordforms ( Writer_i & tWriter ) const final;
void WriteWordforms ( JsonEscapedBuilder& tOut ) const final;
const CSphWordforms* GetWordforms() final { return m_pWordforms; }
void DisableWordforms() final { m_bDisableWordforms = true; }
Expand Down
6 changes: 3 additions & 3 deletions src/docstore.cpp
Expand Up @@ -1330,7 +1330,7 @@ class DocstoreRT_c : public DocstoreRT_i
void CreateReader ( int64_t iSessionId ) const final {}

bool Load ( CSphReader & tReader ) final;
void Save ( CSphWriter & tWriter ) final;
void Save ( Writer_i & tWriter ) final;
void Load ( MemoryReader_c & tReader ) final;
void Save ( MemoryWriter_c & tWriter ) final;

Expand Down Expand Up @@ -1509,9 +1509,9 @@ bool DocstoreRT_c::Load ( CSphReader & tReader )
return !tReader.GetErrorFlag();
}

void DocstoreRT_c::Save ( CSphWriter & tWriter )
void DocstoreRT_c::Save ( Writer_i & tWriter )
{
DocstoreSave_T<CSphWriter> ( m_dDocs, m_tFields.GetNumFields(), tWriter );
DocstoreSave_T<Writer_i> ( m_dDocs, m_tFields.GetNumFields(), tWriter );
}

void DocstoreRT_c::Load ( MemoryReader_c & tReader )
Expand Down
2 changes: 1 addition & 1 deletion src/docstore.h
Expand Up @@ -74,7 +74,7 @@ class DocstoreRT_i : public Docstore_i, public DocstoreBuilder_i
{
public:
virtual bool Load ( CSphReader & tReader ) = 0;
virtual void Save ( CSphWriter & tWriter ) = 0;
virtual void Save ( Writer_i & tWriter ) = 0;
virtual void Load ( MemoryReader_c & tReader ) = 0;
virtual void Save ( MemoryWriter_c & tWriter ) = 0;

Expand Down
38 changes: 27 additions & 11 deletions src/fileio.h
Expand Up @@ -151,8 +151,24 @@ class CSphAutoreader : public FileReader_c
};


class Writer_i : ISphNoncopyable
{
public:
virtual void PutByte ( BYTE uValue ) = 0;
virtual void PutBytes ( const void * pData, int64_t iSize ) = 0;
virtual void PutWord ( WORD uValue ) = 0;
virtual void PutDword ( DWORD uValue ) = 0;
virtual void PutOffset ( SphOffset_t uValue ) = 0;
virtual void PutString ( const char * szString ) = 0;
virtual void PutString ( const CSphString & sString ) = 0;

virtual void ZipInt ( DWORD uValue ) = 0;
virtual void ZipOffset ( uint64_t uValue ) = 0;
};


/// file writer with write buffering and int encoder
class CSphWriter : ISphNoncopyable
class CSphWriter : public Writer_i
{
public:
virtual ~CSphWriter (); ///< if error flag is set, or if file is not closed by CloseFile, it will be automatically deleted (unlinked).
Expand All @@ -164,20 +180,20 @@ class CSphWriter : ISphNoncopyable
void SetFile ( CSphAutofile & tAuto, SphOffset_t * pSharedOffset, CSphString & sError );
void CloseFile ( bool bTruncate = false ); ///< note: calls Flush(), ie. IsError() might get true after this call

void PutByte ( BYTE uValue );
void PutBytes ( const void * pData, int64_t iSize );
void PutWord ( WORD uValue ) { PutBytes ( &uValue, sizeof(WORD) ); }
void PutDword ( DWORD uValue ) { PutBytes ( &uValue, sizeof(DWORD) ); }
void PutOffset ( SphOffset_t uValue ) { PutBytes ( &uValue, sizeof(SphOffset_t) ); }
void PutString ( const char * szString );
void PutString ( const CSphString & sString );
void PutByte ( BYTE uValue ) override;
void PutBytes ( const void * pData, int64_t iSize ) override;
void PutWord ( WORD uValue ) override { PutBytes ( &uValue, sizeof(WORD) ); }
void PutDword ( DWORD uValue ) override { PutBytes ( &uValue, sizeof(DWORD) ); }
void PutOffset ( SphOffset_t uValue ) override { PutBytes ( &uValue, sizeof(SphOffset_t) ); }
void PutString ( const char * szString ) override;
void PutString ( const CSphString & sString ) override;
void PutString ( Str_t tString ) { PutBytes ( tString.first, tString.second ); };
void Tag ( const char * sTag );

void SeekTo ( SphOffset_t iPos, bool bTruncate = false );

void ZipInt ( DWORD uValue );
void ZipOffset ( uint64_t uValue );
void ZipInt ( DWORD uValue ) override;
void ZipOffset ( uint64_t uValue ) override;

bool IsError () const { return m_bError; }
SphOffset_t GetPos () const { return m_iPos; }
Expand Down Expand Up @@ -216,7 +232,7 @@ int sphPread ( int iFD, void * pBuf, int iBytes, SphOffset_t iOffset );
void sphSetThrottling ( int iMaxIOps, int iMaxIOSize );

/// write blob to file honoring throttling
bool sphWriteThrottled ( int iFD, const void* pBuf, int64_t iCount, const char* sName, CSphString& sError );
bool sphWriteThrottled ( int iFD, const void * pBuf, int64_t iCount, const char * szName, CSphString & sError );

/// read blob from file honoring throttling
size_t sphReadThrottled ( int iFD, void* pBuf, size_t iCount );
Expand Down
10 changes: 5 additions & 5 deletions src/indexsettings.cpp
Expand Up @@ -599,7 +599,7 @@ void CSphFieldFilterSettings::Load ( CSphReader & tReader )
}


void CSphFieldFilterSettings::Save ( CSphWriter & tWriter ) const
void CSphFieldFilterSettings::Save ( Writer_i & tWriter ) const
{
tWriter.PutDword ( m_dRegexps.GetLength() );
for ( const auto & i : m_dRegexps )
Expand Down Expand Up @@ -1376,15 +1376,15 @@ bool IndexSettingsContainer_c::CheckPaths()

//////////////////////////////////////////////////////////////////////////

static void WriteFileInfo ( CSphWriter & tWriter, const CSphSavedFile & tInfo )
static void WriteFileInfo ( Writer_i & tWriter, const CSphSavedFile & tInfo )
{
tWriter.PutOffset ( tInfo.m_uSize );
tWriter.PutOffset ( tInfo.m_uCTime );
tWriter.PutOffset ( tInfo.m_uMTime );
tWriter.PutDword ( tInfo.m_uCRC32 );
}

void operator<< ( JsonEscapedBuilder& tOut, const CSphSavedFile & tInfo )
void operator<< ( JsonEscapedBuilder & tOut, const CSphSavedFile & tInfo )
{
auto _ = tOut.Object ();
tOut.NamedValNonDefault ( "size", tInfo.m_uSize );
Expand All @@ -1395,7 +1395,7 @@ void operator<< ( JsonEscapedBuilder& tOut, const CSphSavedFile & tInfo )

/// gets called from and MUST be in sync with RtIndex_c::SaveDiskHeader()!
/// note that SaveDiskHeader() occasionaly uses some PREVIOUS format version!
void SaveTokenizerSettings ( CSphWriter & tWriter, const TokenizerRefPtr_c& pTokenizer, int iEmbeddedLimit )
void SaveTokenizerSettings ( Writer_i & tWriter, const TokenizerRefPtr_c & pTokenizer, int iEmbeddedLimit )
{
assert ( pTokenizer );

Expand Down Expand Up @@ -1454,7 +1454,7 @@ void operator<< ( JsonEscapedBuilder& tOut, const CSphFieldFilterSettings& tFiel

/// gets called from and MUST be in sync with RtIndex_c::SaveDiskHeader()!
/// note that SaveDiskHeader() occasionaly uses some PREVIOUS format version!
void SaveDictionarySettings ( CSphWriter & tWriter, const DictRefPtr_c& pDict, bool bForceWordDict, int iEmbeddedLimit )
void SaveDictionarySettings ( Writer_i & tWriter, const DictRefPtr_c & pDict, bool bForceWordDict, int iEmbeddedLimit )
{
assert ( pDict );
const CSphDictSettings & tSettings = pDict->GetSettings ();
Expand Down
9 changes: 5 additions & 4 deletions src/indexsettings.h
Expand Up @@ -116,15 +116,15 @@ class CSphDictSettings : public SettingsWriter_c
void Format ( SettingsFormatter_c & tOut, FilenameBuilder_i * pFilenameBuilder ) const override;
};


class Writer_i;
class CSphFieldFilterSettings : public SettingsWriter_c
{
public:
StrVec_t m_dRegexps;

bool Setup ( const CSphConfigSection & hIndex, CSphString & sWarning );
void Load ( CSphReader & tReader );
void Save ( CSphWriter & tWriter ) const;
void Save ( Writer_i & tWriter ) const;
void Format ( SettingsFormatter_c & tOut, FilenameBuilder_i * pFilenameBuilder ) const override;
};

Expand Down Expand Up @@ -422,9 +422,10 @@ class IndexSettingsContainer_c
class ISphTokenizer;
class CSphDict;
class CSphIndex;
class Writer_i;

void SaveTokenizerSettings ( CSphWriter & tWriter, const TokenizerRefPtr_c& pTokenizer, int iEmbeddedLimit );
void SaveDictionarySettings ( CSphWriter & tWriter, const DictRefPtr_c& pDict, bool bForceWordDict, int iEmbeddedLimit );
void SaveTokenizerSettings ( Writer_i & tWriter, const TokenizerRefPtr_c& pTokenizer, int iEmbeddedLimit );
void SaveDictionarySettings ( Writer_i & tWriter, const DictRefPtr_c& pDict, bool bForceWordDict, int iEmbeddedLimit );

void DumpSettings ( StringBuilder_c & tBuf, const CSphIndex & tIndex, FilenameBuilder_i * pFilenameBuilder );
void DumpSettingsCfg ( FILE * fp, const CSphIndex & tIndex, FilenameBuilder_i * pFilenameBuilder );
Expand Down
2 changes: 1 addition & 1 deletion src/memio.cpp
Expand Up @@ -126,7 +126,7 @@ void MemoryWriter_c::PutString ( const char * sVal )
}


void MemoryWriter_c::PutBytes ( const void * pData, int iLen )
void MemoryWriter_c::PutBytes ( const void * pData, int64_t iLen )
{
if ( !iLen )
return;
Expand Down
25 changes: 13 additions & 12 deletions src/memio.h
Expand Up @@ -14,6 +14,7 @@
#define _memio_

#include "sphinxstd.h"
#include "fileio.h"

class MemoryReader_c
{
Expand Down Expand Up @@ -52,21 +53,21 @@ T GetVal( MemoryReader_c& tReader );
template<typename VECTOR>
void GetArray ( VECTOR& dBuf, MemoryReader_c& tIn );

class MemoryWriter_c
class MemoryWriter_c : public Writer_i
{
public:
MemoryWriter_c ( CSphVector<BYTE> & dBuf );

int GetPos();
void ZipOffset ( uint64_t uVal );
void ZipInt ( DWORD uVal );
void PutString ( const CSphString & sVal );
void PutString ( const char * szVal );
void PutDword ( DWORD uVal );
void PutOffset ( SphOffset_t uValue );
void PutWord ( WORD uVal );
void PutBytes ( const void * pData, int iLen );
void PutByte ( BYTE uVal );
void ZipOffset ( uint64_t uVal ) override;
void ZipInt ( DWORD uVal ) override;
void PutString ( const CSphString & sVal ) override;
void PutString ( const char * szVal ) override;
void PutDword ( DWORD uVal ) override;
void PutOffset ( SphOffset_t uValue ) override;
void PutWord ( WORD uVal ) override;
void PutBytes ( const void * pData, int64_t iLen ) override;
void PutByte ( BYTE uVal ) override;
void PutUint64 ( uint64_t uVal );

template<typename T>
Expand Down Expand Up @@ -100,8 +101,8 @@ class MemoryWriter2_c : public MemoryWriter_c
public:
MemoryWriter2_c ( CSphVector<BYTE> & dBuf );

void ZipOffset ( uint64_t uVal );
void ZipInt ( DWORD uVal );
void ZipOffset ( uint64_t uVal ) override;
void ZipInt ( DWORD uVal ) override;
};

#include "memio_impl.h"
Expand Down

0 comments on commit 3be4503

Please sign in to comment.