Skip to content

Commit

Permalink
Fix perf regression due to DML HA
Browse files Browse the repository at this point in the history
We added checks via timescale#4846 to handle DML HA when replication factor is
greater than 1 and a datanode is down. Since each insert can go to a
different chunk with a different set of datanodes, we added checks
on every insert to check if DNs are unavailable. This increased CPU
consumption on the AN leading to a performance regression for RF > 1
code paths.

This patch fixes this regression. We now track if any DN is marked as
unavailable at the start of the transaction and use that information to
reduce unnecessary checks for each inserted row.
  • Loading branch information
nikkhils authored and svenklemm committed Mar 6, 2023
1 parent 3f6e9a3 commit b9305ba
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 13 deletions.
13 changes: 9 additions & 4 deletions tsl/src/data_node.c
Expand Up @@ -1958,10 +1958,10 @@ data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck)
return nodes;
}

void
data_node_fail_if_nodes_are_unavailable(void)
bool
data_node_some_unavailable(void)
{
/* Get a list of data nodes and ensure all of them are available */
/* Get a list of data nodes and check if one is unavailable */
List *data_node_list = data_node_get_node_name_list_with_aclcheck(ACL_NO_CHECK, false);
ListCell *lc;

Expand All @@ -1972,8 +1972,13 @@ data_node_fail_if_nodes_are_unavailable(void)

server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false);
if (!ts_data_node_is_available_by_server(server))
ereport(ERROR, (errmsg("some data nodes are not available")));
{
list_free(data_node_list);
return true;
}
}
list_free(data_node_list);
return false;
}

/*
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/data_node.h
Expand Up @@ -35,7 +35,7 @@ extern List *data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_
extern List *data_node_get_filtered_node_name_list(ArrayType *nodearr, AclMode mode,
bool fail_on_aclcheck);
extern List *data_node_get_node_name_list(void);
extern void data_node_fail_if_nodes_are_unavailable(void);
extern bool data_node_some_unavailable(void);
extern List *data_node_array_to_node_name_list_with_aclcheck(ArrayType *nodearr, AclMode mode,
bool fail_on_aclcheck);
extern List *data_node_array_to_node_name_list(ArrayType *nodearr);
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/dist_backup.c
Expand Up @@ -117,7 +117,8 @@ create_distributed_restore_point(PG_FUNCTION_ARGS)
"from there.")));

/* Ensure all data nodes are available */
data_node_fail_if_nodes_are_unavailable();
if (data_node_some_unavailable())
ereport(ERROR, (errmsg("some data nodes are not available")));

/*
* In order to achieve synchronization across the multinode cluster,
Expand Down
47 changes: 40 additions & 7 deletions tsl/src/remote/dist_copy.c
Expand Up @@ -129,7 +129,8 @@ typedef struct RemoteCopyContext
List *attnums;
void *data_context; /* TextCopyContext or BinaryCopyContext */
bool binary_operation;
MemoryContext mctx; /* MemoryContext that holds the RemoteCopyContext */
MemoryContext mctx; /* MemoryContext that holds the RemoteCopyContext */
bool dns_unavailable; /* are some DNs marked as "unavailable"? */

/*
* Incoming rows are batched before creating the chunks and sending them to
Expand Down Expand Up @@ -1026,6 +1027,7 @@ remote_copy_begin(const CopyStmt *stmt, Hypertable *ht, ExprContext *per_tuple_c
context->connection_state.data_node_connections = NIL;
context->connection_state.using_binary = binary_copy;
context->connection_state.outgoing_copy_cmd = deparse_copy_cmd(stmt, ht, binary_copy);
context->dns_unavailable = data_node_some_unavailable();

context->batch_row_data = palloc0(sizeof(StringInfo) * MAX_BATCH_ROWS);
context->batch_points = palloc0(sizeof(Point *) * MAX_BATCH_ROWS);
Expand Down Expand Up @@ -1333,6 +1335,8 @@ remote_copy_process_and_send_data(RemoteCopyContext *context)
Hypertable *ht = context->ht;
const int n = context->batch_row_count;
Assert(n <= MAX_BATCH_ROWS);
int32 chunk_id = INVALID_CHUNK_ID;
List *chunk_data_nodes = NIL;

/*
* This list tracks the per-batch insert states of the data nodes
Expand Down Expand Up @@ -1366,12 +1370,35 @@ remote_copy_process_and_send_data(RemoteCopyContext *context)
else
found = true;

/* get the filtered list of "available" DNs for this chunk but only if it's replicated */
if (found && ht->fd.replication_factor > 1)
/*
* Get the filtered list of "available" DNs for this chunk but only if it's replicated. We
* only fetch the filtered list once. Assuming that inserts will typically go to the same
* chunk we should be able to reuse this filtered list a few more times
*
* The worse case scenario is one in which INSERT1 goes into CHUNK1, INSERT2 goes into
* CHUNK2, INSERT3 goes into CHUNK1,... in which case we will end up refreshing the list
* everytime
*
* We will also enter the below loop if we KNOW that any of the DNs has been marked
* unavailable before we started this transaction. If not, then we know that every chunk's
* datanode list is fine and no stale chunk metadata updates are needed.
*/
if (context->dns_unavailable && found && ht->fd.replication_factor > 1)
{
List *chunk_data_nodes =
ts_chunk_data_node_scan_by_chunk_id_filter(chunk->fd.id, CurrentMemoryContext);
if (chunk_id != chunk->fd.id)
chunk_id = INVALID_CHUNK_ID;

if (chunk_id == INVALID_CHUNK_ID)
{
if (chunk_data_nodes)
list_free(chunk_data_nodes);
chunk_data_nodes =
ts_chunk_data_node_scan_by_chunk_id_filter(chunk->fd.id, CurrentMemoryContext);
chunk_id = chunk->fd.id;
}

Assert(chunk_id == chunk->fd.id);
Assert(chunk_data_nodes != NIL);
/*
* If the chunk was not created as part of this insert, we need to check whether any
* of the chunk's data nodes are currently unavailable and in that case consider the
Expand All @@ -1386,8 +1413,6 @@ remote_copy_process_and_send_data(RemoteCopyContext *context)
*/
if (ht->fd.replication_factor > list_length(chunk_data_nodes))
ts_cm_functions->dist_update_stale_chunk_metadata(chunk, chunk_data_nodes);

list_free(chunk_data_nodes);
}

/*
Expand Down Expand Up @@ -1496,6 +1521,14 @@ remote_copy_process_and_send_data(RemoteCopyContext *context)
* attempted after finishing each protocol message (pqPutMsgEnd()).
*/
}

/* reset static vars before returning */
chunk_id = INVALID_CHUNK_ID;
if (chunk_data_nodes)
{
list_free(chunk_data_nodes);
chunk_data_nodes = NIL;
}
}

/*
Expand Down

0 comments on commit b9305ba

Please sign in to comment.