Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add support for casts.

 * docs/cast.rst: New page to document casts.
 * docs/index.rst: Add cast.rst.
 * pyrseas/database.py (Database.Dicts.__init__): Add dictionary for
   casts.  (Database.from_map): Process input casts.
   (Database.to_map): Call casts.to_map.  (Database.diff_map): Call
   casts.diff_map.
 * pyrseas/dbobject/__init__.py (DbObject.keylist): Add 'name' as
   default element.  (DbObject.identifier): Return the value of the
   first keylist element.
 * pyrseas/dbobject/cast.py: New module to implement casts.
 * tests/dbobject/__init__.py: Invoke cast tests.
 * tests/dbobject/test_cast.py: New test module.
  • Loading branch information...
commit f1816c51e52cc94ebfab547ad3ef4a7e1a6ac4ba 1 parent 9072c18
@jmafc jmafc authored
View
42 docs/cast.rst
@@ -0,0 +1,42 @@
+Casts
+=====
+
+.. module:: pyrseas.dbobject.cast
+
+The :mod:`cast` module defines two classes, :class:`Cast` and
+:class:`CastDict`, derived from :class:`DbObject` and
+:class:`DbObjectDict`, respectively.
+
+Cast
+----
+
+:class:`Cast` is derived from :class:`~pyrseas.dbobject.DbObject` and
+represents a `PostgreSQL cast
+<http://www.postgresql.org/docs/current/static/sql-createcast.html>`_.
+
+.. autoclass:: Cast
+
+.. automethod:: Cast.extern_key
+
+.. automethod:: Cast.identifier
+
+.. automethod:: Cast.to_map
+
+.. automethod:: Cast.create
+
+.. automethod:: Cast.diff_map
+
+Cast Dictionary
+---------------
+
+:class:`CastDict` is derived from
+:class:`~pyrseas.dbobject.DbObjectDict`. It is a dictionary that
+represents the collection of casts in a database.
+
+.. autoclass:: CastDict
+
+.. automethod:: CastDict.to_map
+
+.. automethod:: CastDict.from_map
+
+.. automethod:: CastDict.diff_map
View
1  docs/index.rst
@@ -45,6 +45,7 @@ classes and methods are documented mainly for developer use.
dbconn
database
language
+ cast
schema
type
table
View
8 pyrseas/database.py
@@ -11,6 +11,7 @@
on the `input_map` supplied to the `from_map` method.
"""
from pyrseas.dbobject.language import LanguageDict
+from pyrseas.dbobject.cast import CastDict
from pyrseas.dbobject.schema import SchemaDict
from pyrseas.dbobject.dbtype import TypeDict
from pyrseas.dbobject.table import ClassDict
@@ -45,6 +46,7 @@ def __init__(self, dbconn=None):
:param dbconn: a DbConnection object
"""
self.languages = LanguageDict(dbconn)
+ self.casts = CastDict(dbconn)
self.schemas = SchemaDict(dbconn)
self.types = TypeDict(dbconn)
self.tables = ClassDict(dbconn)
@@ -98,15 +100,19 @@ def from_map(self, input_map):
self.ndb = self.Dicts()
input_schemas = {}
input_langs = {}
+ input_casts = {}
for key in input_map.keys():
if key.startswith('schema '):
input_schemas.update({key: input_map[key]})
elif key.startswith('language '):
input_langs.update({key: input_map[key]})
+ elif key.startswith('cast '):
+ input_casts.update({key: input_map[key]})
else:
raise KeyError("Expected typed object, found '%s'" % key)
self.ndb.languages.from_map(input_langs)
self.ndb.schemas.from_map(input_schemas, self.ndb)
+ self.ndb.casts.from_map(input_casts, self.ndb)
self._link_refs(self.ndb)
def to_map(self):
@@ -117,6 +123,7 @@ def to_map(self):
if not self.db:
self.from_catalog()
dbmap = self.db.languages.to_map()
+ dbmap.update(self.db.casts.to_map())
dbmap.update(self.db.schemas.to_map())
return dbmap
@@ -146,6 +153,7 @@ def diff_map(self, input_map):
stmts.append(self.db.columns.diff_map(self.ndb.columns))
stmts.append(self.db.triggers.diff_map(self.ndb.triggers))
stmts.append(self.db.rules.diff_map(self.ndb.rules))
+ stmts.append(self.db.casts.diff_map(self.ndb.casts))
stmts.append(self.db.operators._drop())
stmts.append(self.db.functions._drop())
stmts.append(self.db.types._drop())
View
4 pyrseas/dbobject/__init__.py
@@ -28,7 +28,7 @@ def split_schema_table(tbl, sch=None):
class DbObject(object):
"A single object in a database catalog, e.g., a schema, a table, a column"
- keylist = []
+ keylist = ['name']
objtype = ''
def __init__(self, **attrs):
@@ -62,7 +62,7 @@ def identifier(self):
:return: string
"""
- return self.name
+ return self.__dict__[self.keylist[0]]
def comment(self):
"""Return SQL statement to create COMMENT on object
View
183 pyrseas/dbobject/cast.py
@@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+"""
+ pyrseas.cast
+ ~~~~~~~~~~~~
+
+ This module defines two classes: Cast derived from DbObject and
+ CastDict derived from DbObjectDict.
+"""
+from pyrseas.dbobject import DbObject, DbObjectDict
+
+
+CONTEXTS = {'a': 'assignment', 'e': 'explicit', 'i': 'implicit'}
+METHODS = {'f': 'function', 'i': 'inout', 'b': 'binary coercible'}
+
+
+class Cast(DbObject):
+ """A cast"""
+
+ keylist = ['source', 'target']
+ objtype = "CAST"
+
+ def extern_key(self):
+ """Return the key to be used in external maps for this cast
+
+ :return: string
+ """
+ return '%s (%s AS %s)' % (self.objtype.lower(), self.source,
+ self.target)
+
+ def identifier(self):
+ """Return a full identifier for a cast object
+
+ :return: string
+ """
+ return "(%s AS %s)" % (self.source, self.target)
+
+ def to_map(self):
+ """Convert a cast to a YAML-suitable format
+
+ :return: dictionary
+ """
+ dct = self.__dict__.copy()
+ for k in self.keylist:
+ del dct[k]
+ dct['context'] = CONTEXTS[self.context]
+ dct['method'] = METHODS[self.method]
+ return {self.extern_key(): dct}
+
+ def create(self):
+ """Return SQL statements to CREATE the cast
+
+ :return: SQL statements
+ """
+ stmts = []
+ with_clause = "\n WITH"
+ if hasattr(self, 'function'):
+ with_clause += " FUNCTION %s" % self.function
+ elif self.method == 'i':
+ with_clause += " INOUT"
+ else:
+ with_clause += "OUT FUNCTION"
+ as_clause = ''
+ if self.context == 'a':
+ as_clause = "\n AS ASSIGNMENT"
+ elif self.context == 'i':
+ as_clause = "\n AS IMPLICIT"
+ stmts.append("CREATE CAST (%s AS %s)%s%s" % (
+ self.source, self.target, with_clause, as_clause))
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
+ return stmts
+
+ def diff_map(self, incast):
+ """Generate SQL to transform an existing cast
+
+ :param incast: a YAML map defining the new cast
+ :return: list of SQL statements
+
+ Compares the cast to an input cast and generates SQL
+ statements to transform it into the one represented by the
+ input.
+ """
+ stmts = []
+ stmts.append(self.diff_description(incast))
+ return stmts
+
+
+class CastDict(DbObjectDict):
+ "The collection of casts in a database"
+
+ cls = Cast
+ query = \
+ """SELECT castsource::regtype AS source,
+ casttarget::regtype AS target,
+ CASE WHEN castmethod = 'f' THEN castfunc::regprocedure
+ ELSE NULL::regprocedure END AS function,
+ castcontext AS context, castmethod AS method,
+ description
+ FROM pg_cast c
+ JOIN pg_type s ON (castsource = s.oid)
+ JOIN pg_namespace sn ON (s.typnamespace = sn.oid)
+ JOIN pg_type t ON (casttarget = t.oid)
+ JOIN pg_namespace tn ON (t.typnamespace = tn.oid)
+ LEFT JOIN pg_proc p ON (castfunc = p.oid)
+ LEFT JOIN pg_namespace pn ON (p.pronamespace = pn.oid)
+ LEFT JOIN pg_description d
+ ON (c.oid = d.objoid AND d.objsubid = 0)
+ WHERE substring(sn.nspname for 3) != 'pg_'
+ OR substring(tn.nspname for 3) != 'pg_'
+ OR (castfunc != 0 AND substring(pn.nspname for 3) != 'pg_')
+ ORDER BY castsource, casttarget"""
+
+ def to_map(self):
+ """Convert the cast dictionary to a regular dictionary
+
+ :return: dictionary
+
+ Invokes the `to_map` method of each cast to construct a
+ dictionary of casts.
+ """
+ casts = {}
+ for cst in self.keys():
+ casts.update(self[cst].to_map())
+ return casts
+
+ def from_map(self, incasts, newdb):
+ """Initalize the dictionary of casts by converting the input map
+
+ :param incasts: YAML map defining the casts
+ :param newdb: collection of dictionaries defining the database
+ """
+ for key in incasts.keys():
+ if not key.startswith('cast (') or ' AS ' not in key.upper() \
+ or key[-1:] != ')':
+ raise KeyError("Unrecognized object type: %s" % key)
+ asloc = key.upper().find(' AS ')
+ src = key[6:asloc]
+ trg = key[asloc + 4:-1]
+ incast = incasts[key]
+ self[(src, trg)] = cast = Cast(source=src, target=trg)
+ if not incast:
+ raise ValueError("Cast '%s' has no specification" % key[5:])
+ for attr, val in incast.items():
+ setattr(cast, attr, val)
+ if not hasattr(cast, 'context'):
+ raise ValueError("Cast '%s' missing context" % key[5:])
+ if not hasattr(cast, 'context'):
+ raise ValueError("Cast '%s' missing method" % key[5:])
+ cast.context = cast.context[:1].lower()
+ cast.method = cast.method[:1].lower()
+ if 'description' in incast:
+ cast.description = incast['description']
+
+ def diff_map(self, incasts):
+ """Generate SQL to transform existing casts
+
+ :param incasts: a YAML map defining the new casts
+ :return: list of SQL statements
+
+ Compares the existing cast definitions, as fetched from the
+ catalogs, to the input map and generates SQL statements to
+ transform the casts accordingly.
+ """
+ stmts = []
+ # check input casts
+ for (src, trg) in incasts.keys():
+ incast = incasts[(src, trg)]
+ # does it exist in the database?
+ if (src, trg) not in self:
+ # create new cast
+ stmts.append(incast.create())
+ else:
+ # check cast objects
+ stmts.append(self[(src, trg)].diff_map(incast))
+
+ # check existing casts
+ for (src, trg) in self.keys():
+ cast = self[(src, trg)]
+ # if missing, mark it for dropping
+ if (src, trg) not in incasts:
+ stmts.append(cast.drop())
+
+ return stmts
View
2  tests/dbobject/__init__.py
@@ -4,6 +4,7 @@
import unittest
import test_language
+import test_cast
import test_schema
import test_type
import test_domain
@@ -21,6 +22,7 @@
def suite():
tests = unittest.TestSuite()
tests.addTest(test_language.suite())
+ tests.addTest(test_cast.suite())
tests.addTest(test_schema.suite())
tests.addTest(test_type.suite())
tests.addTest(test_domain.suite())
View
197 tests/dbobject/test_cast.py
@@ -0,0 +1,197 @@
+# -*- coding: utf-8 -*-
+"""Test casts"""
+
+import unittest
+
+from utils import PyrseasTestCase, fix_indent, new_std_map
+
+SOURCE = "SELECT CAST($1::int AS boolean)"
+CREATE_FUNC = "CREATE FUNCTION int2_bool(smallint) RETURNS boolean " \
+ "LANGUAGE sql IMMUTABLE AS $_$%s$_$" % SOURCE
+CREATE_DOMAIN = "CREATE DOMAIN d1 AS integer"
+CREATE_STMT1 = "CREATE CAST (smallint AS boolean) WITH FUNCTION " \
+ "int2_bool(smallint)"
+CREATE_STMT2 = "CREATE CAST (integer AS d1) WITHOUT FUNCTION AS ASSIGNMENT"
+CREATE_STMT3 = "CREATE CAST (d1 AS integer) WITH INOUT AS IMPLICIT"
+DROP_STMT = "DROP CAST IF EXISTS (smallint AS boolean)"
+COMMENT_STMT = "COMMENT ON CAST (smallint AS boolean) IS 'Test cast 1'"
+
+
+class CastToMapTestCase(PyrseasTestCase):
+ """Test mapping of existing casts"""
+
+ def test_map_cast_function(self):
+ "Map a cast with a function"
+ self.db.execute(CREATE_FUNC)
+ expmap = {'function': 'int2_bool(smallint)', 'context': 'explicit',
+ 'method': 'function'}
+ dbmap = self.db.execute_and_map(CREATE_STMT1)
+ self.assertEqual(dbmap['cast (smallint AS boolean)'], expmap)
+
+ def test_map_cast_no_function(self):
+ "Map a cast without a function"
+ self.db.execute(CREATE_DOMAIN)
+ expmap = {'context': 'assignment', 'method': 'binary coercible'}
+ dbmap = self.db.execute_and_map(CREATE_STMT2)
+ self.assertEqual(dbmap['cast (integer AS d1)'], expmap)
+
+ def test_map_cast_inout(self):
+ "Map a cast with INOUT"
+ self.db.execute(CREATE_DOMAIN)
+ expmap = {'context': 'implicit', 'method': 'inout'}
+ dbmap = self.db.execute_and_map(CREATE_STMT3)
+ self.assertEqual(dbmap['cast (d1 AS integer)'], expmap)
+
+ def test_map_cast_comment(self):
+ "Map a cast comment"
+ self.db.execute(CREATE_FUNC)
+ self.db.execute(CREATE_STMT1)
+ dbmap = self.db.execute_and_map(COMMENT_STMT)
+ self.assertEqual(dbmap['cast (smallint AS boolean)']['description'],
+ 'Test cast 1')
+
+
+class CastToSqlTestCase(PyrseasTestCase):
+ """Test SQL generation from input casts"""
+
+ def test_create_cast_function(self):
+ "Create a cast with a function"
+ self.db.execute_commit(DROP_STMT)
+ self.db.execute(CREATE_FUNC)
+ inmap = new_std_map()
+ inmap.update({'cast (smallint AS boolean)': {
+ 'function': 'int2_bool(smallint)', 'context': 'explicit',
+ 'method': 'function'}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_STMT1)
+
+ def test_create_cast_no_function(self):
+ "Create a cast without a function"
+ self.db.execute(CREATE_DOMAIN)
+ self.db.execute_commit("DROP CAST IF EXISTS (integer AS d1)")
+ inmap = new_std_map()
+ inmap.update({'cast (integer AS d1)': {
+ 'context': 'assignment', 'method': 'binary coercible'}})
+ inmap['schema public'].update({'domain d1': {'type': 'integer'}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_STMT2)
+
+ def test_create_cast_inout(self):
+ "Create a cast with INOUT"
+ self.db.execute(CREATE_DOMAIN)
+ self.db.execute_commit("DROP CAST IF EXISTS (d1 AS integer)")
+ inmap = new_std_map()
+ inmap.update({'cast (d1 AS integer)': {
+ 'context': 'implicit', 'method': 'inout'}})
+ inmap['schema public'].update({'domain d1': {'type': 'integer'}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_STMT3)
+
+ def test_create_cast_schema(self):
+ "Create a cast using a type/domain in a non-public schema"
+ self.db.execute_commit("DROP SCHEMA IF EXISTS s1 CASCADE")
+ self.db.execute("CREATE SCHEMA s1")
+ self.db.execute("CREATE DOMAIN s1.d1 AS integer")
+ self.db.execute_commit("DROP CAST IF EXISTS (integer AS s1.d1)")
+ inmap = new_std_map()
+ inmap.update({'cast (integer AS s1.d1)': {
+ 'context': 'assignment', 'method': 'binary coercible'}})
+ inmap.update({'schema s1': {'domain d1': {'type': 'integer'}}})
+ dbsql = self.db.process_map(inmap)
+ self.db.execute_commit("DROP SCHEMA s1 CASCADE")
+ self.assertEqual(fix_indent(dbsql[0]),
+ "CREATE CAST (integer AS s1.d1) WITHOUT FUNCTION "
+ "AS ASSIGNMENT")
+
+ def test_bad_cast_map(self):
+ "Error creating a cast with a bad map"
+ inmap = new_std_map()
+ inmap.update({'(smallint AS boolean)': {
+ 'function': 'int2_bool(smallint)', 'context': 'explicit',
+ 'method': 'function'}})
+ self.assertRaises(KeyError, self.db.process_map, inmap)
+
+ def test_drop_cast(self):
+ "Drop an existing cast"
+ self.db.execute(DROP_STMT)
+ self.db.execute(CREATE_FUNC)
+ self.db.execute_commit(CREATE_STMT1)
+ dbsql = self.db.process_map(new_std_map())
+ self.assertEqual(dbsql[0], "DROP CAST (smallint AS boolean)")
+
+ def test_cast_with_comment(self):
+ "Create a cast with a comment"
+ self.db.execute_commit(DROP_STMT)
+ inmap = new_std_map()
+ inmap.update({'cast (smallint AS boolean)': {
+ 'description': 'Test cast 1',
+ 'function': 'int2_bool(smallint)', 'context': 'explicit',
+ 'method': 'function'}})
+ inmap['schema public'].update({'function int2_bool(smallint)': {
+ 'returns': 'boolean', 'language': 'sql',
+ 'immutable': True, 'source': SOURCE}})
+ dbsql = self.db.process_map(inmap)
+ # dbsql[0] -> SET, dbsql[1] -> CREATE FUNCTION
+ self.assertEqual(fix_indent(dbsql[2]), CREATE_STMT1)
+ self.assertEqual(dbsql[3], COMMENT_STMT)
+
+ def test_comment_on_cast(self):
+ "Create a comment for an existing cast"
+ self.db.execute(DROP_STMT)
+ self.db.execute(CREATE_FUNC)
+ self.db.execute_commit(CREATE_STMT1)
+ inmap = new_std_map()
+ inmap.update({'cast (smallint AS boolean)': {
+ 'description': 'Test cast 1',
+ 'function': 'int2_bool(smallint)', 'context': 'explicit',
+ 'method': 'function'}})
+ inmap['schema public'].update({'function int2_bool(smallint)': {
+ 'returns': 'boolean', 'language': 'sql',
+ 'immutable': True, 'source': SOURCE}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [COMMENT_STMT])
+
+ def test_drop_cast_comment(self):
+ "Drop a comment on an existing cast"
+ self.db.execute(DROP_STMT)
+ self.db.execute(CREATE_FUNC)
+ self.db.execute(CREATE_STMT1)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap.update({'cast (smallint AS boolean)': {
+ 'function': 'int2_bool(smallint)', 'context': 'explicit',
+ 'method': 'function'}})
+ inmap['schema public'].update({'function int2_bool(smallint)': {
+ 'returns': 'boolean', 'language': 'sql',
+ 'immutable': True, 'source': SOURCE}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [
+ "COMMENT ON CAST (smallint AS boolean) IS NULL"])
+
+ def test_change_cast_comment(self):
+ "Change existing comment on a cast"
+ self.db.execute(DROP_STMT)
+ self.db.execute(CREATE_FUNC)
+ self.db.execute(CREATE_STMT1)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap.update({'cast (smallint AS boolean)': {
+ 'description': 'Changed cast 1',
+ 'function': 'int2_bool(smallint)', 'context': 'explicit',
+ 'method': 'function'}})
+ inmap['schema public'].update({'function int2_bool(smallint)': {
+ 'returns': 'boolean', 'language': 'sql',
+ 'immutable': True, 'source': SOURCE}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [
+ "COMMENT ON CAST (smallint AS boolean) IS 'Changed cast 1'"])
+
+
+def suite():
+ tests = unittest.TestLoader().loadTestsFromTestCase(CastToMapTestCase)
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ CastToSqlTestCase))
+ return tests
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')
Please sign in to comment.
Something went wrong with that request. Please try again.