Skip to content

Commit

Permalink
Enable scheduler to update its jobs list
Browse files Browse the repository at this point in the history
Previously, the scheduler only populated its jobs list once at start time. This commit enables the scheduler to receive notifications for updates (insert, update, delete) to the bgw_job table. Notifications are sent via the cache invalidation framework. Whenever the scheduler receives a notification, it re-reads the bgw_job table. For each job currently in the bgw_job table, it either instantiates new scheduler state for the job or copies over any existing scheduler state, for persisting jobs. For jobs that have disappeared from the bgw_job table, the scheduler deletes any local state it has.

Note that any updates to the bgw_job table must now go through the C, so that the cache invalidation framework in catalog.c can run. In particular, this commit includes a rudimentary API for interacting with the bgw_job table, for testing purposes. This API will be rewritten in the future.
  • Loading branch information
Amy Tai authored and amytai committed Nov 7, 2018
1 parent e5e8e2a commit b77b47a
Show file tree
Hide file tree
Showing 20 changed files with 987 additions and 56 deletions.
10 changes: 10 additions & 0 deletions sql/bgw_scheduler.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ LANGUAGE C VOLATILE;
INSERT INTO _timescaledb_config.bgw_job (id, application_name, job_type, schedule_INTERVAL, max_runtime, max_retries, retry_period) VALUES
(1, 'Telemetry Reporter', 'telemetry_and_version_check_if_enabled', INTERVAL '24h', INTERVAL '100s', -1, INTERVAL '1h')
ON CONFLICT (id) DO NOTHING;

CREATE OR REPLACE FUNCTION insert_job(application_name NAME, job_type NAME, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL)
RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_bgw_job_insert_relation'
LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION delete_job(job_id INTEGER)
RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_bgw_job_delete_by_id'
LANGUAGE C VOLATILE;
4 changes: 4 additions & 0 deletions sql/cache.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
-- description of how this works.
CREATE TABLE IF NOT EXISTS _timescaledb_cache.cache_inval_hypertable();

-- For notifying the scheduler of changes to the bgw_job table.
CREATE TABLE IF NOT EXISTS _timescaledb_cache.cache_inval_bgw_job();

-- This is pretty subtle. We create this dummy cache_inval_extension table
-- solely for the purpose of getting a relcache invalidation event when it is
-- deleted on DROP extension. It has no related triggers. When the table is
Expand All @@ -19,6 +22,7 @@ CREATE TABLE IF NOT EXISTS _timescaledb_cache.cache_inval_extension();
-- not actually strictly needed but good for sanity as all tables should be dumped.
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_cache.cache_inval_hypertable', '');
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_cache.cache_inval_extension', '');
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_cache.cache_inval_bgw_job', '');

GRANT SELECT ON ALL TABLES IN SCHEMA _timescaledb_cache TO PUBLIC;

104 changes: 103 additions & 1 deletion src/bgw/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ bgw_job_get_all(size_t alloc_size, MemoryContext mctx)
};
ScannerCtx scanctx = {
.table = catalog->tables[BGW_JOB].id,
.index = InvalidOid,
.index = CATALOG_INDEX(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
.data = &list_data,
.tuple_found = bgw_job_accum_tuple_found,
.lockmode = AccessShareLock,
Expand Down Expand Up @@ -187,6 +187,108 @@ bgw_job_find(int32 bgw_job_id, MemoryContext mctx)
return job_stat;
}

static bool
bgw_job_insert_relation(Name application_name, Name job_type, Interval *schedule_interval, Interval *max_runtime, int32 max_retries, Interval *retry_period)
{
Catalog *catalog = catalog_get();
Relation rel;
TupleDesc desc;
Datum values[Natts_bgw_job];
CatalogSecurityContext sec_ctx;
bool nulls[Natts_bgw_job] = {false};

rel = heap_open(catalog->tables[BGW_JOB].id, RowExclusiveLock);

desc = RelationGetDescr(rel);

values[AttrNumberGetAttrOffset(Anum_bgw_job_application_name)] = NameGetDatum(application_name);
values[AttrNumberGetAttrOffset(Anum_bgw_job_job_type)] = NameGetDatum(job_type);
values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = IntervalPGetDatum(schedule_interval);
values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = IntervalPGetDatum(max_runtime);
values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = Int32GetDatum(max_retries);
values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = IntervalPGetDatum(retry_period);

catalog_become_owner(catalog, &sec_ctx);
values[AttrNumberGetAttrOffset(Anum_bgw_job_id)] = catalog_table_next_seq_id(catalog, BGW_JOB);
catalog_insert_values(rel, desc, values, nulls);
catalog_restore_user(&sec_ctx);
heap_close(rel, RowExclusiveLock);

return true;
}

TS_FUNCTION_INFO_V1(ts_bgw_job_insert_relation);

Datum
ts_bgw_job_insert_relation(PG_FUNCTION_ARGS)
{
bgw_job_insert_relation(PG_GETARG_NAME(0), PG_GETARG_NAME(1), PG_GETARG_INTERVAL_P(2), PG_GETARG_INTERVAL_P(3), PG_GETARG_INT32(4), PG_GETARG_INTERVAL_P(5));

PG_RETURN_NULL();
}

static bool
bgw_job_tuple_delete(TupleInfo *ti, void *data)
{
CatalogSecurityContext sec_ctx;

/* Also delete the bgw_stat entry */
bgw_job_stat_delete(((FormData_bgw_job *) GETSTRUCT(ti->tuple))->id);

catalog_become_owner(catalog_get(), &sec_ctx);
catalog_delete(ti->scanrel, ti->tuple);
catalog_restore_user(&sec_ctx);

return true;
}

static bool
bgw_job_delete_scan(ScanKeyData *scankey)
{
Catalog *catalog = catalog_get();

ScannerCtx scanctx = {
.table = catalog->tables[BGW_JOB].id,
.index = CATALOG_INDEX(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.data = NULL,
.limit = 1,
.tuple_found = bgw_job_tuple_delete,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
.enabled = false,
}
};

return scanner_scan(&scanctx);
}

static bool
bgw_job_delete_by_id(int32 job_id)
{
ScanKeyData scankey[1];

ScanKeyInit(&scankey[0], Anum_bgw_job_pkey_idx_id,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(job_id));

return bgw_job_delete_scan(scankey);
}

TS_FUNCTION_INFO_V1(ts_bgw_job_delete_by_id);

Datum
ts_bgw_job_delete_by_id(PG_FUNCTION_ARGS)
{
bgw_job_delete_by_id(PG_GETARG_INT32(0));
PG_RETURN_NULL();
}

bool
bgw_job_execute(BgwJob *job)
{
Expand Down
13 changes: 13 additions & 0 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ bgw_job_stat_find(int32 bgw_job_id)
return job_stat;
}

static bool
bgw_job_stat_tuple_delete(TupleInfo *ti, void *const data)
{
catalog_delete(ti->scanrel, ti->tuple);
return true;
}

void
bgw_job_stat_delete(int32 bgw_job_id)
{
bgw_job_stat_scan_job_id(bgw_job_id, bgw_job_stat_tuple_delete, NULL, NULL, RowExclusiveLock);
}

/* Mark the start of a job. This should be done in a separate transaction by the scheduler
* before the bgw for a job is launched. This ensures that the job is counted as started
* before /any/ job specific code is executed. A job that has been started but never ended
Expand Down
1 change: 1 addition & 0 deletions src/bgw/job_stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ typedef enum JobResult
} JobResult;

extern BgwJobStat *bgw_job_stat_find(int job_id);
extern void bgw_job_stat_delete(int job_id);
extern void bgw_job_stat_mark_start(int32 bgw_job_id);
extern void bgw_job_stat_mark_end(BgwJob *job, JobResult result);
extern bool bgw_job_stat_end_was_marked(BgwJobStat *jobstat);
Expand Down

0 comments on commit b77b47a

Please sign in to comment.