Skip to content

Commit

Permalink
pgbench: Prepare commands in pipelines in advance
Browse files Browse the repository at this point in the history
Failing to do so results in an error when a pgbench script tries to
start a serializable transaction inside a pipeline, because by the time
BEGIN ISOLATION LEVEL SERIALIZABLE is executed, we're already in a
transaction that has acquired a snapshot, so the server rightfully
complains.

We can work around that by preparing all commands in the pipeline before
actually starting the pipeline.  This changes the existing code in two
aspects: first, we now prepare each command individually at the point
where that command is about to be executed; previously, we would prepare
all commands in a script as soon as the first command of that script
would be executed.  It's hard to see that this would make much of a
difference (particularly since it only affects the first time to execute
each script in a client), but I didn't actually try to measure it.

Secondly, we no longer use PQsendPrepare() in pipeline mode, but only
PQprepare.  There's no specific reason for this change other than no
longer needing to do differently in pipeline mode.  (Previously we had
no choice, because in pipeline mode PQprepare could not be used.)

Backpatch to 14, where pgbench got support for pipeline mode.

Reported-by: Yugo NAGATA <nagata@sraoss.co.jp>
Discussion: https://postgr.es/m/20210716153013.fc53b1c780b06fccc07a7f0d@sraoss.co.jp
  • Loading branch information
alvherre committed Feb 21, 2023
1 parent 8028e29 commit 038f586
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 51 deletions.
158 changes: 107 additions & 51 deletions src/bin/pgbench/pgbench.c
Expand Up @@ -628,7 +628,8 @@ typedef struct
pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
pg_time_usec_t stmt_begin; /* used for measuring statement latencies */

bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* whether client prepared each command of each script */
bool **prepared;

/*
* For processing failures and repeating transactions with serialization
Expand Down Expand Up @@ -733,7 +734,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
* argv Command arguments, the first of which is the command or SQL
* string itself. For SQL commands, after post-processing
* argv[0] is the same as 'lines' with variables substituted.
* varprefix SQL commands terminated with \gset or \aset have this set
* prepname The name that this command is prepared under, in prepare mode
* varprefix SQL commands terminated with \gset or \aset have this set
* to a non NULL value. If nonempty, it's used to prefix the
* variable name that receives the value.
* aset do gset on all possible queries of a combined query (\;).
Expand All @@ -751,6 +753,7 @@ typedef struct Command
MetaCommand meta;
int argc;
char *argv[MAX_ARGS];
char *prepname;
char *varprefix;
PgBenchExpr *expr;
SimpleStats stats;
Expand Down Expand Up @@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
return true;
}

#define MAX_PREPARE_NAME 32
static void
preparedStatementName(char *buffer, int file, int state)
{
sprintf(buffer, "P%d_%d", file, state);
}

/*
* Report the abortion of the client when processing SQL commands.
*/
Expand Down Expand Up @@ -3053,6 +3049,87 @@ chooseScript(TState *thread)
return i - 1;
}

/*
* Prepare the SQL command from st->use_file at command_num.
*/
static void
prepareCommand(CState *st, int command_num)
{
Command *command = sql_script[st->use_file].commands[command_num];

/* No prepare for non-SQL commands */
if (command->type != SQL_COMMAND)
return;

/*
* If not already done, allocate space for 'prepared' flags: one boolean
* for each command of each script.
*/
if (!st->prepared)
{
st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
for (int i = 0; i < num_scripts; i++)
{
ParsedScript *script = &sql_script[i];
int numcmds;

for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
;
st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
}
}

if (!st->prepared[st->use_file][command_num])
{
PGresult *res;

pg_log_debug("client %d preparing %s", st->id, command->prepname);
res = PQprepare(st->con, command->prepname,
command->argv[0], command->argc - 1, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_log_error("%s", PQerrorMessage(st->con));
PQclear(res);
st->prepared[st->use_file][command_num] = true;
}
}

/*
* Prepare all the commands in the script that come after the \startpipeline
* that's at position st->command, and the first \endpipeline we find.
*
* This sets the ->prepared flag for each relevant command as well as the
* \startpipeline itself, but doesn't move the st->command counter.
*/
static void
prepareCommandsInPipeline(CState *st)
{
int j;
Command **commands = sql_script[st->use_file].commands;

Assert(commands[st->command]->type == META_COMMAND &&
commands[st->command]->meta == META_STARTPIPELINE);

/*
* We set the 'prepared' flag on the \startpipeline itself to flag that we
* don't need to do this next time without calling prepareCommand(), even
* though we don't actually prepare this command.
*/
if (st->prepared &&
st->prepared[st->use_file][st->command])
return;

for (j = st->command + 1; commands[j] != NULL; j++)
{
if (commands[j]->type == META_COMMAND &&
commands[j]->meta == META_ENDPIPELINE)
break;

prepareCommand(st, j);
}

st->prepared[st->use_file][st->command] = true;
}

/* Send a SQL command, using the chosen querymode */
static bool
sendCommand(CState *st, Command *command)
Expand Down Expand Up @@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command)
}
else if (querymode == QUERY_PREPARED)
{
char name[MAX_PREPARE_NAME];
const char *params[MAX_ARGS];

if (!st->prepared[st->use_file])
{
int j;
Command **commands = sql_script[st->use_file].commands;

for (j = 0; commands[j] != NULL; j++)
{
PGresult *res;

if (commands[j]->type != SQL_COMMAND)
continue;
preparedStatementName(name, st->use_file, j);
if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
{
res = PQprepare(st->con, name,
commands[j]->argv[0], commands[j]->argc - 1, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_log_error("%s", PQerrorMessage(st->con));
PQclear(res);
}
else
{
/*
* In pipeline mode, we use asynchronous functions. If a
* server-side error occurs, it will be processed later
* among the other results.
*/
if (!PQsendPrepare(st->con, name,
commands[j]->argv[0], commands[j]->argc - 1, NULL))
pg_log_error("%s", PQerrorMessage(st->con));
}
}
st->prepared[st->use_file] = true;
}

prepareCommand(st, st->command);
getQueryParams(&st->variables, command, params);
preparedStatementName(name, st->use_file, st->command);

pg_log_debug("client %d sending %s", st->id, name);
r = PQsendQueryPrepared(st->con, name, command->argc - 1,
pg_log_debug("client %d sending %s", st->id, command->prepname);
r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
params, NULL, NULL, 0);
}
else /* unknown sql mode */
Expand Down Expand Up @@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
thread->conn_duration += now - start;

/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
pg_free(st->prepared);
st->prepared = NULL;
}

/*
Expand Down Expand Up @@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_ABORTED;
}

/*
* If we're in prepared-query mode, we need to prepare all the
* commands that are inside the pipeline before we actually start the
* pipeline itself. This solves the problem that running BEGIN
* ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
* snapshot having been acquired by the prepare within the pipeline.
*/
if (querymode == QUERY_PREPARED)
prepareCommandsInPipeline(st);

if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
{
commandFailed(st, "startpipeline", "already in pipeline mode");
Expand Down Expand Up @@ -5439,6 +5491,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
my_command->varprefix = NULL; /* allocated later, if needed */
my_command->expr = NULL;
initSimpleStats(&my_command->stats);
my_command->prepname = NULL; /* set later, if needed */

return my_command;
}
Expand Down Expand Up @@ -5468,6 +5521,7 @@ static void
postprocess_sql_command(Command *my_command)
{
char buffer[128];
static int prepnum = 0;

Assert(my_command->type == SQL_COMMAND);

Expand All @@ -5476,15 +5530,17 @@ postprocess_sql_command(Command *my_command)
buffer[strcspn(buffer, "\n\r")] = '\0';
my_command->first_line = pg_strdup(buffer);

/* parse query if necessary */
/* Parse query and generate prepared statement name, if necessary */
switch (querymode)
{
case QUERY_SIMPLE:
my_command->argv[0] = my_command->lines.data;
my_command->argc++;
break;
case QUERY_EXTENDED:
case QUERY_PREPARED:
my_command->prepname = psprintf("P_%d", prepnum++);
/* fall through */
case QUERY_EXTENDED:
if (!parseQuery(my_command))
exit(1);
break;
Expand Down
20 changes: 20 additions & 0 deletions src/bin/pgbench/t/001_pgbench_with_server.pl
Expand Up @@ -839,6 +839,26 @@
}
});

# Working \startpipeline in prepared query mode with serializable
$node->pgbench(
'-c4 -j2 -t 10 -n -M prepared',
0,
[
qr{type: .*/001_pgbench_pipeline_serializable},
qr{actually processed: (\d+)/\1}
],
[],
'working \startpipeline with serializable',
{
'001_pgbench_pipeline_serializable' => q{
-- test startpipeline with serializable
\startpipeline
BEGIN ISOLATION LEVEL SERIALIZABLE;
} . "select 1;\n" x 10 . q{
END;
\endpipeline
}
});

# trigger many expression errors
my @errors = (
Expand Down

0 comments on commit 038f586

Please sign in to comment.