Skip to content

Commit

Permalink
Fix DML HA in multi-node
Browse files Browse the repository at this point in the history
If a datanode goes down for whatever reason then DML activity to
chunks residing on (or targeted to) that DN will start erroring out.
We now handle this by marking the target chunk as "stale" for this
DN by changing the metadata on the access node. This allows us to
continue to do DML to replicas of the same chunk data on other DNs
in the setup. This obviously will only work for chunks which have
"replication_factor" > 1. Note that for chunks which do not have
undergo any change will continue to carry the appropriate DN related
metadata on the AN.

This means that such "stale" chunks will become underreplicated and
need to be re-balanced by using the copy_chunk functionality by a micro
service or some such process.

In passing also fix SELECT behaviour on distributed caggs. We only
connect to available DNs for invalidations handling.

Fixes timescale#4846
  • Loading branch information
nikkhils committed Nov 23, 2022
1 parent 639a501 commit e38514c
Show file tree
Hide file tree
Showing 22 changed files with 445 additions and 47 deletions.
8 changes: 7 additions & 1 deletion src/chunk.c
Expand Up @@ -1475,7 +1475,7 @@ ts_chunk_find_for_point(const Hypertable *ht, const Point *p)
* Create a chunk through insertion of a tuple at a given point.
*/
Chunk *
ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema,
ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, const char *schema,
const char *prefix)
{
/*
Expand Down Expand Up @@ -1506,6 +1506,8 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche
* release the lock early.
*/
UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
if (found)
*found = true;
return chunk;
}

Expand All @@ -1517,11 +1519,15 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche
chunk = chunk_resurrect(ht, chunk_id);
if (chunk != NULL)
{
if (found)
*found = true;
return chunk;
}
}

/* Create the chunk normally. */
if (found)
*found = false;
if (hypertable_is_distributed_member(ht))
ereport(ERROR,
(errcode(ERRCODE_TS_INTERNAL_ERROR),
Expand Down
4 changes: 2 additions & 2 deletions src/chunk.h
Expand Up @@ -145,8 +145,8 @@ typedef struct DisplayKeyData

extern void ts_chunk_formdata_fill(FormData_chunk *fd, const TupleInfo *ti);
extern Chunk *ts_chunk_find_for_point(const Hypertable *ht, const Point *p);
extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema,
const char *prefix);
extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found,
const char *schema, const char *prefix);
List *ts_chunk_id_find_in_subspace(Hypertable *ht, List *dimension_vecs);

extern TSDLLEXPORT Chunk *ts_chunk_create_base(int32 id, int16 num_constraints, const char relkind);
Expand Down
8 changes: 8 additions & 0 deletions src/cross_module_fn.c
Expand Up @@ -386,6 +386,13 @@ update_compressed_chunk_relstats_default(Oid uncompressed_relid, Oid compressed_
error_no_default_fn_community();
}

static void
dist_update_stale_chunk_metadata_default(Chunk *new_chunk, List *chunk_data_nodes)
{
error_no_default_fn_community();
pg_unreachable();
}

TS_FUNCTION_INFO_V1(ts_tsl_loaded);

PGDLLEXPORT Datum
Expand Down Expand Up @@ -539,6 +546,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.dist_remote_chunk_info = error_no_default_fn_pg_community,
.dist_remote_compressed_chunk_info = error_no_default_fn_pg_community,
.dist_remote_hypertable_index_info = error_no_default_fn_pg_community,
.dist_update_stale_chunk_metadata = dist_update_stale_chunk_metadata_default,
.validate_as_data_node = error_no_default_fn_community,
.func_call_on_data_nodes = func_call_on_data_nodes_default,
.chunk_get_relstats = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -188,6 +188,7 @@ typedef struct CrossModuleFunctions
PGFunction dist_remote_chunk_info;
PGFunction dist_remote_compressed_chunk_info;
PGFunction dist_remote_hypertable_index_info;
void (*dist_update_stale_chunk_metadata)(Chunk *new_chunk, List *chunk_data_nodes);
void (*validate_as_data_node)(void);
void (*func_call_on_data_nodes)(FunctionCallInfo fcinfo, List *data_node_oids);
PGFunction distributed_exec;
Expand Down
3 changes: 2 additions & 1 deletion src/hypertable.c
Expand Up @@ -1067,12 +1067,13 @@ hypertable_chunk_store_add(const Hypertable *h, const Chunk *input_chunk)
* Create a chunk for the point, given that it does not exist yet.
*/
Chunk *
ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point)
ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point, bool *found)
{
Assert(ts_subspace_store_get(h->chunk_cache, point) == NULL);

Chunk *chunk = ts_chunk_create_for_point(h,
point,
found,
NameStr(h->fd.associated_schema_name),
NameStr(h->fd.associated_table_prefix));

Expand Down
2 changes: 1 addition & 1 deletion src/hypertable.h
Expand Up @@ -135,7 +135,7 @@ extern TSDLLEXPORT int32 ts_hypertable_relid_to_id(Oid relid);
extern TSDLLEXPORT Chunk *ts_hypertable_find_chunk_for_point(const Hypertable *h,
const Point *point);
extern TSDLLEXPORT Chunk *ts_hypertable_create_chunk_for_point(const Hypertable *h,
const Point *point);
const Point *point, bool *found);
extern Oid ts_hypertable_relid(RangeVar *rv);
extern TSDLLEXPORT bool ts_is_hypertable(Oid relid);
extern bool ts_hypertable_has_tablespace(const Hypertable *ht, Oid tspc_oid);
Expand Down
20 changes: 19 additions & 1 deletion src/nodes/chunk_dispatch.c
Expand Up @@ -14,9 +14,11 @@
#include "compat/compat.h"
#include "chunk_dispatch.h"
#include "chunk_insert_state.h"
#include "errors.h"
#include "subspace_store.h"
#include "dimension.h"
#include "guc.h"
#include "ts_catalog/chunk_data_node.h"

ChunkDispatch *
ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags)
Expand Down Expand Up @@ -144,11 +146,27 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* locking the hypertable. This serves as a fast path for the usual case
* where the chunk already exists.
*/
bool found;
Chunk *new_chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);
if (new_chunk == NULL)
{
new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point);
new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found);
}
else
found = true;

/* get the filtered list of "available" DNs for this chunk */
List *chunk_data_nodes =
ts_chunk_data_node_scan_by_chunk_id_filter(new_chunk->fd.id, CurrentMemoryContext);

/*
* If the chunk was not created as part of this insert, we need to check whether any
* of the chunk's data nodes are currently unavailable and in that case consider the
* chunk stale on those data nodes. Do that by removing the AN's chunk-datanode
* mapping for the unavailable data nodes.
*/
if (found && dispatch->hypertable->fd.replication_factor > list_length(chunk_data_nodes))
ts_cm_functions->dist_update_stale_chunk_metadata(new_chunk, chunk_data_nodes);

if (NULL == new_chunk)
elog(ERROR, "no chunk found or created");
Expand Down
50 changes: 50 additions & 0 deletions src/ts_catalog/chunk_data_node.c
Expand Up @@ -14,6 +14,9 @@
#include <access/xact.h>

#include "ts_catalog/chunk_data_node.h"
#include "cache.h"
#include "hypercube.h"
#include "hypertable_cache.h"
#include "scanner.h"
#include "chunk.h"

Expand Down Expand Up @@ -124,6 +127,37 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data)
return SCAN_CONTINUE;
}

/* return a filtered list of "available" ChunkDataNode entries */
static ScanTupleResult
chunk_data_node_tuple_found_filter(TupleInfo *ti, void *data)
{
List **nodes = data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple);
ForeignServer *server;

server = GetForeignServerByName(NameStr(form->node_name), false);

if (ts_data_node_is_available_by_server(server))
{
ChunkDataNode *chunk_data_node;
MemoryContext old;

old = MemoryContextSwitchTo(ti->mctx);
chunk_data_node = palloc(sizeof(ChunkDataNode));
memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node));
chunk_data_node->foreign_server_oid = server->serverid;
*nodes = lappend(*nodes, chunk_data_node);
MemoryContextSwitchTo(old);
}

if (should_free)
heap_freetuple(tuple);

return SCAN_CONTINUE;
}

static int
ts_chunk_data_node_scan_by_chunk_id_and_node_internal(int32 chunk_id, const char *node_name,
bool scan_by_remote_chunk_id,
Expand Down Expand Up @@ -210,6 +244,22 @@ ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx)
return chunk_data_nodes;
}

/* Returns a filtered List of available ChunkDataNode structs. */
List *
ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id, MemoryContext mctx)
{
List *chunk_data_nodes = NIL;

ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
NULL,
false,
chunk_data_node_tuple_found_filter,
&chunk_data_nodes,
AccessShareLock,
mctx);
return chunk_data_nodes;
}

static ChunkDataNode *
chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
bool scan_by_remote_chunk_id, MemoryContext mctx)
Expand Down
4 changes: 3 additions & 1 deletion src/ts_catalog/chunk_data_node.h
Expand Up @@ -7,6 +7,7 @@
#define TIMESCALEDB_CHUNK_DATA_NODE_H

#include "ts_catalog/catalog.h"
#include "chunk.h"
#include "export.h"
#include "scan_iterator.h"

Expand All @@ -17,6 +18,8 @@ typedef struct ChunkDataNode
} ChunkDataNode;

extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx);
extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id,
MemoryContext mctx);
extern TSDLLEXPORT ChunkDataNode *
ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
MemoryContext mctx);
Expand All @@ -37,5 +40,4 @@ extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanItera
int32 chunk_id);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it,
const char *node_name);

#endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */
49 changes: 49 additions & 0 deletions tsl/src/chunk.c
Expand Up @@ -561,3 +561,52 @@ chunk_unfreeze_chunk(PG_FUNCTION_ARGS)
bool ret = ts_chunk_unset_frozen(chunk);
PG_RETURN_BOOL(ret);
}

/*
* Update and refresh the DN list for a given chunk. We remove metadata for this chunk
* for unavailable DNs and assign the chunk's data_node list to the list of live DNs
*/
void
chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes)
{
List *serveroids = NIL;
ListCell *lc;
ChunkDataNode *cdn;

/* check that at least one data node is available for this chunk on the AN */
if (chunk_data_nodes == NIL)
ereport(ERROR,
(errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
(errmsg("insufficient number of data nodes"),
errhint("Increase the number of available data nodes on hypertable "
"\"%s\".",
get_rel_name(new_chunk->hypertable_relid)))));

foreach (lc, chunk_data_nodes)
{
cdn = lfirst(lc);
serveroids = lappend_oid(serveroids, cdn->foreign_server_oid);
}

foreach (lc, new_chunk->data_nodes)
{
cdn = lfirst(lc);
/*
* check if this DN is a part of chunk_data_nodes. If not
* found in chunk_data_nodes, then we need to remove this
* chunk id to node name mapping and also update the primary
* foreign server if necessary. It's possible that this metadata
* might have been already cleared earlier but we have no way of
* knowing that here.
*/
if (!list_member_oid(serveroids, cdn->foreign_server_oid))
{
chunk_update_foreign_server_if_needed(new_chunk, cdn->foreign_server_oid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
NameStr(cdn->fd.node_name));
}
}

/* update new_chunk->data_nodes to point to the list of "live" DNs */
new_chunk->data_nodes = chunk_data_nodes;
}
1 change: 1 addition & 0 deletions tsl/src/chunk.h
Expand Up @@ -18,5 +18,6 @@ extern Datum chunk_freeze_chunk(PG_FUNCTION_ARGS);
extern Datum chunk_unfreeze_chunk(PG_FUNCTION_ARGS);
extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type);
extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS);
extern void chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes);

#endif /* TIMESCALEDB_TSL_CHUNK_H */
8 changes: 8 additions & 0 deletions tsl/src/chunk_api.c
Expand Up @@ -474,6 +474,14 @@ chunk_api_create_on_data_nodes(const Chunk *chunk, const Hypertable *ht,
ListCell *lc;
TupleDesc tupdesc;
AttInMetadata *attinmeta;

/*
* In case of "unavailable" datanodes, the chunk->data_nodes list is already pruned
* and doesn't contain "unavailable" datanodes. So this chunk creation will never
* happen on such "unavailable" datanodes. By the same logic, metadata update on the
* AN for the chunk->datanode mappings will only happen for the listed "live" DNs
* and not for the "unavailable" ones
*/
List *target_data_nodes = data_nodes ? data_nodes : chunk->data_nodes;

get_create_chunk_result_type(&tupdesc);
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/data_node.c
Expand Up @@ -1902,6 +1902,8 @@ data_node_delete(PG_FUNCTION_ARGS)

/*
* Get server list, performing an ACL check on each of them in the process.
*
* If "available" is true then only return a set of available data nodes
*/
List *
data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck)
Expand Down
50 changes: 50 additions & 0 deletions tsl/src/fdw/modify_exec.c
Expand Up @@ -26,6 +26,8 @@

#include "scan_plan.h"
#include "modify_exec.h"
#include "modify_plan.h"
#include "tsl/src/chunk.h"

/*
* This enum describes what's kept in the fdw_private list for a ModifyTable
Expand Down Expand Up @@ -393,6 +395,46 @@ response_type(AttConvInMetadata *att_conv_metadata)
return att_conv_metadata == NULL || att_conv_metadata->binary ? FORMAT_BINARY : FORMAT_TEXT;
}

static void
fdw_chunk_update_stale_metadata(TsFdwModifyState *fmstate)
{
List *all_data_nodes;
Relation rel = fmstate->rel;
int32 chunk_id = ts_chunk_get_id_by_relid(rel->rd_id);
Assert(chunk_id != 0);

all_data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk_id, CurrentMemoryContext);
Assert(list_length(all_data_nodes) >= fmstate->num_data_nodes);

if (list_length(all_data_nodes) > fmstate->num_data_nodes)
{
Chunk *chunk = ts_chunk_get_by_relid(rel->rd_id, true);
/* get filtered list */
List *serveroids = get_chunk_data_nodes(rel->rd_id);
ListCell *lc;
Assert(list_length(serveroids) == fmstate->num_data_nodes);

foreach (lc, all_data_nodes)
{
ChunkDataNode *cdn = lfirst(lc);
/*
* check if this DN is a part of serveroids. If not
* found in serveroids, then we need to remove this
* chunk id to node name mapping and also update the primary
* foreign server if necessary. It's possible that this metadata
* might have been already cleared earlier but we have no way of
* knowing that here.
*/
if (!list_member_oid(serveroids, cdn->foreign_server_oid))
{
chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
NameStr(cdn->fd.node_name));
}
}
}
}

TupleTableSlot *
fdw_exec_foreign_insert(TsFdwModifyState *fmstate, EState *estate, TupleTableSlot *slot,
TupleTableSlot *planslot)
Expand Down Expand Up @@ -544,6 +586,14 @@ fdw_exec_foreign_update_or_delete(TsFdwModifyState *fmstate, EState *estate, Tup
pfree(reqset);
stmt_params_reset(params);

/*
* If rows are affected on DNs and a DN was excluded because of being
* "unavailable" then we need to update metadata on the AN to mark
* this chunk as "stale" for that "unavailable" DN
*/
if (n_rows > 0)
fdw_chunk_update_stale_metadata(fmstate);

/* Return NULL if nothing was updated on the remote end */
return (n_rows > 0) ? slot : NULL;
}
Expand Down

0 comments on commit e38514c

Please sign in to comment.