Skip to content

Commit

Permalink
Make data node calls non-blocking and interruptable
Browse files Browse the repository at this point in the history
Refactor the use of libpq calls to data nodes so that they honor
PostgreSQL interrupt handling (e.g., ctrl-c or `statement_timeout`)
and don't block unnecessarily.

To implement this behavior, data node connections are made
non-blocking by default and all `libpq` functions are wrapped to
integrate with PostgreSQL's signal handling (via `WaitEventSets`) when
waiting for read or write readiness.

A change is also made to the life-cycle management of `libpq` objects,
including connections, and remote query results. Instead of tying
these to transactions, they are now tied to the life-cycle of memory
contexts using a callback. In most cases, the memory context a
connection is allocated on has the same lifetime as transactions, but
not always. For example, the connection cache lives across connections
and is using a longer lived memory context. Previously that was
handled as a special case where connections were marked to not auto
close on transaction end.

Closes #4958
Closes #2757
  • Loading branch information
erimatnor committed Jan 18, 2023
1 parent 6aa3d6e commit 052f0d4
Show file tree
Hide file tree
Showing 19 changed files with 975 additions and 807 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/windows-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
ignores: ["chunk_adaptive metadata"]
tsl_ignores: ["compression_algos remote_connection"]
tsl_skips: ["bgw_db_scheduler bgw_db_scheduler_fixed cagg_ddl_dist_ht data_fetcher dist_compression dist_move_chunk dist_remote_error remote_txn"]
pg_config: ["-cfsync=off -cstatement_timeout=60s"]
pg_config: ["-cfsync=off"]
include:
- pg: 12
pkg_version: ${{ fromJson(needs.config.outputs.pg12_latest) }}.1
Expand Down
4 changes: 1 addition & 3 deletions src/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ extern TSDLLEXPORT void ts_cache_init(Cache *cache);
extern TSDLLEXPORT void ts_cache_invalidate(Cache *cache);
extern TSDLLEXPORT void *ts_cache_fetch(Cache *cache, CacheQuery *query);
extern TSDLLEXPORT bool ts_cache_remove(Cache *cache, void *key);

extern MemoryContext ts_cache_memory_ctx(Cache *cache);

extern TSDLLEXPORT MemoryContext ts_cache_memory_ctx(Cache *cache);
extern TSDLLEXPORT Cache *ts_cache_pin(Cache *cache);
extern TSDLLEXPORT int ts_cache_release(Cache *cache);

Expand Down
28 changes: 18 additions & 10 deletions tsl/src/data_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,28 @@ data_node_bootstrap_extension(TSConnection *conn)
quote_literal_cstr(EXTENSION_NAME));

if (PQresultStatus(res) != PGRES_TUPLES_OK)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
remote_result_elog(res, ERROR);

if (PQntuples(res) == 0)
{
remote_result_close(res);

if (schema_oid != PG_PUBLIC_NAMESPACE)
{
PGresult *res = remote_connection_execf(conn,
"CREATE SCHEMA %s AUTHORIZATION %s",
schema_name_quoted,
quote_identifier(username));
res = remote_connection_execf(conn,
"CREATE SCHEMA %s AUTHORIZATION %s",
schema_name_quoted,
quote_identifier(username));

if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
const char *const sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
bool schema_exists =
(sqlstate && strcmp(sqlstate, ERRCODE_DUPLICATE_SCHEMA_STR) == 0);
if (!schema_exists)
remote_result_elog(res, ERROR);

remote_result_close(res);
/* If the schema already existed on the remote node, we got a
* duplicate schema error and the schema was not created. In
* that case, we log an error with a hint on how to fix the
Expand All @@ -538,6 +542,8 @@ data_node_bootstrap_extension(TSConnection *conn)
errhint("Make sure that the data node does not contain any "
"existing objects prior to adding it.")));
}

remote_result_close(res);
}

remote_connection_cmdf_ok(conn,
Expand All @@ -556,6 +562,7 @@ data_node_bootstrap_extension(TSConnection *conn)
PQhost(remote_connection_get_pg_conn(conn)),
PQport(remote_connection_get_pg_conn(conn)),
PQgetvalue(res, 0, 1))));
remote_result_close(res);
data_node_validate_extension(conn);
return false;
}
Expand Down Expand Up @@ -592,7 +599,7 @@ connect_for_bootstrapping(const char *node_name, const char *const host, int32 p
{
List *node_options =
create_data_node_options(host, port, bootstrap_databases[i], username, password);
conn = remote_connection_open_with_options_nothrow(node_name, node_options, &err);
conn = remote_connection_open(node_name, node_options, &err);

if (conn)
return conn;
Expand Down Expand Up @@ -635,7 +642,8 @@ data_node_validate_extension_availability(TSConnection *conn)

if (PQresultStatus(res) != PGRES_TUPLES_OK)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("failed to validate remote extension: %s", PQresultErrorMessage(res))));

if (PQntuples(res) == 0)
ereport(ERROR,
Expand Down Expand Up @@ -788,7 +796,7 @@ data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid)
* necessary. Instead using a more straightforward approach here since
* we do not need 2PC support. */
node_options = create_data_node_options(host, port, dbname, username, password);
conn = remote_connection_open_with_options(node_name, node_options, false);
conn = remote_connection_open_session(node_name, node_options, false);
Assert(NULL != conn);
remote_connection_cmd_ok(conn, "BEGIN");

Expand Down Expand Up @@ -1769,7 +1777,7 @@ drop_data_node_database(const ForeignServer *server)
server = data_node_get_foreign_server(nodename, ACL_USAGE, true, false);
/* Open a connection to the bootstrap database using the new server options */
conn_options = remote_connection_prepare_auth_options(server, userid);
conn = remote_connection_open_with_options_nothrow(nodename, conn_options, &err);
conn = remote_connection_open(nodename, conn_options, &err);

if (NULL != conn)
break;
Expand Down
57 changes: 24 additions & 33 deletions tsl/src/remote/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ async_request_set_state(AsyncRequest *req, AsyncRequestState new_state)
static AsyncRequest *
async_request_send_internal(AsyncRequest *req, int elevel)
{
int ret = 0;

if (req->state != DEFERRED)
elog(elevel, "can't send async request in state \"%d\"", req->state);

Expand All @@ -170,39 +172,30 @@ async_request_send_internal(AsyncRequest *req, int elevel)
* the prepared statements we use in this module are simple enough that
* the data node will make the right choices.
*/
if (0 == PQsendPrepare(remote_connection_get_pg_conn(req->conn),
req->stmt_name,
req->sql,
req->prep_stmt_params,
NULL))
{
/*
* null is fine to pass down as the res, the connection error message
* will get through
*/
remote_connection_elog(req->conn, elevel);
return NULL;
}
ret = PQsendPrepare(remote_connection_get_pg_conn(req->conn),
req->stmt_name,
req->sql,
req->prep_stmt_params,
NULL);
}
else
{
if (0 == PQsendQueryParams(remote_connection_get_pg_conn(req->conn),
req->sql,
stmt_params_total_values(req->params),
/* param types - see note above */ NULL,
stmt_params_values(req->params),
stmt_params_lengths(req->params),
stmt_params_formats(req->params),
req->res_format))
{
/*
* null is fine to pass down as the res, the connection error message
* will get through
*/
remote_connection_elog(req->conn, elevel);
return NULL;
}
ret = PQsendQueryParams(remote_connection_get_pg_conn(req->conn),
req->sql,
stmt_params_total_values(req->params),
/* param types - see note above */ NULL,
stmt_params_values(req->params),
stmt_params_lengths(req->params),
stmt_params_formats(req->params),
req->res_format);
}

if (ret == 0 || !remote_connection_flush(req->conn, NULL))
{
remote_connection_elog(req->conn, elevel);
return NULL;
}

async_request_set_state(req, EXECUTING);
remote_connection_set_status(req->conn, CONN_PROCESSING);
return req;
Expand Down Expand Up @@ -684,7 +677,6 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
ListCell *lc;
int rc;
WaitEvent event;
uint32 wait_event_info = PG_WAIT_EXTENSION;
AsyncRequest *wait_req;
AsyncResponse *result;
long timeout_ms = -1L;
Expand Down Expand Up @@ -723,16 +715,14 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
while (true)
{
wait_req = NULL;
rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, wait_event_info);
rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, PG_WAIT_EXTENSION);

if (rc == 0)
{
result = async_response_timeout_create();
break;
}

CHECK_FOR_INTERRUPTS();

if (event.events & ~(WL_SOCKET_READABLE | WL_LATCH_SET))
{
/*
Expand All @@ -748,6 +738,7 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
if (event.events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}

if (event.events & WL_SOCKET_READABLE)
Expand Down

0 comments on commit 052f0d4

Please sign in to comment.