Skip to content

Commit

Permalink
Add hooks for hypertable drops
Browse files Browse the repository at this point in the history
To properly clean up the OSM catalog we need a way to reliably track
hypertable deletion (including internal hypertables for CAGGS).
  • Loading branch information
zilder committed Mar 6, 2023
1 parent 474b09b commit 4c00750
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(SOURCES
init.c
jsonb_utils.c
license_guc.c
osm_callbacks.c
partitioning.c
process_utility.c
scanner.c
Expand Down
22 changes: 7 additions & 15 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "hypercube.h"
#include "hypertable.h"
#include "hypertable_cache.h"
#include "osm_callbacks.h"
#include "partitioning.h"
#include "process_utility.h"
#include "scan_iterator.h"
Expand Down Expand Up @@ -1156,27 +1157,15 @@ ts_chunk_create_only_table(Hypertable *ht, Hypercube *cube, const char *schema_n
return chunk;
}

#if PG14_GE
#define OSM_CHUNK_INSERT_CHECK_HOOK "osm_chunk_insert_check_hook"
typedef int (*ts_osm_chunk_insert_hook_type)(Oid ht_oid, int64 range_start, int64 range_end);
static ts_osm_chunk_insert_hook_type
get_osm_chunk_insert_hook()
{
ts_osm_chunk_insert_hook_type *func_ptr =
(ts_osm_chunk_insert_hook_type *) find_rendezvous_variable(OSM_CHUNK_INSERT_CHECK_HOOK);
return *func_ptr;
}
#endif

static Chunk *
chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
const char *schema_name, const char *table_name,
const char *prefix)
{
#if PG14_GE
ts_osm_chunk_insert_hook_type insert_func_ptr = get_osm_chunk_insert_hook();
OsmCallbacks *callbacks = ts_get_osm_callbacks();

if (insert_func_ptr)
if (callbacks)
{
/* OSM only uses first dimension . doesn't work with multinode tables yet*/
Dimension *dim = &ht->space->dimensions[0];
Expand All @@ -1185,7 +1174,10 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
ts_internal_to_time_int64(cube->slices[0]->fd.range_start, dim->fd.column_type);
int64 range_end =
ts_internal_to_time_int64(cube->slices[0]->fd.range_end, dim->fd.column_type);
int chunk_exists = insert_func_ptr(ht->main_table_relid, range_start, range_end);

int chunk_exists =
callbacks->chunk_insert_check_hook(ht->main_table_relid, range_start, range_end);

if (chunk_exists)
{
Oid outfuncid = InvalidOid;
Expand Down
15 changes: 15 additions & 0 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
#include "cross_module_fn.h"
#include "scan_iterator.h"
#include "debug_assert.h"
#include "osm_callbacks.h"

Oid
ts_rel_get_owner(Oid relid)
Expand Down Expand Up @@ -625,6 +626,20 @@ hypertable_tuple_delete(TupleInfo *ti, void *data)
ts_hypertable_drop(compressed_hypertable, DROP_RESTRICT);
}

#if PG14_GE
OsmCallbacks *callbacks = ts_get_osm_callbacks();

/* Invoke the OSM callback if set */
if (callbacks)
{
Name schema_name =
DatumGetName(slot_getattr(ti->slot, Anum_hypertable_schema_name, &isnull));
Name table_name = DatumGetName(slot_getattr(ti->slot, Anum_hypertable_table_name, &isnull));

callbacks->hypertable_drop_hook(NameStr(*schema_name), NameStr(*table_name));
}
#endif

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
ts_catalog_restore_user(&sec_ctx);
Expand Down
18 changes: 18 additions & 0 deletions src/osm_callbacks.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include "osm_callbacks.h"

#include <fmgr.h>

#define OSM_CALLBACKS_VAR_NAME "osm_callbacks"

OsmCallbacks *
ts_get_osm_callbacks(void)
{
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable(OSM_CALLBACKS_VAR_NAME);

return *ptr;
}
29 changes: 29 additions & 0 deletions src/osm_callbacks.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#ifndef TIMESCALEDB_OSM_CALLBACKS_H
#define TIMESCALEDB_OSM_CALLBACKS_H

#include <postgres.h>
#include <catalog/objectaddress.h>

typedef int (*chunk_insert_check_hook_type)(Oid ht_oid, int64 range_start, int64 range_end);
typedef void (*hypertable_drop_hook_type)(const char *schema_name, const char *table_name);

/*
* Object Storage Manager callbacks.
*
* chunk_insert_check_hook - checks whether the specified range is managed by OSM
* hypertable_drop_hook - used for OSM catalog cleanups
*/
typedef struct
{
chunk_insert_check_hook_type chunk_insert_check_hook;
hypertable_drop_hook_type hypertable_drop_hook;
} OsmCallbacks;

extern OsmCallbacks *ts_get_osm_callbacks(void);

#endif /* TIMESCALEDB_OSM_CALLBACKS_H */
28 changes: 28 additions & 0 deletions tsl/test/expected/chunk_utils_internal.out
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,33 @@ SELECT * FROM hyper1_cagg ORDER BY 1;
30 | 2
(1 row)

-- check that dropping cagg triggers OSM callback
SELECT ts_setup_osm_hook();
ts_setup_osm_hook
-------------------

(1 row)

BEGIN;
DROP MATERIALIZED VIEW hyper1_cagg CASCADE;
NOTICE: drop cascades to table _timescaledb_internal._hyper_4_9_chunk
NOTICE: hypertable_drop_hook
DROP TABLE test1.hyper1;
NOTICE: hypertable_drop_hook
ROLLBACK;
BEGIN;
DROP TABLE test1.hyper1 CASCADE;
NOTICE: drop cascades to 3 other objects
NOTICE: drop cascades to table _timescaledb_internal._hyper_4_9_chunk
NOTICE: hypertable_drop_hook
NOTICE: hypertable_drop_hook
ROLLBACK;
SELECT ts_undo_osm_hook();
ts_undo_osm_hook
------------------

(1 row)

--TEST error case (un)freeze a non-chunk
CREATE TABLE nochunk_tab( a timestamp, b integer);
\set ON_ERROR_STOP 0
Expand Down Expand Up @@ -629,6 +656,7 @@ SELECT ts_setup_osm_hook();
\set ON_ERROR_STOP 0
--the mock hook returns true always. so cannot create a new chunk on the hypertable
INSERT INTO ht_try VALUES ('2022-06-05 01:00', 222, 222);
NOTICE: chunk_insert_check_hook
ERROR: Cannot insert into tiered chunk range of public.ht_try - attempt to create new chunk with range [Sat Jun 04 17:00:00 2022 PDT Sun Jun 05 17:00:00 2022 PDT] failed
\set ON_ERROR_STOP 1
SELECT ts_undo_osm_hook();
Expand Down
11 changes: 11 additions & 0 deletions tsl/test/sql/chunk_utils_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ SELECT _timescaledb_internal.drop_chunk( :'CHNAME1');
SELECT * from test1.hyper1 ORDER BY 1;
SELECT * FROM hyper1_cagg ORDER BY 1;

-- check that dropping cagg triggers OSM callback
SELECT ts_setup_osm_hook();
BEGIN;
DROP MATERIALIZED VIEW hyper1_cagg CASCADE;
DROP TABLE test1.hyper1;
ROLLBACK;
BEGIN;
DROP TABLE test1.hyper1 CASCADE;
ROLLBACK;
SELECT ts_undo_osm_hook();

--TEST error case (un)freeze a non-chunk
CREATE TABLE nochunk_tab( a timestamp, b integer);
\set ON_ERROR_STOP 0
Expand Down
28 changes: 20 additions & 8 deletions tsl/test/src/test_chunk_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "bgw/job.h"
#include "export.h"
#include "bgw_policy/chunk_stats.h"
#include "osm_callbacks.h"

TS_FUNCTION_INFO_V1(ts_test_chunk_stats_insert);

Expand All @@ -35,34 +36,45 @@ ts_test_chunk_stats_insert(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}

typedef int (*chunk_insert_check_hook_type)(Oid, int64, int64);
typedef void (*hypertable_drop_hook_type)(const char *, const char *);

static int
osm_insert_hook_mock(Oid ht_oid, int64 range_start, int64 range_end)
{
/* always return true */
elog(NOTICE, "chunk_insert_check_hook");
return 1;
}

static void
osm_ht_drop_hook_mock(const char *schema_name, const char *table_name)
{
elog(NOTICE, "hypertable_drop_hook");
}

OsmCallbacks fake_osm_callbacks = { .chunk_insert_check_hook = osm_insert_hook_mock,
.hypertable_drop_hook = osm_ht_drop_hook_mock };

/*
* Dummy function to mock OSM_INSERT hook called at chunk creation for tiered data
*/
TS_FUNCTION_INFO_V1(ts_setup_osm_hook);
Datum
ts_setup_osm_hook(PG_FUNCTION_ARGS)
{
typedef int (*MOCK_OSM_INSERT_HOOK)(Oid, int64, int64);
MOCK_OSM_INSERT_HOOK *var =
(MOCK_OSM_INSERT_HOOK *) find_rendezvous_variable("osm_chunk_insert_check_hook");
*var = osm_insert_hook_mock;
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks");
*ptr = &fake_osm_callbacks;

PG_RETURN_NULL();
}

TS_FUNCTION_INFO_V1(ts_undo_osm_hook);
Datum
ts_undo_osm_hook(PG_FUNCTION_ARGS)
{
typedef int (*MOCK_OSM_INSERT_HOOK)(Oid, int64, int64);
MOCK_OSM_INSERT_HOOK *var =
(MOCK_OSM_INSERT_HOOK *) find_rendezvous_variable("osm_chunk_insert_check_hook");
*var = NULL;
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks");
*ptr = NULL;

PG_RETURN_NULL();
}

0 comments on commit 4c00750

Please sign in to comment.