Skip to content

Commit

Permalink
added replay of the empty binlog files with the old binlog versions; …
Browse files Browse the repository at this point in the history
…fixed Github#1364
  • Loading branch information
tomatolog committed Aug 18, 2023
1 parent 4ae1651 commit 828d139
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions src/binlog.cpp
Expand Up @@ -151,6 +151,7 @@ class Binlog_c : public ISphNoncopyable
bool m_bDisabled = true;
bool m_bConfigless = false;
DWORD m_uReplayFlags = 0;
bool m_bWrongVersion = false;

int m_iRestartSize = 268435456; // binlog size restart threshold, 256M

Expand All @@ -169,7 +170,7 @@ class Binlog_c : public ISphNoncopyable
bool ReplayTxn ( Binlog::Blop_e eOp, int iBinlog, BinlogReader_c & tReader ) const;
bool ReplayUpdateAttributes ( int iBinlog, BinlogReader_c & tReader ) const;
bool ReplayIndexAdd ( int iBinlog, const SmallStringHash_T<CSphIndex*> & hIndexes, BinlogReader_c & tReader ) const;
bool ReplayCacheAdd ( int iBinlog, BinlogReader_c & tReader ) const;
bool ReplayCacheAdd ( int iBinlog, DWORD uVersion, BinlogReader_c & tReader ) const;
bool IsBinlogWritable ( int64_t * pTID = nullptr );

static bool CheckCrc ( const char * sOp, const CSphString & sIndex, int64_t iTID, int64_t iTxnPos, BinlogReader_c & tReader ) ;
Expand Down Expand Up @@ -663,11 +664,10 @@ void Binlog_c::LoadMeta ()
return;

// ok, so there is actual recovery data
// let's require that exact version and bitness, then
if ( uVersion!=BINLOG_VERSION )
sphDie ( "binlog meta file %s is v.%d, binary is v.%d; recovery requires previous binary version",
sMeta.cstr(), uVersion, BINLOG_VERSION );
// could be wrong version of the empty binlog
m_bWrongVersion = ( uVersion!=BINLOG_VERSION );

// let's require that bitness
if ( !bLoaded64bit )
sphDie ( "tables with 32-bit docids are no longer supported; recovery requires previous binary version" );

Expand Down Expand Up @@ -878,11 +878,11 @@ int Binlog_c::ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, int
}

DWORD uVersion = tReader.GetDword();
if ( uVersion!=BINLOG_VERSION || tReader.GetErrorFlag() )
{
Log ( REPLAY_IGNORE_TRX_ERROR, "binlog: log %s is v.%d, binary is v.%d; recovery requires previous binary version", sLog.cstr(), uVersion, BINLOG_VERSION );
return -1;
}
if ( tReader.GetErrorFlag() )
sphWarning ( "binlog: log io error at pos=" INT64_FMT ": %s", tReader.GetPos(), sError.cstr() );

// could replay empty binlog of the old version
m_bWrongVersion = ( uVersion!=BINLOG_VERSION );

/////////////
// do replay
Expand Down Expand Up @@ -920,6 +920,12 @@ int Binlog_c::ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, int
bReplayOK = false;
break;
}
if ( m_bWrongVersion && uOp!=ADD_CACHE )
{
Log ( REPLAY_IGNORE_TRX_ERROR, "binlog: log %s is v.%d, binary is v.%d; recovery requires previous binary version", sLog.cstr(), uVersion, BINLOG_VERSION );
bReplayOK = false;
break;
}

// FIXME! blop might be OK but skipped (eg. index that is no longer)
switch ( uOp )
Expand All @@ -936,7 +942,7 @@ int Binlog_c::ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, int
break;
}
bHaveCacheOp = true;
bReplayOK = ReplayCacheAdd ( iBinlog, tReader );
bReplayOK = ReplayCacheAdd ( iBinlog, uVersion, tReader );
break;

case UPDATE_ATTRS:
Expand Down Expand Up @@ -1056,14 +1062,19 @@ bool Binlog_c::ReplayIndexAdd ( int iBinlog, const SmallStringHash_T<CSphIndex*>
return true;
}

bool Binlog_c::ReplayCacheAdd ( int iBinlog, BinlogReader_c & tReader ) const
bool Binlog_c::ReplayCacheAdd ( int iBinlog, DWORD uVersion, BinlogReader_c & tReader ) const
{
const int64_t iTxnPos = tReader.GetPos();
BinlogFileDesc_t & tLog = m_dLogFiles[iBinlog];

// load data
CSphVector<BinlogIndexInfo_t> dCache;
dCache.Resize ( (int) tReader.UnzipOffset() ); // FIXME! sanity check
if ( m_bWrongVersion && dCache.GetLength() )
{
Log ( REPLAY_IGNORE_TRX_ERROR, "binlog: log %s is v.%d, binary is v.%d; recovery requires previous binary version", tReader.GetFilename().cstr(), uVersion, BINLOG_VERSION );
return false;
}
ARRAY_FOREACH ( i, dCache )
{
dCache[i].m_sName = tReader.GetString();
Expand Down

0 comments on commit 828d139

Please sign in to comment.