Skip to content

Commit

Permalink
Fix alter_job not updating new job fields
Browse files Browse the repository at this point in the history
Fix bgw_job_tuple_update_by_id to also update scheduled and config
field of bgw_job.
  • Loading branch information
svenklemm committed Sep 18, 2020
1 parent 5179447 commit 86039f1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 25 deletions.
78 changes: 53 additions & 25 deletions src/bgw/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -901,31 +901,35 @@ ts_bgw_job_insert_relation(Name application_name, Name job_type, Interval *sched
return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)];
}

/* This function only updates the fields modifiable with alter_job. */
static ScanTupleResult
bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
{
BgwJob *updated_job = (BgwJob *) data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple = heap_copytuple(tuple);
FormData_bgw_job *fd = (FormData_bgw_job *) GETSTRUCT(new_tuple);
TimestampTz next_start;
HeapTuple new_tuple;

if (should_free)
heap_freetuple(tuple);
Datum values[Natts_bgw_job] = { 0 };
bool isnull[Natts_bgw_job] = { 0 };
bool repl[Natts_bgw_job] = { 0 };

Datum old_schedule_interval =
slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
Assert(!isnull[0]);

ts_bgw_job_permission_check(updated_job);

/* when we update the schedule interval, modify the next start time as well*/
if (!DatumGetBool(DirectFunctionCall2(interval_eq,
IntervalPGetDatum(&fd->schedule_interval),
old_schedule_interval,
IntervalPGetDatum(&updated_job->fd.schedule_interval))))
{
BgwJobStat *stat = ts_bgw_job_stat_find(fd->id);
BgwJobStat *stat = ts_bgw_job_stat_find(updated_job->fd.id);

if (stat != NULL)
{
next_start = DatumGetTimestampTz(
TimestampTz next_start = DatumGetTimestampTz(
DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(stat->fd.last_finish),
IntervalPGetDatum(&updated_job->fd.schedule_interval)));
Expand All @@ -934,21 +938,54 @@ bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
* This means the value is counted as unset which is what we want */
ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true);
}
fd->schedule_interval = updated_job->fd.schedule_interval;
values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] =
IntervalPGetDatum(&updated_job->fd.schedule_interval);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = true;
}
fd->max_runtime = updated_job->fd.max_runtime;
fd->max_retries = updated_job->fd.max_retries;
fd->retry_period = updated_job->fd.retry_period;

values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] =
IntervalPGetDatum(&updated_job->fd.max_runtime);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] =
Int32GetDatum(updated_job->fd.max_retries);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] =
IntervalPGetDatum(&updated_job->fd.retry_period);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] =
BoolGetDatum(updated_job->fd.scheduled);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true;

repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
if (updated_job->fd.config)
values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] =
JsonbPGetDatum(updated_job->fd.config);
else
isnull[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;

new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, repl);

ts_catalog_update(ti->scanrel, new_tuple);

heap_freetuple(new_tuple);
if (should_free)
heap_freetuple(tuple);

return SCAN_DONE;
}

static bool
bgw_job_update_scan(ScanKeyData *scankey, void *data)
/*
* Overwrite job with specified job_id with the given fields
*
* This function only updates the fields modifiable with alter_job.
*/
void
ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
{
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get();
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
Expand All @@ -958,28 +995,19 @@ bgw_job_update_scan(ScanKeyData *scankey, void *data)
.index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.data = data,
.data = job,
.limit = 1,
.tuple_found = bgw_job_tuple_update_by_id,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock };

return ts_scanner_scan(&scanctx);
}

/* Overwrite job with specified job_id with the given fields */
void
ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
{
ScanKeyData scankey[1];

ScanKeyInit(&scankey[0],
Anum_bgw_job_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(job_id));

bgw_job_update_scan(scankey, (void *) job);
ts_scanner_scan(&scanctx);
}
19 changes: 19 additions & 0 deletions tsl/test/expected/bgw_custom.out
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,22 @@ SELECT job_id FROM alter_job(1,scheduled:=false);
1
(1 row)

SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+--------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | |
(1 row)

-- test updating config
SELECT job_id FROM alter_job(1,config:='{"test":"test"}');
job_id
--------
1
(1 row)

SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | {"test": "test"}
(1 row)

5 changes: 5 additions & 0 deletions tsl/test/sql/bgw_custom.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ SELECT * FROM _timescaledb_config.bgw_job WHERE id >= 1000;
\c :TEST_DBNAME :ROLE_SUPERUSER
-- test altering job with NULL config
SELECT job_id FROM alter_job(1,scheduled:=false);
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;

-- test updating config
SELECT job_id FROM alter_job(1,config:='{"test":"test"}');
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;

0 comments on commit 86039f1

Please sign in to comment.