Skip to content

Commit

Permalink
fixed replication nodes retry after some nodes are missed and name re…
Browse files Browse the repository at this point in the history
…solution of these nodes failed (should fixed kubernetes and docker nodes issues with replication); fixed error message at replication start failure; fixed model at test 376; added error code and readable message on name resolution failure; fixed dev#530
  • Loading branch information
tomatolog authored and sanikolaev committed Aug 21, 2023
1 parent 0871070 commit 71f81d8
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 75 deletions.
6 changes: 3 additions & 3 deletions src/searchdaemon.cpp
Expand Up @@ -677,11 +677,11 @@ DWORD sphGetAddress ( const char * sHost, bool bFatal, bool bIP, CSphString * pF
if ( iResult!=0 || !pResult )
{
if ( pFatal )
pFatal->SetSprintf ( "no AF_INET address found for: %s", sHost );
pFatal->SetSprintf ( "no AF_INET address found for: %s, error %d: %s", sHost, iResult, gai_strerror(iResult) );
else if ( bFatal )
sphFatal( "no AF_INET address found for: %s", sHost );
sphFatal( "no AF_INET address found for: %s, error %d: %s", sHost, iResult, gai_strerror(iResult) );
else
sphLogDebugv( "no AF_INET address found for: %s", sHost );
sphLogDebugv( "no AF_INET address found for: %s, error %d: %s", sHost, iResult, gai_strerror(iResult) );
return 0;
}

Expand Down
163 changes: 92 additions & 71 deletions src/searchdreplication.cpp
Expand Up @@ -294,9 +294,6 @@ static bool ClusterGetNodes ( const CSphString & sClusterNodes, const CSphString
// callback at remote node for CLUSTER_GET_NODES to return actual nodes list at cluster
static bool RemoteClusterGetNodes ( const CSphString & sCluster, const CSphString & sGTID, CSphString & sError, CSphString & sNodes );

// utility function to filter nodes list provided at string by specific protocol
static bool ClusterFilterNodes ( Str_t sSrcNodes, Proto_e eProto, CSphString & sDstNodes, CSphString & sError );

// callback at remote node for CLUSTER_UPDATE_NODES to update nodes list at cluster from actual nodes list
static bool RemoteClusterUpdateNodes ( const CSphString & sCluster, CSphString * pNodes, bool bSaveConf, bool bMergeNodes, CSphString & sError );

Expand Down Expand Up @@ -3133,49 +3130,45 @@ struct VecAgentDesc_t : public ISphNoncopyable, public CSphVector<AgentDesc_t *>
// get nodes of specific type from string
class ISphDescIterator
{
protected:
CSphString & m_sError;
virtual void ResetNodes () = 0;

public:
ISphDescIterator ( CSphString & sError )
: m_sError ( sError )
{}

bool ProcessNodes ( const CSphString & sNodes )
bool ProcessNodes ( const char * sNodes, CSphString & sError, CSphString & sWarning )
{
bool bOk = false;
int iRetry = 0;
const int iRetryMax = g_iNodeRetry;
while ( true )
if ( !sNodes || !*sNodes )
{
bOk = SplitNodes ( sNodes );
iRetry++;
m_sError += "empty nodes list";
return false;
}

if ( !bOk && iRetry<iRetryMax )
{
m_sError = "";
ResetNodes();
// should wait and retry for DNS set
Threads::Coro::SleepMsec ( g_iAnyNodesTimeout );
continue;
for ( int iRetry=0; iRetry<g_iNodeRetry; iRetry++ )
{
if ( SplitNodes ( sNodes ) )
return true;

} else
if ( iRetry+1>=g_iNodeRetry )
break;
}

return bOk;
}
m_sError.Clear();
ResetNodes();
// should wait and retry for DNS set
Threads::Coro::SleepMsec ( g_iAnyNodesTimeout );
}

private:
bool SplitNodes ( const CSphString & sNodes )
{
if ( sNodes.IsEmpty () )
if ( IsValid() )
{
sWarning = m_sError.cstr();
return true;
} else
{
m_sError = "empty nodes list";
sError = m_sError.cstr();
return false;
}
}

private:
bool SplitNodes ( const char * sNodes )
{
bool bOk = true;
CSphString sTmp = sNodes;
char * sCur = const_cast<char *>( sTmp.cstr() );
while ( *sCur )
Expand All @@ -3195,14 +3188,25 @@ class ISphDescIterator
sCur++;
}

if ( *sAddrs && !SetNode ( sAddrs ) )
return false;
if ( !*sAddrs )
continue;

CSphString sError;
if ( !SetNode ( sAddrs, sError ) )
{
bOk = false;
m_sError += sError.cstr();
}
}

return true;
return bOk;
}

virtual bool SetNode ( const char * sAddrs ) = 0;
virtual bool SetNode ( const char * sAddrs, CSphString & sError ) = 0;
virtual void ResetNodes () = 0;
virtual bool IsValid() const = 0;

StringBuilder_c m_sError { ";" };
};

CSphString GetAddr ( const ListenerDesc_t & tListen )
Expand All @@ -3225,16 +3229,20 @@ class AgentDescIterator_t : public ISphDescIterator
VecAgentDesc_t & m_dNodes;

void ResetNodes () override { m_dNodes.Reset(); }
bool IsValid() const override { return !m_dNodes.IsEmpty(); }

public:
AgentDescIterator_t ( VecAgentDesc_t & dNodes, CSphString & sError )
: ISphDescIterator ( sError )
, m_dNodes ( dNodes )
AgentDescIterator_t ( VecAgentDesc_t & dNodes )
: m_dNodes ( dNodes )
{}

bool SetNode ( const char * sListen ) override
bool SetNode ( const char * sListen, CSphString & sError ) override
{
ListenerDesc_t tListen = ParseListener ( sListen, &m_sError );
// filter out own address to do not query itself
if ( g_sIncomingProto.Begins ( sListen ) )
return true;

ListenerDesc_t tListen = ParseListener ( sListen, &sError );

if ( tListen.m_eProto==Proto_e::UNKNOWN )
return false;
Expand All @@ -3245,10 +3253,6 @@ class AgentDescIterator_t : public ISphDescIterator
if ( tListen.m_uIP==0 )
return true;

// filter out own address to do not query itself
if ( g_sIncomingProto.Begins ( sListen ) )
return true;

AgentDesc_t * pDesc = new AgentDesc_t;
m_dNodes.Add( pDesc );
pDesc->m_sAddr = GetAddr ( tListen );
Expand All @@ -3262,10 +3266,16 @@ class AgentDescIterator_t : public ISphDescIterator
}
};

static bool GetNodes ( const CSphString & sNodes, VecAgentDesc_t & dNodes, CSphString & sError )
static bool GetNodes ( const char * sNodes, VecAgentDesc_t & dNodes, CSphString & sError )
{
AgentDescIterator_t tIt ( dNodes, sError );
return tIt.ProcessNodes ( sNodes );
CSphString sWarning;
AgentDescIterator_t tIt ( dNodes );
bool bOk = tIt.ProcessNodes ( sNodes, sError, sWarning );

if ( bOk && !sWarning.IsEmpty() )
sphLogDebugRpl ( "node %s parse error: %s", sNodes, sWarning.cstr() );

return bOk;
}

static AgentConn_t * CreateAgent ( const AgentDesc_t & tDesc, const PQRemoteData_t & tReq, int64_t iTimeoutMs )
Expand All @@ -3285,7 +3295,7 @@ static AgentConn_t * CreateAgent ( const AgentDesc_t & tDesc, const PQRemoteData
return pAgent;
}

static bool GetNodes ( const CSphString & sNodes, VecRefPtrs_t<AgentConn_t *> & dNodes, const PQRemoteData_t & tReq, CSphString & sError )
static bool GetNodes ( const char * sNodes, VecRefPtrs_t<AgentConn_t *> & dNodes, const PQRemoteData_t & tReq, CSphString & sError )
{
VecAgentDesc_t dDesc;
if ( !GetNodes ( sNodes, dDesc, sError ) )
Expand All @@ -3312,16 +3322,16 @@ class ListenerProtocolIterator_t : public ISphDescIterator
sph::StringSet m_hNodes;

void ResetNodes () override { m_hNodes.Reset(); }
bool IsValid() const override { return !m_hNodes.IsEmpty(); }

public:
ListenerProtocolIterator_t ( Proto_e eProto, CSphString & sError )
: ISphDescIterator ( sError )
, m_eProto ( eProto )
ListenerProtocolIterator_t ( Proto_e eProto )
: m_eProto ( eProto )
{}

bool SetNode ( const char * sListen ) override
bool SetNode ( const char * sListen, CSphString & sError ) override
{
ListenerDesc_t tListen = ParseListener ( sListen, &m_sError );
ListenerDesc_t tListen = ParseListener ( sListen, &sError );
if ( tListen.m_eProto==Proto_e::UNKNOWN )
return false;

Expand All @@ -3348,15 +3358,20 @@ class ListenerProtocolIterator_t : public ISphDescIterator
};

// utility function to filter nodes list provided at string by specific protocol
bool ClusterFilterNodes ( Str_t sSrcNodes, Proto_e eProto, CSphString & sDstNodes, CSphString & sError )
static bool ClusterFilterNodes ( const char * sSrcNodes, Proto_e eProto, CSphString & sDstNodes, CSphString & sError )
{
ListenerProtocolIterator_t tIt ( eProto, sError );
if ( !tIt.ProcessNodes ( sSrcNodes ) )
return false;
CSphString sWarning;
ListenerProtocolIterator_t tIt ( eProto );
bool bOk = tIt.ProcessNodes ( sSrcNodes, sError, sWarning );

sDstNodes = tIt.DumpNodes();
if ( bOk )
{
sDstNodes = tIt.DumpNodes();
if ( !sWarning.IsEmpty() )
sphLogDebugRpl ( "node %s parse error: %s", sSrcNodes, sWarning.cstr() );
}

return ( sError.IsEmpty() );
return bOk;
}

// base of API commands request and reply builders
Expand Down Expand Up @@ -3894,7 +3909,7 @@ bool ClusterDelete ( const CSphString & sCluster, CSphString & sError, CSphStrin
PQRemoteData_t tCmd;
tCmd.m_sCluster = sCluster;
VecRefPtrs_t<AgentConn_t *> dNodes;
if ( !GetNodes ( sNodes, dNodes, tCmd, sError ) )
if ( !GetNodes ( sNodes.cstr(), dNodes, tCmd, sError ) )
return false;

PQRemoteDelete_c tReq;
Expand Down Expand Up @@ -5371,7 +5386,7 @@ static bool ClusterAlterAdd ( const CSphString & sCluster, const CSphString & sI
if ( !sNodes.IsEmpty() )
{
VecAgentDesc_t dDesc;
if ( !GetNodes ( sNodes, dDesc, sError ) )
if ( !GetNodes ( sNodes.cstr(), dDesc, sError ) )
return false;

if ( dDesc.GetLength() && !NodesReplicateIndex ( sCluster, sIndex, dDesc, pServed, sError ) )
Expand Down Expand Up @@ -5500,9 +5515,12 @@ bool SendClusterIndexes ( const ReplicationCluster_t * pCluster, const CSphStrin
const wsrep_gtid_t & tStateID, CSphString & sError ) EXCLUDES ( g_tClustersLock )
{
VecAgentDesc_t dDesc;
if ( !GetNodes ( sNode, dDesc, sError ) || !dDesc.GetLength() )
if ( !GetNodes ( sNode.cstr(), dDesc, sError ) || !dDesc.GetLength() )
{
sError.SetSprintf ( "%s invalid node", sNode.cstr() );
if ( sError.IsEmpty() )
sError.SetSprintf ( "%s invalid node", sNode.cstr() );
else
sError.SetSprintf ( "%s invalid node, error: %s", sNode.cstr(), sError.cstr() );
return false;
}

Expand Down Expand Up @@ -5660,9 +5678,12 @@ std::optional<CSphString> IsPartOfCluster ( const ServedDesc_t * pDesc )
bool ClusterGetNodes ( const CSphString & sClusterNodes, const CSphString & sCluster, const CSphString & sGTID, Proto_e eProto, CSphString & sError, CSphString & sNodes )
{
VecAgentDesc_t dDesc;
if ( !GetNodes ( sClusterNodes, dDesc, sError ) || !dDesc.GetLength() || !sError.IsEmpty() )
if ( !GetNodes ( sClusterNodes.cstr(), dDesc, sError ) || !dDesc.GetLength() || !sError.IsEmpty() )
{
sError.SetSprintf ( "%s invalid node, %s", sClusterNodes.cstr(), sError.cstr() );
if ( sError.IsEmpty() )
sError.SetSprintf ( "%s invalid node", sClusterNodes.cstr() );
else
sError.SetSprintf ( "%s invalid node, error: %s", sClusterNodes.cstr(), sError.cstr() );
return false;
}

Expand Down Expand Up @@ -5720,7 +5741,7 @@ bool ClusterGetNodes ( const CSphString & sClusterNodes, const CSphString & sClu
return false;
}

if ( !ClusterFilterNodes ( Str_t ( sAllNodes ), eProto, sNodes, sError ) )
if ( !ClusterFilterNodes ( sAllNodes.cstr(), eProto, sNodes, sError ) )
return false;

if ( sNodes.IsEmpty() )
Expand Down Expand Up @@ -5815,7 +5836,7 @@ bool DoClusterAlterUpdate ( const CSphString & sCluster, const CSphString & sUpd
tReqData.m_bJoinUpdate = bJoinUpdate;

VecRefPtrs_t<AgentConn_t *> dNodes;
if ( !GetNodes ( sNodes, dNodes, tReqData, sError ) )
if ( !GetNodes ( sNodes.cstr(), dNodes, tReqData, sError ) )
return false;

PQRemoteClusterUpdateNodes_c tReq;
Expand Down Expand Up @@ -5863,9 +5884,9 @@ bool RemoteClusterUpdateNodes ( const CSphString & sCluster, CSphString * pNodes
sNodes << pCluster->m_sViewNodes.cstr();
}

if ( !ClusterFilterNodes ( Str_t ( sNodes ), Proto_e::SPHINX, pCluster->m_sClusterNodes, sError ) )
if ( !ClusterFilterNodes ( sNodes.cstr(), Proto_e::SPHINX, pCluster->m_sClusterNodes, sError ) )
{
sError.SetSprintf ( "cluster '%s', invalid nodes(%s), error: %s", sCluster.cstr(), sNodes.cstr(), sError.cstr() );
sError.SetSprintf ( "cluster '%s', invalid nodes(%s), error: %s", sCluster.cstr(), sNodes.cstr(), sError.scstr() );
return false;
}
bClusterChanged = ( uWasNodes!=sphFNV64 ( pCluster->m_sClusterNodes.cstr() ) );
Expand Down

0 comments on commit 71f81d8

Please sign in to comment.