Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 154 additions & 19 deletions src/bin/pg_amcheck/pg_amcheck.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ typedef struct DatabaseInfo
{
char *datname;
char *amcheck_schema; /* escaped, quoted literal */
char *orioledb_schema; /* escaped, quoted; NULL if not installed */
} DatabaseInfo;

typedef struct RelationInfo
{
const DatabaseInfo *datinfo; /* shared by other relinfos */
Oid reloid;
bool is_heap; /* true if heap, false if btree */
bool is_heap; /* heap table */
bool is_orioledb; /* orioledb table */
char *nspname;
char *relname;
int relpages;
Expand All @@ -171,13 +173,25 @@ static const char *amcheck_sql =
"\nJOIN pg_catalog.pg_namespace n ON x.extnamespace = n.oid"
"\nWHERE x.extname = 'amcheck'";

/*
* Lookup the schema where the orioledb extension lives, if installed.
*/
static const char *const orioledb_sql =
"SELECT n.nspname FROM pg_catalog.pg_extension x"
"\nJOIN pg_catalog.pg_namespace n ON x.extnamespace = n.oid"
"\nWHERE x.extname = 'orioledb'";
Comment on lines +180 to +182
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
"SELECT n.nspname FROM pg_catalog.pg_extension x"
"\nJOIN pg_catalog.pg_namespace n ON x.extnamespace = n.oid"
"\nWHERE x.extname = 'orioledb'";
"SELECT n.nspname FROM pg_catalog.pg_extension as x"
"\nJOIN pg_catalog.pg_namespace n ON x.extnamespace OPERATOR(pg_catalog.=) n.oid"
"\nWHERE x.extname OPERATOR(pg_catalog.=) 'orioledb'";

The rest of the code isn't paranoid about qualifying the operators, so it's ok if we aren't either. Just wanted to bring this to attention.


static void prepare_heap_command(PQExpBuffer sql, RelationInfo *rel,
PGconn *conn);
static void prepare_orioledb_command(PQExpBuffer sql, RelationInfo *rel,
PGconn *conn);
static void prepare_btree_command(PQExpBuffer sql, RelationInfo *rel,
PGconn *conn);
static void run_command(ParallelSlot *slot, const char *sql);
static bool verify_heap_slot_handler(PGresult *res, PGconn *conn,
void *context);
static bool verify_orioledb_slot_handler(PGresult *res, PGconn *conn,
void *context);
static bool verify_btree_slot_handler(PGresult *res, PGconn *conn, void *context);
static void help(const char *progname);
static void progress_report(uint64 relations_total, uint64 relations_checked,
Expand Down Expand Up @@ -591,6 +605,21 @@ main(int argc, char *argv[])
strlen(amcheck_schema));
PQclear(result);

/*
* Look up the schema where the orioledb extension is installed (if
* any). Required to schema-qualify orioledb_verify_tableam() for any
* orioledb tables we might encounter.
*/
result = executeQuery(conn, orioledb_sql, opts.echo);
if (PQresultStatus(result) == PGRES_TUPLES_OK && PQntuples(result) > 0)
{
const char *schema = PQgetvalue(result, 0, 0);

dat->orioledb_schema = PQescapeIdentifier(conn, schema,
strlen(schema));
}
PQclear(result);

compile_relation_list_one_db(conn, &relations, dat, &pagestotal);
}

Expand Down Expand Up @@ -736,6 +765,27 @@ main(int argc, char *argv[])
ParallelSlotSetHandler(free_slot, verify_heap_slot_handler, rel);
run_command(free_slot, rel->sql);
}
else if (rel->is_orioledb)
{
if (!rel->datinfo->orioledb_schema)
{
pg_log_warning("skipping orioledb relation \"%s.%s.%s\": orioledb extension not found",
rel->datinfo->datname, rel->nspname, rel->relname);
continue;
}
if (opts.verbose)
{
if (opts.show_progress && progress_since_last_stderr)
fprintf(stderr, "\n");
pg_log_info("checking orioledb table \"%s.%s.%s\"",
rel->datinfo->datname, rel->nspname, rel->relname);
progress_since_last_stderr = false;
}
prepare_orioledb_command(&sql, rel, free_slot->connection);
rel->sql = pstrdup(sql.data); /* pg_free'd after command */
ParallelSlotSetHandler(free_slot, verify_orioledb_slot_handler, rel);
run_command(free_slot, rel->sql);
}
else
{
if (opts.verbose)
Expand Down Expand Up @@ -821,6 +871,27 @@ prepare_heap_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
rel->reloid);
}

/*
* prepare_orioledb_command
*
* Build a SQL command that invokes verify_orioledb() on the given orioledb
* relation. Returns (index_name, msg) per failed index or an empty result
* if the relation passed all checks.
*
* The --check-toast pg_amcheck flag is reused to request the deeper
* (force_map_check) variant.
*/
static void
prepare_orioledb_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
{
resetPQExpBuffer(sql);
appendPQExpBuffer(sql,
"SELECT index_name, msg FROM %s.verify_orioledb(%u, %s)",
rel->datinfo->orioledb_schema,
rel->reloid,
opts.reconcile_toast ? "true" : "false");
}

/*
* prepare_btree_command
*
Expand Down Expand Up @@ -1056,6 +1127,56 @@ verify_heap_slot_handler(PGresult *res, PGconn *conn, void *context)
return should_processing_continue(res);
}

/*
* verify_orioledb_slot_handler
*
* Handles results from verify_orioledb(). Each row is (index_name, msg)
* describing a failed index; non-empty result means failure.
*/
static bool
verify_orioledb_slot_handler(PGresult *res, PGconn *conn, void *context)
{
RelationInfo *rel = (RelationInfo *) context;

if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
int ntups = PQntuples(res);
int i;

if (ntups > 0)
all_checks_pass = false;

for (i = 0; i < ntups; i++)
{
const char *index_name = PQgetvalue(res, i, 0);
const char *msg = PQgetvalue(res, i, 1);

printf(_("orioledb table \"%s.%s.%s\", index \"%s\":\n"),
rel->datinfo->datname, rel->nspname, rel->relname,
index_name);
printf(" %s\n", msg);
}
}
else
{
char *msg = indent_lines(PQerrorMessage(conn));

all_checks_pass = false;
printf(_("orioledb table \"%s.%s.%s\":\n"),
rel->datinfo->datname, rel->nspname, rel->relname);
printf("%s", msg);
if (opts.verbose)
printf(_("query was: %s\n"), rel->sql);
FREE_AND_SET_NULL(msg);
}

FREE_AND_SET_NULL(rel->sql);
FREE_AND_SET_NULL(rel->nspname);
FREE_AND_SET_NULL(rel->relname);

return should_processing_continue(res);
}

/*
* verify_btree_slot_handler
*
Expand Down Expand Up @@ -1868,7 +1989,7 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,

/* Append the relation CTE. */
appendPQExpBufferStr(&sql,
" relation (pattern_id, oid, nspname, relname, reltoastrelid, relpages, is_heap, is_btree) AS ("
" relation (pattern_id, oid, nspname, relname, reltoastrelid, relpages, is_heap, is_btree, is_orioledb) AS ("
"\nSELECT DISTINCT ON (c.oid");
if (!opts.allrel)
appendPQExpBufferStr(&sql, ", ip.pattern_id) ip.pattern_id,");
Expand All @@ -1877,25 +1998,28 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
appendPQExpBuffer(&sql,
"\nc.oid, n.nspname, c.relname, c.reltoastrelid, c.relpages, "
"c.relam = %u AS is_heap, "
"c.relam = %u AS is_btree"
"c.relam = %u AS is_btree, "
"am.amname = 'orioledb' AS is_orioledb"
"\nFROM pg_catalog.pg_class c "
"INNER JOIN pg_catalog.pg_namespace n "
"ON c.relnamespace = n.oid",
"ON c.relnamespace = n.oid "
"LEFT JOIN pg_catalog.pg_am am "
"ON am.oid = c.relam",
HEAP_TABLE_AM_OID, BTREE_AM_OID);
if (!opts.allrel)
appendPQExpBuffer(&sql,
"\nINNER JOIN include_pat ip"
"\nON (n.nspname ~ ip.nsp_regex OR ip.nsp_regex IS NULL)"
"\nAND (c.relname ~ ip.rel_regex OR ip.rel_regex IS NULL)"
"\nAND (c.relam = %u OR NOT ip.heap_only)"
"\nAND (c.relam = %u OR am.amname = 'orioledb' OR NOT ip.heap_only)"
"\nAND (c.relam = %u OR NOT ip.btree_only)",
HEAP_TABLE_AM_OID, BTREE_AM_OID);
if (opts.excludetbl || opts.excludeidx || opts.excludensp)
appendPQExpBuffer(&sql,
"\nLEFT OUTER JOIN exclude_pat ep"
"\nON (n.nspname ~ ep.nsp_regex OR ep.nsp_regex IS NULL)"
"\nAND (c.relname ~ ep.rel_regex OR ep.rel_regex IS NULL)"
"\nAND (c.relam = %u OR NOT ep.heap_only OR ep.rel_regex IS NULL)"
"\nAND (c.relam = %u OR am.amname = 'orioledb' OR NOT ep.heap_only OR ep.rel_regex IS NULL)"
"\nAND (c.relam = %u OR NOT ep.btree_only OR ep.rel_regex IS NULL)",
HEAP_TABLE_AM_OID, BTREE_AM_OID);

Expand Down Expand Up @@ -1925,15 +2049,15 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
*/
if (opts.allrel)
appendPQExpBuffer(&sql,
" AND c.relam = %u "
" AND (c.relam = %u OR am.amname = 'orioledb') "
"AND c.relkind IN ('r', 'S', 'm', 't') "
"AND c.relnamespace != %u",
HEAP_TABLE_AM_OID, PG_TOAST_NAMESPACE);
else
appendPQExpBuffer(&sql,
" AND c.relam IN (%u, %u)"
" AND (c.relam IN (%u, %u) OR am.amname = 'orioledb') "
"AND c.relkind IN ('r', 'S', 'm', 't', 'i') "
"AND ((c.relam = %u AND c.relkind IN ('r', 'S', 'm', 't')) OR "
"AND (((c.relam = %u OR am.amname = 'orioledb') AND c.relkind IN ('r', 'S', 'm', 't')) OR "
"(c.relam = %u AND c.relkind = 'i'))",
HEAP_TABLE_AM_OID, BTREE_AM_OID,
HEAP_TABLE_AM_OID, BTREE_AM_OID);
Expand Down Expand Up @@ -1972,15 +2096,18 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
* selected above, filtering by exclusion patterns (if any) that match
* btree index names.
*/
appendPQExpBufferStr(&sql,
appendPQExpBuffer(&sql,
", index (oid, nspname, relname, relpages) AS ("
"\nSELECT c.oid, r.nspname, c.relname, c.relpages "
"FROM relation r"
"\nINNER JOIN pg_catalog.pg_class pc "
"ON pc.oid = r.oid AND pc.relam = %u"
"\nINNER JOIN pg_catalog.pg_index i "
"ON r.oid = i.indrelid "
"INNER JOIN pg_catalog.pg_class c "
"ON i.indexrelid = c.oid "
"AND c.relpersistence != 't'");
"AND c.relpersistence != 't'",
HEAP_TABLE_AM_OID);
if (opts.excludeidx || opts.excludensp)
appendPQExpBufferStr(&sql,
"\nINNER JOIN pg_catalog.pg_namespace n "
Expand Down Expand Up @@ -2045,42 +2172,46 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
* list.
*/
appendPQExpBufferStr(&sql,
"\nSELECT pattern_id, is_heap, is_btree, oid, nspname, relname, relpages "
"\nSELECT pattern_id, is_heap, is_btree, oid, nspname, relname, relpages, is_orioledb "
"FROM (");
appendPQExpBufferStr(&sql,
/* Inclusion patterns that failed to match */
"\nSELECT pattern_id, is_heap, is_btree, "
"NULL::OID AS oid, "
"NULL::TEXT AS nspname, "
"NULL::TEXT AS relname, "
"NULL::INTEGER AS relpages"
"NULL::INTEGER AS relpages, "
"is_orioledb"
"\nFROM relation "
"WHERE pattern_id IS NOT NULL "
"UNION"
/* Primary relations */
"\nSELECT NULL::INTEGER AS pattern_id, "
"is_heap, is_btree, oid, nspname, relname, relpages "
"is_heap, is_btree, oid, nspname, relname, relpages, is_orioledb "
"FROM relation");
if (!opts.no_toast_expansion)
appendPQExpBufferStr(&sql,
" UNION"
/* Toast tables for primary relations */
"\nSELECT NULL::INTEGER AS pattern_id, TRUE AS is_heap, "
"FALSE AS is_btree, oid, nspname, relname, relpages "
"FALSE AS is_btree, oid, nspname, relname, relpages, "
"FALSE AS is_orioledb "
"FROM toast");
if (!opts.no_btree_expansion)
appendPQExpBufferStr(&sql,
" UNION"
/* Indexes for primary relations */
"\nSELECT NULL::INTEGER AS pattern_id, FALSE AS is_heap, "
"TRUE AS is_btree, oid, nspname, relname, relpages "
"TRUE AS is_btree, oid, nspname, relname, relpages, "
"FALSE AS is_orioledb "
"FROM index");
if (!opts.no_toast_expansion && !opts.no_btree_expansion)
appendPQExpBufferStr(&sql,
" UNION"
/* Indexes for toast relations */
"\nSELECT NULL::INTEGER AS pattern_id, FALSE AS is_heap, "
"TRUE AS is_btree, oid, nspname, relname, relpages "
"TRUE AS is_btree, oid, nspname, relname, relpages, "
"FALSE AS is_orioledb "
"FROM toast_index");
appendPQExpBufferStr(&sql,
"\n) AS combined_records "
Expand All @@ -2102,6 +2233,7 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
int pattern_id = -1;
bool is_heap = false;
bool is_btree PG_USED_FOR_ASSERTS_ONLY = false;
bool is_orioledb = false;
Oid oid = InvalidOid;
const char *nspname = NULL;
const char *relname = NULL;
Expand All @@ -2121,6 +2253,8 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
relname = PQgetvalue(res, i, 5);
if (!PQgetisnull(res, i, 6))
relpages = atoi(PQgetvalue(res, i, 6));
if (!PQgetisnull(res, i, 7))
is_orioledb = (PQgetvalue(res, i, 7)[0] == 't');

if (pattern_id >= 0)
{
Expand All @@ -2138,15 +2272,16 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations,
else
{
/* Current record pertains to a relation */

bool is_table PG_USED_FOR_ASSERTS_ONLY = is_heap || is_orioledb;
RelationInfo *rel = (RelationInfo *) pg_malloc0(sizeof(RelationInfo));

Assert(OidIsValid(oid));
Assert((is_heap && !is_btree) || (is_btree && !is_heap));
Assert((is_table && !is_btree) || (is_btree && !is_table));

rel->datinfo = dat;
rel->reloid = oid;
rel->is_heap = is_heap;
rel->is_orioledb = is_orioledb;
rel->nspname = pstrdup(nspname);
rel->relname = pstrdup(relname);
rel->relpages = relpages;
Expand Down