Skip to content

Commit

Permalink
Add support for more progress reporting in COPY
Browse files Browse the repository at this point in the history
The command (TO or FROM), its type (file, pipe, program or callback),
and the number of tuples excluded by a WHERE clause in COPY FROM are
added to the progress reporting already available.

The column "lines_processed" is renamed to "tuples_processed" to
disambiguate the meaning of this column in the cases of CSV and BINARY
COPY and to be more consistent with the other catalog progress views.

Bump catalog version, again.

Author: Matthias van de Meent
Reviewed-by: Michael Paquier, Justin Pryzby, Bharath Rupireddy, Josef
Šimánek, Tomas Vondra
Discussion: https://postgr.es/m/CAEze2WiOcgdH4aQA8NtZq-4dgvnJzp8PohdeKchPkhMY-jWZXA@mail.gmail.com
  • Loading branch information
michaelpq committed Mar 9, 2021
1 parent f9264d1 commit 9d2d457
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 18 deletions.
45 changes: 40 additions & 5 deletions doc/src/sgml/monitoring.sgml
Expand Up @@ -6531,8 +6531,33 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
<structfield>relid</structfield> <type>oid</type>
</para>
<para>
OID of the table on which the <command>COPY</command> command is executed.
It is set to 0 if copying from a <command>SELECT</command> query.
OID of the table on which the <command>COPY</command> command is
executed. It is set to <literal>0</literal> if copying from a
<command>SELECT</command> query.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>command</structfield> <type>text</type>
</para>
<para>
The command that is running: <literal>COPY FROM</literal>, or
<literal>COPY TO</literal>.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>type</structfield> <type>text</type>
</para>
<para>
The io type that the data is read from or written to:
<literal>FILE</literal>, <literal>PROGRAM</literal>,
<literal>PIPE</literal> (for <command>COPY FROM STDIN</command> and
<command>COPY TO STDOUT</command>), or <literal>CALLBACK</literal>
(used for example during the initial table synchronization in
logical replication).
</para></entry>
</row>

Expand All @@ -6551,16 +6576,26 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
</para>
<para>
Size of source file for <command>COPY FROM</command> command in bytes.
It is set to 0 if not available.
It is set to <literal>0</literal> if not available.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>tuples_processed</structfield> <type>bigint</type>
</para>
<para>
Number of tuples already processed by <command>COPY</command> command.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>lines_processed</structfield> <type>bigint</type>
<structfield>tuples_excluded</structfield> <type>bigint</type>
</para>
<para>
Number of lines already processed by <command>COPY</command> command.
Number of tuples not processed because they were excluded by the
<command>WHERE</command> clause of the <command>COPY</command> command.
</para></entry>
</row>
</tbody>
Expand Down
11 changes: 10 additions & 1 deletion src/backend/catalog/system_views.sql
Expand Up @@ -1129,9 +1129,18 @@ CREATE VIEW pg_stat_progress_copy AS
SELECT
S.pid AS pid, S.datid AS datid, D.datname AS datname,
S.relid AS relid,
CASE S.param5 WHEN 1 THEN 'COPY FROM'
WHEN 2 THEN 'COPY TO'
END AS command,
CASE S.param6 WHEN 1 THEN 'FILE'
WHEN 2 THEN 'PROGRAM'
WHEN 3 THEN 'PIPE'
WHEN 4 THEN 'CALLBACK'
END AS "type",
S.param1 AS bytes_processed,
S.param2 AS bytes_total,
S.param3 AS lines_processed
S.param3 AS tuples_processed,
S.param4 AS tuples_excluded
FROM pg_stat_get_progress_info('COPY') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;

Expand Down
34 changes: 30 additions & 4 deletions src/backend/commands/copyfrom.c
Expand Up @@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate)
BulkInsertState bistate = NULL;
CopyInsertMethod insertMethod;
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
uint64 processed = 0;
int64 processed = 0;
int64 excluded = 0;
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false;
Expand Down Expand Up @@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate)
econtext->ecxt_scantuple = myslot;
/* Skip items that don't match COPY's WHERE clause */
if (!ExecQual(cstate->qualexpr, econtext))
{
/*
* Report that this tuple was filtered out by the WHERE
* clause.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
++excluded);
continue;
}
}

/* Determine the partition to insert the tuple into */
Expand Down Expand Up @@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate)
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command. Update
* for counting tuples inserted by an INSERT command. Update
* progress of the COPY command as well.
*/
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++processed);
}
}

Expand Down Expand Up @@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate,
ExprState **defexprs;
MemoryContext oldcontext;
bool volatile_defexprs;
const int progress_cols[] = {
PROGRESS_COPY_COMMAND,
PROGRESS_COPY_TYPE,
PROGRESS_COPY_BYTES_TOTAL
};
int64 progress_vals[] = {
PROGRESS_COPY_COMMAND_FROM,
0,
0
};

/* Allocate workspace and zero all fields */
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
Expand Down Expand Up @@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate,

if (data_source_cb)
{
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
cstate->copy_src = COPY_CALLBACK;
cstate->data_source_cb = data_source_cb;
}
else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
Expand All @@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate,

if (cstate->is_program)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
Expand All @@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate,
{
struct stat st;

progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
{
Expand All @@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));

pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
progress_vals[2] = st.st_size;
}
}

pgstat_progress_update_multi_param(3, progress_cols, progress_vals);

if (cstate->opts.binary)
{
/* Read and verify binary header */
Expand Down
28 changes: 24 additions & 4 deletions src/backend/commands/copyto.c
Expand Up @@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate,
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
const int progress_cols[] = {
PROGRESS_COPY_COMMAND,
PROGRESS_COPY_TYPE
};
int64 progress_vals[] = {
PROGRESS_COPY_COMMAND_TO,
0
};

if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
{
Expand Down Expand Up @@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate,

if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;

Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
Expand All @@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate,

if (is_program)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
if (cstate->copy_file == NULL)
ereport(ERROR,
Expand All @@ -682,6 +693,8 @@ BeginCopyTo(ParseState *pstate,
mode_t oumask; /* Pre-existing umask value */
struct stat st;

progress_vals[1] = PROGRESS_COPY_TYPE_FILE;

/*
* Prevent write to relative path ... too easy to shoot oneself in
* the foot by overwriting a database file ...
Expand Down Expand Up @@ -731,6 +744,8 @@ BeginCopyTo(ParseState *pstate,
/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
pgstat_progress_update_multi_param(2, progress_cols, progress_vals);

cstate->bytes_processed = 0;

MemoryContextSwitchTo(oldcontext);
Expand Down Expand Up @@ -881,8 +896,12 @@ DoCopyTo(CopyToState cstate)
/* Format and send the data */
CopyOneRowTo(cstate, slot);

/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
/*
* Increment the number of processed tuples, and report the
* progress.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++processed);
}

ExecDropSingleTupleTableSlot(slot);
Expand Down Expand Up @@ -1251,8 +1270,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
/* Send the data */
CopyOneRowTo(cstate, slot);

/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
/* Increment the number of processed tuples, and report the progress */
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++myState->processed);

return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/catversion.h
Expand Up @@ -53,6 +53,6 @@
*/

/* yyyymmddN */
#define CATALOG_VERSION_NO 202103091
#define CATALOG_VERSION_NO 202103092

#endif
17 changes: 15 additions & 2 deletions src/include/commands/progress.h
Expand Up @@ -133,9 +133,22 @@
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5

/* Commands of PROGRESS_COPY */
/* Progress parameters for PROGRESS_COPY */
#define PROGRESS_COPY_BYTES_PROCESSED 0
#define PROGRESS_COPY_BYTES_TOTAL 1
#define PROGRESS_COPY_LINES_PROCESSED 2
#define PROGRESS_COPY_TUPLES_PROCESSED 2
#define PROGRESS_COPY_TUPLES_EXCLUDED 3
#define PROGRESS_COPY_COMMAND 4
#define PROGRESS_COPY_TYPE 5

/* Commands of COPY (as advertised via PROGRESS_COPY_COMMAND) */
#define PROGRESS_COPY_COMMAND_FROM 1
#define PROGRESS_COPY_COMMAND_TO 2

/* Types of COPY commands (as advertised via PROGRESS_COPY_TYPE) */
#define PROGRESS_COPY_TYPE_FILE 1
#define PROGRESS_COPY_TYPE_PROGRAM 2
#define PROGRESS_COPY_TYPE_PIPE 3
#define PROGRESS_COPY_TYPE_CALLBACK 4

#endif
15 changes: 14 additions & 1 deletion src/test/regress/expected/rules.out
Expand Up @@ -1950,9 +1950,22 @@ pg_stat_progress_copy| SELECT s.pid,
s.datid,
d.datname,
s.relid,
CASE s.param5
WHEN 1 THEN 'COPY FROM'::text
WHEN 2 THEN 'COPY TO'::text
ELSE NULL::text
END AS command,
CASE s.param6
WHEN 1 THEN 'FILE'::text
WHEN 2 THEN 'PROGRAM'::text
WHEN 3 THEN 'PIPE'::text
WHEN 4 THEN 'CALLBACK'::text
ELSE NULL::text
END AS type,
s.param1 AS bytes_processed,
s.param2 AS bytes_total,
s.param3 AS lines_processed
s.param3 AS tuples_processed,
s.param4 AS tuples_excluded
FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_create_index| SELECT s.pid,
Expand Down

0 comments on commit 9d2d457

Please sign in to comment.