diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 508ed218e8392..47b2c87f7f219 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -628,7 +628,8 @@ typedef struct pg_time_usec_t txn_begin; /* used for measuring schedule lag times */ pg_time_usec_t stmt_begin; /* used for measuring statement latencies */ - bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ + /* whether client prepared each command of each script */ + bool **prepared; /* * For processing failures and repeating transactions with serialization @@ -733,7 +734,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; * argv Command arguments, the first of which is the command or SQL * string itself. For SQL commands, after post-processing * argv[0] is the same as 'lines' with variables substituted. - * varprefix SQL commands terminated with \gset or \aset have this set + * prepname The name that this command is prepared under, in prepare mode + * varprefix SQL commands terminated with \gset or \aset have this set * to a non NULL value. If nonempty, it's used to prefix the * variable name that receives the value. * aset do gset on all possible queries of a combined query (\;). @@ -751,6 +753,7 @@ typedef struct Command MetaCommand meta; int argc; char *argv[MAX_ARGS]; + char *prepname; char *varprefix; PgBenchExpr *expr; SimpleStats stats; @@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc) return true; } -#define MAX_PREPARE_NAME 32 -static void -preparedStatementName(char *buffer, int file, int state) -{ - sprintf(buffer, "P%d_%d", file, state); -} - /* * Report the abortion of the client when processing SQL commands. */ @@ -3053,6 +3049,87 @@ chooseScript(TState *thread) return i - 1; } +/* + * Prepare the SQL command from st->use_file at command_num. + */ +static void +prepareCommand(CState *st, int command_num) +{ + Command *command = sql_script[st->use_file].commands[command_num]; + + /* No prepare for non-SQL commands */ + if (command->type != SQL_COMMAND) + return; + + /* + * If not already done, allocate space for 'prepared' flags: one boolean + * for each command of each script. + */ + if (!st->prepared) + { + st->prepared = pg_malloc(sizeof(bool *) * num_scripts); + for (int i = 0; i < num_scripts; i++) + { + ParsedScript *script = &sql_script[i]; + int numcmds; + + for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++) + ; + st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds); + } + } + + if (!st->prepared[st->use_file][command_num]) + { + PGresult *res; + + pg_log_debug("client %d preparing %s", st->id, command->prepname); + res = PQprepare(st->con, command->prepname, + command->argv[0], command->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_log_error("%s", PQerrorMessage(st->con)); + PQclear(res); + st->prepared[st->use_file][command_num] = true; + } +} + +/* + * Prepare all the commands in the script that come after the \startpipeline + * that's at position st->command, and the first \endpipeline we find. + * + * This sets the ->prepared flag for each relevant command as well as the + * \startpipeline itself, but doesn't move the st->command counter. + */ +static void +prepareCommandsInPipeline(CState *st) +{ + int j; + Command **commands = sql_script[st->use_file].commands; + + Assert(commands[st->command]->type == META_COMMAND && + commands[st->command]->meta == META_STARTPIPELINE); + + /* + * We set the 'prepared' flag on the \startpipeline itself to flag that we + * don't need to do this next time without calling prepareCommand(), even + * though we don't actually prepare this command. + */ + if (st->prepared && + st->prepared[st->use_file][st->command]) + return; + + for (j = st->command + 1; commands[j] != NULL; j++) + { + if (commands[j]->type == META_COMMAND && + commands[j]->meta == META_ENDPIPELINE) + break; + + prepareCommand(st, j); + } + + st->prepared[st->use_file][st->command] = true; +} + /* Send a SQL command, using the chosen querymode */ static bool sendCommand(CState *st, Command *command) @@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command) } else if (querymode == QUERY_PREPARED) { - char name[MAX_PREPARE_NAME]; const char *params[MAX_ARGS]; - if (!st->prepared[st->use_file]) - { - int j; - Command **commands = sql_script[st->use_file].commands; - - for (j = 0; commands[j] != NULL; j++) - { - PGresult *res; - - if (commands[j]->type != SQL_COMMAND) - continue; - preparedStatementName(name, st->use_file, j); - if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) - { - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("%s", PQerrorMessage(st->con)); - PQclear(res); - } - else - { - /* - * In pipeline mode, we use asynchronous functions. If a - * server-side error occurs, it will be processed later - * among the other results. - */ - if (!PQsendPrepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL)) - pg_log_error("%s", PQerrorMessage(st->con)); - } - } - st->prepared[st->use_file] = true; - } - + prepareCommand(st, st->command); getQueryParams(&st->variables, command, params); - preparedStatementName(name, st->use_file, st->command); - pg_log_debug("client %d sending %s", st->id, name); - r = PQsendQueryPrepared(st->con, name, command->argc - 1, + pg_log_debug("client %d sending %s", st->id, command->prepname); + r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1, params, NULL, NULL, 0); } else /* unknown sql mode */ @@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) thread->conn_duration += now - start; /* Reset session-local state */ - memset(st->prepared, 0, sizeof(st->prepared)); + pg_free(st->prepared); + st->prepared = NULL; } /* @@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } + /* + * If we're in prepared-query mode, we need to prepare all the + * commands that are inside the pipeline before we actually start the + * pipeline itself. This solves the problem that running BEGIN + * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a + * snapshot having been acquired by the prepare within the pipeline. + */ + if (querymode == QUERY_PREPARED) + prepareCommandsInPipeline(st); + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) { commandFailed(st, "startpipeline", "already in pipeline mode"); @@ -5439,6 +5491,7 @@ create_sql_command(PQExpBuffer buf, const char *source) my_command->varprefix = NULL; /* allocated later, if needed */ my_command->expr = NULL; initSimpleStats(&my_command->stats); + my_command->prepname = NULL; /* set later, if needed */ return my_command; } @@ -5468,6 +5521,7 @@ static void postprocess_sql_command(Command *my_command) { char buffer[128]; + static int prepnum = 0; Assert(my_command->type == SQL_COMMAND); @@ -5476,15 +5530,17 @@ postprocess_sql_command(Command *my_command) buffer[strcspn(buffer, "\n\r")] = '\0'; my_command->first_line = pg_strdup(buffer); - /* parse query if necessary */ + /* Parse query and generate prepared statement name, if necessary */ switch (querymode) { case QUERY_SIMPLE: my_command->argv[0] = my_command->lines.data; my_command->argc++; break; - case QUERY_EXTENDED: case QUERY_PREPARED: + my_command->prepname = psprintf("P_%d", prepnum++); + /* fall through */ + case QUERY_EXTENDED: if (!parseQuery(my_command)) exit(1); break; diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index 4bf508ea96650..99273203f0374 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -839,6 +839,26 @@ } }); +# Working \startpipeline in prepared query mode with serializable +$node->pgbench( + '-c4 -j2 -t 10 -n -M prepared', + 0, + [ + qr{type: .*/001_pgbench_pipeline_serializable}, + qr{actually processed: (\d+)/\1} + ], + [], + 'working \startpipeline with serializable', + { + '001_pgbench_pipeline_serializable' => q{ +-- test startpipeline with serializable +\startpipeline +BEGIN ISOLATION LEVEL SERIALIZABLE; +} . "select 1;\n" x 10 . q{ +END; +\endpipeline +} + }); # trigger many expression errors my @errors = (