From 289ac3ef1c8569c9be5b7c06d43ef18e4e058e2e Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Fri, 11 Nov 2022 19:14:21 +0530 Subject: [PATCH] Fix DML HA in multi-node If a datanode goes down for whatever reason then DML activity to chunks residing on (or targeted to) that DN will start erroring out. We now handle this by marking the target chunk as "stale" for this DN by changing the metadata on the access node. This allows us to continue to do DML to replicas of the same chunk data on other DNs in the setup. This obviously will only work for chunks which have "replication_factor" > 1. Note that for chunks which do not have undergo any change will continue to carry the appropriate DN related metadata on the AN. This means that such "stale" chunks will become underreplicated and need to be re-balanced by using the copy_chunk functionality by a micro service or some such process. In passing also fix SELECT behaviour on distributed caggs. We only connect to available DNs for invalidations handling. Fixes #4846 --- src/chunk.c | 8 +- src/chunk.h | 4 +- src/cross_module_fn.c | 8 ++ src/cross_module_fn.h | 1 + src/hypertable.c | 3 +- src/hypertable.h | 2 +- src/nodes/chunk_dispatch.c | 20 +++- src/ts_catalog/chunk_data_node.c | 50 ++++++++++ src/ts_catalog/chunk_data_node.h | 4 +- tsl/src/chunk.c | 49 ++++++++++ tsl/src/chunk.h | 1 + tsl/src/chunk_api.c | 8 ++ tsl/src/data_node.c | 2 + tsl/src/fdw/modify_exec.c | 50 ++++++++++ tsl/src/fdw/modify_plan.c | 19 +++- tsl/src/fdw/modify_plan.h | 1 + tsl/src/fdw/scan_exec.c | 13 ++- tsl/src/init.c | 1 + tsl/src/remote/dist_copy.c | 2 +- tsl/src/remote/dist_ddl.c | 12 +++ tsl/test/expected/data_node.out | 156 ++++++++++++++++++++++++++----- tsl/test/sql/data_node.sql | 78 ++++++++++++++-- 22 files changed, 445 insertions(+), 47 deletions(-) diff --git a/src/chunk.c b/src/chunk.c index 22157555a22..5345d8d30f1 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -1475,7 +1475,7 @@ ts_chunk_find_for_point(const Hypertable *ht, const Point *p) * Create a chunk through insertion of a tuple at a given point. */ Chunk * -ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema, +ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, const char *schema, const char *prefix) { /* @@ -1506,6 +1506,8 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche * release the lock early. */ UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock); + if (found) + *found = true; return chunk; } @@ -1517,11 +1519,15 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche chunk = chunk_resurrect(ht, chunk_id); if (chunk != NULL) { + if (found) + *found = true; return chunk; } } /* Create the chunk normally. */ + if (found) + *found = false; if (hypertable_is_distributed_member(ht)) ereport(ERROR, (errcode(ERRCODE_TS_INTERNAL_ERROR), diff --git a/src/chunk.h b/src/chunk.h index b6b3f2b8092..4281cea3bca 100644 --- a/src/chunk.h +++ b/src/chunk.h @@ -145,8 +145,8 @@ typedef struct DisplayKeyData extern void ts_chunk_formdata_fill(FormData_chunk *fd, const TupleInfo *ti); extern Chunk *ts_chunk_find_for_point(const Hypertable *ht, const Point *p); -extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema, - const char *prefix); +extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, + const char *schema, const char *prefix); List *ts_chunk_id_find_in_subspace(Hypertable *ht, List *dimension_vecs); extern TSDLLEXPORT Chunk *ts_chunk_create_base(int32 id, int16 num_constraints, const char relkind); diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 06af2fac474..1c3fcbde18b 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -386,6 +386,13 @@ update_compressed_chunk_relstats_default(Oid uncompressed_relid, Oid compressed_ error_no_default_fn_community(); } +static void +dist_update_stale_chunk_metadata_default(Chunk *new_chunk, List *chunk_data_nodes) +{ + error_no_default_fn_community(); + pg_unreachable(); +} + TS_FUNCTION_INFO_V1(ts_tsl_loaded); PGDLLEXPORT Datum @@ -539,6 +546,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .dist_remote_chunk_info = error_no_default_fn_pg_community, .dist_remote_compressed_chunk_info = error_no_default_fn_pg_community, .dist_remote_hypertable_index_info = error_no_default_fn_pg_community, + .dist_update_stale_chunk_metadata = dist_update_stale_chunk_metadata_default, .validate_as_data_node = error_no_default_fn_community, .func_call_on_data_nodes = func_call_on_data_nodes_default, .chunk_get_relstats = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 8c479f5e546..da84a84af2e 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -188,6 +188,7 @@ typedef struct CrossModuleFunctions PGFunction dist_remote_chunk_info; PGFunction dist_remote_compressed_chunk_info; PGFunction dist_remote_hypertable_index_info; + void (*dist_update_stale_chunk_metadata)(Chunk *new_chunk, List *chunk_data_nodes); void (*validate_as_data_node)(void); void (*func_call_on_data_nodes)(FunctionCallInfo fcinfo, List *data_node_oids); PGFunction distributed_exec; diff --git a/src/hypertable.c b/src/hypertable.c index 4bf9c0e59b9..3dd3f58f3f2 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -1067,12 +1067,13 @@ hypertable_chunk_store_add(const Hypertable *h, const Chunk *input_chunk) * Create a chunk for the point, given that it does not exist yet. */ Chunk * -ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point) +ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point, bool *found) { Assert(ts_subspace_store_get(h->chunk_cache, point) == NULL); Chunk *chunk = ts_chunk_create_for_point(h, point, + found, NameStr(h->fd.associated_schema_name), NameStr(h->fd.associated_table_prefix)); diff --git a/src/hypertable.h b/src/hypertable.h index a5449787386..08c363c348c 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -135,7 +135,7 @@ extern TSDLLEXPORT int32 ts_hypertable_relid_to_id(Oid relid); extern TSDLLEXPORT Chunk *ts_hypertable_find_chunk_for_point(const Hypertable *h, const Point *point); extern TSDLLEXPORT Chunk *ts_hypertable_create_chunk_for_point(const Hypertable *h, - const Point *point); + const Point *point, bool *found); extern Oid ts_hypertable_relid(RangeVar *rv); extern TSDLLEXPORT bool ts_is_hypertable(Oid relid); extern bool ts_hypertable_has_tablespace(const Hypertable *ht, Oid tspc_oid); diff --git a/src/nodes/chunk_dispatch.c b/src/nodes/chunk_dispatch.c index 05eaef65eec..8775a9ce52c 100644 --- a/src/nodes/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch.c @@ -14,9 +14,11 @@ #include "compat/compat.h" #include "chunk_dispatch.h" #include "chunk_insert_state.h" +#include "errors.h" #include "subspace_store.h" #include "dimension.h" #include "guc.h" +#include "ts_catalog/chunk_data_node.h" ChunkDispatch * ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags) @@ -144,11 +146,27 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, * locking the hypertable. This serves as a fast path for the usual case * where the chunk already exists. */ + bool found; Chunk *new_chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point); if (new_chunk == NULL) { - new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point); + new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found); } + else + found = true; + + /* get the filtered list of "available" DNs for this chunk */ + List *chunk_data_nodes = + ts_chunk_data_node_scan_by_chunk_id_filter(new_chunk->fd.id, CurrentMemoryContext); + + /* + * If the chunk was not created as part of this insert, we need to check whether any + * of the chunk's data nodes are currently unavailable and in that case consider the + * chunk stale on those data nodes. Do that by removing the AN's chunk-datanode + * mapping for the unavailable data nodes. + */ + if (found && dispatch->hypertable->fd.replication_factor > list_length(chunk_data_nodes)) + ts_cm_functions->dist_update_stale_chunk_metadata(new_chunk, chunk_data_nodes); if (NULL == new_chunk) elog(ERROR, "no chunk found or created"); diff --git a/src/ts_catalog/chunk_data_node.c b/src/ts_catalog/chunk_data_node.c index 78e3afd19be..ec1d1eb7cb3 100644 --- a/src/ts_catalog/chunk_data_node.c +++ b/src/ts_catalog/chunk_data_node.c @@ -14,6 +14,9 @@ #include #include "ts_catalog/chunk_data_node.h" +#include "cache.h" +#include "hypercube.h" +#include "hypertable_cache.h" #include "scanner.h" #include "chunk.h" @@ -124,6 +127,37 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data) return SCAN_CONTINUE; } +/* return a filtered list of "available" ChunkDataNode entries */ +static ScanTupleResult +chunk_data_node_tuple_found_filter(TupleInfo *ti, void *data) +{ + List **nodes = data; + bool should_free; + HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); + Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple); + ForeignServer *server; + + server = GetForeignServerByName(NameStr(form->node_name), false); + + if (ts_data_node_is_available_by_server(server)) + { + ChunkDataNode *chunk_data_node; + MemoryContext old; + + old = MemoryContextSwitchTo(ti->mctx); + chunk_data_node = palloc(sizeof(ChunkDataNode)); + memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node)); + chunk_data_node->foreign_server_oid = server->serverid; + *nodes = lappend(*nodes, chunk_data_node); + MemoryContextSwitchTo(old); + } + + if (should_free) + heap_freetuple(tuple); + + return SCAN_CONTINUE; +} + static int ts_chunk_data_node_scan_by_chunk_id_and_node_internal(int32 chunk_id, const char *node_name, bool scan_by_remote_chunk_id, @@ -210,6 +244,22 @@ ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx) return chunk_data_nodes; } +/* Returns a filtered List of available ChunkDataNode structs. */ +List * +ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id, MemoryContext mctx) +{ + List *chunk_data_nodes = NIL; + + ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id, + NULL, + false, + chunk_data_node_tuple_found_filter, + &chunk_data_nodes, + AccessShareLock, + mctx); + return chunk_data_nodes; +} + static ChunkDataNode * chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name, bool scan_by_remote_chunk_id, MemoryContext mctx) diff --git a/src/ts_catalog/chunk_data_node.h b/src/ts_catalog/chunk_data_node.h index 83a35c78583..ecfbef9005a 100644 --- a/src/ts_catalog/chunk_data_node.h +++ b/src/ts_catalog/chunk_data_node.h @@ -7,6 +7,7 @@ #define TIMESCALEDB_CHUNK_DATA_NODE_H #include "ts_catalog/catalog.h" +#include "chunk.h" #include "export.h" #include "scan_iterator.h" @@ -17,6 +18,8 @@ typedef struct ChunkDataNode } ChunkDataNode; extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx); +extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id, + MemoryContext mctx); extern TSDLLEXPORT ChunkDataNode * ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name, MemoryContext mctx); @@ -37,5 +40,4 @@ extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanItera int32 chunk_id); extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name); - #endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */ diff --git a/tsl/src/chunk.c b/tsl/src/chunk.c index 9d4f2b7e8a0..34151bd154f 100644 --- a/tsl/src/chunk.c +++ b/tsl/src/chunk.c @@ -561,3 +561,52 @@ chunk_unfreeze_chunk(PG_FUNCTION_ARGS) bool ret = ts_chunk_unset_frozen(chunk); PG_RETURN_BOOL(ret); } + +/* + * Update and refresh the DN list for a given chunk. We remove metadata for this chunk + * for unavailable DNs and assign the chunk's data_node list to the list of live DNs + */ +void +chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes) +{ + List *serveroids = NIL; + ListCell *lc; + ChunkDataNode *cdn; + + /* check that at least one data node is available for this chunk on the AN */ + if (chunk_data_nodes == NIL) + ereport(ERROR, + (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES), + (errmsg("insufficient number of data nodes"), + errhint("Increase the number of available data nodes on hypertable " + "\"%s\".", + get_rel_name(new_chunk->hypertable_relid))))); + + foreach (lc, chunk_data_nodes) + { + cdn = lfirst(lc); + serveroids = lappend_oid(serveroids, cdn->foreign_server_oid); + } + + foreach (lc, new_chunk->data_nodes) + { + cdn = lfirst(lc); + /* + * check if this DN is a part of chunk_data_nodes. If not + * found in chunk_data_nodes, then we need to remove this + * chunk id to node name mapping and also update the primary + * foreign server if necessary. It's possible that this metadata + * might have been already cleared earlier but we have no way of + * knowing that here. + */ + if (!list_member_oid(serveroids, cdn->foreign_server_oid)) + { + chunk_update_foreign_server_if_needed(new_chunk, cdn->foreign_server_oid, false); + ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id, + NameStr(cdn->fd.node_name)); + } + } + + /* update new_chunk->data_nodes to point to the list of "live" DNs */ + new_chunk->data_nodes = chunk_data_nodes; +} diff --git a/tsl/src/chunk.h b/tsl/src/chunk.h index 9d0dcfaaff5..76d902d5742 100644 --- a/tsl/src/chunk.h +++ b/tsl/src/chunk.h @@ -18,5 +18,6 @@ extern Datum chunk_freeze_chunk(PG_FUNCTION_ARGS); extern Datum chunk_unfreeze_chunk(PG_FUNCTION_ARGS); extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type); extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS); +extern void chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes); #endif /* TIMESCALEDB_TSL_CHUNK_H */ diff --git a/tsl/src/chunk_api.c b/tsl/src/chunk_api.c index c8d8c060bfb..34ca99a0a03 100644 --- a/tsl/src/chunk_api.c +++ b/tsl/src/chunk_api.c @@ -474,6 +474,14 @@ chunk_api_create_on_data_nodes(const Chunk *chunk, const Hypertable *ht, ListCell *lc; TupleDesc tupdesc; AttInMetadata *attinmeta; + + /* + * In case of "unavailable" datanodes, the chunk->data_nodes list is already pruned + * and doesn't contain "unavailable" datanodes. So this chunk creation will never + * happen on such "unavailable" datanodes. By the same logic, metadata update on the + * AN for the chunk->datanode mappings will only happen for the listed "live" DNs + * and not for the "unavailable" ones + */ List *target_data_nodes = data_nodes ? data_nodes : chunk->data_nodes; get_create_chunk_result_type(&tupdesc); diff --git a/tsl/src/data_node.c b/tsl/src/data_node.c index 08f622e4dd7..dbabf200899 100644 --- a/tsl/src/data_node.c +++ b/tsl/src/data_node.c @@ -1902,6 +1902,8 @@ data_node_delete(PG_FUNCTION_ARGS) /* * Get server list, performing an ACL check on each of them in the process. + * + * If "available" is true then only return a set of available data nodes */ List * data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck) diff --git a/tsl/src/fdw/modify_exec.c b/tsl/src/fdw/modify_exec.c index 1bed493c601..51be3702c41 100644 --- a/tsl/src/fdw/modify_exec.c +++ b/tsl/src/fdw/modify_exec.c @@ -26,6 +26,8 @@ #include "scan_plan.h" #include "modify_exec.h" +#include "modify_plan.h" +#include "tsl/src/chunk.h" /* * This enum describes what's kept in the fdw_private list for a ModifyTable @@ -393,6 +395,46 @@ response_type(AttConvInMetadata *att_conv_metadata) return att_conv_metadata == NULL || att_conv_metadata->binary ? FORMAT_BINARY : FORMAT_TEXT; } +static void +fdw_chunk_update_stale_metadata(TsFdwModifyState *fmstate) +{ + List *all_data_nodes; + Relation rel = fmstate->rel; + int32 chunk_id = ts_chunk_get_id_by_relid(rel->rd_id); + Assert(chunk_id != 0); + + all_data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk_id, CurrentMemoryContext); + Assert(list_length(all_data_nodes) >= fmstate->num_data_nodes); + + if (list_length(all_data_nodes) > fmstate->num_data_nodes) + { + Chunk *chunk = ts_chunk_get_by_relid(rel->rd_id, true); + /* get filtered list */ + List *serveroids = get_chunk_data_nodes(rel->rd_id); + ListCell *lc; + Assert(list_length(serveroids) == fmstate->num_data_nodes); + + foreach (lc, all_data_nodes) + { + ChunkDataNode *cdn = lfirst(lc); + /* + * check if this DN is a part of serveroids. If not + * found in serveroids, then we need to remove this + * chunk id to node name mapping and also update the primary + * foreign server if necessary. It's possible that this metadata + * might have been already cleared earlier but we have no way of + * knowing that here. + */ + if (!list_member_oid(serveroids, cdn->foreign_server_oid)) + { + chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false); + ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id, + NameStr(cdn->fd.node_name)); + } + } + } +} + TupleTableSlot * fdw_exec_foreign_insert(TsFdwModifyState *fmstate, EState *estate, TupleTableSlot *slot, TupleTableSlot *planslot) @@ -544,6 +586,14 @@ fdw_exec_foreign_update_or_delete(TsFdwModifyState *fmstate, EState *estate, Tup pfree(reqset); stmt_params_reset(params); + /* + * If rows are affected on DNs and a DN was excluded because of being + * "unavailable" then we need to update metadata on the AN to mark + * this chunk as "stale" for that "unavailable" DN + */ + if (n_rows > 0) + fdw_chunk_update_stale_metadata(fmstate); + /* Return NULL if nothing was updated on the remote end */ return (n_rows > 0) ? slot : NULL; } diff --git a/tsl/src/fdw/modify_plan.c b/tsl/src/fdw/modify_plan.c index 08a474f8367..ead1a0af48d 100644 --- a/tsl/src/fdw/modify_plan.c +++ b/tsl/src/fdw/modify_plan.c @@ -10,6 +10,7 @@ #include #include "deparse.h" +#include "errors.h" #include "modify_plan.h" #include "ts_catalog/chunk_data_node.h" @@ -51,16 +52,30 @@ get_update_attrs(RangeTblEntry *rte) return attrs; } -static List * +/* get a list of "live" DNs associated with this chunk */ +List * get_chunk_data_nodes(Oid relid) { int32 chunk_id = ts_chunk_get_id_by_relid(relid); Assert(chunk_id != 0); - List *chunk_data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk_id, CurrentMemoryContext); + List *chunk_data_nodes = + ts_chunk_data_node_scan_by_chunk_id_filter(chunk_id, CurrentMemoryContext); List *serveroids = NIL; ListCell *lc; + /* check that alteast one data node is available for this chunk */ + if (chunk_data_nodes == NIL) + { + Hypertable *ht = ts_hypertable_get_by_id(ts_chunk_get_hypertable_id_by_relid(relid)); + + ereport(ERROR, + (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES), + (errmsg("insufficient number of data nodes"), + errhint("Increase the number of available data nodes on hypertable \"%s\".", + get_rel_name(ht->main_table_relid))))); + } + foreach (lc, chunk_data_nodes) { ChunkDataNode *data_node = lfirst(lc); diff --git a/tsl/src/fdw/modify_plan.h b/tsl/src/fdw/modify_plan.h index 87b17ddb2ea..c53506420ea 100644 --- a/tsl/src/fdw/modify_plan.h +++ b/tsl/src/fdw/modify_plan.h @@ -10,5 +10,6 @@ extern List *fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relation, int subplan_index); +extern List *get_chunk_data_nodes(Oid relid); #endif /* TIMESCALEDB_TSL_FDW_MODIFY_PLAN_H */ diff --git a/tsl/src/fdw/scan_exec.c b/tsl/src/fdw/scan_exec.c index aeeb7524063..c5bf257f241 100644 --- a/tsl/src/fdw/scan_exec.c +++ b/tsl/src/fdw/scan_exec.c @@ -281,18 +281,23 @@ fdw_scan_init(ScanState *ss, TsFdwScanState *fsstate, Bitmapset *scanrelids, Lis List *fdw_exprs, int eflags) { int num_params; + Oid server_oid; + ForeignServer *server; if ((eflags & EXEC_FLAG_EXPLAIN_ONLY) && !ts_guc_enable_remote_explain) return; + /* Check if the server is "available" for use before setting up a connection to it */ + server_oid = intVal(list_nth(fdw_private, FdwScanPrivateServerId)); + server = GetForeignServer(server_oid); + if (!ts_data_node_is_available_by_server(server)) + ereport(ERROR, (errmsg("Node \"%s\" is not available for queries", server->servername))); + /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = get_connection(ss, - intVal(list_nth(fdw_private, FdwScanPrivateServerId)), - scanrelids, - fdw_exprs); + fsstate->conn = get_connection(ss, server_oid, scanrelids, fdw_exprs); /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); diff --git a/tsl/src/init.c b/tsl/src/init.c index f30a93ca409..f42a92b4605 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -223,6 +223,7 @@ CrossModuleFunctions tsl_cm_functions = { .dist_remote_chunk_info = dist_util_remote_chunk_info, .dist_remote_compressed_chunk_info = dist_util_remote_compressed_chunk_info, .dist_remote_hypertable_index_info = dist_util_remote_hypertable_index_info, + .dist_update_stale_chunk_metadata = chunk_update_stale_metadata, .validate_as_data_node = validate_data_node_settings, .distributed_exec = ts_dist_cmd_exec, .create_distributed_restore_point = create_distributed_restore_point, diff --git a/tsl/src/remote/dist_copy.c b/tsl/src/remote/dist_copy.c index 52077bbef22..c8ace1927c5 100644 --- a/tsl/src/remote/dist_copy.c +++ b/tsl/src/remote/dist_copy.c @@ -1360,7 +1360,7 @@ remote_copy_process_and_send_data(RemoteCopyContext *context) end_copy_on_success(&context->connection_state); did_end_copy = true; } - chunk = ts_hypertable_create_chunk_for_point(ht, point); + chunk = ts_hypertable_create_chunk_for_point(ht, point, NULL); } /* diff --git a/tsl/src/remote/dist_ddl.c b/tsl/src/remote/dist_ddl.c index 8ee7993872a..1a09a00989d 100644 --- a/tsl/src/remote/dist_ddl.c +++ b/tsl/src/remote/dist_ddl.c @@ -238,7 +238,19 @@ dist_ddl_state_add_data_node_list_from_table(const char *schema, const char *nam static void dist_ddl_state_add_data_node_list_from_ht(Hypertable *ht) { + ListCell *lc; + dist_ddl_state.data_node_list = ts_hypertable_get_data_node_name_list(ht); + + /* Check that all DNs are "available" for this DDL operation, fail otherwise */ + foreach (lc, dist_ddl_state.data_node_list) + { + const char *data_node_name = lfirst(lc); + ForeignServer *server = GetForeignServerByName(data_node_name, false); + + if (!ts_data_node_is_available_by_server(server)) + ereport(ERROR, (errmsg("some data nodes are not available for DDL queries"))); + } } static void diff --git a/tsl/test/expected/data_node.out b/tsl/test/expected/data_node.out index 8b1708b1848..14c575d325a 100644 --- a/tsl/test/expected/data_node.out +++ b/tsl/test/expected/data_node.out @@ -1769,7 +1769,7 @@ SELECT * FROM chunk_query_data_node; -- replication \set ON_ERROR_STOP 0 SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; -ERROR: could not connect to "data_node_1" +ERROR: Node "data_node_1" is not available for queries \set ON_ERROR_STOP 1 SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; time | location @@ -1789,19 +1789,36 @@ SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; Sat Jan 01 00:00:00 2022 PST | 1 (1 row) --- inserts should fail if going to chunks that exist on the --- unavailable data node -\set ON_ERROR_STOP 0 +-- inserts should continue to work and should go to the "live" +-- datanodes INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); -ERROR: could not connect to "data_node_1" +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); -ERROR: could not connect to "data_node_1" -\set ON_ERROR_STOP 1 --- inserts should work if going to a new chunk +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:05', 1, 1); +-- Check that the metadata on the AN removes the association with +-- the "unavailable" DN for existing chunks that are being written into +-- above +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-03 00:00:00' +AND range_end::timestamptz > '2022-01-03 00:00:00' +ORDER BY 1, 2; + hypertable_name | chunk_name | data_nodes +-----------------+-------------------------+--------------------------------------- + hyper3 | _dist_hyper_12_18_chunk | {data_node_2,data_node_3} + hyper3 | _dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} + hyper3 | _dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} + hyper_1dim | _dist_hyper_13_22_chunk | {data_node_2,data_node_3} +(4 rows) + +-- Also, inserts should work if going to a new chunk INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); WARNING: insufficient number of data nodes +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); WARNING: insufficient number of data nodes +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:05', 1, 1); +-- Also check that new chunks only use the "available" DNs SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks WHERE hypertable_name IN ('hyper3', 'hyper_1dim') AND range_start::timestamptz <= '2022-01-10 00:00:00' @@ -1813,17 +1830,103 @@ ORDER BY 1, 2; hyper_1dim | _dist_hyper_13_25_chunk | {data_node_2,data_node_3} (2 rows) +-- Updates/Deletes should also work +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:05'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; +-- Inserts directly into chunks using FDW should also work +INSERT INTO _timescaledb_internal._dist_hyper_12_24_chunk VALUES ('2022-01-11 00:00:00', 1, 1); +INSERT INTO _timescaledb_internal._dist_hyper_13_25_chunk VALUES ('2022-01-11 00:00:00', 1, 1); +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-10 00:00:00' +AND range_end::timestamptz > '2022-01-10 00:00:00' +ORDER BY 1, 2; + hypertable_name | chunk_name | data_nodes +-----------------+-------------------------+--------------------------- + hyper3 | _dist_hyper_12_24_chunk | {data_node_2,data_node_3} + hyper_1dim | _dist_hyper_13_25_chunk | {data_node_2,data_node_3} +(2 rows) + +-- DDL should error out even if one DN is unavailable +\set ON_ERROR_STOP 0 +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL queries +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL queries +\set ON_ERROR_STOP 1 +-- Mark all DNs unavailable. Metadata should still retain last DN but all +-- activity should fail +SELECT * FROM alter_data_node('data_node_2', available=>false); +WARNING: could not switch data node on 2 chunks + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_2 | localhost | 55432 | db_data_node_2 | f +(1 row) + +SELECT * FROM alter_data_node('data_node_3', available=>false); +WARNING: could not switch data node on 11 chunks + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_3 | localhost | 55432 | db_data_node_3 | f +(1 row) + +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +ERROR: insufficient number of data nodes +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +ERROR: insufficient number of data nodes +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of data nodes +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of data nodes +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of data nodes +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of data nodes +SELECT count(*) FROM hyper3; +ERROR: Node "data_node_3" is not available for queries +SELECT count(*) FROM hyper_1dim; +ERROR: Node "data_node_3" is not available for queries +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL queries +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL queries +\set ON_ERROR_STOP 1 -- re-enable the data node and the chunks should "switch back" to -- using the data node. However, only the chunks for which the node is -- "primary" should switch to using the data node for queries ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; WARNING: you need to manually restart any running background workers after this command SELECT * FROM alter_data_node('data_node_1', available=>true); +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes node_name | host | port | database | available -------------+-----------+-------+----------------+----------- data_node_1 | localhost | 55432 | db_data_node_1 | t (1 row) +SELECT * FROM alter_data_node('data_node_2', available=>true); +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_2 | localhost | 55432 | db_data_node_2 | t +(1 row) + +SELECT * FROM alter_data_node('data_node_3', available=>true); + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_3 | localhost | 55432 | db_data_node_3 | t +(1 row) + SELECT * FROM chunk_query_data_node; hypertable_name | chunk | data_nodes | default_data_node -----------------+-----------------------------------------------+---------------------------------------+------------------- @@ -1833,14 +1936,14 @@ SELECT * FROM chunk_query_data_node; hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1 hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 - hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 - hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_2,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_2,data_node_3} | data_node_2 hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 - hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_3 hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 - hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 - hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 - hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_3 (14 rows) --queries should work again on all tables @@ -1868,6 +1971,9 @@ SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; Sat Jan 01 00:00:00 2022 PST | 1 (1 row) +-- DDL should also work again +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; -- save old port so that we can restore connectivity after we test -- changing the connection information for the data node WITH options AS ( @@ -1878,12 +1984,12 @@ WITH options AS ( SELECT split_part(opt, '=', 2) AS old_port FROM options WHERE opt LIKE 'port%' \gset -- also test altering host, port and database -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; node_name | options -------------+------------------------------------------------------------------ - data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} - data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2,available=true} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3,available=true} (3 rows) SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', port=>8989, database=>'new_db'); @@ -1892,12 +1998,12 @@ SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', p data_node_1 | foo.bar | 8989 | new_db | t (1 row) -SELECT node_name, options FROM timescaledb_information.data_nodes; - node_name | options --------------+------------------------------------------------------- +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; + node_name | options +-------------+------------------------------------------------------------------ data_node_1 | {host=foo.bar,port=8989,dbname=new_db,available=true} - data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} - data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2,available=true} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3,available=true} (3 rows) -- just show current options: @@ -1926,12 +2032,12 @@ SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, data_node_1 | localhost | 55432 | db_data_node_1 | t (1 row) -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; node_name | options -------------+------------------------------------------------------------------ data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} - data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} - data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2,available=true} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3,available=true} (3 rows) DROP TABLE hyper1; diff --git a/tsl/test/sql/data_node.sql b/tsl/test/sql/data_node.sql index f66260b801c..931fcd56a40 100644 --- a/tsl/test/sql/data_node.sql +++ b/tsl/test/sql/data_node.sql @@ -866,16 +866,49 @@ SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; --- inserts should fail if going to chunks that exist on the --- unavailable data node -\set ON_ERROR_STOP 0 +-- inserts should continue to work and should go to the "live" +-- datanodes INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); -\set ON_ERROR_STOP 1 +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:05', 1, 1); + +-- Check that the metadata on the AN removes the association with +-- the "unavailable" DN for existing chunks that are being written into +-- above +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-03 00:00:00' +AND range_end::timestamptz > '2022-01-03 00:00:00' +ORDER BY 1, 2; --- inserts should work if going to a new chunk +-- Also, inserts should work if going to a new chunk INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:05', 1, 1); + +-- Also check that new chunks only use the "available" DNs +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-10 00:00:00' +AND range_end::timestamptz > '2022-01-10 00:00:00' +ORDER BY 1, 2; + + +-- Updates/Deletes should also work +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:05'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; + +-- Inserts directly into chunks using FDW should also work +INSERT INTO _timescaledb_internal._dist_hyper_12_24_chunk VALUES ('2022-01-11 00:00:00', 1, 1); +INSERT INTO _timescaledb_internal._dist_hyper_13_25_chunk VALUES ('2022-01-11 00:00:00', 1, 1); SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks WHERE hypertable_name IN ('hyper3', 'hyper_1dim') @@ -883,11 +916,36 @@ AND range_start::timestamptz <= '2022-01-10 00:00:00' AND range_end::timestamptz > '2022-01-10 00:00:00' ORDER BY 1, 2; +-- DDL should error out even if one DN is unavailable +\set ON_ERROR_STOP 0 +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +\set ON_ERROR_STOP 1 + +-- Mark all DNs unavailable. Metadata should still retain last DN but all +-- activity should fail +SELECT * FROM alter_data_node('data_node_2', available=>false); +SELECT * FROM alter_data_node('data_node_3', available=>false); +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +SELECT count(*) FROM hyper3; +SELECT count(*) FROM hyper_1dim; +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +\set ON_ERROR_STOP 1 + -- re-enable the data node and the chunks should "switch back" to -- using the data node. However, only the chunks for which the node is -- "primary" should switch to using the data node for queries ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; SELECT * FROM alter_data_node('data_node_1', available=>true); +SELECT * FROM alter_data_node('data_node_2', available=>true); +SELECT * FROM alter_data_node('data_node_3', available=>true); SELECT * FROM chunk_query_data_node; --queries should work again on all tables @@ -896,6 +954,10 @@ SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; +-- DDL should also work again +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; + -- save old port so that we can restore connectivity after we test -- changing the connection information for the data node WITH options AS ( @@ -907,10 +969,10 @@ SELECT split_part(opt, '=', 2) AS old_port FROM options WHERE opt LIKE 'port%' \gset -- also test altering host, port and database -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', port=>8989, database=>'new_db'); -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; -- just show current options: SELECT * FROM alter_data_node('data_node_1'); @@ -925,7 +987,7 @@ SELECT delete_data_node('data_node_1', drop_database=>true); -- restore configuration for data_node_1 SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1'); -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; DROP TABLE hyper1; DROP TABLE hyper2;