Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add several ALTER FOREIGN TABLE post-fixup routines.

  • Loading branch information...
commit 2620c365cd8b4d5f429b40abb74cd1150235cfe7 1 parent ad75d9e
@kaigai authored
Showing with 478 additions and 33 deletions.
  1. +478 −33 utilcmds.c
View
511 utilcmds.c
@@ -17,12 +17,14 @@
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_am.h"
+#include "catalog/pg_attribute.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/sequence.h"
+#include "commands/tablecmds.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "nodes/makefuncs.h"
@@ -30,8 +32,11 @@
#include "miscadmin.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
#include "pg_strom.h"
/*
@@ -355,7 +360,7 @@ pgstrom_create_shadow_sequence(Oid namespaceId, Relation base_rel)
}
static void
-pgstrom_process_post_create(RangeVar *base_range)
+pgstrom_post_create_foreign_table(CreateForeignTableStmt *stmt)
{
Relation base_rel;
Oid namespaceId;
@@ -382,7 +387,7 @@ pgstrom_process_post_create(RangeVar *base_range)
* Open the base relation; exclusive lock should be already
* acquired, so we use NoLock instead.
*/
- base_rel = heap_openrv(base_range, NoLock);
+ base_rel = heap_openrv(stmt->base.relation, NoLock);
/* switch current credential of database users */
GetUserIdAndSecContext(&save_userid, &save_sec_context);
@@ -405,17 +410,370 @@ pgstrom_process_post_create(RangeVar *base_range)
heap_close(base_rel, NoLock);
}
+/*
+ * pgstrom_shadow_relation_rename
+ *
+ * It is internal utility function to rename shadow table, index and
+ * sequence according to changes on base relation.
+ */
+static void
+pgstrom_shadow_relation_rename(const char *old_nspname,
+ const char *old_relname,
+ const char *old_attname,
+ const char *new_nspname,
+ const char *new_relname,
+ const char *new_attname,
+ char relkind)
+{
+ Oid namespace_id;
+ Oid shadow_relid;
+ char old_name[NAMEDATALEN * 3 + 20];
+ char new_name[NAMEDATALEN * 3 + 20];
+
+ Assert((old_attname != NULL && new_attname != NULL) ||
+ (old_attname == NULL && new_attname == NULL));
+ Assert(old_attname == NULL || relkind != RELKIND_SEQUENCE);
+ Assert(new_attname == NULL || relkind != RELKIND_SEQUENCE);
+
+ namespace_id = get_namespace_oid(PGSTROM_SCHEMA_NAME, false);
+
+ if (old_attname)
+ snprintf(old_name, sizeof(old_name), "%s.%s.%s.%s",
+ old_nspname, old_relname, old_attname,
+ (relkind == RELKIND_RELATION ? "cs" : "idx"));
+ else
+ snprintf(old_name, sizeof(old_name), "%s.%s.%s",
+ old_nspname, old_relname,
+ (relkind == RELKIND_RELATION ? "rowid" :
+ (relkind == RELKIND_INDEX ? "idx" : "seq")));
+
+ shadow_relid = get_relname_relid(old_name, namespace_id);
+ if (!OidIsValid(shadow_relid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("relation \"%s.%s\" does not exist",
+ PGSTROM_SCHEMA_NAME, old_name)));
+
+ if (new_attname)
+ snprintf(new_name, sizeof(new_name), "%s.%s.%s.%s",
+ new_nspname, new_relname, new_attname,
+ (relkind == RELKIND_RELATION ? "cs" : "idx"));
+ else
+ snprintf(new_name, sizeof(new_name), "%s.%s.%s",
+ new_nspname, new_relname,
+ (relkind == RELKIND_RELATION ? "rowid" :
+ (relkind == RELKIND_INDEX ? "idx" : "seq")));
+ if (strlen(new_name) >= NAMEDATALEN)
+ ereport(ERROR,
+ (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("Name of shadow relation \"%s\" too long",
+ new_name)));
+
+ RenameRelationInternal(shadow_relid, new_name);
+}
+
+/*
+ * pgstrom_post_alter_schema
+ *
+ * It is a post-fixup routine to handle ALTER TABLE ... SET SCHEMA
+ * on the foreign table managed by PG-Strom. It replaces the portion
+ * of all the underlying shadow relations.
+ */
+static void
+pgstrom_post_alter_schema(AlterObjectSchemaStmt *stmt,
+ Oid base_nspid, Oid base_relid)
+{
+ char *old_nspname;
+ char *cur_relname;
+ Relation pg_attr;
+ ScanKeyData skey[2];
+ SysScanDesc scan;
+ HeapTuple tuple;
+
+ /* Ensure the changes to base relation being visible */
+ CommandCounterIncrement();
+
+ old_nspname = get_namespace_name(base_nspid);
+ if (!old_nspname)
+ elog(ERROR, "cache lookup failed for schema %u", base_nspid);
+ cur_relname = get_rel_name(base_relid);
+ if (!cur_relname)
+ elog(ERROR, "cache lookup failed for relation %u", base_relid);
+
+ /* Rename rowid table and its index */
+ pgstrom_shadow_relation_rename(old_nspname, cur_relname, NULL,
+ stmt->newschema, cur_relname, NULL,
+ RELKIND_RELATION);
+ pgstrom_shadow_relation_rename(old_nspname, cur_relname, NULL,
+ stmt->newschema, cur_relname, NULL,
+ RELKIND_INDEX);
+
+ /* Rename column-store and its index */
+ pg_attr = heap_open(AttributeRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(base_relid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_attribute_attnum,
+ BTGreaterEqualStrategyNumber, F_INT2GE,
+ Int16GetDatum(InvalidAttrNumber));
+
+ scan = systable_beginscan(pg_attr, AttributeRelidNumIndexId, true,
+ SnapshotNow, 2, skey);
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ Form_pg_attribute attr = (Form_pg_attribute) GETSTRUCT(tuple);
+
+ if (attr->attisdropped)
+ continue;
+
+ pgstrom_shadow_relation_rename(old_nspname, cur_relname,
+ NameStr(attr->attname),
+ stmt->newschema, cur_relname,
+ NameStr(attr->attname),
+ RELKIND_RELATION);
+ pgstrom_shadow_relation_rename(old_nspname, cur_relname,
+ NameStr(attr->attname),
+ stmt->newschema, cur_relname,
+ NameStr(attr->attname),
+ RELKIND_INDEX);
+ }
+ systable_endscan(scan);
+ heap_close(pg_attr, AccessShareLock);
+
+ /* Rename rowid generator sequence */
+ pgstrom_shadow_relation_rename(old_nspname, cur_relname, NULL,
+ stmt->newschema, cur_relname, NULL,
+ RELKIND_SEQUENCE);
+ /* Make changes visible */
+ CommandCounterIncrement();
+
+ pfree(old_nspname);
+}
+
+/*
+ * pgstrom_post_rename_schema
+ *
+ * It is a post-fixup routine to handle ALTER SCHEMA ... RENAME TO
+ * on the namespace that owning foreign tables managed by PG-Strom.
+ * It replaces the portion of all the underlying shadow relations.
+ */
static void
-pgstrom_process_post_alter_schema(void)
-{}
+pgstrom_post_rename_schema(RenameStmt *stmt, Oid namespaceId)
+{
+ Relation pg_class;
+ ScanKeyData skey[2];
+ HeapScanDesc scan;
+ HeapTuple tuple;
+
+ Assert(stmt->renameType == OBJECT_SCHEMA);
+ Assert(OidIsValid(namespaceId));
+
+ pg_class = heap_open(RelationRelationId, RowExclusiveLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_class_relnamespace,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(namespaceId));
+ ScanKeyInit(&skey[1],
+ Anum_pg_class_relkind,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(RELKIND_FOREIGN_TABLE));
+
+ scan = heap_beginscan(pg_class, SnapshotNow, 2, skey);
+
+ while (HeapTupleIsValid(tuple = heap_getnext(scan, ForwardScanDirection)))
+ {
+ ForeignTable *ft = GetForeignTable(HeapTupleGetOid(tuple));
+ ForeignServer *fs = GetForeignServer(ft->serverid);
+ ForeignDataWrapper *fdw = GetForeignDataWrapper(fs->fdwid);
+ const char *cur_relname;
+ Relation pg_attr;
+ ScanKeyData akey[2];
+ SysScanDesc ascan;
+ HeapTuple atup;
+
+ if (GetFdwRoutine(fdw->fdwhandler) != &pgstromFdwHandlerData)
+ continue;
+
+ cur_relname = NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname);
+
+ /* Rename rowid table and its index */
+ pgstrom_shadow_relation_rename(stmt->subname, cur_relname, NULL,
+ stmt->newname, cur_relname, NULL,
+ RELKIND_RELATION);
+ pgstrom_shadow_relation_rename(stmt->subname, cur_relname, NULL,
+ stmt->newname, cur_relname, NULL,
+ RELKIND_INDEX);
+
+ /* Rename column-store and its index */
+ pg_attr = heap_open(AttributeRelationId, AccessShareLock);
+
+ ScanKeyInit(&akey[0],
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(HeapTupleGetOid(tuple)));
+ ScanKeyInit(&akey[1],
+ Anum_pg_attribute_attnum,
+ BTGreaterEqualStrategyNumber, F_INT2GE,
+ Int16GetDatum(InvalidAttrNumber));
+
+ ascan = systable_beginscan(pg_attr, AttributeRelidNumIndexId, true,
+ SnapshotSelf, 2, akey);
+ while (HeapTupleIsValid(atup = systable_getnext(ascan)))
+ {
+ Form_pg_attribute attr = (Form_pg_attribute) GETSTRUCT(atup);
+
+ if (attr->attisdropped)
+ continue;
+
+ pgstrom_shadow_relation_rename(stmt->subname, cur_relname,
+ NameStr(attr->attname),
+ stmt->newname, cur_relname,
+ NameStr(attr->attname),
+ RELKIND_RELATION);
+ pgstrom_shadow_relation_rename(stmt->subname, cur_relname,
+ NameStr(attr->attname),
+ stmt->newname, cur_relname,
+ NameStr(attr->attname),
+ RELKIND_INDEX);
+ }
+ systable_endscan(ascan);
+ heap_close(pg_attr, AccessShareLock);
+
+ /* Rename rowid generator sequence */
+ pgstrom_shadow_relation_rename(stmt->subname, cur_relname, NULL,
+ stmt->newname, cur_relname, NULL,
+ RELKIND_SEQUENCE);
+ }
+ heap_endscan(scan);
+ heap_close(pg_class, RowExclusiveLock);
+ /* Make changes visible */
+ CommandCounterIncrement();
+}
+
+/*
+ * pgstrom_post_rename_table
+ *
+ * It is a post-fixup routine to handle ALTER TABLE ... RENAME TO on
+ * the foreign table managed by PG-Strom. It replaces portion of the
+ * relation name on all the underlying shadow relations.
+ */
static void
-pgstrom_process_post_alter_rename(void)
-{}
+pgstrom_post_rename_table(RenameStmt *stmt, Oid base_relid)
+{
+ char *cur_nspname;
+ char *old_relname;
+ Relation pg_attr;
+ ScanKeyData skey[2];
+ SysScanDesc scan;
+ HeapTuple tuple;
+
+ Assert(stmt->renameType == OBJECT_FOREIGN_TABLE);
+ Assert(stmt->relation->relname != NULL);
+ /* Ensure the changes to base relation being visible */
+ CommandCounterIncrement();
+
+ cur_nspname = get_namespace_name(get_rel_namespace(base_relid));
+ if (!cur_nspname)
+ elog(ERROR, "cache lookup failed for schema of table %u", base_relid);
+ old_relname = stmt->relation->relname;
+
+ /* Rename rowid table and its index */
+ pgstrom_shadow_relation_rename(cur_nspname, old_relname, NULL,
+ cur_nspname, stmt->newname, NULL,
+ RELKIND_RELATION);
+ pgstrom_shadow_relation_rename(cur_nspname, old_relname, NULL,
+ cur_nspname, stmt->newname, NULL,
+ RELKIND_INDEX);
+
+ /* Rename column-store and its index */
+ pg_attr = heap_open(AttributeRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(base_relid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_attribute_attnum,
+ BTGreaterEqualStrategyNumber, F_INT2GE,
+ Int16GetDatum(InvalidAttrNumber));
+
+ scan = systable_beginscan(pg_attr, AttributeRelidNumIndexId, true,
+ SnapshotNow, 2, skey);
+
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ Form_pg_attribute attr = (Form_pg_attribute) GETSTRUCT(tuple);
+
+ if (attr->attisdropped)
+ continue;
+
+ pgstrom_shadow_relation_rename(cur_nspname, old_relname,
+ NameStr(attr->attname),
+ cur_nspname, stmt->newname,
+ NameStr(attr->attname),
+ RELKIND_RELATION);
+ pgstrom_shadow_relation_rename(cur_nspname, old_relname,
+ NameStr(attr->attname),
+ cur_nspname, stmt->newname,
+ NameStr(attr->attname),
+ RELKIND_INDEX);
+ }
+ systable_endscan(scan);
+ heap_close(pg_attr, AccessShareLock);
+
+ /* Rename rowid generator sequence */
+ pgstrom_shadow_relation_rename(cur_nspname, old_relname, NULL,
+ cur_nspname, stmt->newname, NULL,
+ RELKIND_SEQUENCE);
+ /* Make changes visible */
+ CommandCounterIncrement();
+
+ pfree(cur_nspname);
+}
+
+/*
+ * pgstrom_post_rename_table
+ *
+ * It is a post-fixup routine to handle ALTER TABLE ... RENAME TO on
+ * column of the foreign table managed by PG-Strom. It replaces portion
+ * of the relation name on all the underlying shadow relations.
+ */
static void
-pgstrom_process_post_alter_owner(void)
-{}
+pgstrom_post_rename_column(RenameStmt *stmt, Oid base_relid)
+{
+ char *cur_nspname;
+ char *cur_relname;
+
+ Assert(stmt->renameType == OBJECT_COLUMN);
+ Assert(stmt->relation->relname != NULL);
+ Assert(stmt->subname != NULL);
+
+ /* Ensure the changes to base relation being visible */
+ CommandCounterIncrement();
+
+ cur_nspname = get_namespace_name(get_rel_namespace(base_relid));
+ if (!cur_nspname)
+ elog(ERROR, "cache lookup failed for schema of table %u", base_relid);
+ cur_relname = stmt->relation->relname;
+
+ /* Rename column-store and its index */
+ pgstrom_shadow_relation_rename(cur_nspname, cur_relname, stmt->subname,
+ cur_nspname, cur_relname, stmt->newname,
+ RELKIND_RELATION);
+ pgstrom_shadow_relation_rename(cur_nspname, cur_relname, stmt->subname,
+ cur_nspname, cur_relname, stmt->newname,
+ RELKIND_INDEX);
+ /* Make changes visible */
+ CommandCounterIncrement();
+
+ pfree(cur_nspname);
+}
/*
* pgstrom_process_utility_command
@@ -423,44 +781,131 @@ pgstrom_process_post_alter_owner(void)
* Entrypoint of the ProcessUtility hook; that handles post DDL operations.
*/
static void
-pgstrom_process_utility_command(Node *stmt,
+pgstrom_process_utility_command(Node *parsetree,
const char *queryString,
ParamListInfo params,
bool isTopLevel,
DestReceiver *dest,
char *completionTag)
{
+ ForeignTable *ft;
+ ForeignServer *fs;
+ ForeignDataWrapper *fdw;
+ Oid base_nspid = InvalidOid;
+ Oid base_relid = InvalidOid;
+
+ /*
+ * XXX - Preparation of ProcessUtility; Some of operation will changes
+ * name of the foreign table being altered and managed by PG-Strom, so
+ * we open the relation prior to update of system catalog.
+ */
+ switch (nodeTag(parsetree))
+ {
+ case T_AlterObjectSchemaStmt:
+ {
+ AlterObjectSchemaStmt *stmt
+ = (AlterObjectSchemaStmt *)parsetree;
+ if (stmt->objectType == OBJECT_FOREIGN_TABLE)
+ {
+ base_relid = RangeVarGetRelid(stmt->relation,
+ AccessExclusiveLock,
+ true);
+ base_nspid = get_rel_namespace(base_relid);
+ }
+ }
+ break;
+
+ case T_RenameStmt:
+ {
+ RenameStmt *stmt = (RenameStmt *)parsetree;
+ if (stmt->renameType == OBJECT_SCHEMA)
+ base_nspid = get_namespace_oid(stmt->subname, true);
+ else if (stmt->renameType == OBJECT_FOREIGN_TABLE ||
+ stmt->renameType == OBJECT_COLUMN)
+ base_relid = RangeVarGetRelid(stmt->relation,
+ AccessExclusiveLock,
+ true);
+ }
+ break;
+
+ case T_AlterTableStmt:
+ break;
+
+ default:
+ /* no preparations for other commands */
+ break;
+ }
+
+ /*
+ * Call the original ProcessUtility
+ */
if (next_process_utility_hook)
- (*next_process_utility_hook)(stmt, queryString, params,
+ (*next_process_utility_hook)(parsetree, queryString, params,
isTopLevel, dest, completionTag);
else
- standard_ProcessUtility(stmt, queryString, params,
+ standard_ProcessUtility(parsetree, queryString, params,
isTopLevel, dest, completionTag);
+
/*
- * Do we need post ddl works?
+ * Post ProcessUtility Stuffs
*/
- if (IsA(stmt, CreateForeignTableStmt))
- {
- CreateForeignTableStmt *cfts = (CreateForeignTableStmt *)stmt;
- ForeignServer *fs;
- ForeignDataWrapper *fdw;
-
- fs = GetForeignServerByName(cfts->servername, false);
- fdw = GetForeignDataWrapper(fs->fdwid);
- if (GetFdwRoutine(fdw->fdwhandler) == &pgstromFdwHandlerData)
- pgstrom_process_post_create(cfts->base.relation);
- }
- else if (IsA(stmt, AlterObjectSchemaStmt))
- {
- pgstrom_process_post_alter_schema();
- }
- else if (IsA(stmt, RenameStmt))
- {
- pgstrom_process_post_alter_rename();
- }
- else if (IsA(stmt, AlterOwnerStmt))
+ switch (nodeTag(parsetree))
{
- pgstrom_process_post_alter_owner();
+ case T_CreateForeignTableStmt:
+ {
+ CreateForeignTableStmt *stmt
+ = (CreateForeignTableStmt *)parsetree;
+
+ /* Is a foreign table managed by PG-Strom? */
+ fs = GetForeignServerByName(stmt->servername, false);
+ fdw = GetForeignDataWrapper(fs->fdwid);
+ if (GetFdwRoutine(fdw->fdwhandler) == &pgstromFdwHandlerData)
+ pgstrom_post_create_foreign_table(stmt);
+ }
+ break;
+
+ case T_AlterObjectSchemaStmt:
+ {
+ AlterObjectSchemaStmt *stmt
+ = (AlterObjectSchemaStmt *)parsetree;
+
+ if (!OidIsValid(base_relid))
+ break;
+
+ ft = GetForeignTable(base_relid);
+ fs = GetForeignServer(ft->serverid);
+ fdw = GetForeignDataWrapper(fs->fdwid);
+ if (GetFdwRoutine(fdw->fdwhandler) == &pgstromFdwHandlerData)
+ pgstrom_post_alter_schema(stmt, base_nspid, base_relid);
+ }
+ break;
+
+ case T_RenameStmt:
+ {
+ RenameStmt *stmt = (RenameStmt *)parsetree;
+
+ if (OidIsValid(base_nspid))
+ pgstrom_post_rename_schema(stmt, base_nspid);
+ else if (OidIsValid(base_relid))
+ {
+ ft = GetForeignTable(base_relid);
+ fs = GetForeignServer(ft->serverid);
+ fdw = GetForeignDataWrapper(fs->fdwid);
+ if (GetFdwRoutine(fdw->fdwhandler)==&pgstromFdwHandlerData)
+ {
+ if (stmt->renameType == OBJECT_FOREIGN_TABLE)
+ pgstrom_post_rename_table(stmt, base_relid);
+ else
+ pgstrom_post_rename_column(stmt, base_relid);
+ }
+ }
+ }
+ break;
+
+ case T_AlterTableStmt:
+ default:
+ /* do nothing */
+ break;
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.