Skip to content

Commit

Permalink
Introduce drop_stale_chunks() function
Browse files Browse the repository at this point in the history
This function drops chunks on a specified data node if those chunks are
not known by the access node.

Call drop_stale_chunks() automatically when data node becomes
available again.

Fix #4848
  • Loading branch information
pmwkaa authored and SachinSetiya committed Nov 28, 2022
1 parent a2841ca commit 5b29050
Show file tree
Hide file tree
Showing 15 changed files with 966 additions and 1 deletion.
6 changes: 6 additions & 0 deletions sql/ddl_internal.sql
Expand Up @@ -54,3 +54,9 @@ $BODY$ SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE FUNCTION _timescaledb_internal.health() RETURNS
TABLE (node_name NAME, healthy BOOL, in_recovery BOOL, error TEXT)
AS '@MODULE_PATHNAME@', 'ts_health_check' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.drop_stale_chunks(
node_name NAME,
chunks integer[] = NULL
) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_chunks_drop_stale' LANGUAGE C VOLATILE;
7 changes: 7 additions & 0 deletions sql/updates/latest-dev.sql
Expand Up @@ -496,3 +496,10 @@ ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan
REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id);

ANALYZE _timescaledb_catalog.continuous_agg;

-- changes related to drop_stale_chunks()
CREATE FUNCTION _timescaledb_internal.drop_stale_chunks(
node_name NAME,
chunks integer[] = NULL
) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_chunks_drop_stale' LANGUAGE C VOLATILE;
3 changes: 3 additions & 0 deletions sql/updates/reverse-dev.sql
Expand Up @@ -596,3 +596,6 @@ ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan
REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id);

ANALYZE _timescaledb_catalog.continuous_agg;

-- changes related to drop_stale_chunks()
DROP FUNCTION _timescaledb_internal.drop_stale_chunks;
22 changes: 22 additions & 0 deletions src/chunk.c
Expand Up @@ -3261,6 +3261,28 @@ ts_chunk_get_chunk_ids_by_hypertable_id(int32 hypertable_id)
return chunkids;
}

List *
ts_chunk_get_all_chunk_ids(LOCKMODE lockmode)
{
List *chunkids = NIL;
ScanIterator iterator = ts_scan_iterator_create(CHUNK, lockmode, CurrentMemoryContext);
ts_scan_iterator_set_index(&iterator, CHUNK, CHUNK_ID_INDEX);
ts_scan_iterator_scan_key_init(&iterator,
Anum_chunk_idx_id,
BTEqualStrategyNumber,
F_INT4GE,
Int32GetDatum(0));
ts_scanner_foreach(&iterator)
{
bool isnull;
Datum id = slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_id, &isnull);
if (!isnull)
chunkids = lappend_int(chunkids, DatumGetInt32(id));
}

return chunkids;
}

static ChunkResult
chunk_recreate_constraint(ChunkScanCtx *ctx, ChunkStub *stub)
{
Expand Down
2 changes: 2 additions & 0 deletions src/chunk.h
Expand Up @@ -215,7 +215,9 @@ extern TSDLLEXPORT bool ts_chunk_contains_compressed_data(const Chunk *chunk);
extern TSDLLEXPORT ChunkCompressionStatus ts_chunk_get_compression_status(int32 chunk_id);
extern TSDLLEXPORT Datum ts_chunk_id_from_relid(PG_FUNCTION_ARGS);
extern TSDLLEXPORT List *ts_chunk_get_chunk_ids_by_hypertable_id(int32 hypertable_id);
extern TSDLLEXPORT List *ts_chunk_get_all_chunk_ids(LOCKMODE lockmode);
extern TSDLLEXPORT List *ts_chunk_get_data_node_name_list(const Chunk *chunk);

extern bool TSDLLEXPORT ts_chunk_has_data_node(const Chunk *chunk, const char *node_name);
extern List *ts_chunk_data_nodes_copy(const Chunk *chunk);
extern TSDLLEXPORT Chunk *ts_chunk_create_only_table(Hypertable *ht, Hypercube *cube,
Expand Down
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Expand Up @@ -106,6 +106,7 @@ CROSSMODULE_WRAPPER(data_node_alter);
CROSSMODULE_WRAPPER(chunk_drop_replica);
CROSSMODULE_WRAPPER(chunk_freeze_chunk);
CROSSMODULE_WRAPPER(chunk_unfreeze_chunk);
CROSSMODULE_WRAPPER(chunks_drop_stale);

CROSSMODULE_WRAPPER(chunk_set_default_data_node);
CROSSMODULE_WRAPPER(chunk_get_relstats);
Expand Down Expand Up @@ -520,6 +521,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.chunk_drop_replica = error_no_default_fn_pg_community,
.chunk_freeze_chunk = error_no_default_fn_pg_community,
.chunk_unfreeze_chunk = error_no_default_fn_pg_community,
.chunks_drop_stale = error_no_default_fn_pg_community,
.hypertable_make_distributed = hypertable_make_distributed_default_fn,
.get_and_validate_data_node_list = get_and_validate_data_node_list_default_fn,
.timescaledb_fdw_handler = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -200,6 +200,7 @@ typedef struct CrossModuleFunctions
PGFunction chunk_drop_replica;
PGFunction chunk_freeze_chunk;
PGFunction chunk_unfreeze_chunk;
PGFunction chunks_drop_stale;
void (*update_compressed_chunk_relstats)(Oid uncompressed_relid, Oid compressed_relid);
CompressSingleRowState *(*compress_row_init)(int srcht_id, Relation in_rel, Relation out_rel);
TupleTableSlot *(*compress_row_exec)(CompressSingleRowState *cr, TupleTableSlot *slot);
Expand Down
2 changes: 1 addition & 1 deletion src/hypertable.h
Expand Up @@ -143,7 +143,7 @@ extern Tablespace *ts_hypertable_select_tablespace(const Hypertable *ht, const C
extern const char *ts_hypertable_select_tablespace_name(const Hypertable *ht, const Chunk *chunk);
extern Tablespace *ts_hypertable_get_tablespace_at_offset_from(int32 hypertable_id,
Oid tablespace_oid, int16 offset);
extern bool ts_hypertable_has_chunks(Oid table_relid, LOCKMODE lockmode);
extern TSDLLEXPORT bool ts_hypertable_has_chunks(Oid table_relid, LOCKMODE lockmode);
extern void ts_hypertables_rename_schema_name(const char *old_name, const char *new_name);
extern bool ts_is_partitioning_column(const Hypertable *ht, AttrNumber column_attno);
extern TSDLLEXPORT bool ts_hypertable_set_compressed(Hypertable *ht,
Expand Down
204 changes: 204 additions & 0 deletions tsl/src/chunk.c
Expand Up @@ -25,6 +25,7 @@
#include <utils/snapmgr.h>
#include <executor/executor.h>
#include <parser/parse_func.h>
#include <storage/lmgr.h>
#include <funcapi.h>
#include <miscadmin.h>
#include <fmgr.h>
Expand Down Expand Up @@ -561,3 +562,206 @@ chunk_unfreeze_chunk(PG_FUNCTION_ARGS)
bool ret = ts_chunk_unset_frozen(chunk);
PG_RETURN_BOOL(ret);
}

static List *
chunk_id_list_create(ArrayType *array)
{
/* create a sorted list of chunk ids from array */
ArrayIterator it;
Datum id_datum;
List *id_list = NIL;
bool isnull;

it = array_create_iterator(array, 0, NULL);
while (array_iterate(it, &id_datum, &isnull))
{
if (isnull)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("chunks array arguments cannot be NULL")));
id_list = lappend_int(id_list, DatumGetInt32(id_datum));
}
array_free_iterator(it);

(void) list_sort_compat(id_list, list_int_cmp_compat);
return id_list;
}

static List *
chunk_id_list_exclusive_right_merge_join(const List *an_list, const List *dn_list)
{
/*
* merge join two sorted list and return only values which exclusively
* exists in the right target (dn_list list)
*/
List *result = NIL;
const ListCell *l = list_head(an_list);
const ListCell *r = list_head(dn_list);
for (;;)
{
if (l && r)
{
int compare = list_int_cmp_compat(l, r);
if (compare == 0)
{
/* l = r */
l = lnext_compat(an_list, l);
r = lnext_compat(dn_list, r);
}
else if (compare < 0)
{
/* l < r */
/* chunk exists only on the access node */
l = lnext_compat(an_list, l);
}
else
{
/* l > r */
/* chunk exists only on the data node */
result = lappend_int(result, lfirst_int(r));
r = lnext_compat(dn_list, r);
}
}
else if (l)
{
/* chunk exists only on the access node */
l = lnext_compat(an_list, l);
}
else if (r)
{
/* chunk exists only on the data node */
result = lappend_int(result, lfirst_int(r));
r = lnext_compat(dn_list, r);
}
else
{
break;
}
}
return result;
}

/*
* chunk_drop_stale_chunks:
*
* This function drops chunks on a specified data node if those chunks are
* not known by the access node (chunks array).
*
* This function is intended to be used on the access node and data node.
*/
void
ts_chunk_drop_stale_chunks(const char *node_name, ArrayType *chunks_array)
{
DistUtilMembershipStatus membership;

/* execute according to the node membership */
membership = dist_util_membership();
if (membership == DIST_MEMBER_ACCESS_NODE)
{
StringInfo cmd = makeStringInfo();
bool first = true;

if (node_name == NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("node_name argument cannot be NULL")));
if (chunks_array != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("chunks argument cannot be used on the access node")));

/* get an exclusive lock on the chunks catalog table to prevent new chunk
* creation during this operation */
LockRelationOid(ts_catalog_get()->tables[CHUNK].id, AccessExclusiveLock);

/* generate query to execute drop_stale_chunks() on the data node */
appendStringInfo(cmd, "SELECT _timescaledb_internal.drop_stale_chunks(NULL, array[");

/* scan for chunks that reference the given data node */
ScanIterator it = ts_chunk_data_nodes_scan_iterator_create(CurrentMemoryContext);
ts_chunk_data_nodes_scan_iterator_set_node_name(&it, node_name);
ts_scanner_foreach(&it)
{
TupleTableSlot *slot = ts_scan_iterator_slot(&it);
bool PG_USED_FOR_ASSERTS_ONLY isnull = false;
int32 node_chunk_id;

node_chunk_id =
DatumGetInt32(slot_getattr(slot, Anum_chunk_data_node_node_chunk_id, &isnull));
Assert(!isnull);

appendStringInfo(cmd, "%s%d", first ? "" : ",", node_chunk_id);
first = false;
}
ts_scan_iterator_close(&it);

appendStringInfo(cmd, "]::integer[])");

/* execute command on the data node */
ts_dist_cmd_run_on_data_nodes(cmd->data, list_make1((char *) node_name), true);
}
else if (membership == DIST_MEMBER_DATA_NODE)
{
List *an_chunk_id_list = NIL;
List *dn_chunk_id_list = NIL;
List *dn_chunk_id_list_stale = NIL;
ListCell *lc;
Cache *htcache;

if (node_name != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("node_name argument cannot be used on the data node")));

if (chunks_array == NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("chunks argument cannot be NULL")));

/* get a sorted list of chunk ids from the supplied chunks id argument array */
an_chunk_id_list = chunk_id_list_create(chunks_array);

/* get a local sorted list of chunk ids */
dn_chunk_id_list = ts_chunk_get_all_chunk_ids(RowExclusiveLock);

/* merge join two sorted list and get chunk ids which exists locally */
dn_chunk_id_list_stale =
chunk_id_list_exclusive_right_merge_join(an_chunk_id_list, dn_chunk_id_list);

/* drop stale chunks */
htcache = ts_hypertable_cache_pin();
foreach (lc, dn_chunk_id_list_stale)
{
const Chunk *chunk = ts_chunk_get_by_id(lfirst_int(lc), false);
Hypertable *ht;

/* chunk might be already dropped by previous drop, if the chunk was compressed */
if (chunk == NULL)
continue;

/* ensure that we drop only chunks related to distributed hypertables */
ht = ts_hypertable_cache_get_entry(htcache, chunk->hypertable_relid, CACHE_FLAG_NONE);
if (hypertable_is_distributed_member(ht))
ts_chunk_drop(chunk, DROP_RESTRICT, DEBUG1);
}
ts_cache_release(htcache);
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("current server is not an access node or data node")));
}
}

Datum
chunk_drop_stale_chunks(PG_FUNCTION_ARGS)
{
char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
ArrayType *chunks_array = PG_ARGISNULL(1) ? NULL : PG_GETARG_ARRAYTYPE_P(1);

TS_PREVENT_FUNC_IF_READ_ONLY();

ts_chunk_drop_stale_chunks(node_name, chunks_array);
PG_RETURN_VOID();
}
2 changes: 2 additions & 0 deletions tsl/src/chunk.h
Expand Up @@ -16,6 +16,8 @@ extern Datum chunk_set_default_data_node(PG_FUNCTION_ARGS);
extern Datum chunk_drop_replica(PG_FUNCTION_ARGS);
extern Datum chunk_freeze_chunk(PG_FUNCTION_ARGS);
extern Datum chunk_unfreeze_chunk(PG_FUNCTION_ARGS);
extern Datum chunk_drop_stale_chunks(PG_FUNCTION_ARGS);
extern void ts_chunk_drop_stale_chunks(const char *node_name, ArrayType *chunks_array);
extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type);
extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS);

Expand Down
5 changes: 5 additions & 0 deletions tsl/src/data_node.c
Expand Up @@ -1647,6 +1647,11 @@ data_node_alter(PG_FUNCTION_ARGS)
alter_server_stmt.options = options;
AlterForeignServer(&alter_server_stmt);

/* Drop stale chunks on the unavailable data node, if we are going to
* make it available again */
if (!available_is_null && available && !ts_data_node_is_available_by_server(server))
ts_chunk_drop_stale_chunks(node_name, NULL);

/* Make changes to the data node (foreign server object) visible so that
* the changes are present when we switch "primary" data node on chunks */
CommandCounterIncrement();
Expand Down
1 change: 1 addition & 0 deletions tsl/src/init.c
Expand Up @@ -201,6 +201,7 @@ CrossModuleFunctions tsl_cm_functions = {
.chunk_drop_replica = chunk_drop_replica,
.chunk_freeze_chunk = chunk_freeze_chunk,
.chunk_unfreeze_chunk = chunk_unfreeze_chunk,
.chunks_drop_stale = chunk_drop_stale_chunks,
.hypertable_make_distributed = hypertable_make_distributed,
.get_and_validate_data_node_list = hypertable_get_and_validate_data_nodes,
.timescaledb_fdw_handler = timescaledb_fdw_handler,
Expand Down

0 comments on commit 5b29050

Please sign in to comment.