Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implement support for foreign data wrappers, servers and user mappings.

 * docs/foreign.rst: Document new objects.
 * docs/index.rst: Add foreign.rst.
 * pyrseas/database.py (Database.Dicts.__init__): Add dictionaries for
   new objects.  (Database.from_map): Process new input objects.
   (Database.diff_map): Call diff_map's for new objects.
 * pyrseas/dbobject/foreign.py: Add classes for new objects.
 * pyrseas/testutils.py (PostgresDb.clear): Drop existing FDWs,
   servers and user mappings.
 * tests/dbobject/__init__.py: Invoke new test module.
 * tests/dbobject/test_foreign.py: New test module.
  • Loading branch information...
commit 171b62883f6621b965121f7603dc7172b6e12584 1 parent 4c2a65d
@jmafc jmafc authored
View
96 docs/foreign.rst
@@ -0,0 +1,96 @@
+Foreign Data Objects
+====================
+
+.. module:: pyrseas.dbobject.foreign
+
+The :mod:`foreign` module defines six classes: classes
+:class:`ForeignDataWrapper`, :class:`ForeignServer` and
+:class:`UserMapping` derived from :class:`DbObject`, and classes
+:class:`ForeignDataWrapperDict`, :class:`ForeignServerDict` and
+:class:`UserMappingDict` derived from :class:`DbObjectDict`.
+
+Foreign Data Wrapper
+--------------------
+
+:class:`ForeignDataWrapper` is derived from :class:`DbObject` and
+represents a `PostgreSQL foreign data wrapper
+<http://www.postgresql.org/docs/current/static/sql-createcreateforeigndatawrapper.html>`_.
+For PostgreSQL versions 9.1 and later see also `Foreign Data
+<http://www.postgresql.org/docs/current/static/ddl-foreign-data.html>`_
+and `Writing A Foreign Data Wrapper
+<http://www.postgresql.org/docs/current/static/fdwhandler.html>`_.
+
+.. autoclass:: ForeignDataWrapper
+
+.. automethod:: ForeignDataWrapper.create
+
+Foreign Data Wrapper Dictionary
+-------------------------------
+
+:class:`ForeignDataWrapperDict` is derived from
+:class:`~pyrseas.dbobject.DbObjectDict`. It is a dictionary that
+represents the collection of foreign data wrappers in a database.
+
+.. autoclass:: ForeignDataWrapperDict
+
+.. automethod:: ForeignDataWrapperDict.from_map
+
+.. automethod:: ForeignDataWrapperDict.to_map
+
+.. automethod:: ForeignDataWrapperDict.diff_map
+
+Foreign Server
+--------------
+
+:class:`ForeignServer` is derived from :class:`DbObject` and
+represents a `PostgreSQL foreign server
+<http://www.postgresql.org/docs/current/static/sql-createserver.html>`_.
+
+.. autoclass:: ForeignServer
+
+.. automethod:: ForeignServer.create
+
+Foreign Server Dictionary
+-------------------------
+
+:class:`ForeignServerDict` is derived from
+:class:`~pyrseas.dbobject.DbObjectDict`. It is a Python dictionary
+that represents the collection of foreign servers in a database.
+
+.. autoclass:: ForeignServerDict
+
+.. automethod:: ForeignServerDict.from_map
+
+.. automethod:: ForeignServerDict.to_map
+
+.. automethod:: ForeignServerDict.diff_map
+
+User Mapping
+------------
+
+:class:`UserMapping` is derived from :class:`DbObject` and represents
+a `PostgreSQL user mapping of a user to a foreign server
+<http://www.postgresql.org/docs/current/static/sql-createusermapping.html>`_.
+
+.. autoclass:: UserMapping
+
+.. automethod:: UserMapping.extern_key
+
+.. automethod:: UserMapping.identifier
+
+.. automethod:: UserMapping.create
+
+User Mapping Dictionary
+-----------------------
+
+:class:`UserMappingDict` is derived from
+:class:`~pyrseas.dbobject.DbObjectDict`. It is a dictionary that
+represents the collection of user mappings in a database.
+
+.. autoclass:: UserMappingDict
+
+.. automethod:: UserMappingDict.from_map
+
+.. automethod:: UserMappingDict.to_map
+
+.. automethod:: UserMappingDict.diff_map
View
1  docs/index.rst
@@ -76,6 +76,7 @@ classes and methods are documented mainly for developer use.
rule
trigger
textsearch
+ foreign
Indices and tables
View
23 pyrseas/database.py
@@ -27,6 +27,8 @@
from pyrseas.dbobject.conversion import ConversionDict
from pyrseas.dbobject.textsearch import TSConfigurationDict, TSDictionaryDict
from pyrseas.dbobject.textsearch import TSParserDict, TSTemplateDict
+from pyrseas.dbobject.foreign import ForeignDataWrapperDict
+from pyrseas.dbobject.foreign import ForeignServerDict, UserMappingDict
def flatten(lst):
@@ -69,6 +71,9 @@ def __init__(self, dbconn=None):
self.tsdicts = TSDictionaryDict(dbconn)
self.tsparsers = TSParserDict(dbconn)
self.tsconfigs = TSConfigurationDict(dbconn)
+ self.fdwrappers = ForeignDataWrapperDict(dbconn)
+ self.servers = ForeignServerDict(dbconn)
+ self.usermaps = UserMappingDict(dbconn)
def __init__(self, dbconn):
"""Initialize the database
@@ -137,6 +142,9 @@ def from_map(self, input_map):
input_schemas = {}
input_langs = {}
input_casts = {}
+ input_fdws = {}
+ input_fss = {}
+ input_ums = {}
for key in input_map.keys():
if key.startswith('schema '):
input_schemas.update({key: input_map[key]})
@@ -144,11 +152,20 @@ def from_map(self, input_map):
input_langs.update({key: input_map[key]})
elif key.startswith('cast '):
input_casts.update({key: input_map[key]})
+ elif key.startswith('foreign data wrapper '):
+ input_fdws.update({key: input_map[key]})
+ elif key.startswith('server '):
+ input_fss.update({key: input_map[key]})
+ elif key.startswith('user mapping for '):
+ input_ums.update({key: input_map[key]})
else:
raise KeyError("Expected typed object, found '%s'" % key)
self.ndb.languages.from_map(input_langs)
self.ndb.schemas.from_map(input_schemas, self.ndb)
self.ndb.casts.from_map(input_casts, self.ndb)
+ self.ndb.fdwrappers.from_map(input_fdws, self.ndb)
+ self.ndb.servers.from_map(input_fss, self.ndb)
+ self.ndb.usermaps.from_map(input_ums, self.ndb)
self._link_refs(self.ndb)
def to_map(self):
@@ -160,6 +177,9 @@ def to_map(self):
self.from_catalog()
dbmap = self.db.languages.to_map()
dbmap.update(self.db.casts.to_map())
+ dbmap.update(self.db.fdwrappers.to_map())
+ dbmap.update(self.db.servers.to_map())
+ dbmap.update(self.db.usermaps.to_map())
dbmap.update(self.db.schemas.to_map())
return dbmap
@@ -200,6 +220,9 @@ def diff_map(self, input_map, schemas=[]):
stmts.append(self.db.tsparsers.diff_map(self.ndb.tsparsers))
stmts.append(self.db.tsconfigs.diff_map(self.ndb.tsconfigs))
stmts.append(self.db.casts.diff_map(self.ndb.casts))
+ stmts.append(self.db.usermaps.diff_map(self.ndb.usermaps))
+ stmts.append(self.db.servers.diff_map(self.ndb.servers))
+ stmts.append(self.db.fdwrappers.diff_map(self.ndb.fdwrappers))
stmts.append(self.db.operators._drop())
stmts.append(self.db.operclasses._drop())
stmts.append(self.db.operfams._drop())
View
391 pyrseas/dbobject/foreign.py
@@ -0,0 +1,391 @@
+# -*- coding: utf-8 -*-
+"""
+ pyrseas.dbobject.foreign
+ ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This defines six classes: ForeignDataWrapper, ForeignServer and
+ UserMapping derived from DbObject, and ForeignDataWrapperDict,
+ ForeignServerDict and UserMappingDict derived from DbObjectDict.
+"""
+from pyrseas.dbobject import DbObjectDict, DbObject, quote_id
+
+
+class ForeignDataWrapper(DbObject):
+ """A foreign data wrapper definition"""
+
+ objtype = "FOREIGN DATA WRAPPER"
+
+ def create(self):
+ """Return SQL statements to CREATE the data wrapper
+
+ :return: SQL statements
+ """
+ clauses = []
+ for fnc in ['validator', 'handler']:
+ if hasattr(self, fnc):
+ clauses.append("%s %s" % (fnc.upper(), getattr(self, fnc)))
+ if hasattr(self, 'options'):
+ opts = []
+ for opt in self.options:
+ (nm, val) = opt.split('=')
+ opts.append("%s '%s'" % (nm, val))
+ clauses.append("OPTIONS (%s)" % ', '.join(opts))
+ stmts = ["CREATE FOREIGN DATA WRAPPER %s%s" % (
+ quote_id(self.name),
+ clauses and '\n ' + ',\n '.join(clauses) or '')]
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
+ return stmts
+
+
+QUERY_PRE91 = \
+ """SELECT fdwname AS name, CASE WHEN fdwvalidator = 0 THEN NULL
+ ELSE fdwvalidator::regproc END AS validator,
+ fdwoptions AS options,
+ obj_description(w.oid, 'pg_foreign_data_wrapper') AS
+ description
+ FROM pg_foreign_data_wrapper w
+ ORDER BY fdwname"""
+
+
+class ForeignDataWrapperDict(DbObjectDict):
+ "The collection of foreign data wrappers in a database"
+
+ cls = ForeignDataWrapper
+ query = \
+ """SELECT fdwname AS name, CASE WHEN fdwhandler = 0 THEN NULL
+ ELSE fdwhandler::regproc END AS handler,
+ CASE WHEN fdwvalidator = 0 THEN NULL
+ ELSE fdwvalidator::regproc END AS validator,
+ fdwoptions AS options,
+ obj_description(w.oid, 'pg_foreign_data_wrapper') AS
+ description
+ FROM pg_foreign_data_wrapper w
+ ORDER BY fdwname"""
+
+ def _from_catalog(self):
+ """Initialize the dictionary of wrappers by querying the catalogs"""
+ if self.dbconn.version < 90100:
+ self.query = QUERY_PRE91
+ super(ForeignDataWrapperDict, self)._from_catalog()
+
+ def from_map(self, inwrappers, newdb):
+ """Initialize the dictionary of wrappers by examining the input map
+
+ :param inwrappers: input YAML map defining the data wrappers
+ :param newdb: collection of dictionaries defining the database
+ """
+ for key in inwrappers.keys():
+ if not key.startswith('foreign data wrapper '):
+ raise KeyError("Unrecognized object type: %s" % key)
+ fdw = key[21:]
+ self[fdw] = wrapper = ForeignDataWrapper(name=fdw)
+ inwrapper = inwrappers[key]
+ if inwrapper:
+ for attr, val in inwrapper.items():
+ setattr(wrapper, attr, val)
+ if 'oldname' in inwrapper:
+ wrapper.oldname = inwrapper['oldname']
+ del inwrapper['oldname']
+ if 'description' in inwrapper:
+ wrapper.description = inwrapper['description']
+
+ def to_map(self):
+ """Convert the wrapper dictionary to a regular dictionary
+
+ :return: dictionary
+
+ Invokes the `to_map` method of each wrapper to construct a
+ dictionary of foreign data wrappers.
+ """
+ wrappers = {}
+ for fdw in self.keys():
+ wrappers.update(self[fdw].to_map())
+ return wrappers
+
+ def diff_map(self, inwrappers):
+ """Generate SQL to transform existing data wrappers
+
+ :param input_map: a YAML map defining the new data wrappers
+ :return: list of SQL statements
+
+ Compares the existing data wrapper definitions, as fetched from the
+ catalogs, to the input map and generates SQL statements to
+ transform the data wrappers accordingly.
+ """
+ stmts = []
+ # check input data wrappers
+ for fdw in inwrappers.keys():
+ infdw = inwrappers[fdw]
+ # does it exist in the database?
+ if fdw in self:
+ stmts.append(self[fdw].diff_map(infdw))
+ else:
+ # check for possible RENAME
+ if hasattr(infdw, 'oldname'):
+ oldname = infdw.oldname
+ try:
+ stmts.append(self[oldname].rename(infdw.name))
+ del self[oldname]
+ except KeyError as exc:
+ exc.args = ("Previous name '%s' for data wrapper "
+ "'%s' not found" % (oldname, infdw.name), )
+ raise
+ else:
+ # create new data wrapper
+ stmts.append(infdw.create())
+ # check database data wrappers
+ for fdw in self.keys():
+ # if missing, drop it
+ if fdw not in inwrappers:
+ stmts.append(self[fdw].drop())
+ return stmts
+
+
+class ForeignServer(DbObject):
+ """A foreign server definition"""
+
+ objtype = "SERVER"
+
+ def create(self):
+ """Return SQL statements to CREATE the server
+
+ :return: SQL statements
+ """
+ clauses = []
+ options = []
+ for opt in ['type', 'version']:
+ if hasattr(self, opt):
+ clauses.append("%s '%s'" % (opt.upper(), getattr(self, opt)))
+ if hasattr(self, 'options'):
+ opts = []
+ for opt in self.options:
+ (nm, val) = opt.split('=')
+ opts.append("%s '%s'" % (nm, val))
+ options.append("OPTIONS (%s)" % ', '.join(opts))
+ stmts = ["CREATE SERVER %s%s\n FOREIGN DATA WRAPPER %s%s" % (
+ quote_id(self.name),
+ clauses and ' ' + ' '.join(clauses) or '',
+ quote_id(self.wrapper),
+ options and '\n ' + ',\n '.join(options) or '')]
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
+ return stmts
+
+
+class ForeignServerDict(DbObjectDict):
+ "The collection of foreign servers in a database"
+
+ cls = ForeignServer
+ query = \
+ """SELECT srvname AS name, fdwname AS wrapper, srvtype AS type,
+ srvversion AS version, srvoptions AS options,
+ obj_description(s.oid, 'pg_foreign_server') AS description
+ FROM pg_foreign_server s
+ JOIN pg_foreign_data_wrapper w ON (srvfdw = w.oid)
+ ORDER BY srvname"""
+
+ def from_map(self, inservers, newdb):
+ """Initialize the dictionary of servers by examining the input map
+
+ :param inservers: input YAML map defining the foreign servers
+ :param newdb: collection of dictionaries defining the database
+ """
+ for key in inservers.keys():
+ if not key.startswith('server '):
+ raise KeyError("Unrecognized object type: %s" % key)
+ srv = key[7:]
+ self[srv] = serv = ForeignServer(name=srv)
+ inserv = inservers[key]
+ if inserv:
+ for attr, val in inserv.items():
+ setattr(serv, attr, val)
+ if 'oldname' in inserv:
+ serv.oldname = inserv['oldname']
+ del inserv['oldname']
+ if 'description' in inserv:
+ serv.description = inserv['description']
+
+ def to_map(self):
+ """Convert the server dictionary to a regular dictionary
+
+ :return: dictionary
+
+ Invokes the `to_map` method of each server to construct a
+ dictionary of foreign servers.
+ """
+ servers = {}
+ for srv in self.keys():
+ servers.update(self[srv].to_map())
+ return servers
+
+ def diff_map(self, inservers):
+ """Generate SQL to transform existing foreign servers
+
+ :param inservers: a YAML map defining the new foreign servers
+ :return: list of SQL statements
+
+ Compares the existing server definitions, as fetched from the
+ catalogs, to the input map and generates SQL statements to
+ transform the foreign servers accordingly.
+ """
+ stmts = []
+ # check input foreign servers
+ for srv in inservers.keys():
+ insrv = inservers[srv]
+ # does it exist in the database?
+ if srv in self:
+ stmts.append(self[srv].diff_map(insrv))
+ else:
+ # check for possible RENAME
+ if hasattr(insrv, 'oldname'):
+ oldname = insrv.oldname
+ try:
+ stmts.append(self[oldname].rename(insrv.name))
+ del self[oldname]
+ except KeyError as exc:
+ exc.args = ("Previous name '%s' for dictionary '%s' "
+ "not found" % (oldname, insrv.name), )
+ raise
+ else:
+ # create new dictionary
+ stmts.append(insrv.create())
+ # check database foreign servers
+ for srv in self.keys():
+ # if missing, drop it
+ if srv not in inservers:
+ stmts.append(self[srv].drop())
+ return stmts
+
+
+class UserMapping(DbObject):
+ """A user mapping definition"""
+
+ objtype = "USER MAPPING"
+
+ keylist = ['username', 'server']
+
+ def extern_key(self):
+ """Return the key to be used in external maps for this user mapping
+
+ :return: string
+ """
+ return '%s for %s server %s' % (self.objtype.lower(), self.username,
+ self.server)
+
+ def identifier(self):
+ """Return a full identifier for a user mapping object
+
+ :return: string
+ """
+ return "FOR %s SERVER %s" % (
+ self.username == 'PUBLIC' and 'PUBLIC' or quote_id(self.username),
+ quote_id(self.server))
+
+ def create(self):
+ """Return SQL statements to CREATE the user mapping
+
+ :return: SQL statements
+ """
+ options = []
+ if hasattr(self, 'options'):
+ opts = []
+ for opt in self.options:
+ (nm, val) = opt.split('=')
+ opts.append("%s '%s'" % (nm, val))
+ options.append("OPTIONS (%s)" % ', '.join(opts))
+ stmts = ["CREATE USER MAPPING FOR %s\n SERVER %s%s" % (
+ self.username == 'PUBLIC' and 'PUBLIC' or
+ quote_id(self.username), quote_id(self.server),
+ options and '\n ' + ',\n '.join(options) or '')]
+ return stmts
+
+
+class UserMappingDict(DbObjectDict):
+ "The collection of user mappings in a database"
+
+ cls = UserMapping
+ query = \
+ """SELECT CASE umuser WHEN 0 THEN 'PUBLIC' ELSE
+ pg_get_userbyid(umuser) END AS username, srvname AS server,
+ umoptions AS options
+ FROM pg_user_mapping u
+ JOIN pg_foreign_server s ON (umserver = s.oid)
+ ORDER BY umuser, srvname"""
+
+ def from_map(self, inusermaps, newdb):
+ """Initialize the dictionary of mappings by examining the input map
+
+ :param schema: schema owning the user mappings
+ :param inusermaps: input YAML map defining the user mappings
+ """
+ for key in inusermaps.keys():
+ if not key.startswith('user mapping for '):
+ raise KeyError("Unrecognized object type: %s" % key)
+ ump = key[17:]
+ if ' server ' not in ump:
+ raise KeyError("Unrecognized object type: %s" % key)
+ pos = ump.find(' server ')
+ usr = ump[:pos]
+ if usr.lower() == 'public':
+ usr = 'PUBLIC'
+ srv = ump[pos + 8:]
+ self[(usr, srv)] = usermap = UserMapping(username=usr, server=srv)
+ inusermap = inusermaps[key]
+ if inusermap:
+ for attr, val in inusermap.items():
+ setattr(usermap, attr, val)
+ if 'oldname' in inusermap:
+ usermap.oldname = inusermap['oldname']
+ del inusermap['oldname']
+
+ def to_map(self):
+ """Convert the user mapping dictionary to a regular dictionary
+
+ :return: dictionary
+
+ Invokes the `to_map` method of each mapping to construct a
+ dictionary of user mappings.
+ """
+ usermaps = {}
+ for um in self.keys():
+ usermaps.update(self[um].to_map())
+ return usermaps
+
+ def diff_map(self, inusermaps):
+ """Generate SQL to transform existing user mappings
+
+ :param input_map: a YAML map defining the new user mappings
+ :return: list of SQL statements
+
+ Compares the existing user mapping definitions, as fetched from the
+ catalogs, to the input map and generates SQL statements to
+ transform the user mappings accordingly.
+ """
+ stmts = []
+ # check input user mappings
+ for (usr, srv) in inusermaps.keys():
+ inump = inusermaps[(usr, srv)]
+ # does it exist in the database?
+ if (usr, srv) in self:
+ stmts.append(self[(usr, srv)].diff_map(inump))
+ else:
+ # check for possible RENAME
+ if hasattr(inump, 'oldname'):
+ oldname = inump.oldname
+ try:
+ stmts.append(self[oldname].rename(inump.name))
+ del self[oldname]
+ except KeyError as exc:
+ exc.args = ("Previous name '%s' for user mapping '%s' "
+ "not found" % (oldname, inump.name), )
+ raise
+ else:
+ # create new user mapping
+ stmts.append(inump.create())
+ # check database user mappings
+ for (usr, srv) in self.keys():
+ # if missing, drop it
+ if (usr, srv) not in inusermaps:
+ stmts.append(self[(usr, srv)].drop())
+ return stmts
View
35 pyrseas/testutils.py
@@ -249,6 +249,41 @@ def clear(self):
cnv[0], cnv[1]))
self.conn.commit()
+ # User mappings
+ curs = pgexecute(
+ self.conn,
+ """SELECT CASE umuser WHEN 0 THEN 'PUBLIC' ELSE
+ pg_get_userbyid(umuser) END AS username, srvname
+ FROM pg_user_mapping u
+ JOIN pg_foreign_server s ON (umserver = s.oid)""")
+ umaps = curs.fetchall()
+ curs.close()
+ self.conn.rollback()
+ for ump in umaps:
+ self.execute("DROP USER MAPPING IF EXISTS FOR %s SERVER %s" % (
+ ump[0], ump[1]))
+ self.conn.commit()
+
+ # Servers
+ curs = pgexecute(self.conn, "SELECT srvname FROM pg_foreign_server")
+ servs = curs.fetchall()
+ curs.close()
+ self.conn.rollback()
+ for srv in servs:
+ self.execute("DROP SERVER IF EXISTS %s CASCADE" % (srv[0]))
+ self.conn.commit()
+
+ # Foreign data wrappers
+ curs = pgexecute(self.conn,
+ "SELECT fdwname FROM pg_foreign_data_wrapper")
+ fdws = curs.fetchall()
+ curs.close()
+ self.conn.rollback()
+ for fdw in fdws:
+ self.execute("DROP FOREIGN DATA WRAPPER IF EXISTS %s CASCADE" % (
+ fdw[0]))
+ self.conn.commit()
+
def drop(self):
"Drop the database"
conn = pgconnect(ADMIN_DB, self.user, self.host, self.port)
View
2  tests/dbobject/__init__.py
@@ -22,6 +22,7 @@
import test_rule
import test_conversion
import test_textsearch
+import test_foreign
def suite():
@@ -45,6 +46,7 @@ def suite():
tests.addTest(test_rule.suite())
tests.addTest(test_conversion.suite())
tests.addTest(test_textsearch.suite())
+ tests.addTest(test_foreign.suite())
return tests
if __name__ == '__main__':
View
285 tests/dbobject/test_foreign.py
@@ -0,0 +1,285 @@
+# -*- coding: utf-8 -*-
+"""Test text search objects"""
+
+import unittest
+
+from pyrseas.testutils import PyrseasTestCase, fix_indent
+
+CREATE_FDW_STMT = "CREATE FOREIGN DATA WRAPPER fdw1"
+CREATE_FS_STMT = "CREATE SERVER fs1 FOREIGN DATA WRAPPER fdw1"
+CREATE_UM_STMT = "CREATE USER MAPPING FOR PUBLIC SERVER fs1"
+DROP_FDW_STMT = "DROP FOREIGN DATA WRAPPER IF EXISTS fdw1"
+DROP_FS_STMT = "DROP SERVER IF EXISTS fs1"
+DROP_UM_STMT = "DROP USER MAPPING IF EXISTS FOR PUBLIC SERVER fs1"
+COMMENT_FDW_STMT = "COMMENT ON FOREIGN DATA WRAPPER fdw1 IS " \
+ "'Test foreign data wrapper fdw1'"
+COMMENT_FS_STMT = "COMMENT ON SERVER fs1 IS 'Test server fs1'"
+
+
+class ForeignDataWrapperToMapTestCase(PyrseasTestCase):
+ """Test mapping of existing foreign data wrappers"""
+
+ def test_map_fd_wrapper(self):
+ "Map an existing foreign data wrapper"
+ dbmap = self.db.execute_and_map(CREATE_FDW_STMT)
+ self.assertEqual(dbmap['foreign data wrapper fdw1'], {})
+
+ def test_map_wrapper_validator(self):
+ "Map a foreign data wrapper with a validator function"
+ dbmap = self.db.execute_and_map("CREATE FOREIGN DATA WRAPPER fdw1 "
+ "VALIDATOR postgresql_fdw_validator")
+ self.assertEqual(dbmap['foreign data wrapper fdw1'], {
+ 'validator': 'postgresql_fdw_validator'})
+
+ def test_map_wrapper_options(self):
+ "Map a foreign data wrapper with options"
+ dbmap = self.db.execute_and_map("CREATE FOREIGN DATA WRAPPER fdw1 "
+ "OPTIONS (debug 'true')")
+ self.assertEqual(dbmap['foreign data wrapper fdw1'], {
+ 'options': ['debug=true']})
+
+ def test_map_fd_wrapper_comment(self):
+ "Map a foreign data wrapper with a comment"
+ if self.db.version < 90100:
+ return True
+ self.db.execute(CREATE_FDW_STMT)
+ dbmap = self.db.execute_and_map(COMMENT_FDW_STMT)
+ self.assertEqual(dbmap['foreign data wrapper fdw1']['description'],
+ 'Test foreign data wrapper fdw1')
+
+
+class ForeignDataWrapperToSqlTestCase(PyrseasTestCase):
+ """Test SQL generation for input foreign data wrappers"""
+
+ def test_create_fd_wrapper(self):
+ "Create a foreign data wrapper that didn't exist"
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_FDW_STMT)
+
+ def test_create_wrapper_validator(self):
+ "Create a foreign data wrapper with a validator function"
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {
+ 'validator': 'postgresql_fdw_validator'}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]),
+ "CREATE FOREIGN DATA WRAPPER fdw1 "
+ "VALIDATOR postgresql_fdw_validator")
+
+ def test_create_wrapper_options(self):
+ "Create a foreign data wrapper with options"
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {
+ 'options': ['debug=true']}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]),
+ "CREATE FOREIGN DATA WRAPPER fdw1 "
+ "OPTIONS (debug 'true')")
+
+ def test_bad_map_fd_wrapper(self):
+ "Error creating a foreign data wrapper with a bad map"
+ inmap = self.std_map()
+ inmap.update({'fdw1': {}})
+ self.assertRaises(KeyError, self.db.process_map, inmap)
+
+ def test_drop_fd_wrapper(self):
+ "Drop an existing foreign data wrapper"
+ self.db.execute_commit(CREATE_FDW_STMT)
+ dbsql = self.db.process_map(self.std_map())
+ self.assertEqual(dbsql[0], "DROP FOREIGN DATA WRAPPER fdw1")
+
+ def test_comment_on_fd_wrapper(self):
+ "Create a comment for an existing foreign data wrapper"
+ self.db.execute_commit(CREATE_FDW_STMT)
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {
+ 'description': "Test foreign data wrapper fdw1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql[0], COMMENT_FDW_STMT)
+
+
+class ForeignServerToMapTestCase(PyrseasTestCase):
+ """Test mapping of existing foreign servers"""
+
+ def test_map_server(self):
+ "Map an existing foreign server"
+ self.db.execute(CREATE_FDW_STMT)
+ dbmap = self.db.execute_and_map(CREATE_FS_STMT)
+ self.assertEqual(dbmap['server fs1'], {'wrapper': 'fdw1'})
+
+ def test_map_server_type_version(self):
+ "Map a foreign server with type and version"
+ self.db.execute(CREATE_FDW_STMT)
+ dbmap = self.db.execute_and_map(
+ "CREATE SERVER fs1 TYPE 'test' VERSION '1.0' "
+ "FOREIGN DATA WRAPPER fdw1")
+ self.assertEqual(dbmap['server fs1'], {
+ 'wrapper': 'fdw1', 'type': 'test', 'version': '1.0'})
+
+ def test_map_server_options(self):
+ "Map a foreign server with options"
+ self.db.execute(CREATE_FDW_STMT)
+ dbmap = self.db.execute_and_map(
+ "CREATE SERVER fs1 FOREIGN DATA WRAPPER fdw1 "
+ "OPTIONS (dbname 'test')")
+ self.assertEqual(dbmap['server fs1'], {'wrapper': 'fdw1',
+ 'options': ['dbname=test']})
+
+ def test_map_server_comment(self):
+ "Map a foreign server with a comment"
+ if self.db.version < 90100:
+ return True
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute(CREATE_FS_STMT)
+ dbmap = self.db.execute_and_map(COMMENT_FS_STMT)
+ self.assertEqual(dbmap['server fs1']['description'], 'Test server fs1')
+
+
+class ForeignServerToSqlTestCase(PyrseasTestCase):
+ """Test SQL generation for input foreign servers"""
+
+ def test_create_server(self):
+ "Create a foreign server that didn't exist"
+ self.db.execute_commit(CREATE_FDW_STMT)
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {},
+ 'server fs1': {'wrapper': 'fdw1'}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_FS_STMT)
+
+ def test_create_server_type_version(self):
+ "Create a foreign server with type and version"
+ self.db.execute_commit(CREATE_FDW_STMT)
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {},
+ 'server fs1': {'wrapper': 'fdw1', 'type': 'test',
+ 'version': '1.0'}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]),
+ "CREATE SERVER fs1 TYPE 'test' VERSION '1.0' "
+ "FOREIGN DATA WRAPPER fdw1")
+
+ def test_create_server_options(self):
+ "Create a foreign server with options"
+ self.db.execute_commit(CREATE_FDW_STMT)
+ dbmap = self.db.execute_and_map(
+ "CREATE SERVER fs1 FOREIGN DATA WRAPPER fdw1 "
+ "OPTIONS (dbname 'test')")
+ self.assertEqual(dbmap['server fs1'], {'wrapper': 'fdw1',
+ 'options': ['dbname=test']})
+
+ def test_bad_map_server(self):
+ "Error creating a foreign server with a bad map"
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {},
+ 'fs1': {'wrapper': 'fdw1'}})
+ self.assertRaises(KeyError, self.db.process_map, inmap)
+
+ def test_drop_server(self):
+ "Drop an existing foreign server"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute_commit(CREATE_FS_STMT)
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql[0], "DROP SERVER fs1")
+
+ def test_drop_server_wrapper(self):
+ "Drop an existing foreign data wrapper"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute_commit(CREATE_FS_STMT)
+ dbsql = self.db.process_map(self.std_map())
+ self.assertEqual(dbsql[0], "DROP SERVER fs1")
+ self.assertEqual(dbsql[1], "DROP FOREIGN DATA WRAPPER fdw1")
+
+ def test_comment_on_server(self):
+ "Create a comment for an existing foreign server"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute_commit(CREATE_FS_STMT)
+ inmap = self.std_map()
+ inmap.update({'foreign data wrapper fdw1': {},
+ 'server fs1': {'wrapper': 'fdw1',
+ 'description': "Test server fs1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [COMMENT_FS_STMT])
+
+
+class UserMappingToMapTestCase(PyrseasTestCase):
+ """Test mapping of existing user mappings"""
+
+ def test_map_user_mapping(self):
+ "Map an existing user mapping"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute(CREATE_FS_STMT)
+ dbmap = self.db.execute_and_map(CREATE_UM_STMT)
+ self.assertEqual(dbmap['user mapping for PUBLIC server fs1'], {})
+
+ def test_map_user_mapping_options(self):
+ "Map a user mapping with options"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute(CREATE_FS_STMT)
+ dbmap = self.db.execute_and_map(
+ "CREATE USER MAPPING FOR PUBLIC SERVER fs1 "
+ "OPTIONS (user 'john', password 'doe')")
+ self.assertEqual(dbmap['user mapping for PUBLIC server fs1'], {
+ 'options': ['user=john', 'password=doe']})
+
+
+class UserMappingToSqlTestCase(PyrseasTestCase):
+ """Test SQL generation for input user mappings"""
+
+ def test_create_user_mapping(self):
+ "Create a user mapping that didn't exist"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute_commit(CREATE_FS_STMT)
+ inmap = self.std_map()
+ inmap.update({'user mapping for PUBLIC server fs1': {}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_UM_STMT)
+
+ def test_create_user_mapping_options(self):
+ "Create a user mapping with options"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute_commit(CREATE_FS_STMT)
+ inmap = self.std_map()
+ inmap.update({'user mapping for PUBLIC server fs1': {
+ 'options': ['user=john', 'password=doe']}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]),
+ "CREATE USER MAPPING FOR PUBLIC SERVER fs1 "
+ "OPTIONS (user 'john', password 'doe')")
+
+ def test_bad_map_user_mapping(self):
+ "Error creating a user mapping with a bad map"
+ inmap = self.std_map()
+ inmap.update({'PUBLIC server fs1': {}})
+ self.assertRaises(KeyError, self.db.process_map, inmap)
+
+ def test_drop_user_mapping(self):
+ "Drop an existing user mapping"
+ self.db.execute(CREATE_FDW_STMT)
+ self.db.execute(CREATE_FS_STMT)
+ self.db.execute_commit(CREATE_UM_STMT)
+ dbsql = self.db.process_map(self.std_map())
+ self.assertEqual(dbsql[0], "DROP USER MAPPING FOR PUBLIC SERVER fs1")
+
+
+def suite():
+ tests = unittest.TestLoader().loadTestsFromTestCase(
+ ForeignDataWrapperToMapTestCase)
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ ForeignDataWrapperToSqlTestCase))
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ ForeignServerToMapTestCase))
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ ForeignServerToSqlTestCase))
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ UserMappingToMapTestCase))
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ UserMappingToSqlTestCase))
+ return tests
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')
Please sign in to comment.
Something went wrong with that request. Please try again.