Skip to content

Commit

Permalink
Enable FTS for mirrorless cluster
Browse files Browse the repository at this point in the history
This commit enables FTS for mirrorless cluster and adds status messages
into gp_configuration_history for when a segment goes up/down for
mirrorless clusters

Output message in gp_configuration_history:
'FTS: content id 0 dbid 2 is down'
'FTS: content id 0 dbid 2 is now up'
  • Loading branch information
divyeshddv committed Feb 21, 2024
1 parent b744273 commit 897ead2
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 78 deletions.
59 changes: 53 additions & 6 deletions src/backend/fts/fts.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,57 @@ probeWalRepUpdateConfig(int16 dbid, int16 segindex, char role,
}
}

void
probeUpdateConfHistory(const CdbComponentDatabaseInfo *primary,
bool isSegmentAlive,
bool hasMirrors)
{
Relation histrel;
HeapTuple histtuple;
Datum histvals[Natts_gp_configuration_history];
bool histnulls[Natts_gp_configuration_history] = {false };
char desc[64];

histrel = table_open(GpConfigHistoryRelationId,
RowExclusiveLock);

histvals[Anum_gp_configuration_history_time-1] =
TimestampTzGetDatum(GetCurrentTimestamp());
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(primary->config->dbid);
if (hasMirrors)
{
if (isSegmentAlive)
snprintf(desc, sizeof(desc),
"FTS: content id %d is out of double fault, dbid %d is up",
primary->config->segindex, primary->config->dbid);
else
snprintf(desc, sizeof(desc),
"FTS: double fault detected for content id %d",
primary->config->segindex);
}
else
{
if (isSegmentAlive)
snprintf(desc, sizeof(desc),
"FTS: content id %d dbid %d is now up",
primary->config->segindex, primary->config->dbid);
else
snprintf(desc, sizeof(desc),
"FTS: content id %d dbid %d is down",
primary->config->segindex, primary->config->dbid);
}
histvals[Anum_gp_configuration_history_description-1] =
CStringGetTextDatum(desc);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
CatalogTupleInsert(histrel, histtuple);
heap_freetuple(histtuple);

SIMPLE_FAULT_INJECTOR("fts_update_config_hist");

table_close(histrel, RowExclusiveLock);
}

static
void FtsLoop()
{
Expand All @@ -283,7 +334,6 @@ void FtsLoop()

while (true)
{
bool has_mirrors;
int rc;

if (got_SIGHUP)
Expand All @@ -308,7 +358,6 @@ void FtsLoop()
cdbs = readCdbComponentInfoAndUpdateStatus();

/* Check here gp_segment_configuration if has mirror's */
has_mirrors = gp_segment_config_has_mirrors();

/* close the transaction we started above */
CommitTransactionCommand();
Expand All @@ -322,12 +371,10 @@ void FtsLoop()
skip_fts_probe = true;
#endif

if (skip_fts_probe || !has_mirrors)
if (skip_fts_probe)
{
elogif(gp_log_fts >= GPVARS_VERBOSITY_VERBOSE, LOG,
"skipping FTS probes due to %s",
!has_mirrors ? "no mirrors" : "fts_probe fault");

"skipping FTS probes due to fts_probe fault");
}
else
{
Expand Down
135 changes: 63 additions & 72 deletions src/backend/fts/ftsprobe.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


static struct pollfd *PollFds;
static Bitmapset *doubleFaultContentIds;
static Bitmapset *failedContentIds;

static CdbComponentDatabaseInfo *
FtsGetPeerSegment(CdbComponentDatabases *cdbs,
Expand Down Expand Up @@ -856,7 +856,7 @@ processRetry(fts_context *context)
* mirror as down prematurely. If mirror is already marked
* down in configuration, there is no need to retry.
*/
if (!(ftsInfo->result.retryRequested &&
if (!(ftsInfo->result.retryRequested && context->has_mirrors &&
SEGMENT_IS_ALIVE(ftsInfo->mirror_cdbinfo)))
break;
/* else, fallthrough */
Expand Down Expand Up @@ -980,60 +980,45 @@ updateConfiguration(CdbComponentDatabaseInfo *primary,
/*
* This function is used to update gp_configuration_history whenever a
* segment pair goes into double fault or comes out of double fault.
* Or when a segment goes down/up in a mirrorless cluster.
* Since FTS probe doesn't have its own transaction, we need to create one
* to update the catalog and commit it once it is done.
* We cannot update gp_segment_configuration in these cases since the status of
* last standing primary has to be 'u' to enable gpstop/gpstart to start/restart
* the last standing primary segment. We only log a special message of this state
* (mirrored double fault/ mirrorless single fault) into gp_configuration_history
* for utils to detect this state.
*/
static void
updateDoubleFaultStatus(CdbComponentDatabaseInfo *primary, bool isSegmentAlive)
updateSegmentDownStatus(CdbComponentDatabaseInfo *primary,
bool isSegmentAlive,
bool hasMirrors)
{
StartTransactionCommand();
GetTransactionSnapshot();

/* Nothing to update if segment is alive, but content isn't down */
if (isSegmentAlive && !bms_is_member(primary->config->segindex, failedContentIds))
return;
/* Nothing to update if segment is dead, and we have already reported */
if (!isSegmentAlive && bms_is_member(primary->config->segindex, failedContentIds))
return;
/*
* Insert new tuple into gp_configuration_history catalog.
*/
Relation histrel;
HeapTuple histtuple;
Datum histvals[Natts_gp_configuration_history];
bool histnulls[Natts_gp_configuration_history] = { false };
char desc[SQL_CMD_BUF_SIZE];

histrel = table_open(GpConfigHistoryRelationId,
RowExclusiveLock);

histvals[Anum_gp_configuration_history_time-1] =
TimestampTzGetDatum(GetCurrentTimestamp());
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(primary->config->dbid);
if (isSegmentAlive)
snprintf(desc, sizeof(desc),
"FTS: content id %d is out of double fault, dbid %d is up",
primary->config->segindex, primary->config->dbid);
else
snprintf(desc, sizeof(desc),
"FTS: double fault detected for content id %d",
primary->config->segindex);
histvals[Anum_gp_configuration_history_description-1] =
CStringGetTextDatum(desc);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
CatalogTupleInsert(histrel, histtuple);

SIMPLE_FAULT_INJECTOR("fts_update_config_hist");

table_close(histrel, RowExclusiveLock);
StartTransactionCommand();
GetTransactionSnapshot();
probeUpdateConfHistory(primary, isSegmentAlive, hasMirrors);

CommitTransactionCommand();
if (isSegmentAlive)
doubleFaultContentIds = bms_del_member(doubleFaultContentIds, primary->config->segindex);
failedContentIds = bms_del_member(failedContentIds, primary->config->segindex);
else
doubleFaultContentIds = bms_add_member(doubleFaultContentIds, primary->config->segindex);
failedContentIds = bms_add_member(failedContentIds, primary->config->segindex);
}

/*
* Process responses from primary segments:
* (a) Transition internal state so that segments can be messaged subsequently
* (e.g. promotion and turning off syncrep).
* (b) Update gp_segment_configuration catalog table, if needed.
* (b) Update gp_segment_configuration, gp_configuration_history catalog table, if needed.
*/
static bool
processResponse(fts_context *context)
Expand Down Expand Up @@ -1064,7 +1049,7 @@ processResponse(fts_context *context)
bool IsPrimaryAlive = ftsInfo->result.isPrimaryAlive;
/* Trust a response from primary only if it's alive. */
bool IsMirrorAlive = IsPrimaryAlive ?
ftsInfo->result.isMirrorAlive : SEGMENT_IS_ALIVE(mirror);
ftsInfo->result.isMirrorAlive : (context->has_mirrors && SEGMENT_IS_ALIVE(mirror));
bool IsInSync = IsPrimaryAlive ?
ftsInfo->result.isInSync : false;

Expand All @@ -1076,11 +1061,16 @@ processResponse(fts_context *context)
case FTS_PROBE_SUCCESS:
Assert(IsPrimaryAlive);

if (bms_is_member(primary->config->segindex, doubleFaultContentIds))
updateDoubleFaultStatus(primary,
true);
updateSegmentDownStatus(primary,true, context->has_mirrors);

if (ftsInfo->result.isSyncRepEnabled && !IsMirrorAlive)
if (!context->has_mirrors)
{
elogif(gp_log_fts >= GPVARS_VERBOSITY_VERBOSE, LOG,
"FTS skipping mirror down update for (content=%d) as mirrorless",
primary->config->segindex);
ftsInfo->state = FTS_RESPONSE_PROCESSED;
}
else if (ftsInfo->result.isSyncRepEnabled && !IsMirrorAlive)
{
if (!ftsInfo->result.retryRequested)
{
Expand Down Expand Up @@ -1179,7 +1169,7 @@ processResponse(fts_context *context)

Assert(!IsPrimaryAlive);
/* See if mirror can be promoted. */
if (SEGMENT_IS_IN_SYNC(mirror))
if (context->has_mirrors && SEGMENT_IS_IN_SYNC(mirror))
{
/*
* Primary and mirror must have been recorded as in-sync
Expand Down Expand Up @@ -1220,14 +1210,19 @@ processResponse(fts_context *context)
* Only log here, will handle it later, having an "ERROR"
* keyword here for customer convenience
*/
elog(WARNING, "ERROR: FTS double fault detected (content=%d) "
"primary dbid=%d, mirror dbid=%d",
primary->config->segindex, primary->config->dbid, mirror->config->dbid);

if (!bms_is_member(primary->config->segindex, doubleFaultContentIds))
updateDoubleFaultStatus(primary,
false);

if (context->has_mirrors)
{
elog(WARNING, "ERROR: FTS double fault detected (content=%d) "
"primary dbid=%d, mirror dbid=%d",
primary->config->segindex, primary->config->dbid, mirror->config->dbid);
}
else
{
elog(WARNING, "ERROR: FTS detected segment down (content=%d) "
"primary dbid=%d",
primary->config->segindex, primary->config->dbid);
}
updateSegmentDownStatus(primary,false, context->has_mirrors);
ftsInfo->state = FTS_RESPONSE_PROCESSED;
}
break;
Expand All @@ -1249,23 +1244,15 @@ processResponse(fts_context *context)
elog(WARNING, "ERROR: FTS double fault detected (content=%d) "
"primary dbid=%d, mirror dbid=%d",
primary->config->segindex, primary->config->dbid, mirror->config->dbid);

if (!bms_is_member(primary->config->segindex, doubleFaultContentIds))
updateDoubleFaultStatus(primary,
false);

updateSegmentDownStatus(primary,false, context->has_mirrors);
ftsInfo->state = FTS_RESPONSE_PROCESSED;
break;
case FTS_PROMOTE_SUCCESS:
elogif(gp_log_fts >= GPVARS_VERBOSITY_VERBOSE, LOG,
"FTS mirror (content=%d, dbid=%d) promotion "
"triggered successfully",
primary->config->segindex, primary->config->dbid);

if (bms_is_member(primary->config->segindex, doubleFaultContentIds))
updateDoubleFaultStatus(primary,
true);

updateSegmentDownStatus(primary,true, context->has_mirrors);
ftsInfo->state = FTS_RESPONSE_PROCESSED;
break;
case FTS_SYNCREP_OFF_SUCCESS:
Expand All @@ -1275,9 +1262,10 @@ processResponse(fts_context *context)
ftsInfo->state = FTS_RESPONSE_PROCESSED;
break;
default:
elog(ERROR, "FTS invalid internal state %d for (content=%d)"
"primary dbid=%d, mirror dbid=%d", ftsInfo->state,
primary->config->segindex, primary->config->dbid, mirror->config->dbid);
elog(ERROR, "FTS invalid internal state %d for (content=%d) "
"primary dbid=%d, mirror dbid=%d", ftsInfo->state,
primary->config->segindex, primary->config->dbid,
context->has_mirrors ? mirror->config->dbid : -1);
break;
}
/* Close connection and reset result for next message, if any. */
Expand Down Expand Up @@ -1316,22 +1304,25 @@ FtsWalRepInitProbeContext(CdbComponentDatabases *cdbs, fts_context *context)
context->num_pairs = cdbs->total_segments;
context->perSegInfos = (fts_segment_info *) palloc0(
context->num_pairs * sizeof(fts_segment_info));
context->has_mirrors = !(cdbs->total_segment_dbs == cdbs->total_segments);

int fts_index = 0;
int cdb_index = 0;
CdbComponentDatabaseInfo *mirror = NULL;

for(; cdb_index < cdbs->total_segment_dbs; cdb_index++)
{
CdbComponentDatabaseInfo *primary = &(cdbs->segment_db_info[cdb_index]);
if (!SEGMENT_IS_ACTIVE_PRIMARY(primary))
continue;
CdbComponentDatabaseInfo *mirror = FtsGetPeerSegment(cdbs,
primary->config->segindex,
primary->config->dbid);
/*
* If there is no mirror under this primary, no need to probe.
*/
if (!mirror)
mirror = FtsGetPeerSegment(cdbs,
primary->config->segindex,
primary->config->dbid);
if (context->has_mirrors && !mirror)
{
/*
* If there is no mirror under this primary, no need to probe.
*/
context->num_pairs--;
continue;
}
Expand Down
24 changes: 24 additions & 0 deletions src/backend/fts/test/ftsprobe_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,14 @@ test_PrimayUpMirrorUpNotInSync_to_PrimaryDown(void **state)
expect_value(PQfinish, conn, context.perSegInfos[1].conn);
will_be_called(PQfinish);

will_be_called(StartTransactionCommand);
will_be_called(GetTransactionSnapshot);
expect_value(probeUpdateConfHistory, primary, context.perSegInfos[0].primary_cdbinfo);
expect_value(probeUpdateConfHistory, isSegmentAlive, false);
expect_value(probeUpdateConfHistory, hasMirrors, true);
will_be_called(probeUpdateConfHistory);
will_be_called(CommitTransactionCommand);

/* No update must happen */
bool is_updated = processResponse(&context);

Expand Down Expand Up @@ -1218,6 +1226,14 @@ test_PrimaryUpMirrorDownNotInSync_to_PrimayUpMirrorUpSync(void **state)
expect_value(PQfinish, conn, context.perSegInfos[0].conn);
will_be_called(PQfinish);

will_be_called(StartTransactionCommand);
will_be_called(GetTransactionSnapshot);
expect_value(probeUpdateConfHistory, primary, context.perSegInfos[0].primary_cdbinfo);
expect_value(probeUpdateConfHistory, isSegmentAlive, true);
expect_value(probeUpdateConfHistory, hasMirrors, true);
will_be_called(probeUpdateConfHistory);
will_be_called(CommitTransactionCommand);

bool is_updated = processResponse(&context);

assert_true(is_updated);
Expand Down Expand Up @@ -1298,6 +1314,14 @@ test_PrimaryUpMirrorDownNotInSync_to_PrimaryDown(void **state)
expect_value(PQfinish, conn, context.perSegInfos[1].conn);
will_be_called(PQfinish);

will_be_called(StartTransactionCommand);
will_be_called(GetTransactionSnapshot);
expect_value(probeUpdateConfHistory, primary, context.perSegInfos[0].primary_cdbinfo);
expect_value(probeUpdateConfHistory, isSegmentAlive, false);
expect_value(probeUpdateConfHistory, hasMirrors, true);
will_be_called(probeUpdateConfHistory);
will_be_called(CommitTransactionCommand);

bool is_updated = processResponse(&context);

assert_false(is_updated);
Expand Down
3 changes: 3 additions & 0 deletions src/include/postmaster/fts.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ extern bool FtsIsActive(void);
extern void HandleFtsMessage(const char* query_string);
extern void probeWalRepUpdateConfig(int16 dbid, int16 segindex, char role,
bool IsSegmentAlive, bool IsInSync);
extern void probeUpdateConfHistory(const CdbComponentDatabaseInfo *primary,
bool isSegmentAlive,
bool hasMirrors);
extern bool FtsProbeStartRule(Datum main_arg);
extern void FtsProbeMain (Datum main_arg);
extern pid_t FtsProbePID(void);
Expand Down
1 change: 1 addition & 0 deletions src/include/postmaster/ftsprobe.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ typedef struct

typedef struct
{
bool has_mirrors; /* mirrored or mirrorless cluster */
int num_pairs; /* number of primary-mirror pairs FTS wants to probe */
fts_segment_info *perSegInfos;
} fts_context;
Expand Down
Loading

0 comments on commit 897ead2

Please sign in to comment.