Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alter_job not updating new job fields #2404

Merged
merged 1 commit into from Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 53 additions & 25 deletions src/bgw/job.c
Expand Up @@ -901,31 +901,35 @@ ts_bgw_job_insert_relation(Name application_name, Name job_type, Interval *sched
return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)];
}

/* This function only updates the fields modifiable with alter_job. */
static ScanTupleResult
bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
{
BgwJob *updated_job = (BgwJob *) data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple = heap_copytuple(tuple);
FormData_bgw_job *fd = (FormData_bgw_job *) GETSTRUCT(new_tuple);
TimestampTz next_start;
HeapTuple new_tuple;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the following lines belong to the variable declaration block it can be valuable to remove this new line.

if (should_free)
heap_freetuple(tuple);
Datum values[Natts_bgw_job] = { 0 };
bool isnull[Natts_bgw_job] = { 0 };
bool repl[Natts_bgw_job] = { 0 };

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be no new line here.

Datum old_schedule_interval =
slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
Assert(!isnull[0]);
Comment on lines +918 to +919
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New line will be an advantage here:

Suggested change
slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
Assert(!isnull[0]);
slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
Assert(!isnull[0]);


ts_bgw_job_permission_check(updated_job);

/* when we update the schedule interval, modify the next start time as well*/
if (!DatumGetBool(DirectFunctionCall2(interval_eq,
IntervalPGetDatum(&fd->schedule_interval),
old_schedule_interval,
IntervalPGetDatum(&updated_job->fd.schedule_interval))))
{
BgwJobStat *stat = ts_bgw_job_stat_find(fd->id);
BgwJobStat *stat = ts_bgw_job_stat_find(updated_job->fd.id);

if (stat != NULL)
{
next_start = DatumGetTimestampTz(
TimestampTz next_start = DatumGetTimestampTz(
DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(stat->fd.last_finish),
IntervalPGetDatum(&updated_job->fd.schedule_interval)));
Expand All @@ -934,21 +938,54 @@ bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
* This means the value is counted as unset which is what we want */
ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true);
}
fd->schedule_interval = updated_job->fd.schedule_interval;
values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] =
IntervalPGetDatum(&updated_job->fd.schedule_interval);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = true;
}
fd->max_runtime = updated_job->fd.max_runtime;
fd->max_retries = updated_job->fd.max_retries;
fd->retry_period = updated_job->fd.retry_period;

values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] =
IntervalPGetDatum(&updated_job->fd.max_runtime);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] =
Int32GetDatum(updated_job->fd.max_retries);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] =
IntervalPGetDatum(&updated_job->fd.retry_period);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] =
BoolGetDatum(updated_job->fd.scheduled);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true;

repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
if (updated_job->fd.config)
values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] =
JsonbPGetDatum(updated_job->fd.config);
else
isnull[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;

new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, repl);

ts_catalog_update(ti->scanrel, new_tuple);

heap_freetuple(new_tuple);
if (should_free)
heap_freetuple(tuple);

return SCAN_DONE;
}

static bool
bgw_job_update_scan(ScanKeyData *scankey, void *data)
/*
* Overwrite job with specified job_id with the given fields
*
* This function only updates the fields modifiable with alter_job.
*/
void
ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
{
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get();
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
Expand All @@ -958,28 +995,19 @@ bgw_job_update_scan(ScanKeyData *scankey, void *data)
.index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.data = data,
.data = job,
.limit = 1,
.tuple_found = bgw_job_tuple_update_by_id,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock };

return ts_scanner_scan(&scanctx);
}

/* Overwrite job with specified job_id with the given fields */
void
ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
{
ScanKeyData scankey[1];

ScanKeyInit(&scankey[0],
Anum_bgw_job_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(job_id));

bgw_job_update_scan(scankey, (void *) job);
ts_scanner_scan(&scanctx);
}
43 changes: 43 additions & 0 deletions tsl/test/expected/bgw_custom.out
Expand Up @@ -116,3 +116,46 @@ SELECT job_id FROM alter_job(1,scheduled:=false);
1
(1 row)

SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+--------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | |
(1 row)

-- test updating job settings
SELECT job_id FROM alter_job(1,config:='{"test":"test"}');
job_id
--------
1
(1 row)

SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | {"test": "test"}
(1 row)

SELECT job_id FROM alter_job(1,scheduled:=true);
job_id
--------
1
(1 row)

SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | t | | {"test": "test"}
(1 row)

SELECT job_id FROM alter_job(1,scheduled:=false);
job_id
--------
1
(1 row)

SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | {"test": "test"}
(1 row)

8 changes: 8 additions & 0 deletions tsl/test/sql/bgw_custom.sql
Expand Up @@ -59,4 +59,12 @@ SELECT * FROM _timescaledb_config.bgw_job WHERE id >= 1000;
\c :TEST_DBNAME :ROLE_SUPERUSER
-- test altering job with NULL config
SELECT job_id FROM alter_job(1,scheduled:=false);
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;

-- test updating job settings
SELECT job_id FROM alter_job(1,config:='{"test":"test"}');
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
SELECT job_id FROM alter_job(1,scheduled:=true);
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
SELECT job_id FROM alter_job(1,scheduled:=false);
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;