Skip to content

Commit

Permalink
Fix DML HA in multi-node
Browse files Browse the repository at this point in the history
If a datanode goes down for whatever reason then DML activity to
chunks residing on (or targeted to) that DN will start erroring out.
We now handle this by marking the target chunk as "stale" for this
DN by changing the metadata on the access node. This allows us to
continue to do DML to replicas of the same chunk data on other DNs
in the setup. This obviously will only work for chunks which have
"replication_factor" > 1. Note that for chunks which do not have
undergo any change will continue to carry the appropriate DN related
metadata on the AN.

This means that such "stale" chunks will become underreplicated and
need to be re-balanced by using the copy_chunk functionality by a micro
service or some such process.

Fixes timescale#4846
  • Loading branch information
nikkhils committed Nov 25, 2022
1 parent 26e3be1 commit 97543ec
Show file tree
Hide file tree
Showing 24 changed files with 1,167 additions and 58 deletions.
8 changes: 7 additions & 1 deletion src/chunk.c
Expand Up @@ -1475,7 +1475,7 @@ ts_chunk_find_for_point(const Hypertable *ht, const Point *p)
* Create a chunk through insertion of a tuple at a given point.
*/
Chunk *
ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema,
ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, const char *schema,
const char *prefix)
{
/*
Expand Down Expand Up @@ -1506,6 +1506,8 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche
* release the lock early.
*/
UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
if (found)
*found = true;
return chunk;
}

Expand All @@ -1517,11 +1519,15 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche
chunk = chunk_resurrect(ht, chunk_id);
if (chunk != NULL)
{
if (found)
*found = true;
return chunk;
}
}

/* Create the chunk normally. */
if (found)
*found = false;
if (hypertable_is_distributed_member(ht))
ereport(ERROR,
(errcode(ERRCODE_TS_INTERNAL_ERROR),
Expand Down
4 changes: 2 additions & 2 deletions src/chunk.h
Expand Up @@ -145,8 +145,8 @@ typedef struct DisplayKeyData

extern void ts_chunk_formdata_fill(FormData_chunk *fd, const TupleInfo *ti);
extern Chunk *ts_chunk_find_for_point(const Hypertable *ht, const Point *p);
extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema,
const char *prefix);
extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found,
const char *schema, const char *prefix);
List *ts_chunk_id_find_in_subspace(Hypertable *ht, List *dimension_vecs);

extern TSDLLEXPORT Chunk *ts_chunk_create_base(int32 id, int16 num_constraints, const char relkind);
Expand Down
8 changes: 8 additions & 0 deletions src/cross_module_fn.c
Expand Up @@ -387,6 +387,13 @@ update_compressed_chunk_relstats_default(Oid uncompressed_relid, Oid compressed_
error_no_default_fn_community();
}

static void
dist_update_stale_chunk_metadata_default(Chunk *new_chunk, List *chunk_data_nodes)
{
error_no_default_fn_community();
pg_unreachable();
}

TS_FUNCTION_INFO_V1(ts_tsl_loaded);

PGDLLEXPORT Datum
Expand Down Expand Up @@ -541,6 +548,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.dist_remote_chunk_info = error_no_default_fn_pg_community,
.dist_remote_compressed_chunk_info = error_no_default_fn_pg_community,
.dist_remote_hypertable_index_info = error_no_default_fn_pg_community,
.dist_update_stale_chunk_metadata = dist_update_stale_chunk_metadata_default,
.validate_as_data_node = error_no_default_fn_community,
.func_call_on_data_nodes = func_call_on_data_nodes_default,
.chunk_get_relstats = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -188,6 +188,7 @@ typedef struct CrossModuleFunctions
PGFunction dist_remote_chunk_info;
PGFunction dist_remote_compressed_chunk_info;
PGFunction dist_remote_hypertable_index_info;
void (*dist_update_stale_chunk_metadata)(Chunk *new_chunk, List *chunk_data_nodes);
void (*validate_as_data_node)(void);
void (*func_call_on_data_nodes)(FunctionCallInfo fcinfo, List *data_node_oids);
PGFunction distributed_exec;
Expand Down
3 changes: 2 additions & 1 deletion src/hypertable.c
Expand Up @@ -1067,12 +1067,13 @@ hypertable_chunk_store_add(const Hypertable *h, const Chunk *input_chunk)
* Create a chunk for the point, given that it does not exist yet.
*/
Chunk *
ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point)
ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point, bool *found)
{
Assert(ts_subspace_store_get(h->chunk_cache, point) == NULL);

Chunk *chunk = ts_chunk_create_for_point(h,
point,
found,
NameStr(h->fd.associated_schema_name),
NameStr(h->fd.associated_table_prefix));

Expand Down
2 changes: 1 addition & 1 deletion src/hypertable.h
Expand Up @@ -135,7 +135,7 @@ extern TSDLLEXPORT int32 ts_hypertable_relid_to_id(Oid relid);
extern TSDLLEXPORT Chunk *ts_hypertable_find_chunk_for_point(const Hypertable *h,
const Point *point);
extern TSDLLEXPORT Chunk *ts_hypertable_create_chunk_for_point(const Hypertable *h,
const Point *point);
const Point *point, bool *found);
extern Oid ts_hypertable_relid(RangeVar *rv);
extern TSDLLEXPORT bool ts_is_hypertable(Oid relid);
extern bool ts_hypertable_has_tablespace(const Hypertable *ht, Oid tspc_oid);
Expand Down
25 changes: 24 additions & 1 deletion src/nodes/chunk_dispatch.c
Expand Up @@ -14,9 +14,11 @@
#include "compat/compat.h"
#include "chunk_dispatch.h"
#include "chunk_insert_state.h"
#include "errors.h"
#include "subspace_store.h"
#include "dimension.h"
#include "guc.h"
#include "ts_catalog/chunk_data_node.h"

ChunkDispatch *
ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags)
Expand Down Expand Up @@ -144,10 +146,31 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* locking the hypertable. This serves as a fast path for the usual case
* where the chunk already exists.
*/
bool found;
Chunk *new_chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);
if (new_chunk == NULL)
{
new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point);
new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found);
}
else
found = true;

/* get the filtered list of "available" DNs for this chunk but only if it's replicated */
if (found && dispatch->hypertable->fd.replication_factor > 1)
{
List *chunk_data_nodes =
ts_chunk_data_node_scan_by_chunk_id_filter(new_chunk->fd.id, CurrentMemoryContext);

/*
* If the chunk was not created as part of this insert, we need to check whether any
* of the chunk's data nodes are currently unavailable and in that case consider the
* chunk stale on those data nodes. Do that by removing the AN's chunk-datanode
* mapping for the unavailable data nodes.
*/
if (dispatch->hypertable->fd.replication_factor > list_length(chunk_data_nodes))
ts_cm_functions->dist_update_stale_chunk_metadata(new_chunk, chunk_data_nodes);

list_free(chunk_data_nodes);
}

if (NULL == new_chunk)
Expand Down
50 changes: 50 additions & 0 deletions src/ts_catalog/chunk_data_node.c
Expand Up @@ -14,6 +14,9 @@
#include <access/xact.h>

#include "ts_catalog/chunk_data_node.h"
#include "cache.h"
#include "hypercube.h"
#include "hypertable_cache.h"
#include "scanner.h"
#include "chunk.h"

Expand Down Expand Up @@ -124,6 +127,37 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data)
return SCAN_CONTINUE;
}

/* return a filtered list of "available" ChunkDataNode entries */
static ScanTupleResult
chunk_data_node_tuple_found_filter(TupleInfo *ti, void *data)
{
List **nodes = data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple);
ForeignServer *server;

server = GetForeignServerByName(NameStr(form->node_name), false);

if (ts_data_node_is_available_by_server(server))
{
ChunkDataNode *chunk_data_node;
MemoryContext old;

old = MemoryContextSwitchTo(ti->mctx);
chunk_data_node = palloc(sizeof(ChunkDataNode));
memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node));
chunk_data_node->foreign_server_oid = server->serverid;
*nodes = lappend(*nodes, chunk_data_node);
MemoryContextSwitchTo(old);
}

if (should_free)
heap_freetuple(tuple);

return SCAN_CONTINUE;
}

static int
ts_chunk_data_node_scan_by_chunk_id_and_node_internal(int32 chunk_id, const char *node_name,
bool scan_by_remote_chunk_id,
Expand Down Expand Up @@ -210,6 +244,22 @@ ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx)
return chunk_data_nodes;
}

/* Returns a filtered List of available ChunkDataNode structs. */
List *
ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id, MemoryContext mctx)
{
List *chunk_data_nodes = NIL;

ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
NULL,
false,
chunk_data_node_tuple_found_filter,
&chunk_data_nodes,
AccessShareLock,
mctx);
return chunk_data_nodes;
}

static ChunkDataNode *
chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
bool scan_by_remote_chunk_id, MemoryContext mctx)
Expand Down
4 changes: 3 additions & 1 deletion src/ts_catalog/chunk_data_node.h
Expand Up @@ -7,6 +7,7 @@
#define TIMESCALEDB_CHUNK_DATA_NODE_H

#include "ts_catalog/catalog.h"
#include "chunk.h"
#include "export.h"
#include "scan_iterator.h"

Expand All @@ -17,6 +18,8 @@ typedef struct ChunkDataNode
} ChunkDataNode;

extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx);
extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id,
MemoryContext mctx);
extern TSDLLEXPORT ChunkDataNode *
ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
MemoryContext mctx);
Expand All @@ -37,5 +40,4 @@ extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanItera
int32 chunk_id);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it,
const char *node_name);

#endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */
69 changes: 69 additions & 0 deletions tsl/src/chunk.c
Expand Up @@ -765,3 +765,72 @@ chunk_drop_stale_chunks(PG_FUNCTION_ARGS)
ts_chunk_drop_stale_chunks(node_name, chunks_array);
PG_RETURN_VOID();
}
/*
* Update and refresh the DN list for a given chunk. We remove metadata for this chunk
* for unavailable DNs
*/
void
chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes)
{
List *serveroids = NIL, *removeoids = NIL;
ChunkDataNode *cdn;
ListCell *lc;

/* check that at least one data node is available for this chunk on the AN */
if (chunk_data_nodes == NIL)
ereport(ERROR,
(errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
(errmsg("insufficient number of available data nodes"),
errhint("Increase the number of available data nodes on hypertable "
"\"%s\".",
get_rel_name(new_chunk->hypertable_relid)))));

foreach (lc, chunk_data_nodes)
{
cdn = lfirst(lc);
serveroids = lappend_oid(serveroids, cdn->foreign_server_oid);
}

foreach (lc, new_chunk->data_nodes)
{
cdn = lfirst(lc);

/*
* check if this DN is a part of chunk_data_nodes. If not
* found in chunk_data_nodes, then we need to remove this
* chunk id to node name mapping and also update the primary
* foreign server if necessary. It's possible that this metadata
* might have been already cleared earlier in which case the
* data_nodes list for the chunk will be the same as the
* "serveroids" list and no unnecesary metadata update function
* calls will occur.
*/
if (!list_member_oid(serveroids, cdn->foreign_server_oid))
{
chunk_update_foreign_server_if_needed(new_chunk, cdn->foreign_server_oid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
NameStr(cdn->fd.node_name));

removeoids = lappend_oid(removeoids, cdn->foreign_server_oid);
}
}

/* remove entries from new_chunk->data_nodes matching removeoids */
foreach (lc, removeoids)
{
ListCell *l;
Oid serveroid = lfirst_oid(lc);

/* this contrived code to ensure PG12+ compatible in-place list delete */
foreach (l, new_chunk->data_nodes)
{
cdn = lfirst(l);

if (cdn->foreign_server_oid == serveroid)
{
new_chunk->data_nodes = list_delete_ptr(new_chunk->data_nodes, cdn);
break;
}
}
}
}
1 change: 1 addition & 0 deletions tsl/src/chunk.h
Expand Up @@ -20,5 +20,6 @@ extern Datum chunk_drop_stale_chunks(PG_FUNCTION_ARGS);
extern void ts_chunk_drop_stale_chunks(const char *node_name, ArrayType *chunks_array);
extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type);
extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS);
extern void chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes);

#endif /* TIMESCALEDB_TSL_CHUNK_H */
8 changes: 8 additions & 0 deletions tsl/src/chunk_api.c
Expand Up @@ -474,6 +474,14 @@ chunk_api_create_on_data_nodes(const Chunk *chunk, const Hypertable *ht,
ListCell *lc;
TupleDesc tupdesc;
AttInMetadata *attinmeta;

/*
* In case of "unavailable" datanodes, the chunk->data_nodes list is already pruned
* and doesn't contain "unavailable" datanodes. So this chunk creation will never
* happen on such "unavailable" datanodes. By the same logic, metadata update on the
* AN for the chunk->datanode mappings will only happen for the listed "live" DNs
* and not for the "unavailable" ones
*/
List *target_data_nodes = data_nodes ? data_nodes : chunk->data_nodes;

get_create_chunk_result_type(&tupdesc);
Expand Down

0 comments on commit 97543ec

Please sign in to comment.