Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions doc/src/sgml/monitoring.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
</entry>
</row>

<row>
<entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry>
<entry>One row for each backend running <command>COPY</command>, showing current progress.
See <xref linkend='copy-progress-reporting'/>.
</entry>
</row>
</tbody>
</tgroup>
</table>
Expand Down Expand Up @@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
which support progress reporting are <command>ANALYZE</command>,
<command>CLUSTER</command>,
<command>CREATE INDEX</command>, <command>VACUUM</command>,
<command>COPY</command>,
and <xref linkend="protocol-replication-base-backup"/> (i.e., replication
command that <xref linkend="app-pgbasebackup"/> issues to take
a base backup).
Expand Down Expand Up @@ -6396,6 +6403,100 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
</table>

</sect2>

<sect2 id="copy-progress-reporting">
<title>COPY Progress Reporting</title>

<para>
Whenever <command>COPY</command> is running, the
<structname>pg_stat_copy_progress</structname> view will contain one row
for each backend that is currently running <command>COPY</command> command.
The table bellow describes the information that will be reported and provide
information how to interpret it.
</para>

<table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy">
<title><structname>pg_stat_progress_copy</structname> View</title>
<tgroup cols="1">
<thead>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
Column Type
</para>
<para>
Description
</para></entry>
</row>
</thead>

<tbody>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>pid</structfield> <type>integer</type>
</para>
<para>
Process ID of backend.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>datid</structfield> <type>text</type>
</para>
<para>
OID of the database to which this backend is connected.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>datname</structfield> <type>name</type>
</para>
<para>
Name of the database to which this backend is connected.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>relid</structfield> <type>oid</type>
</para>
<para>
OID of the table on which the <command>COPY</command> command is executed. It is set to 0 if <structfield>SELECT query</structfield> is provided.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>bytes_processed</structfield> <type>bigint</type>
</para>
<para>
Number of bytes already processed by <command>COPY</command> command.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>bytes_total</structfield> <type>bigint</type>
</para>
<para>
Size of source file for <command>COPY FROM</command> command in bytes. It is set to 0 if not available.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>lines_processed</structfield> <type>bigint</type>
</para>
<para>
Number of lines already processed by <command>COPY</command> command.
</para></entry>
</row>
</tbody>
</tgroup>
</table>
</sect2>

</sect1>

<sect1 id="dynamic-trace">
Expand Down
11 changes: 11 additions & 0 deletions src/backend/catalog/system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS
S.param5 AS tablespaces_streamed
FROM pg_stat_get_progress_info('BASEBACKUP') AS S;


CREATE VIEW pg_stat_progress_copy AS
SELECT
S.pid AS pid, S.datid AS datid, D.datname AS datname,
S.relid AS relid,
S.param1 AS bytes_processed,
S.param2 AS bytes_total,
S.param3 AS lines_processed
FROM pg_stat_get_progress_info('COPY') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;

CREATE VIEW pg_user_mappings AS
SELECT
U.oid AS umid,
Expand Down
16 changes: 14 additions & 2 deletions src/backend/commands/copyfrom.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "access/xlog.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
Expand All @@ -35,6 +36,7 @@
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
Expand Down Expand Up @@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate)
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command.
* for counting tuples inserted by an INSERT command. Update
* progress as well
*/
processed++;
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
}
}

Expand Down Expand Up @@ -1415,6 +1418,11 @@ BeginCopyFrom(ParseState *pstate,
}
}


/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
cstate->bytes_processed = 0;

/* We keep those variables in cstate. */
cstate->in_functions = in_functions;
cstate->typioparams = typioparams;
Expand Down Expand Up @@ -1479,6 +1487,8 @@ BeginCopyFrom(ParseState *pstate,
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));

pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
}
}

Expand Down Expand Up @@ -1522,6 +1532,8 @@ EndCopyFrom(CopyFromState cstate)
cstate->filename)));
}

pgstat_progress_end_command();

MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
Expand Down
4 changes: 4 additions & 0 deletions src/backend/commands/copyfromparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bswap.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Expand Down Expand Up @@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate)
cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0;
cstate->raw_buf_len = nbytes;
cstate->bytes_processed += nbytes;
pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
return (inbytes > 0);
}

Expand Down
21 changes: 19 additions & 2 deletions src/backend/commands/copyto.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/copy.h"
#include "commands/progress.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
Expand All @@ -32,6 +33,7 @@
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
Expand Down Expand Up @@ -95,6 +97,7 @@ typedef struct CopyToStateData

FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
uint64 bytes_processed;/* total # of bytes processed, used for progress reporting */

} CopyToStateData;

Expand Down Expand Up @@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate)
break;
}

/* Update the progress */
cstate->bytes_processed += fe_msgbuf->len;
pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);

resetStringInfo(fe_msgbuf);
}

Expand Down Expand Up @@ -363,6 +370,8 @@ EndCopy(CopyToState cstate)
cstate->filename)));
}

pgstat_progress_end_command();

MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
Expand Down Expand Up @@ -760,6 +769,10 @@ BeginCopyTo(ParseState *pstate,
}
}

/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
cstate->bytes_processed = 0;

MemoryContextSwitchTo(oldcontext);

return cstate;
Expand Down Expand Up @@ -938,7 +951,9 @@ CopyTo(CopyToState cstate)

/* Format and send the data */
CopyOneRowTo(cstate, slot);
processed++;

/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
}

ExecDropSingleTupleTableSlot(slot);
Expand Down Expand Up @@ -1303,7 +1318,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)

/* Send the data */
CopyOneRowTo(cstate, slot);
myState->processed++;

/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);

return true;
}
Expand Down
2 changes: 2 additions & 0 deletions src/backend/utils/adt/pgstatfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
cmdtype = PROGRESS_COMMAND_BASEBACKUP;
else if (pg_strcasecmp(cmd, "COPY") == 0)
cmdtype = PROGRESS_COMMAND_COPY;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down
1 change: 1 addition & 0 deletions src/include/commands/copyfrom_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ typedef struct CopyFromStateData
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
uint64 bytes_processed;/* number of bytes processed so far */
/* Shorthand for number of unconsumed bytes available in raw_buf */
#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
} CopyFromStateData;
Expand Down
5 changes: 5 additions & 0 deletions src/include/commands/progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,9 @@
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5

/* Commands of PROGRESS_COPY */
#define PROGRESS_COPY_BYTES_PROCESSED 0
#define PROGRESS_COPY_BYTES_TOTAL 1
#define PROGRESS_COPY_LINES_PROCESSED 2

#endif
3 changes: 2 additions & 1 deletion src/include/pgstat.h
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType
PROGRESS_COMMAND_ANALYZE,
PROGRESS_COMMAND_CLUSTER,
PROGRESS_COMMAND_CREATE_INDEX,
PROGRESS_COMMAND_BASEBACKUP
PROGRESS_COMMAND_BASEBACKUP,
PROGRESS_COMMAND_COPY
} ProgressCommandType;

#define PGSTAT_NUM_PROGRESS_PARAM 20
Expand Down
9 changes: 9 additions & 0 deletions src/test/regress/expected/rules.out
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid,
s.param8 AS index_rebuild_count
FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_copy| SELECT s.pid,
s.datid,
d.datname,
s.relid,
s.param1 AS bytes_processed,
s.param2 AS bytes_total,
s.param3 AS lines_processed
FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_create_index| SELECT s.pid,
s.datid,
d.datname,
Expand Down