Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch rows on access node for distributed COPY #4476

Merged
merged 1 commit into from Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/guc.c
Expand Up @@ -62,6 +62,13 @@ static const struct config_enum_entry hypertable_distributed_types[] = {
{ NULL, 0, false }
};

static const struct config_enum_entry dist_copy_transfer_formats[] = {
{ "auto", DCTF_Auto, false },
{ "binary", DCTF_Binary, false },
{ "text", DCTF_Text, false },
{ NULL, 0, false }
};

bool ts_guc_enable_optimizations = true;
bool ts_guc_restoring = false;
bool ts_guc_enable_constraint_aware_append = true;
Expand Down Expand Up @@ -90,6 +97,7 @@ char *ts_last_tune_version = NULL;
TSDLLEXPORT bool ts_guc_enable_2pc;
TSDLLEXPORT int ts_guc_max_insert_batch_size = 1000;
TSDLLEXPORT bool ts_guc_enable_connection_binary_data;
TSDLLEXPORT DistCopyTransferFormat ts_guc_dist_copy_transfer_format;
TSDLLEXPORT bool ts_guc_enable_client_ddl_on_data_nodes = false;
TSDLLEXPORT char *ts_guc_ssl_dir = NULL;
TSDLLEXPORT char *ts_guc_passfile = NULL;
Expand Down Expand Up @@ -309,6 +317,25 @@ _guc_init(void)
NULL,
NULL);

/*
* The default is 'auto', so that the dist COPY could use text transfer
* format for text input. It has a passthrough optimization for this case,
* which greatly reduces the CPU usage. Ideally we would implement the same
* optimization for binary, but the Postgres COPY code doesn't provide
* enough APIs for that.
*/
DefineCustomEnumVariable("timescaledb.dist_copy_transfer_format",
"Data format used by distributed COPY to send data to data nodes",
"auto, binary or text",
(int *) &ts_guc_dist_copy_transfer_format,
DCTF_Auto,
dist_copy_transfer_formats,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.enable_client_ddl_on_data_nodes",
"Enable DDL operations on data nodes by a client",
"Do not restrict execution of DDL operations only by access node",
Expand Down
9 changes: 9 additions & 0 deletions src/guc.h
Expand Up @@ -66,6 +66,15 @@ typedef enum HypertableDistType
extern TSDLLEXPORT HypertableDistType ts_guc_hypertable_distributed_default;
extern TSDLLEXPORT int ts_guc_hypertable_replication_factor_default;

typedef enum DistCopyTransferFormat
{
DCTF_Auto,
DCTF_Binary,
DCTF_Text
} DistCopyTransferFormat;

extern TSDLLEXPORT DistCopyTransferFormat ts_guc_dist_copy_transfer_format;

#ifdef TS_DEBUG
extern bool ts_shutdown_bgw;
extern char *ts_current_timestamp_mock;
Expand Down
30 changes: 23 additions & 7 deletions test/src/test_utils.c
Expand Up @@ -16,6 +16,7 @@
#include <storage/procarray.h>
#include <utils/builtins.h>
#include <utils/elog.h>
#include <utils/guc.h>
#include <utils/memutils.h>

#include "debug_point.h"
Expand Down Expand Up @@ -125,19 +126,34 @@ ts_debug_shippable_fatal_after_n_rows(PG_FUNCTION_ARGS)

/*
* Broken send/receive functions for int4 that throw after an (arbitrarily
* chosen prime) number of rows.
* Use ERROR, not FATAL, because PG versions < 14 are unable to report a FATAL
* error to the access node before closing the connection, so the test results
* would be different.
* chosen prime or configured) number of rows.
*/
#define ARBITRARY_PRIME_NUMBER 7103
static void
broken_sendrecv_throw()
{
int throw_after = 7103; /* an arbitrary prime */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just a thought, that could also use random number generator with seed to have a broader coverage and deterministic output

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, could be nice, although we don't have a way to set random seed for the entire test run now. And we'll have to mask the particular numbers from the output somehow.

const char *throw_after_option =
GetConfigOption("timescaledb.debug_broken_sendrecv_throw_after", true, false);

if (throw_after_option)
{
throw_after = pg_atoi(throw_after_option, 4, 0);
}

/*
* Use ERROR, not FATAL, because PG versions < 14 are unable to report a
* FATAL error to the access node before closing the connection, so the test
* results would be different.
*/
(void) throw_after_n_rows(throw_after, ERROR);
}

TS_FUNCTION_INFO_V1(ts_debug_broken_int4recv);

Datum
ts_debug_broken_int4recv(PG_FUNCTION_ARGS)
{
(void) throw_after_n_rows(ARBITRARY_PRIME_NUMBER, ERROR);
broken_sendrecv_throw();
return int4recv(fcinfo);
}

Expand All @@ -146,7 +162,7 @@ TS_FUNCTION_INFO_V1(ts_debug_broken_int4send);
Datum
ts_debug_broken_int4send(PG_FUNCTION_ARGS)
{
(void) throw_after_n_rows(ARBITRARY_PRIME_NUMBER, ERROR);
broken_sendrecv_throw();
return int4send(fcinfo);
}

Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/data_node_copy.c
Expand Up @@ -288,7 +288,7 @@ data_node_copy_end(CustomScanState *node)
DataNodeCopyState *dncs = (DataNodeCopyState *) node;

ExecEndNode(linitial(node->custom_ps));
remote_copy_end(dncs->copy_ctx);
remote_copy_end_on_success(dncs->copy_ctx);
ts_cache_release(dncs->hcache);
}

Expand Down
51 changes: 39 additions & 12 deletions tsl/src/remote/connection.c
Expand Up @@ -847,6 +847,9 @@ remote_connection_set_status(TSConnection *conn, TSConnectionStatus status)
{
Assert(conn != NULL);
conn->status = status;

/* Should be blocking except when doing COPY. */
Assert(PQisnonblocking(conn->pg_conn) == (conn->status == CONN_COPY_IN));
}

TSConnectionStatus
Expand Down Expand Up @@ -2162,6 +2165,26 @@ remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binar
"connection not IDLE when beginning COPY",
conn);

#ifndef NDEBUG
/* Set some variables for testing. */
const char *throw_after_option =
GetConfigOption("timescaledb.debug_broken_sendrecv_throw_after", true, false);
if (throw_after_option)
{
res = PQexec(pg_conn,
psprintf("set timescaledb.debug_broken_sendrecv_throw_after = '%s';",
throw_after_option));
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
remote_connection_get_result_error(res, err);
PQclear(res);
return false;
}
PQclear(res);
}
#endif

/* Run the COPY query. */
res = PQexec(pg_conn, copycmd);

if (PQresultStatus(res) != PGRES_COPY_IN)
Expand Down Expand Up @@ -2237,8 +2260,7 @@ send_end_binary_copy_data(const TSConnection *conn, TSConnectionError *err)
bool
remote_connection_end_copy(TSConnection *conn, TSConnectionError *err)
{
PGresult *res;
bool success;
PGresult *res = NULL;

/*
* In any case, try to switch the connection into the blocking mode, because
Expand Down Expand Up @@ -2331,21 +2353,26 @@ remote_connection_end_copy(TSConnection *conn, TSConnectionError *err)
if (res == NULL || PQresultStatus(res) != PGRES_COPY_IN)
{
remote_connection_set_status(conn, res == NULL ? CONN_IDLE : CONN_PROCESSING);
elog(ERROR, "connection marked as CONN_COPY_IN, but no COPY is in progress");
}

if (conn->binary_copy && !send_end_binary_copy_data(conn, err))
return false;
/*
* Finish the COPY if needed.
*/
if (remote_connection_get_status(conn) == CONN_COPY_IN)
{
if (conn->binary_copy && !send_end_binary_copy_data(conn, err))
return false;

if (PQputCopyEnd(conn->pg_conn, NULL) != 1)
return fill_connection_error(err,
ERRCODE_CONNECTION_EXCEPTION,
"could not end remote COPY",
conn);
if (PQputCopyEnd(conn->pg_conn, NULL) != 1)
return fill_connection_error(err,
ERRCODE_CONNECTION_EXCEPTION,
"could not end remote COPY",
conn);

success = true;
remote_connection_set_status(conn, CONN_PROCESSING);
remote_connection_set_status(conn, CONN_PROCESSING);
}

bool success = true;
while ((res = PQgetResult(conn->pg_conn)))
{
ExecStatusType status = PQresultStatus(res);
Expand Down