Permalink
Browse files

Support writable foreign tables.

This patch adds the core-system infrastructure needed to support updates
on foreign tables, and extends contrib/postgres_fdw to allow updates
against remote Postgres servers.  There's still a great deal of room for
improvement in optimization of remote updates, but at least there's basic
functionality there now.

KaiGai Kohei, reviewed by Alexander Korotkov and Laurenz Albe, and rather
heavily revised by Tom Lane.
  • Loading branch information...
1 parent 7f49a67 commit 21734d2fb896e0ecdddd3251caa72a3576e2d415 @tglsfdc tglsfdc committed Mar 10, 2013
@@ -118,7 +118,6 @@ SELECT tableoid::regclass, b FROM agg_csv;
INSERT INTO agg_csv VALUES(1,2.0);
UPDATE agg_csv SET a = 1;
DELETE FROM agg_csv WHERE a = 100;
-SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
-- but this should be ignored
SELECT * FROM agg_csv FOR UPDATE;
@@ -185,15 +185,11 @@ SELECT tableoid::regclass, b FROM agg_csv;
-- updates aren't supported
INSERT INTO agg_csv VALUES(1,2.0);
-ERROR: cannot change foreign table "agg_csv"
+ERROR: cannot insert into foreign table "agg_csv"
UPDATE agg_csv SET a = 1;
-ERROR: cannot change foreign table "agg_csv"
+ERROR: cannot update foreign table "agg_csv"
DELETE FROM agg_csv WHERE a = 100;
-ERROR: cannot change foreign table "agg_csv"
-SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
-ERROR: row-level locks cannot be used with foreign table "agg_csv"
-LINE 1: SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
- ^
+ERROR: cannot delete from foreign table "agg_csv"
-- but this should be ignored
SELECT * FROM agg_csv FOR UPDATE;
a | b
@@ -47,15 +47,18 @@ typedef struct ConnCacheEntry
PGconn *conn; /* connection to foreign server, or NULL */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
+ bool have_prep_stmt; /* have we prepared any stmts in this xact? */
+ bool have_error; /* have any subxacts aborted in this xact? */
} ConnCacheEntry;
/*
* Connection cache (initialized on first use)
*/
static HTAB *ConnectionHash = NULL;
-/* for assigning cursor numbers */
+/* for assigning cursor numbers and prepared statement numbers */
static unsigned int cursor_number = 0;
+static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
@@ -78,6 +81,10 @@ static void pgfdw_subxact_callback(SubXactEvent event,
* if we don't already have a suitable one, and a transaction is opened at
* the right subtransaction nesting depth if we didn't do that already.
*
+ * will_prep_stmt must be true if caller intends to create any prepared
+ * statements. Since those don't go away automatically at transaction end
+ * (not even on error), we need this flag to cue manual cleanup.
+ *
* XXX Note that caching connections theoretically requires a mechanism to
* detect change of FDW objects to invalidate already established connections.
* We could manage that by watching for invalidation events on the relevant
@@ -86,7 +93,8 @@ static void pgfdw_subxact_callback(SubXactEvent event,
* mid-transaction anyway.
*/
PGconn *
-GetConnection(ForeignServer *server, UserMapping *user)
+GetConnection(ForeignServer *server, UserMapping *user,
+ bool will_prep_stmt)
{
bool found;
ConnCacheEntry *entry;
@@ -131,6 +139,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
/* initialize new hashtable entry (key is already filled in) */
entry->conn = NULL;
entry->xact_depth = 0;
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
}
/*
@@ -147,6 +157,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
if (entry->conn == NULL)
{
entry->xact_depth = 0; /* just to be sure */
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
entry->conn, server->servername);
@@ -157,6 +169,9 @@ GetConnection(ForeignServer *server, UserMapping *user)
*/
begin_remote_xact(entry);
+ /* Remember if caller will prepare statements */
+ entry->have_prep_stmt |= will_prep_stmt;
+
return entry->conn;
}
@@ -394,12 +409,30 @@ GetCursorNumber(PGconn *conn)
}
/*
+ * Assign a "unique" number for a prepared statement.
+ *
+ * This works much like GetCursorNumber, except that we never reset the counter
+ * within a session. That's because we can't be 100% sure we've gotten rid
+ * of all prepared statements on all connections, and it's not really worth
+ * increasing the risk of prepared-statement name collisions by resetting.
+ */
+unsigned int
+GetPrepStmtNumber(PGconn *conn)
+{
+ return ++prep_stmt_number;
+}
+
+/*
* Report an error we got from the remote server.
*
* elevel: error level to use (typically ERROR, but might be less)
* res: PGresult containing the error
* clear: if true, PQclear the result (otherwise caller will handle it)
* sql: NULL, or text of remote command we tried to execute
+ *
+ * Note: callers that choose not to throw ERROR for a remote error are
+ * responsible for making sure that the associated ConnCacheEntry gets
+ * marked with have_error = true.
*/
void
pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
@@ -480,6 +513,22 @@ pgfdw_xact_callback(XactEvent event, void *arg)
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION");
PQclear(res);
+
+ /*
+ * If there were any errors in subtransactions, and we made
+ * prepared statements, do a DEALLOCATE ALL to make sure we
+ * get rid of all prepared statements. This is annoying and
+ * not terribly bulletproof, but it's probably not worth
+ * trying harder. We intentionally ignore any errors in the
+ * DEALLOCATE.
+ */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
break;
case XACT_EVENT_PRE_PREPARE:
@@ -502,14 +551,26 @@ pgfdw_xact_callback(XactEvent event, void *arg)
elog(ERROR, "missed cleaning up connection during pre-commit");
break;
case XACT_EVENT_ABORT:
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, true,
"ABORT TRANSACTION");
else
+ {
PQclear(res);
+ /* As above, make sure we've cleared any prepared stmts */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
break;
}
@@ -593,6 +654,8 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
else
{
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
Oops, something went wrong.

0 comments on commit 21734d2

Please sign in to comment.