Permalink
Browse files

Add basic support for collations.

 * docs/collation.rst: New page to document collations.
 * docs/index.rst: Add collation.rst.
 * pyrseas/database.py (Database.Dicts.__init__): Add dictionary for
   collations.  (Database.link_refs): Add collations argument to
   schemas.link_refs call.  (Database._trim_objects): Add collations.
   (Database.diff_map): Call collations.diff_map.
 * pyrseas/dbobject/collation.py: New module to implement collations.
 * pyrseas/dbobject/schema.py (Schema.to_map): Process collations.
   (SchemaDict.from_map): Process input collations.
   (SchemaDict.link_refs): Add argument for collations.
 * pyrseas/testutils.py (PostgresDb.clear): Drop existing collations.
 * tests/dbobject/__init__.py: Invoke collation tests.
 * tests/dbobject/test_collation.py: New test module.
  • Loading branch information...
1 parent d71b663 commit 8120d1788d34aca8266989b14d7334f0cb499ec2 @jmafc jmafc committed Jun 21, 2012
View
@@ -0,0 +1,34 @@
+Collations
+==========
+
+.. module:: pyrseas.dbobject.collation
+
+The :mod:`collation` module defines two classes, :class:`Collation`
+and :class:`CollationDict`, derived from :class:`DbSchemaObject` and
+:class:`DbObjectDict`, respectively.
+
+Collation
+---------
+
+:class:`Collation` is derived from
+:class:`~pyrseas.dbobject.DbSchemaObject` and represents a `PostgreSQL
+collation
+<http://www.postgresql.org/docs/current/static/collation.html>`_
+(available on PostgreSQL 9.1 or later).
+
+.. autoclass:: Collation
+
+.. automethod:: Collation.create
+
+Collation Dictionary
+--------------------
+
+:class:`CollationDict` is derived from
+:class:`~pyrseas.dbobject.DbObjectDict`. It is a dictionary that
+represents the collection of collations in a database.
+
+.. autoclass:: CollationDict
+
+.. automethod:: CollationDict.from_map
+
+.. automethod:: CollationDict.diff_map
View
@@ -72,6 +72,7 @@ classes and methods are documented mainly for developer use.
cast
language
schema
+ collation
conversion
extension
function
View
@@ -32,6 +32,7 @@
from pyrseas.dbobject.foreign import ForeignServerDict, UserMappingDict
from pyrseas.dbobject.foreign import ForeignTableDict
from pyrseas.dbobject.extension import ExtensionDict
+from pyrseas.dbobject.collation import CollationDict
def flatten(lst):
@@ -99,6 +100,7 @@ def __init__(self, dbconn=None):
self.usermaps = UserMappingDict(dbconn)
self.ftables = ForeignTableDict(dbconn)
self.extensions = ExtensionDict(dbconn)
+ self.collations = CollationDict(dbconn)
def __init__(self, dbname, user=None, pswd=None, host=None, port=None):
"""Initialize the database
@@ -118,7 +120,8 @@ def _link_refs(self, db):
db.schemas.link_refs(db.types, db.tables, db.functions, db.operators,
db.operfams, db.operclasses, db.conversions,
db.tsconfigs, db.tsdicts, db.tsparsers,
- db.tstempls, db.ftables, db.extensions)
+ db.tstempls, db.ftables, db.extensions,
+ db.collations)
db.tables.link_refs(db.columns, db.constraints, db.indexes,
db.rules, db.triggers)
db.fdwrappers.link_refs(db.servers)
@@ -134,7 +137,8 @@ def _trim_objects(self, schemas):
for objtype in ['types', 'tables', 'constraints', 'indexes',
'functions', 'operators', 'operclasses', 'operfams',
'rules', 'triggers', 'conversions', 'tstempls',
- 'tsdicts', 'tsparsers', 'tsconfigs', 'extensions']:
+ 'tsdicts', 'tsparsers', 'tsconfigs', 'extensions',
+ 'collations']:
objdict = getattr(self.db, objtype)
for obj in list(objdict.keys()):
# obj[0] is the schema name in all these dicts
@@ -287,6 +291,7 @@ def diff_map(self, input_map, schemas=[]):
stmts.append(self.db.tsparsers.diff_map(self.ndb.tsparsers))
stmts.append(self.db.tsconfigs.diff_map(self.ndb.tsconfigs))
stmts.append(self.db.casts.diff_map(self.ndb.casts))
+ stmts.append(self.db.collations.diff_map(self.ndb.collations))
stmts.append(self.db.fdwrappers.diff_map(self.ndb.fdwrappers))
stmts.append(self.db.servers.diff_map(self.ndb.servers))
stmts.append(self.db.usermaps.diff_map(self.ndb.usermaps))
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+"""
+ pyrseas.dbobject.collation
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ This defines two classes, Collation and CollationDict, derived from
+ DbSchemaObject and DbObjectDict, respectively.
+"""
+from pyrseas.dbobject import DbObjectDict, DbSchemaObject
+
+
+class Collation(DbSchemaObject):
+ """A collation definition"""
+
+ keylist = ['schema', 'name']
+ objtype = "COLLATION"
+
+ def create(self):
+ """Return SQL statements to CREATE the collation
+
+ :return: SQL statements
+ """
+ stmts = []
+ stmts.append("CREATE COLLATION %s (\n LC_COLLATE = '%s',"
+ "\n LC_CTYPE = '%s')" % (
+ self.qualname(), self.lc_collate, self.lc_ctype))
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
+ return stmts
+
+
+class CollationDict(DbObjectDict):
+ "The collection of collations in a database."
+
+ cls = Collation
+ query = \
+ """SELECT nspname AS schema, collname AS name,
+ collcollate AS lc_collate, collctype AS lc_ctype,
+ obj_description(c.oid, 'pg_collation') AS description
+ FROM pg_collation c
+ JOIN pg_namespace n ON (collnamespace = n.oid)
+ WHERE (nspname != 'pg_catalog' AND nspname != 'information_schema')
+ ORDER BY nspname, collname"""
+
+ def from_map(self, schema, inmap):
+ """Initialize the dictionary of collations by examining the input map
+
+ :param schema: the schema owing the collations
+ :param inmap: the input YAML map defining the collations
+ """
+ for key in list(inmap.keys()):
+ if not key.startswith('collation '):
+ raise KeyError("Unrecognized object type: %s" % key)
+ cll = key[10:]
+ incoll = inmap[key]
+ coll = Collation(schema=schema.name, name=cll, **incoll)
+ if incoll:
+ if 'oldname' in incoll:
+ coll.oldname = incoll['oldname']
+ del incoll['oldname']
+ if 'description' in incoll:
+ coll.description = incoll['description']
+ self[(schema.name, cll)] = coll
+
+ def diff_map(self, incolls):
+ """Generate SQL to transform existing collations
+
+ :param incolls: a YAML map defining the new collations
+ :return: list of SQL statements
+
+ Compares the existing collation definitions, as fetched from
+ the catalogs, to the input map and generates SQL statements to
+ create, drop or change the collations accordingly.
+ """
+ stmts = []
+ # check input collations
+ for cll in list(incolls.keys()):
+ incoll = incolls[cll]
+ # does it exist in the database?
+ if cll in self:
+ stmts.append(self[cll].diff_map(incoll))
+ else:
+ # check for possible RENAME
+ if hasattr(incoll, 'oldname'):
+ oldname = incoll.oldname
+ try:
+ stmts.append(self[oldname].rename(incoll.name))
+ del self[oldname]
+ except KeyError as exc:
+ exc.args = ("Previous name '%s' for collation '%s' "
+ "not found" % (oldname, incoll.name), )
+ raise
+ else:
+ # create new collation
+ stmts.append(incoll.create())
+ # check database collations
+ for (sch, cll) in list(self.keys()):
+ # if missing, drop it
+ if (sch, cll) not in incolls:
+ stmts.append(self[(sch, cll)].drop())
+
+ return stmts
@@ -45,7 +45,7 @@ def mapper(schema, objtypes):
for objtypes in ['conversions', 'domains', 'ftables', 'functions',
'operators', 'operclasses', 'operfams', 'sequences',
'tsconfigs', 'tsdicts', 'tsparsers', 'tstempls',
- 'types', 'views', 'extensions']:
+ 'types', 'views', 'extensions', 'collations']:
schema[key].update(mapper(self, objtypes))
if hasattr(self, 'description'):
@@ -104,6 +104,7 @@ def from_map(self, inmap, newdb):
intsts = {}
inftbs = {}
inexts = {}
+ incolls = {}
for key in list(inschema.keys()):
if key.startswith('domain '):
intypes.update({key: inschema[key]})
@@ -135,6 +136,8 @@ def from_map(self, inmap, newdb):
inftbs.update({key: inschema[key]})
elif key.startswith('extension '):
inexts.update({key: inschema[key]})
+ elif key.startswith('collation '):
+ incolls.update({key: inschema[key]})
elif key == 'oldname':
schema.oldname = inschema[key]
elif key == 'description':
@@ -154,10 +157,11 @@ def from_map(self, inmap, newdb):
newdb.tsparsers.from_map(schema, intsps)
newdb.tsconfigs.from_map(schema, intscs)
newdb.ftables.from_map(schema, inftbs, newdb)
+ newdb.collations.from_map(schema, incolls)
def link_refs(self, dbtypes, dbtables, dbfunctions, dbopers, dbopfams,
dbopcls, dbconvs, dbtsconfigs, dbtsdicts, dbtspars,
- dbtstmpls, dbftables, dbexts):
+ dbtstmpls, dbftables, dbexts, dbcolls):
"""Connect types, tables and functions to their respective schemas
:param dbtypes: dictionary of types and domains
@@ -173,6 +177,7 @@ def link_refs(self, dbtypes, dbtables, dbfunctions, dbopers, dbopfams,
:param dbtstmpls: dictionary of text search templates
:param dbftables: dictionary of foreign tables
:param dbexts: dictionary of extensions
+ :param dbcolls: dictionary of collations
Fills in the `domains` dictionary for each schema by
traversing the `dbtypes` dictionary. Fills in the `tables`,
@@ -298,6 +303,13 @@ def link_refs(self, dbtypes, dbtables, dbfunctions, dbopers, dbopfams,
if not hasattr(schema, 'extensions'):
schema.extensions = {}
schema.extensions.update({ext: exten})
+ for (sch, cll) in list(dbcolls.keys()):
+ coll = dbcolls[(sch, cll)]
+ assert self[sch]
+ schema = self[sch]
+ if not hasattr(schema, 'collations'):
+ schema.collations = {}
+ schema.collations.update({cll: coll})
def to_map(self):
"""Convert the schema dictionary to a regular dictionary
View
@@ -267,6 +267,21 @@ def clear(self):
self.execute("DROP CONVERSION IF EXISTS %s CASCADE" % (cnv[0]))
self.conn.commit()
+ # Collations
+ if self.version >= 90100:
+ curs = pgexecute(
+ self.conn,
+ """SELECT collname FROM pg_collation c
+ JOIN pg_namespace n ON (collnamespace = n.oid)
+ WHERE nspname NOT IN (
+ 'pg_catalog', 'information_schema')""")
+ colls = curs.fetchall()
+ curs.close()
+ self.conn.rollback()
+ for coll in colls:
+ self.execute("DROP COLLATION IF EXISTS %s CASCADE" % coll[0])
+ self.conn.commit()
+
# User mappings
curs = pgexecute(
self.conn,
@@ -25,6 +25,7 @@
from tests.dbobject import test_foreign
from tests.dbobject import test_extension
from tests.dbobject import test_tablespace
+from tests.dbobject import test_collation
def suite():
@@ -51,6 +52,7 @@ def suite():
tests.addTest(test_foreign.suite())
tests.addTest(test_extension.suite())
tests.addTest(test_tablespace.suite())
+ tests.addTest(test_collation.suite())
return tests
if __name__ == '__main__':
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+"""Test collations
+
+These tests require that the locale fr_FR.utf8 be installed.
+"""
+
+import unittest
+
+from pyrseas.testutils import DatabaseToMapTestCase
+from pyrseas.testutils import InputMapToSqlTestCase, fix_indent
+
+CREATE_STMT = "CREATE COLLATION c1 (LC_COLLATE = 'fr_FR.utf8', " \
+ "LC_CTYPE = 'fr_FR.utf8')"
+COMMENT_STMT = "COMMENT ON COLLATION c1 IS 'Test collation c1'"
+
+
+class CollationToMapTestCase(DatabaseToMapTestCase):
+ """Test mapping of existing collations"""
+
+ def test_map_collation(self):
+ "Map a collation"
+ if self.db.version < 90100:
+ self.skipTest('Only available on PG 9.1')
+ dbmap = self.to_map([CREATE_STMT])
+ expmap = {'lc_collate': 'fr_FR.utf8', 'lc_ctype': 'fr_FR.utf8'}
+ self.assertEqual(dbmap['schema public']['collation c1'], expmap)
+
+ def test_map_collation_comment(self):
+ "Map a collation comment"
+ if self.db.version < 90100:
+ self.skipTest('Only available on PG 9.1')
+ dbmap = self.to_map([CREATE_STMT, COMMENT_STMT])
+ self.assertEqual(dbmap['schema public']['collation c1']
+ ['description'], 'Test collation c1')
+
+
+class CollationToSqlTestCase(InputMapToSqlTestCase):
+ """Test SQL generation from input collations"""
+
+ def test_create_collation(self):
+ "Create a collation"
+ if self.db.version < 90100:
+ self.skipTest('Only available on PG 9.1')
+ inmap = self.std_map()
+ inmap['schema public'].update({'collation c1': {
+ 'lc_collate': 'fr_FR.utf8', 'lc_ctype': 'fr_FR.utf8'}})
+ sql = self.to_sql(inmap)
+ self.assertEqual(fix_indent(sql[0]), CREATE_STMT)
+
+ def test_create_collation_schema(self):
+ "Create a collation in a non-public schema"
+ if self.db.version < 90100:
+ self.skipTest('Only available on PG 9.1')
+ inmap = self.std_map()
+ inmap.update({'schema s1': {'collation c1': {
+ 'lc_collate': 'fr_FR.utf8', 'lc_ctype': 'fr_FR.utf8'}}})
+ sql = self.to_sql(inmap, ["CREATE SCHEMA s1"])
+ self.assertEqual(fix_indent(sql[0]),
+ "CREATE COLLATION s1.c1 (LC_COLLATE = 'fr_FR.utf8', "
+ "LC_CTYPE = 'fr_FR.utf8')")
+
+ def test_bad_collation_map(self):
+ "Error creating a collation with a bad map"
+ if self.db.version < 90100:
+ self.skipTest('Only available on PG 9.1')
+ inmap = self.std_map()
+ inmap['schema public'].update({'c1': {
+ 'lc_collate': 'fr_FR.utf8', 'lc_ctype': 'fr_FR.utf8'}})
+ self.assertRaises(KeyError, self.to_sql, inmap)
+
+ def test_drop_collation(self):
+ "Drop an existing collation"
+ if self.db.version < 90100:
+ self.skipTest('Only available on PG 9.1')
+ sql = self.to_sql(self.std_map(), [CREATE_STMT])
+ self.assertEqual(sql[0], "DROP COLLATION c1")
+
+ def test_collation_with_comment(self):
+ "Create a collation with a comment"
+ if self.db.version < 90100:
+ self.skipTest('Only available on PG 9.1')
+ inmap = self.std_map()
+ inmap['schema public'].update({'collation c1': {
+ 'description': 'Test collation c1',
+ 'lc_collate': 'fr_FR.utf8', 'lc_ctype': 'fr_FR.utf8'}})
+ sql = self.to_sql(inmap)
+ self.assertEqual(fix_indent(sql[0]), CREATE_STMT)
+ self.assertEqual(sql[1], COMMENT_STMT)
+
+
+def suite():
+ tests = unittest.TestLoader().loadTestsFromTestCase(
+ CollationToMapTestCase)
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ CollationToSqlTestCase))
+ return tests
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')

0 comments on commit 8120d17

Please sign in to comment.