Navigation Menu

Skip to content

Commit

Permalink
cjson wrapper in replication code
Browse files Browse the repository at this point in the history
  • Loading branch information
glookka committed Jan 30, 2019
1 parent 12492cd commit a5d41f9
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 272 deletions.
116 changes: 51 additions & 65 deletions src/searchd.cpp
Expand Up @@ -22,7 +22,6 @@
#include "sphinxplugin.h"
#include "sphinxqcache.h"
#include "sphinxrlp.h"
#include "json/cJSON.h"

extern "C"
{
Expand Down Expand Up @@ -830,7 +829,7 @@ void ServedStats_c::CalcStatsForInterval ( const QueryStatContainer_i * pContain
// want write lock to wipe out reader and not wait readers
// but only for RT and PQ indexes as these operations are rare there
ServedIndex_c::ServedIndex_c ( const ServedDesc_t & tDesc )
: m_tLock ( tDesc.m_eType==eITYPE::RT || tDesc.m_eType==eITYPE::PERCOLATE )
: m_tLock ( tDesc.m_eType==IndexType_e::RT || tDesc.m_eType==IndexType_e::PERCOLATE )
{
*(ServedDesc_t*)(this) = tDesc;
}
Expand Down Expand Up @@ -12322,7 +12321,7 @@ static void PercolateAddQuery ( const SqlStmt_t &tStmt, bool bReplace, ESphColla
return;
}

if ( pServed->m_eType!=eITYPE::PERCOLATE )
if ( pServed->m_eType!=IndexType_e::PERCOLATE )
{
sError.SetSprintf ( "index '%s' is not percolate", sIndex.cstr () );
tOut.Error ( tStmt.m_sStmt, sError.cstr () );
Expand Down Expand Up @@ -12405,7 +12404,6 @@ struct StringPtrTraits_t
CSphVector<BYTE> m_dPackedData;
CSphFixedVector<int> m_dOff { 0 };
CSphVector<BYTE> m_dParserBuf;
cJSON * m_pJsonStorage = nullptr;

// remap offsets to string pointers
void SavePointersTo ( VecTraits_T<const char *> &dStrings, bool bSkipInvalid=true ) const
Expand All @@ -12431,12 +12429,6 @@ struct StringPtrTraits_t
m_dPackedData.Resize ( 0 );
m_dParserBuf.Resize ( 0 );
m_dOff.Fill ( -1 );

if ( m_pJsonStorage )
{
cJSON_Delete ( m_pJsonStorage );
m_pJsonStorage = nullptr;
}
}

BYTE* ReserveBlob ( int iBlobSize, int iOffset )
Expand All @@ -12455,12 +12447,6 @@ struct StringPtrTraits_t
{
memcpy ( ReserveBlob ( dBlob.GetLength (), iOffset ), dBlob.begin (), dBlob.GetLength () );
}

~StringPtrTraits_t()
{
if ( m_pJsonStorage )
cJSON_Delete ( m_pJsonStorage );
}
};

static void BsonToSqlInsert ( const bson::Bson_c& dBson, SqlInsert_t& tAttr )
Expand Down Expand Up @@ -13025,7 +13011,7 @@ static void PQLocalMatch ( const BlobVec_t &dDocs, const CSphString& sIndex, con
return;
}

if ( pServed->m_eType!=eITYPE::PERCOLATE )
if ( pServed->m_eType!=IndexType_e::PERCOLATE )
{
sMsg.Err ( "index '%s' is not percolate", sIndex.cstr () );
return;
Expand Down Expand Up @@ -13546,7 +13532,7 @@ void sphHandleMysqlInsert ( StmtErrorReporter_i & tOut, SqlStmt_t & tStmt, bool
CSphString sError;

// need desc of that index first
eITYPE eType = eITYPE::ERROR_;
IndexType_e eType = IndexType_e::ERROR_;
{
ServedDescRPtr_c pServed ( GetServed ( tStmt.m_sIndex ) );

Expand All @@ -13560,13 +13546,13 @@ void sphHandleMysqlInsert ( StmtErrorReporter_i & tOut, SqlStmt_t & tStmt, bool
}

// percolate need unlocked index
if ( eType==eITYPE::PERCOLATE && tOut.GetBuffer () )
if ( eType==IndexType_e::PERCOLATE && tOut.GetBuffer () )
{
PercolateAddQuery ( tStmt, bReplace, eCollation, *tOut.GetBuffer (), sWarning );
return;
}

if ( eType!=eITYPE::RT )
if ( eType!=IndexType_e::RT )
{
sError.SetSprintf ( "index '%s' does not support INSERT", tStmt.m_sIndex.cstr() );
tOut.Error ( tStmt.m_sStmt, sError.cstr() );
Expand Down Expand Up @@ -14695,16 +14681,16 @@ void HandleMysqlShowTables ( SqlRowBuffer_c & tOut, SqlStmt_t & tStmt )
ServedDescRPtr_c pIdx ( it.Get () );
switch ( pIdx->m_eType )
{
case eITYPE::PLAIN:
case IndexType_e::PLAIN:
dIndexes.Add ( CSphNamedInt ( it.GetName (), 0 ) );
break;
case eITYPE::RT:
case IndexType_e::RT:
dIndexes.Add ( CSphNamedInt ( it.GetName (), 2 ) );
break;
case eITYPE::PERCOLATE:
case IndexType_e::PERCOLATE:
dIndexes.Add ( CSphNamedInt ( it.GetName (), 4 ) );
break;
case eITYPE::TEMPLATE:
case IndexType_e::TEMPLATE:
dIndexes.Add ( CSphNamedInt ( it.GetName (), 3 ) );
break;
default:
Expand Down Expand Up @@ -15851,7 +15837,7 @@ static int LocalIndexDoDeleteDocuments ( const CSphString & sName, const CSphStr
}

// goto to percolate path with unlocked index
if ( pLocked->m_eType==eITYPE::PERCOLATE )
if ( pLocked->m_eType==IndexType_e::PERCOLATE )
break;

CSphScopedPtr<SearchHandler_c> pHandler ( nullptr );
Expand Down Expand Up @@ -16405,16 +16391,16 @@ void HandleMysqlAttach ( SqlRowBuffer_c & tOut, const SqlStmt_t & tStmt )

if ( !pFrom
|| !pTo
|| pFrom->m_eType!=eITYPE::PLAIN
|| pTo->m_eType!=eITYPE::RT )
|| pFrom->m_eType!=IndexType_e::PLAIN
|| pTo->m_eType!=IndexType_e::RT )
{
if ( !pFrom )
tOut.ErrorEx ( MYSQL_ERR_PARSE_ERROR, "no such index '%s'", sFrom.cstr() );
else if ( !pTo )
tOut.ErrorEx ( MYSQL_ERR_PARSE_ERROR, "no such index '%s'", sTo.cstr() );
else if ( pFrom->m_eType!=eITYPE::PLAIN )
else if ( pFrom->m_eType!=IndexType_e::PLAIN )
tOut.Error ( tStmt.m_sStmt, "1st argument to ATTACH must be a plain index" );
else if ( pTo->m_eType!=eITYPE::RT )
else if ( pTo->m_eType!=IndexType_e::RT )
tOut.Error ( tStmt.m_sStmt, "2nd argument to ATTACH must be a RT index" );
return;
}
Expand Down Expand Up @@ -16955,15 +16941,15 @@ static void AddPlainIndexStatus ( SqlRowBuffer_c & tOut, const ServedIndex_c * p

switch ( pLocked->m_eType )
{
case eITYPE::PLAIN: tOut.DataTuplet ( "index_type", "disk" );
case IndexType_e::PLAIN: tOut.DataTuplet ( "index_type", "disk" );
break;
case eITYPE::RT: tOut.DataTuplet ( "index_type", "rt" );
case IndexType_e::RT: tOut.DataTuplet ( "index_type", "rt" );
break;
case eITYPE::PERCOLATE: tOut.DataTuplet ( "index_type", "percolate" );
case IndexType_e::PERCOLATE: tOut.DataTuplet ( "index_type", "percolate" );
break;
case eITYPE::TEMPLATE: tOut.DataTuplet ( "index_type", "template" );
case IndexType_e::TEMPLATE: tOut.DataTuplet ( "index_type", "template" );
break;
case eITYPE::DISTR: tOut.DataTuplet ( "index_type", "distributed" );
case IndexType_e::DISTR: tOut.DataTuplet ( "index_type", "distributed" );
break;
default:
break;
Expand Down Expand Up @@ -18882,7 +18868,7 @@ static void PrereadFunc ( void * )
continue;

ServedDescRPtr_c dReadLock ( pServed );
if ( dReadLock->m_eType==eITYPE::TEMPLATE )
if ( dReadLock->m_eType==IndexType_e::TEMPLATE )
continue;

int64_t tmReading = sphMicroTimer();
Expand Down Expand Up @@ -19172,7 +19158,7 @@ void OptimizeThreadFunc ( void * )
ThreadSystem_t tThdSystemDesc ( "OPTIMIZE" );

// FIXME: MVA update would wait w-lock here for a very long time
assert ( dReadLock->m_eType==eITYPE::RT );
assert ( dReadLock->m_eType==IndexType_e::RT );
static_cast<ISphRtIndex *>( dReadLock->m_pIndex )->Optimize ();
}
}
Expand Down Expand Up @@ -19390,7 +19376,7 @@ bool AddLocallyServedIndex ( const char * szIndexName, ServedDesc_t & tIdx, bool

// at this point only template indexes are production-ready
// so all no-templates we implicitly add to disabled hash
if ( tIdx.m_eType!=eITYPE::TEMPLATE )
if ( tIdx.m_eType!=IndexType_e::TEMPLATE )
{
g_pLocalIndexes->AddUniq ( nullptr, szIndexName );
if ( !g_pDisabledIndexes->AddUniq ( pIdx, szIndexName ) )
Expand Down Expand Up @@ -19526,7 +19512,7 @@ ESphAddIndex AddRTIndex ( const char * szIndexName, const CSphConfigSection &hIn
// index
ServedDesc_t tIdx;
tIdx.m_pIndex = sphCreateIndexRT ( tSchema, szIndexName, iRamSize, hIndex["path"].cstr (), bWordDict );
tIdx.m_eType = eITYPE::RT;
tIdx.m_eType = IndexType_e::RT;
return AddRTPercolate ( szIndexName, tIdx, hIndex, tSettings, bReplace );
}

Expand Down Expand Up @@ -19609,7 +19595,7 @@ ESphAddIndex AddPercolateIndex ( const char * szIndexName, const CSphConfigSecti
// index
ServedDesc_t tIdx;
tIdx.m_pIndex = CreateIndexPercolate ( tSchema, szIndexName, hIndex["path"].cstr () );
tIdx.m_eType = eITYPE::PERCOLATE;
tIdx.m_eType = IndexType_e::PERCOLATE;
return AddRTPercolate ( szIndexName, tIdx, hIndex, tSettings, bReplace );
}

Expand Down Expand Up @@ -19696,39 +19682,39 @@ ESphAddIndex AddTemplateIndex ( const char * szIndexName, const CSphConfigSectio
CSphIndexStatus tStatus;
tIdx.m_pIndex->GetStatus ( &tStatus );
tIdx.m_iMass = CalculateMass ( tStatus );
tIdx.m_eType = eITYPE::TEMPLATE;
tIdx.m_eType = IndexType_e::TEMPLATE;

// done
if ( AddLocallyServedIndex ( szIndexName, tIdx, bReplace ) )
return ADD_SERVED;
return ADD_ERROR;
}

eITYPE TypeOfIndexConfig ( const CSphString & sType )
IndexType_e TypeOfIndexConfig ( const CSphString & sType )
{
if ( sType=="distributed" )
return eITYPE::DISTR;
return IndexType_e::DISTR;

if ( sType=="rt" )
return eITYPE::RT;
return IndexType_e::RT;

if ( sType=="percolate" )
return eITYPE::PERCOLATE;
return IndexType_e::PERCOLATE;

if ( sType=="template" )
return eITYPE::TEMPLATE;
return IndexType_e::TEMPLATE;

if ( ( sType.IsEmpty () || sType=="plain" ) )
return eITYPE::PLAIN;
return IndexType_e::PLAIN;

return eITYPE::ERROR_;
return IndexType_e::ERROR_;
}

const char * g_sIndexTypeName[1+(int)eITYPE::ERROR_] = { "plain", "template", "rt", "percolate", "distributed", "invalid" };
const char * g_dIndexTypeName[1+(int)IndexType_e::ERROR_] = { "plain", "template", "rt", "percolate", "distributed", "invalid" };

CSphString GetTypeName ( eITYPE eType )
CSphString GetTypeName ( IndexType_e eType )
{
return g_sIndexTypeName[(int)eType];
return g_dIndexTypeName[(int)eType];
}

// ServiceMain() -> ConfigureAndPreload() -> AddIndex()
Expand All @@ -19737,17 +19723,17 @@ ESphAddIndex AddIndex ( const char * szIndexName, const CSphConfigSection & hInd
{
switch ( TypeOfIndexConfig ( hIndex.GetStr ( "type", nullptr ) ) )
{
case eITYPE::DISTR:
case IndexType_e::DISTR:
return AddDistributedIndex ( szIndexName, hIndex );
case eITYPE::RT:
case IndexType_e::RT:
return AddRTIndex ( szIndexName, hIndex, bReplace );
case eITYPE::PERCOLATE:
case IndexType_e::PERCOLATE:
return AddPercolateIndex ( szIndexName, hIndex, bReplace );
case eITYPE::TEMPLATE:
case IndexType_e::TEMPLATE:
return AddTemplateIndex ( szIndexName, hIndex, bReplace );
case eITYPE::PLAIN:
case IndexType_e::PLAIN:
return AddPlainIndex ( szIndexName, hIndex, bReplace );
case eITYPE::ERROR_:
case IndexType_e::ERROR_:
default:
break;
}
Expand Down Expand Up @@ -19861,9 +19847,9 @@ static void ReloadIndexSettings ( CSphConfigParser & tCP ) REQUIRES ( MainThread
{
const CSphConfigSection & hIndex = hConf["index"].IterateGet();
const auto & sIndexName = hConf["index"].IterateGetKey();
eITYPE eNewType = TypeOfIndexConfig ( hIndex.GetStr ( "type", nullptr ) );
IndexType_e eNewType = TypeOfIndexConfig ( hIndex.GetStr ( "type", nullptr ) );

if ( eNewType==eITYPE::ERROR_ )
if ( eNewType==IndexType_e::ERROR_ )
continue;

bool bReplaceLocal = false;
Expand All @@ -19889,7 +19875,7 @@ static void ReloadIndexSettings ( CSphConfigParser & tCP ) REQUIRES ( MainThread
tDesc.m_sGlobalIDFPath!=pServedRLocked->m_sGlobalIDFPath ||
tDesc.m_bOnDiskAttrs!=pServedRLocked->m_bOnDiskAttrs ||
tDesc.m_bOnDiskPools!=pServedRLocked->m_bOnDiskPools );
bReconfigure |= ( pServedRLocked->m_eType!=eITYPE::TEMPLATE && hIndex.Exists ( "path" ) && hIndex["path"].strval ()!=pServedRLocked->m_sIndexPath );
bReconfigure |= ( pServedRLocked->m_eType!=IndexType_e::TEMPLATE && hIndex.Exists ( "path" ) && hIndex["path"].strval ()!=pServedRLocked->m_sIndexPath );
}
}

Expand All @@ -19899,7 +19885,7 @@ static void ReloadIndexSettings ( CSphConfigParser & tCP ) REQUIRES ( MainThread
{
ServedDescWPtr_c pServedWLocked ( pServedIndex );
ConfigureLocalIndex ( pServedWLocked, hIndex );
if ( pServedWLocked->m_eType!=eITYPE::TEMPLATE && hIndex.Exists ( "path" ) && hIndex["path"].strval ()!=pServedWLocked->m_sIndexPath )
if ( pServedWLocked->m_eType!=IndexType_e::TEMPLATE && hIndex.Exists ( "path" ) && hIndex["path"].strval ()!=pServedWLocked->m_sIndexPath )
{
pServedWLocked->m_sNewPath = hIndex["path"].strval ();
g_pDisabledIndexes->AddUniq ( nullptr, sIndexName );
Expand All @@ -19914,7 +19900,7 @@ static void ReloadIndexSettings ( CSphConfigParser & tCP ) REQUIRES ( MainThread
}

auto pDistrIndex = GetDistr ( sIndexName );
if ( pDistrIndex && eNewType==eITYPE::DISTR )
if ( pDistrIndex && eNewType==IndexType_e::DISTR )
{
DistributedIndexRefPtr_t ptrIdx ( new DistributedIndex_t );
ConfigureDistributedIndex ( *ptrIdx, sIndexName.cstr (), hIndex );
Expand Down Expand Up @@ -19942,7 +19928,7 @@ static void ReloadIndexSettings ( CSphConfigParser & tCP ) REQUIRES ( MainThread
auto pAddedIndex = GetDisabled ( sIndexName.cstr() );
assert ( pAddedIndex );
ServedDescWPtr_c pWlockedDisabled ( pAddedIndex );
if ( pWlockedDisabled->m_eType==eITYPE::PLAIN )
if ( pWlockedDisabled->m_eType==IndexType_e::PLAIN )
pWlockedDisabled->m_bOnlyNew = true;
if ( pDistrIndex )
dDistrToDelete[sIndexName] = false;
Expand Down Expand Up @@ -20016,7 +20002,7 @@ void CheckRotate () REQUIRES ( MainThread )
ServedDescRPtr_c rLocked ( pIdx );
const CSphString &sIndex = it.GetName ();
assert ( rLocked->m_pIndex );
if ( rLocked->m_eType==eITYPE::PLAIN && ROTATE_FROM::NEW==CheckIndexHeaderRotate ( *rLocked ) )
if ( rLocked->m_eType==IndexType_e::PLAIN && ROTATE_FROM::NEW==CheckIndexHeaderRotate ( *rLocked ) )
g_pDisabledIndexes->AddUniq ( nullptr, sIndex );
}

Expand All @@ -20038,7 +20024,7 @@ void CheckRotate () REQUIRES ( MainThread )
auto pRotating = GetServed ( sIndex );
assert ( pRotating );
ServedDescWPtr_c pWRotating ( pRotating );
assert ( pWRotating->m_eType==eITYPE::PLAIN );
assert ( pWRotating->m_eType==IndexType_e::PLAIN );
if ( !RotateIndexGreedy ( pRotating, *pWRotating, sIndex, sError ) )
{
sphWarning ( "%s", sError.cstr () );
Expand Down Expand Up @@ -20066,7 +20052,7 @@ void CheckRotate () REQUIRES ( MainThread )
g_pDistIndexes->Delete ( sIndex ); // postponed delete of same-named distributed (if any)
}

if ( pWlockedServedPtr->m_eType!=eITYPE::PLAIN )
if ( pWlockedServedPtr->m_eType!=IndexType_e::PLAIN )
continue;

bool bWasAdded = pWlockedServedPtr->m_bOnlyNew;
Expand Down

0 comments on commit a5d41f9

Please sign in to comment.