Skip to content

Commit

Permalink
[#22360] YSQL: PgCron StatefulService
Browse files Browse the repository at this point in the history
Summary:
Adding PG_CRON_LEADER Stateful service to ensure there is only one cron leader running in the universe at any time.
Yb-master will create the Stateful service when the `cron.job` table is created. It never gets dropped once created. The same Stateful service will be used if cron extension is dropped and recreated, even on another db.

The Stateful service is activated after its underlying raft peer becomes a leader. It will be deactivated when the peer loses the leadership. There can be an overlap between the deactivation on one node and activation on another node. To protect against it Stateful services check their term around critical sections.
The cron leader Stateful service sets a 60s lease (FLAGS_ pg_cron_leader_lease_sec). This is refreshed every 10s (cron_leadership_refresh_sec) after checking if the term is valid. It sets the shared memory
`cron_leader_lease_` to indicate for how long the lease will be valid for. This ensures pg cron runs only when both Tserver and pg are healthy and it safely stops if the Stateful service or raft gets stuck (like during high Cpu).
When the Stateful service gets activated in a new node it first waits out the lease period of 60s before setting its local shared memory.
Pg_cron launcher(a pg backend) makes sure the specified lease time in the shared memory has not expired inorder to act as the leader.

- We reset the catalog cache version in the cron background worker to handle cases when the job is scheduled on a different node.
- Converted `enable_pg_cron` to a preview flag.
- Using a generic (id int64, data jsonb) schema for the PG_CRON_LEADER Stateful service tablet. This is not currently used.
- Added gFlag `ysql_cron_database_name` which will update the `cron.database_name` guc. This is NON_RUNTIME since the change requires restart of pg_cron to kill the inflight jobs.
- Cherry picking commit 19f8ebf9349b6a3642e81a4d19dd0ea967d3f357 from pg_cron.

**Upgrade/Downgrade safety**
New service is only enabled if the flag `enable_pg_cron` is enabled. This flag

Fixes #22360
Jira: DB-11263

Test Plan:
PgCronTest.GracefulLeaderMove
PgCronTest.LeaderCrash
PgCronTest.TaskOnDifferentDB
PgCronTest.ChangeCronDB

Reviewers: tnayak, fizaa

Reviewed By: tnayak

Subscribers: jason, yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D35009
  • Loading branch information
hari90 committed May 21, 2024
1 parent 5ca67e4 commit 560ebd3
Show file tree
Hide file tree
Showing 18 changed files with 595 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,19 @@ public int getTestMethodTimeoutSec() {
return 1800;
}

@Override
protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv", "enable_pg_cron");
flagMap.put("enable_pg_cron", "true");
return flagMap;
}

@Override
protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("TEST_enable_pg_cron", "true");
flagMap.put("allowed_preview_flags_csv", "enable_pg_cron");
flagMap.put("enable_pg_cron", "true");
return flagMap;
}

Expand Down
4 changes: 2 additions & 2 deletions src/postgres/third-party-extensions/pg_cron/pg_cron.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ BEGIN
RAISE EXCEPTION 'can only create extension in database %',
pg_catalog.current_setting('cron.database_name')
USING DETAIL = 'Jobs must be scheduled from the database configured in 'OPERATOR(pg_catalog.||)
'cron.database_name, since the pg_cron background worker 'OPERATOR(pg_catalog.||)
'ysql_cron_database_name, since the pg_cron background worker 'OPERATOR(pg_catalog.||)
'reads job descriptions from this database.',
HINT = pg_catalog.format('Add cron.database_name = ''%s'' in postgresql.conf 'OPERATOR(pg_catalog.||)
HINT = pg_catalog.format('Set the flag ysql_cron_database_name to ''%s'' on yb-tservers 'OPERATOR(pg_catalog.||)
'to use the current database.', pg_catalog.current_database());
END IF;
END;
Expand Down
13 changes: 11 additions & 2 deletions src/postgres/third-party-extensions/pg_cron/src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@

/* YB includes */
#include "pg_yb_utils.h"
#include "catalog/yb_catalog_version.h"

PG_MODULE_MAGIC;

Expand Down Expand Up @@ -236,7 +237,7 @@ _PG_init(void)
gettext_noop("Log all cron statements prior to execution."),
NULL,
&CronLogStatement,
true, /* TODO(hari): false? */
true,
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);
Expand Down Expand Up @@ -308,7 +309,7 @@ _PG_init(void)
"cron.max_running_jobs",
gettext_noop("Maximum number of jobs that can run concurrently."),
NULL,
&MaxRunningTasks, /* TODO(Hari): We need local and global limits */
&MaxRunningTasks,
(max_worker_processes - 1 < 5) ? max_worker_processes - 1 : 5,
0,
max_worker_processes - 1,
Expand Down Expand Up @@ -663,6 +664,8 @@ PgCronLauncherMain(Datum arg)
List *taskList = NIL;
TimestampTz currentTime = 0;

CHECK_FOR_INTERRUPTS();

AcceptInvalidationMessages();

if (CronReloadConfig)
Expand Down Expand Up @@ -2163,6 +2166,12 @@ CronBackgroundWorker(Datum main_arg)
#endif

/* Prepare to execute the query. */
/* YB Note: Always read the latest entries in the catalog */
if (IsYugaByteEnabled())
{
YBCPgResetCatalogReadTime();
YbUpdateCatalogCacheVersion(YbGetMasterCatalogVersion());
}
SetCurrentStatementStartTimestamp();
debug_query_string = command;
pgstat_report_activity(STATE_RUNNING, command);
Expand Down
7 changes: 7 additions & 0 deletions src/yb/common/common_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ DEFINE_RUNTIME_AUTO_bool(enable_xcluster_auto_flag_validation, kLocalPersisted,
DEFINE_RUNTIME_AUTO_PG_FLAG(bool, yb_enable_ddl_atomicity_infra, kLocalPersisted, false, true,
"Enables YSQL DDL atomicity");

// NOTE: This flag guards proto changes and it is not safe to enable during an upgrade, or rollback
// once enabled. If you want to change the default to true then you will have to make it a
// kLocalPersisted AutoFlag.
DEFINE_NON_RUNTIME_PREVIEW_bool(enable_pg_cron, false,
"Enables the pg_cron extension. Jobs will be run on a single tserver node. The node should be "
"assumed to be selected randomly.");

namespace yb {

void InitCommonFlags() {
Expand Down
1 change: 1 addition & 0 deletions src/yb/common/common_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ enum StatefulServiceKind {
// Test service.
TEST_ECHO = 0;
PG_AUTO_ANALYZE = 1;
PG_CRON_LEADER = 2;
}

// CDC SDK Consistent Snapshot Options
Expand Down
42 changes: 42 additions & 0 deletions src/yb/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2103,13 +2103,55 @@ Result<pgwrapper::PGConn> ExternalMiniCluster::ConnectToDB(
if (!node_index) {
node_index = RandomUniformInt<size_t>(0, num_tablet_servers() - 1);
}
LOG(INFO) << "Connecting to PG database " << db_name << " on tserver " << *node_index;

auto* ts = tablet_server(*node_index);
return pgwrapper::PGConnBuilder(
{.host = ts->bind_host(), .port = ts->pgsql_rpc_port(), .dbname = db_name})
.Connect(simple_query_protocol);
}

namespace {
Result<itest::TabletServerMap> CreateTabletServerMap(ExternalMiniCluster& cluster) {
auto master = cluster.GetLeaderMaster();
SCHECK_NOTNULL(master);
return itest::CreateTabletServerMap(
cluster.GetProxy<master::MasterClusterProxy>(master), &cluster.proxy_cache());
}
} // namespace

Status ExternalMiniCluster::MoveTabletLeader(
const TabletId& tablet_id, std::optional<size_t> new_leader_idx, MonoDelta timeout) {
if (timeout == MonoDelta::kMin) {
timeout = MonoDelta::FromSeconds(10 * kTimeMultiplier);
}

const auto ts_map = VERIFY_RESULT(CreateTabletServerMap(*this));

itest::TServerDetails* leader_ts;
RETURN_NOT_OK(itest::FindTabletLeader(ts_map, tablet_id, timeout, &leader_ts));

itest::TServerDetails* new_leader_ts = nullptr;
if (new_leader_idx) {
new_leader_ts = ts_map.at(tablet_server(*new_leader_idx)->uuid()).get();
} else {
for (const auto& [ts_id, ts_details] : ts_map) {
if (ts_id != leader_ts->uuid()) {
new_leader_ts = ts_details.get();
}
}
}
SCHECK_NOTNULL(new_leader_ts);

// Step down the leader onto the second follower.
RETURN_NOT_OK(
(itest::WaitForAllPeersToCatchup(tablet_id, TServerDetailsVector(ts_map), timeout)));
RETURN_NOT_OK(
itest::LeaderStepDown(leader_ts, tablet_id, new_leader_ts, timeout, false, nullptr));

return itest::WaitUntilLeader(new_leader_ts, tablet_id, timeout);
}

//------------------------------------------------------------
// ExternalDaemon
//------------------------------------------------------------
Expand Down
10 changes: 4 additions & 6 deletions src/yb/integration-tests/external_mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ class ExternalMiniCluster : public MiniClusterBase {
const std::string& db_name = "yugabyte", std::optional<size_t> node_index = std::nullopt,
bool simple_query_protocol = false);

Status MoveTabletLeader(
const TabletId& tablet_id, std::optional<size_t> new_leader_idx = std::nullopt,
MonoDelta timeout = MonoDelta::kMin);

protected:
FRIEND_TEST(MasterFailoverTest, TestKillAnyMaster);

Expand Down Expand Up @@ -594,12 +598,6 @@ class ExternalMiniCluster : public MiniClusterBase {
std::vector<OpIdPB>* op_ids,
const std::vector<ExternalMaster*>& masters);

// Ensure that the leader server is allowed to process a config change (by having at least one
// commit in the current term as leader).
Status WaitForLeaderToAllowChangeConfig(
const std::string& uuid,
consensus::ConsensusServiceProxy* leader_proxy);

// Return master address for specified port.
std::string MasterAddressForPort(uint16_t port) const;

Expand Down
Loading

0 comments on commit 560ebd3

Please sign in to comment.