Skip to content

Commit

Permalink
Support multiple async requests per connection
Browse files Browse the repository at this point in the history
The idea here is to allow multiple async requests to be
created for the same connection. Since connection can process only
one request at the time only that means that one request can be
running and the rest needs to be deferred. The deferred async request
will run on get response if the connection is not in use by running
async request.
This support should pave the way for async creation of cursors.
  • Loading branch information
niksa authored and erimatnor committed May 27, 2020
1 parent 96727fa commit 86858e3
Show file tree
Hide file tree
Showing 28 changed files with 625 additions and 446 deletions.
8 changes: 6 additions & 2 deletions tsl/src/chunk_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "remote/async.h"
#include "remote/dist_txn.h"
#include "remote/stmt_params.h"
#include "chunk_api.h"

/*
Expand Down Expand Up @@ -394,10 +395,13 @@ chunk_api_create_on_servers(Chunk *chunk, Hypertable *ht)
{
ChunkServer *cs = lfirst(lc);
UserMapping *um = GetUserMapping(GetUserId(), cs->foreign_server_oid);
PGconn *conn = remote_dist_txn_get_connection(um, REMOTE_TXN_NO_PREP_STMT);
TSConnection *conn = remote_dist_txn_get_connection(um, REMOTE_TXN_NO_PREP_STMT);
AsyncRequest *req;

req = async_request_send_with_params(conn, CHUNK_CREATE_STMT, 4, params);
req = async_request_send_with_params(conn,
CHUNK_CREATE_STMT,
stmt_params_create_from_values(params, 4),
FORMAT_TEXT);

async_request_attach_user_data(req, cs);
async_request_set_add(reqset, req);
Expand Down
221 changes: 109 additions & 112 deletions tsl/src/fdw/timescaledb_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ typedef struct TsFdwScanState
List *retrieved_attrs; /* list of retrieved attribute numbers */

/* for remote query execution */
PGconn *conn; /* connection for the scan */
TSConnection *conn; /* connection for the scan */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int num_params; /* number of parameters passed to query */
Expand All @@ -166,7 +166,6 @@ typedef struct TsFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */

int fetch_size; /* number of tuples per fetch */

int row_counter;
} TsFdwScanState;

Expand All @@ -175,7 +174,7 @@ typedef struct TsFdwServerState
Oid serverid;
UserMapping *user;
/* for remote query execution */
PGconn *conn; /* connection for the scan */
TSConnection *conn; /* connection for the scan */
PreparedStmt *p_stmt; /* prepared statement handle, if created */
} TsFdwServerState;

Expand Down Expand Up @@ -215,8 +214,8 @@ static FdwRoutine timescaledb_fdw_routine;
* The given "sql" must be an EXPLAIN command.
*/
static void
get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost,
Cost *total_cost)
get_remote_estimate(const char *sql, TSConnection *conn, double *rows, int *width,
Cost *startup_cost, Cost *total_cost)
{
AsyncResponseResult *volatile rsp = NULL;

Expand Down Expand Up @@ -301,7 +300,7 @@ estimate_path_cost_size(PlannerInfo *root, RelOptInfo *foreignrel, List *param_j
List *remote_param_join_conds;
List *local_param_join_conds;
StringInfoData sql;
PGconn *conn;
TSConnection *conn;
Selectivity local_sel;
QualCost local_cost;
List *fdw_scan_tlist = NIL;
Expand Down Expand Up @@ -1558,6 +1557,107 @@ prepare_query_params(PlanState *node, List *fdw_exprs, int num_params, FmgrInfo
*param_values = (const char **) palloc0(num_params * sizeof(char *));
}

/*
* Fill an array with query parameter values in text format.
*/
static void
fill_query_params_array(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs,
const char **param_values)
{
int nestlevel;
int i;
ListCell *lc;

nestlevel = set_transmission_modes();

i = 0;
foreach (lc, param_exprs)
{
ExprState *expr_state = (ExprState *) lfirst(lc);
Datum expr_value;
bool is_null;

/* Evaluate the parameter expression */
expr_value = ExecEvalExpr(expr_state, econtext, &is_null);

/*
* Get string representation of each parameter value by invoking
* type-specific output function, unless the value is null.
*/
if (is_null)
param_values[i] = NULL;
else
param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);

i++;
}

reset_transmission_modes(nestlevel);
}

/*
* Create cursor for node's query with current parameter values.
*/
static void
create_cursor(ForeignScanState *node)
{
TsFdwScanState *fsstate = (TsFdwScanState *) node->fdw_state;
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int num_params = fsstate->num_params;
const char **values = fsstate->param_values;
TSConnection *conn = fsstate->conn;
StringInfoData buf;
AsyncRequest *req;

/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
* memory leak over repeated scans.
*/
if (num_params > 0)
{
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
fill_query_params_array(econtext, fsstate->param_flinfo, fsstate->param_exprs, values);
MemoryContextSwitchTo(oldcontext);
}

/* Assign a unique ID for my cursor */
fsstate->cursor_number = remote_connection_get_cursor_number();

/* Construct the DECLARE CURSOR command */
initStringInfo(&buf);
appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query);

/*
* Notice that we pass NULL for paramTypes, thus forcing the remote server
* to infer types for all parameters. Since we explicitly cast every
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
*/
req = async_request_send_with_params(conn,
buf.data,
stmt_params_create_from_values(values, num_params),
FORMAT_TEXT);

Assert(NULL != req);

async_request_wait_ok_command(req);
pfree(req);

/* Mark the cursor as created, and show no tuples have been retrieved */
fsstate->cursor_exists = true;
fsstate->tuples = NULL;
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
fsstate->eof_reached = false;

/* Clean up */
pfree(buf.data);
}

static void
begin_foreign_scan(ForeignScanState *node, int eflags)
{
Expand Down Expand Up @@ -1609,10 +1709,6 @@ begin_foreign_scan(ForeignScanState *node, int eflags)
REMOTE_TXN_USE_PREP_STMT :
REMOTE_TXN_NO_PREP_STMT);

/* Assign a unique ID for my cursor */
fsstate->cursor_number = remote_connection_get_cursor_number();
fsstate->cursor_exists = false;

/* Get private info created by planner functions. */
fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql));
fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, FdwScanPrivateRetrievedAttrs);
Expand Down Expand Up @@ -1655,103 +1751,8 @@ begin_foreign_scan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
}

/*
* Construct array of query parameter values in text format.
*/
static void
process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs,
const char **param_values)
{
int nestlevel;
int i;
ListCell *lc;

nestlevel = set_transmission_modes();

i = 0;
foreach (lc, param_exprs)
{
ExprState *expr_state = (ExprState *) lfirst(lc);
Datum expr_value;
bool is_null;

/* Evaluate the parameter expression */
expr_value = ExecEvalExpr(expr_state, econtext, &is_null);

/*
* Get string representation of each parameter value by invoking
* type-specific output function, unless the value is null.
*/
if (is_null)
param_values[i] = NULL;
else
param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);

i++;
}

reset_transmission_modes(nestlevel);
}

/*
* Create cursor for node's query with current parameter values.
*/
static void
create_cursor(ForeignScanState *node)
{
TsFdwScanState *fsstate = (TsFdwScanState *) node->fdw_state;
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int num_params = fsstate->num_params;
const char **values = fsstate->param_values;
PGconn *conn = fsstate->conn;
StringInfoData buf;
AsyncRequest *req;

/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
* memory leak over repeated scans.
*/
if (num_params > 0)
{
MemoryContext oldcontext;

oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

process_query_params(econtext, fsstate->param_flinfo, fsstate->param_exprs, values);

MemoryContextSwitchTo(oldcontext);
}

/* Construct the DECLARE CURSOR command */
initStringInfo(&buf);
appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query);

/*
* Notice that we pass NULL for paramTypes, thus forcing the remote server
* to infer types for all parameters. Since we explicitly cast every
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
*/
req = async_request_send_with_params(conn, buf.data, num_params, values);

Assert(NULL != req);

async_request_wait_ok_command(req);

/* Mark the cursor as created, and show no tuples have been retrieved */
fsstate->cursor_exists = true;
fsstate->tuples = NULL;
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
fsstate->eof_reached = false;

/* Clean up */
pfree(buf.data);
fsstate->cursor_exists = false;
}

/*
Expand All @@ -1776,7 +1777,7 @@ fetch_more_data(ForeignScanState *node)
/* PGresult must be released before leaving this function. */
PG_TRY();
{
PGconn *conn = fsstate->conn;
TSConnection *conn = fsstate->conn;
PGresult *res;
char sql[64];
int numrows;
Expand Down Expand Up @@ -1857,10 +1858,6 @@ iterate_foreign_scan(ForeignScanState *node)
TsFdwScanState *fsstate = (TsFdwScanState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;

/*
* If this is the first call after Begin or ReScan, we need to create the
* cursor on the remote side.
*/
if (!fsstate->cursor_exists)
create_cursor(node);

Expand Down Expand Up @@ -1942,7 +1939,7 @@ rescan_foreign_scan(ForeignScanState *node)
* Utility routine to close a cursor.
*/
static void
close_cursor(PGconn *conn, unsigned int cursor_number)
close_cursor(TSConnection *conn, unsigned int cursor_number)
{
char sql[64];
AsyncRequest *req;
Expand Down

0 comments on commit 86858e3

Please sign in to comment.