Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add support for comments on views, sequences, indexes and constraints.

 * docs/constraint.rst: Add automethod for Constraint.comment.
 * pyrseas/dbobject/constraint.py: Add Constraint.comment and support
   for mapping and creating comments.
 * pyrseas/dbobject/index.py: Add support for mapping and creating
   comments.
 * pyrseas/dbobject/language.py (Language.diff_map): Replace body by
   call to generic implementation.
 * pyrseas/dbobject/table.py: Add support for mapping and creating
   comments on sequences and views.
 * tests/dbobject/{test_constraint.py, test_index.py,
   test_language.py, test_sequence.py, test_view.py}: Add new tests
   for comment mapping and generation.
  • Loading branch information...
commit 72c3590e5b01aa7b8c7ba9bedb81e96e8fc229bd 1 parent 220e935
@jmafc jmafc authored
View
2  docs/constraint.rst
@@ -25,6 +25,8 @@ name, the table name and the constraint name.
.. automethod:: Constraint.drop
+.. automethod:: Constraint.comment
+
Check Constraint
----------------
View
47 pyrseas/dbobject/constraint.py
@@ -41,10 +41,13 @@ def add(self):
Works as is for primary keys and unique constraints but has
to be overridden for check constraints and foreign keys.
"""
- return "ALTER TABLE %s ADD CONSTRAINT %s %s (%s)" % (
- DbSchemaObject(schema=self.schema, name=self.table).qualname(),
- quote_id(self.name),
- self.objtype, self.key_columns())
+ stmts = ["ALTER TABLE %s ADD CONSTRAINT %s %s (%s)" % (
+ DbSchemaObject(schema=self.schema, name=self.table).qualname(),
+ quote_id(self.name),
+ self.objtype, self.key_columns())]
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
+ return stmts
def drop(self):
"""Return string to drop the constraint via ALTER TABLE
@@ -57,6 +60,18 @@ def drop(self):
self._qualtable(), self.name)
return []
+ def comment(self):
+ """Return SQL statement to create COMMENT on constraint
+
+ :return: SQL statement
+ """
+ if hasattr(self, 'description'):
+ descr = "'%s'" % self.description
+ else:
+ descr = 'NULL'
+ return "COMMENT ON CONSTRAINT %s ON %s IS %s" % (
+ self.identifier(), self._qualtable(), descr)
+
class CheckConstraint(Constraint):
"A check constraint definition"
@@ -84,8 +99,11 @@ def add(self):
:return: SQL statement
"""
- return "ALTER TABLE %s ADD CONSTRAINT %s %s (%s)" % (
- self._qualtable(), self.name, self.objtype, self.expression)
+ stmts = ["ALTER TABLE %s ADD CONSTRAINT %s %s (%s)" % (
+ self._qualtable(), self.name, self.objtype, self.expression)]
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
+ return stmts
def diff_map(self, inchk):
"""Generate SQL to transform an existing CHECK constraint
@@ -99,6 +117,7 @@ def diff_map(self, inchk):
"""
stmts = []
# TODO: to be implemented
+ stmts.append(self.diff_description(inchk))
return stmts
@@ -132,6 +151,7 @@ def diff_map(self, inpk):
"""
stmts = []
# TODO: to be implemented (via ALTER DROP and ALTER ADD)
+ stmts.append(self.diff_description(inpk))
return stmts
@@ -194,6 +214,7 @@ def diff_map(self, infk):
"""
stmts = []
# TODO: to be implemented (via ALTER DROP and ALTER ADD)
+ stmts.append(self.diff_description(infk))
return stmts
@@ -229,6 +250,7 @@ def diff_map(self, inuc):
"""
stmts = []
# TODO: to be implemented (via ALTER DROP and ALTER ADD)
+ stmts.append(self.diff_description(inuc))
return stmts
@@ -245,8 +267,9 @@ class ConstraintDict(DbObjectDict):
contype AS type, conkey AS keycols,
confrelid::regclass AS ref_table, confkey AS ref_cols,
consrc AS expression, confupdtype AS on_update,
- confdeltype AS on_delete, amname AS access_method
- FROM pg_constraint
+ confdeltype AS on_delete, amname AS access_method,
+ obj_description(c.oid, 'pg_constraint') AS description
+ FROM pg_constraint c
JOIN pg_namespace ON (connamespace = pg_namespace.oid)
JOIN pg_roles ON (nspowner = pg_roles.oid)
LEFT JOIN pg_class on (conname = relname)
@@ -313,6 +336,8 @@ def from_map(self, table, inconstrs, target=''):
check.keycols = val['columns']
if target:
check.target = target
+ if 'description' in val:
+ check.description = val['description']
self[(table.schema, table.name, cns)] = check
if 'primary_key' in inconstrs:
cns = inconstrs['primary_key'].keys()[0]
@@ -326,6 +351,8 @@ def from_map(self, table, inconstrs, target=''):
raise
if 'access_method' in val:
pkey.access_method = val['access_method']
+ if 'description' in val:
+ pkey.description = val['description']
self[(table.schema, table.name, cns)] = pkey
if 'foreign_keys' in inconstrs:
fkeys = inconstrs['foreign_keys']
@@ -371,6 +398,8 @@ def from_map(self, table, inconstrs, target=''):
if 'schema' in refs:
sch = refs['schema']
fkey.ref_schema = sch
+ if 'description' in val:
+ fkey.description = val['description']
self[(table.schema, table.name, cns)] = fkey
if 'unique_constraints' in inconstrs:
uconstrs = inconstrs['unique_constraints']
@@ -385,6 +414,8 @@ def from_map(self, table, inconstrs, target=''):
raise
if 'access_method' in val:
unq.access_method = val['access_method']
+ if 'description' in val:
+ unq.description = val['description']
self[(table.schema, table.name, cns)] = unq
def diff_map(self, inconstrs):
View
8 pyrseas/dbobject/index.py
@@ -54,6 +54,8 @@ def create(self):
unq and 'UNIQUE ' or '', quote_id(self.name), quote_id(self.table),
acc, hasattr(self, 'keycols') and self.key_columns() or
self.expression))
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
return stmts
def diff_map(self, inindex):
@@ -76,6 +78,7 @@ def diff_map(self, inindex):
self.unique = inindex.unique
stmts.append(self.create())
# TODO: need to deal with changes in keycols
+ stmts.append(self.diff_description(inindex))
return stmts
@@ -87,7 +90,8 @@ class IndexDict(DbObjectDict):
"""SELECT nspname AS schema, indrelid::regclass AS table,
c.relname AS name, amname AS access_method,
indisunique AS unique, indkey AS keycols,
- pg_get_expr(indexprs, indrelid) AS expression
+ pg_get_expr(indexprs, indrelid) AS expression,
+ obj_description (c.oid, 'pg_class') AS description
FROM pg_index JOIN pg_class c ON (indexrelid = c.oid)
JOIN pg_namespace ON (relnamespace = pg_namespace.oid)
JOIN pg_roles ON (nspowner = pg_roles.oid)
@@ -129,6 +133,8 @@ def from_map(self, table, inindexes):
setattr(idx, attr, val[attr])
if not hasattr(idx, 'unique'):
idx.unique = False
+ if 'description' in val:
+ idx.description = val['description']
self[(table.schema, table.name, i)] = idx
def diff_map(self, inindexes):
View
13 pyrseas/dbobject/language.py
@@ -53,18 +53,7 @@ def diff_map(self, inlanguage):
input.
"""
stmts = []
- if hasattr(self, 'description'):
- if hasattr(inlanguage, 'description'):
- if self.description != inlanguage.description:
- self.description = inlanguage.description
- stmts.append(self.comment())
- else:
- del self.description
- stmts.append(self.comment())
- else:
- if hasattr(inlanguage, 'description'):
- self.description = inlanguage.description
- stmts.append(self.comment())
+ stmts.append(self.diff_description(inlanguage))
return stmts
View
21 pyrseas/dbobject/table.py
@@ -85,19 +85,23 @@ def to_map(self):
def create(self):
"""Return a SQL statement to CREATE the sequence
- :return: SQL statement
+ :return: SQL statements
"""
+ stmts = []
maxval = self.max_value and ("MAXVALUE %d" % self.max_value) \
or "NO MAXVALUE"
minval = self.min_value and ("MINVALUE %d" % self.min_value) \
or "NO MINVALUE"
- return """CREATE SEQUENCE %s
+ stmts.append("""CREATE SEQUENCE %s
START WITH %d
INCREMENT BY %d
%s
%s
CACHE %d""" % (self.qualname(), self.start_value, self.increment_by,
- maxval, minval, self.cache_value)
+ maxval, minval, self.cache_value))
+ if hasattr(self, 'description'):
+ stmts.append(self.comment())
+ return stmts
def add_owner(self):
"""Return statement to ALTER the sequence to indicate its owner table
@@ -123,6 +127,7 @@ def diff_map(self, inseq):
statements to transform it into the one represented by the
input.
"""
+ stmts = []
stmt = ""
if self.start_value != inseq.start_value:
stmt += " START WITH %d" % inseq.start_value
@@ -143,8 +148,9 @@ def diff_map(self, inseq):
if self.cache_value != inseq.cache_value:
stmt += " CACHE %d" % inseq.cache_value
if stmt:
- return "ALTER SEQUENCE %s" % self.qualname() + stmt
- return []
+ stmts.append("ALTER SEQUENCE %s" % self.qualname() + stmt)
+ stmts.append(self.diff_description(inseq))
+ return stmts
class Table(DbClass):
@@ -316,7 +322,10 @@ def to_map(self):
:return: dictionary
"""
- return {self.extern_key(): {'definition': self.definition}}
+ dct = self.__dict__.copy()
+ for k in self.keylist:
+ del dct[k]
+ return {self.extern_key(): dct}
def create(self, newdefn=None):
"""Return SQL statements to CREATE the table
View
106 tests/dbobject/test_constraint.py
@@ -5,7 +5,7 @@
from utils import PyrseasTestCase, fix_indent, new_std_map
-DROP_STMT = "DROP TABLE IF EXISTS t1"
+COMMENT_STMT = "COMMENT ON CONSTRAINT cns1 ON t1 IS 'Test constraint cns1'"
class CheckConstraintToMapTestCase(PyrseasTestCase):
@@ -40,7 +40,6 @@ class CheckConstraintToSqlTestCase(PyrseasTestCase):
def test_create_w_check_constraint(self):
"Create new table with a single column CHECK constraint"
- self.db.execute_commit(DROP_STMT)
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c1': {'type': 'integer'}},
@@ -57,7 +56,6 @@ def test_create_w_check_constraint(self):
def test_add_check_constraint(self):
"Add a two-column CHECK constraint to an existing table"
- self.db.execute(DROP_STMT)
self.db.execute_commit("CREATE TABLE t1 (c1 INTEGER NOT NULL, "
"c2 INTEGER NOT NULL, c3 TEXT)")
inmap = new_std_map()
@@ -134,7 +132,6 @@ class PrimaryKeyToSqlTestCase(PyrseasTestCase):
def test_create_with_primary_key(self):
"Create new table with single column primary key"
- self.db.execute_commit(DROP_STMT)
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c1': {'type': 'text'}},
@@ -151,7 +148,6 @@ def test_create_with_primary_key(self):
def test_add_primary_key(self):
"Add a two-column primary key to an existing table"
- self.db.execute(DROP_STMT)
self.db.execute_commit("CREATE TABLE t1 (c1 INTEGER NOT NULL, "
"c2 INTEGER NOT NULL, c3 TEXT)")
inmap = new_std_map()
@@ -170,7 +166,6 @@ def test_add_primary_key(self):
def test_drop_primary_key(self):
"Drop a primary key on an existing table"
- self.db.execute(DROP_STMT)
self.db.execute_commit("CREATE TABLE t1 (c1 INTEGER NOT NULL "
"PRIMARY KEY, c2 TEXT)")
inmap = new_std_map()
@@ -375,7 +370,6 @@ class ForeignKeyToSqlTestCase(PyrseasTestCase):
def test_create_with_foreign_key(self):
"Create a table with a foreign key constraint"
- self.db.execute_commit(DROP_STMT + ", t2")
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c11': {'type': 'integer'}},
@@ -406,7 +400,6 @@ def test_create_with_foreign_key(self):
def test_add_foreign_key(self):
"Add a two-column foreign key to an existing table"
- self.db.execute(DROP_STMT + ", t2")
self.db.execute("CREATE TABLE t1 (c11 INTEGER NOT NULL, "
"c12 INTEGER NOT NULL, c13 TEXT, "
"PRIMARY KEY (c11, c12))")
@@ -441,7 +434,6 @@ def test_add_foreign_key(self):
def test_drop_foreign_key(self):
"Drop a foreign key on an existing table"
- self.db.execute(DROP_STMT + ", t2")
self.db.execute("CREATE TABLE t1 (c11 INTEGER NOT NULL, c12 TEXT, "
"PRIMARY KEY (c11))")
self.db.execute_commit("CREATE TABLE t2 (c21 INTEGER NOT NULL "
@@ -462,7 +454,6 @@ def test_drop_foreign_key(self):
def test_create_foreign_key_actions(self):
"Create a table with foreign key ON UPDATE/ON DELETE actions"
- self.db.execute_commit(DROP_STMT + ", t2")
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c11': {'type': 'integer'}},
@@ -547,7 +538,6 @@ class UniqueConstraintToSqlTestCase(PyrseasTestCase):
def test_create_w_unique_constraint(self):
"Create new table with a single column unique constraint"
- self.db.execute_commit(DROP_STMT)
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c1': {'type': 'integer'}},
@@ -564,7 +554,6 @@ def test_create_w_unique_constraint(self):
def test_add_unique_constraint(self):
"Add a two-column unique constraint to an existing table"
- self.db.execute(DROP_STMT)
self.db.execute_commit("CREATE TABLE t1 (c1 INTEGER NOT NULL, "
"c2 INTEGER NOT NULL, c3 TEXT)")
inmap = new_std_map()
@@ -583,7 +572,6 @@ def test_add_unique_constraint(self):
def test_drop_unique_constraint(self):
"Drop a unique constraint on an existing table"
- self.db.execute(DROP_STMT)
self.db.execute_commit("CREATE TABLE t1 (c1 INTEGER NOT NULL UNIQUE, "
"c2 TEXT)")
inmap = new_std_map()
@@ -594,6 +582,96 @@ def test_drop_unique_constraint(self):
self.assertEqual(dbsql, ["ALTER TABLE t1 DROP CONSTRAINT t1_c1_key"])
+class ConstraintCommentTestCase(PyrseasTestCase):
+ """Test mapping and creation of comments on constraints"""
+
+ def test_map_pk_comment(self):
+ "Map a primary key with a comment"
+ self.db.execute("CREATE TABLE t1 (c1 integer CONSTRAINT cns1 "
+ "PRIMARY KEY, c2 text)")
+ dbmap = self.db.execute_and_map(COMMENT_STMT)
+ self.assertEqual(dbmap['schema public']['table t1']['primary_key']
+ ['cns1']['description'], 'Test constraint cns1')
+
+ def test_map_fk_comment(self):
+ "Map a foreign key with a comment"
+ self.db.execute("CREATE TABLE t2 (pc1 INTEGER PRIMARY KEY, pc2 TEXT)")
+ self.db.execute("CREATE TABLE t1 (c1 INTEGER, c2 INTEGER "
+ "CONSTRAINT cns1 REFERENCES t2 (pc1), c3 TEXT)")
+ dbmap = self.db.execute_and_map(COMMENT_STMT)
+ self.assertEqual(dbmap['schema public']['table t1']['foreign_keys']
+ ['cns1']['description'], 'Test constraint cns1')
+
+ def test_check_constraint_with_comment(self):
+ "Create a CHECK constraint with a comment"
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t1': {
+ 'columns': [{'c1': {'type': 'integer'}},
+ {'c2': {'type': 'text'}}],
+ 'check_constraints': {'cns1': {
+ 'columns': ['c1'], 'expression': 'c1 > 50',
+ 'description': 'Test constraint cns1'}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]),
+ "CREATE TABLE t1 (c1 integer, c2 text)")
+ self.assertEqual(fix_indent(dbsql[1]),
+ "ALTER TABLE t1 ADD CONSTRAINT cns1 CHECK (c1 > 50)")
+ self.assertEqual(dbsql[2], COMMENT_STMT)
+
+ def test_comment_on_primary_key(self):
+ "Create a comment for an existing primary key"
+ self.db.execute_commit("CREATE TABLE t1 (c1 text CONSTRAINT cns1 "
+ "PRIMARY KEY, c2 integer)")
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t1': {
+ 'columns': [{'c1': {'type': 'text', 'not_null': True}},
+ {'c2': {'type': 'integer'}}],
+ 'primary_key': {'cns1': {
+ 'columns': ['c2'], 'access_method': 'btree',
+ 'description': 'Test constraint cns1'}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [COMMENT_STMT])
+
+ def test_drop_foreign_key_comment(self):
+ "Drop the comment on an existing foreign key"
+ self.db.execute("CREATE TABLE t2 (c21 integer PRIMARY KEY, c22 text)")
+ self.db.execute("CREATE TABLE t1 (c11 integer, c12 text, "
+ "c13 integer CONSTRAINT cns1 REFERENCES t2 (c21))")
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t2': {
+ 'columns': [{'c21': {'type': 'integer', 'not_null': True}},
+ {'c22': {'type': 'text'}}],
+ 'primary_key': {'t2_pkey': {
+ 'columns': ['c21'], 'access_method': 'btree'}}},
+ 'table t1': {
+ 'columns': [{'c11': {'type': 'integer'}},
+ {'c12': {'type': 'text'}},
+ {'c13': {'type': 'integer'}}],
+ 'foreign_keys': {'cns1': {
+ 'columns': ['c13'],
+ 'references': {'columns': ['c21'],
+ 'table': 't2'}}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, ["COMMENT ON CONSTRAINT cns1 ON t1 IS NULL"])
+
+ def test_change_unique_constraint_comment(self):
+ "Change existing comment on a unique constraint"
+ self.db.execute("CREATE TABLE t1 (c1 integer CONSTRAINT cns1 UNIQUE, "
+ "c2 text)")
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t1': {
+ 'columns': [{'c1': {'type': 'integer'}},
+ {'c2': {'type': 'text'}}],
+ 'unique_constraints': {'cns1': {
+ 'columns': ['c1'], 'access_method': 'btree',
+ 'description': "Changed constraint cns1"}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, ["COMMENT ON CONSTRAINT cns1 ON t1 IS "
+ "'Changed constraint cns1'"])
+
+
def suite():
tests = unittest.TestLoader().loadTestsFromTestCase(
CheckConstraintToMapTestCase)
@@ -611,6 +689,8 @@ def suite():
UniqueConstraintToMapTestCase))
tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
UniqueConstraintToSqlTestCase))
+ tests.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ ConstraintCommentTestCase))
return tests
if __name__ == '__main__':
View
88 tests/dbobject/test_index.py
@@ -6,6 +6,8 @@
from utils import PyrseasTestCase, fix_indent, new_std_map
CREATE_TABLE_STMT = "CREATE TABLE t1 (c1 integer, c2 text)"
+CREATE_STMT = "CREATE INDEX t1_idx ON t1 (c1)"
+COMMENT_STMT = "COMMENT ON INDEX t1_idx IS 'Test index t1_idx'"
class IndexToMapTestCase(PyrseasTestCase):
@@ -14,12 +16,11 @@ class IndexToMapTestCase(PyrseasTestCase):
def test_index_1(self):
"Map a single-column index"
self.db.execute(CREATE_TABLE_STMT)
- ddlstmt = "CREATE INDEX t1_idx ON t1 (c1)"
expmap = {'columns': [{'c1': {'type': 'integer'}},
{'c2': {'type': 'text'}}],
'indexes': {'t1_idx': {'columns': ['c1'],
'access_method': 'btree'}}}
- dbmap = self.db.execute_and_map(ddlstmt)
+ dbmap = self.db.execute_and_map(CREATE_STMT)
self.assertEqual(dbmap['schema public']['table t1'], expmap)
def test_index_2(self):
@@ -63,13 +64,20 @@ def test_index_function(self):
dbmap = self.db.execute_and_map(ddlstmt)
self.assertEqual(dbmap['schema public']['table t1'], expmap)
+ def test_map_index_comment(self):
+ "Map an index comment"
+ self.db.execute(CREATE_TABLE_STMT)
+ self.db.execute(CREATE_STMT)
+ dbmap = self.db.execute_and_map(COMMENT_STMT)
+ self.assertEqual(dbmap['schema public']['table t1']['indexes']
+ ['t1_idx']['description'], 'Test index t1_idx')
+
class IndexToSqlTestCase(PyrseasTestCase):
"""Test SQL generation from input indexes"""
def test_create_table_with_index(self):
"Create new table with a single column index"
- self.db.execute_commit("DROP TABLE IF EXISTS t1")
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c1': {'type': 'integer'}},
@@ -85,7 +93,6 @@ def test_create_table_with_index(self):
def test_add_index(self):
"Add a two-column unique index to an existing table"
- self.db.execute("DROP TABLE IF EXISTS t1")
self.db.execute_commit("CREATE TABLE t1 (c1 INTEGER NOT NULL, "
"c2 INTEGER NOT NULL, c3 TEXT)")
inmap = new_std_map()
@@ -94,12 +101,10 @@ def test_add_index(self):
{'c1': {'type': 'integer', 'not_null': True}},
{'c2': {'type': 'integer', 'not_null': True}},
{'c3': {'type': 'text'}}],
- 'indexes': {'t1_idx': {
- 'columns': ['c2', 'c1'],
- 'unique': True}}}})
+ 'indexes': {'t1_idx': {'columns': ['c2', 'c1'],
+ 'unique': True}}}})
dbsql = self.db.process_map(inmap)
- self.assertEqual(dbsql,
- ["CREATE UNIQUE INDEX t1_idx ON t1 (c2, c1)"])
+ self.assertEqual(dbsql, ["CREATE UNIQUE INDEX t1_idx ON t1 (c2, c1)"])
def test_bad_index(self):
"Fail on creating an index without columns or expression"
@@ -113,18 +118,75 @@ def test_bad_index(self):
def test_create_index_function(self):
"Create an index which uses a function"
- self.db.execute_commit("DROP TABLE IF EXISTS t1")
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c1': {'type': 'integer'}},
{'c2': {'type': 'text'}}],
- 'indexes': {'t1_idx': {
- 'expression': 'lower(c2)',
- 'access_method': 'btree'}}}})
+ 'indexes': {'t1_idx': {'expression': 'lower(c2)',
+ 'access_method': 'btree'}}}})
dbsql = self.db.process_map(inmap)
self.assertEqual(dbsql[1],
"CREATE INDEX t1_idx ON t1 USING btree (lower(c2))")
+ def test_index_with_comment(self):
+ "Create an index with a comment"
+ self.db.execute_commit(CREATE_TABLE_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t1': {
+ 'columns': [{'c1': {'type': 'integer'}},
+ {'c2': {'type': 'text'}}],
+ 'indexes': {'t1_idx': {
+ 'columns': ['c1'], 'access_method': 'btree',
+ 'description': 'Test index t1_idx'}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]),
+ "CREATE INDEX t1_idx ON t1 USING btree (c1)")
+ self.assertEqual(dbsql[1], COMMENT_STMT)
+
+ def test_comment_on_index(self):
+ "Create a comment for an existing index"
+ self.db.execute(CREATE_TABLE_STMT)
+ self.db.execute_commit(CREATE_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t1': {
+ 'columns': [{'c1': {'type': 'integer'}},
+ {'c2': {'type': 'text'}}],
+ 'indexes': {'t1_idx': {
+ 'columns': ['c1'], 'access_method': 'btree',
+ 'description': 'Test index t1_idx'}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [COMMENT_STMT])
+
+ def test_drop_index_comment(self):
+ "Drop the comment on an existing index"
+ self.db.execute(CREATE_TABLE_STMT)
+ self.db.execute(CREATE_STMT)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t1': {
+ 'columns': [{'c1': {'type': 'integer'}},
+ {'c2': {'type': 'text'}}],
+ 'indexes': {'t1_idx': {
+ 'columns': ['c1'], 'access_method': 'btree'}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, ["COMMENT ON INDEX t1_idx IS NULL"])
+
+ def test_change_index_comment(self):
+ "Change existing comment on an index"
+ self.db.execute(CREATE_TABLE_STMT)
+ self.db.execute(CREATE_STMT)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'table t1': {
+ 'columns': [{'c1': {'type': 'integer'}},
+ {'c2': {'type': 'text'}}],
+ 'indexes': {'t1_idx': {
+ 'columns': ['c1'], 'access_method': 'btree',
+ 'description': 'Changed index t1_idx'}}}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [
+ "COMMENT ON INDEX t1_idx IS 'Changed index t1_idx'"])
+
def suite():
tests = unittest.TestLoader().loadTestsFromTestCase(IndexToMapTestCase)
View
22 tests/dbobject/test_language.py
@@ -3,10 +3,11 @@
import unittest
-from utils import PyrseasTestCase
+from utils import PyrseasTestCase, new_std_map
CREATE_STMT = "CREATE LANGUAGE plperl"
DROP_STMT = "DROP LANGUAGE IF EXISTS plperl CASCADE"
+COMMENT_STMT = "COMMENT ON LANGUAGE plperl IS 'Test language PL/Perl'"
class LanguageToMapTestCase(PyrseasTestCase):
@@ -18,6 +19,13 @@ def test_map_language(self):
dbmap = self.db.execute_and_map(CREATE_STMT)
self.assertEqual(dbmap['language plperl'], {'trusted': True})
+ def test_map_language_comment(self):
+ "Map a language with a comment"
+ self.db.execute(DROP_STMT)
+ self.db.execute(CREATE_STMT)
+ dbmap = self.db.execute_and_map(COMMENT_STMT)
+ self.assertEqual(dbmap['language plperl']['description'],
+ 'Test language PL/Perl')
class LanguageToSqlTestCase(PyrseasTestCase):
"""Test SQL generation for input languages"""
@@ -28,7 +36,6 @@ def tearDown(self):
def test_create_language(self):
"Create a language that didn't exist"
- self.db.execute_commit(DROP_STMT)
dbsql = self.db.process_map({'language plperl': {}})
self.assertEqual(dbsql, [CREATE_STMT])
@@ -38,20 +45,27 @@ def test_bad_language_map(self):
def test_drop_language(self):
"Drop an existing language"
- self.db.execute(DROP_STMT)
self.db.execute_commit(CREATE_STMT)
dbsql = self.db.process_map({})
self.assertEqual(dbsql, ["DROP LANGUAGE plperl"])
def test_drop_language_function(self):
"Drop an existing function and the language it uses"
- self.db.execute(DROP_STMT)
self.db.execute(CREATE_STMT)
self.db.execute_commit("CREATE FUNCTION f1() RETURNS text "
"LANGUAGE plperl AS $_$return \"dummy\";$_$")
dbsql = self.db.process_map({})
self.assertEqual(dbsql, ["DROP FUNCTION f1()", "DROP LANGUAGE plperl"])
+ def test_comment_on_language(self):
+ "Create a comment for an existing language"
+ self.db.execute_commit(CREATE_STMT)
+ inmap = new_std_map()
+ inmap.update({'language plperl': {
+ 'description': "Test language PL/Perl"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [COMMENT_STMT])
+
def suite():
tests = unittest.TestLoader().loadTestsFromTestCase(LanguageToMapTestCase)
View
81 tests/dbobject/test_sequence.py
@@ -5,7 +5,10 @@
from utils import PyrseasTestCase, fix_indent, new_std_map
-DROP_STMT = "DROP SEQUENCE IF EXISTS seq1"
+CREATE_STMT = "CREATE SEQUENCE seq1"
+CREATE_STMT_FULL = "CREATE SEQUENCE seq1 START WITH 1 INCREMENT BY 1 " \
+ "NO MAXVALUE NO MINVALUE CACHE 1"
+COMMENT_STMT = "COMMENT ON SEQUENCE seq1 IS 'Test sequence seq1'"
class SequenceToMapTestCase(PyrseasTestCase):
@@ -13,33 +16,35 @@ class SequenceToMapTestCase(PyrseasTestCase):
def test_map_sequence(self):
"Map a created sequence"
- ddlstmt = "CREATE SEQUENCE seq1"
expmap = {'start_value': 1, 'increment_by': 1, 'max_value': None,
'min_value': None, 'cache_value': 1}
- dbmap = self.db.execute_and_map(ddlstmt)
+ dbmap = self.db.execute_and_map(CREATE_STMT)
self.assertEqual(dbmap['schema public']['sequence seq1'],
expmap)
+ def test_map_sequence_comment(self):
+ "Map a sequence with a comment"
+ self.db.execute(CREATE_STMT)
+ dbmap = self.db.execute_and_map(COMMENT_STMT)
+ self.assertEqual(dbmap['schema public']['sequence seq1'][
+ 'description'], 'Test sequence seq1')
+
class SequenceToSqlTestCase(PyrseasTestCase):
"""Test SQL generation from input sequences"""
def test_create_sequence(self):
"Create a sequence"
- self.db.execute_commit(DROP_STMT)
inmap = new_std_map()
inmap['schema public'].update({'sequence seq1': {
'start_value': 1, 'increment_by': 1, 'max_value': None,
'min_value': None, 'cache_value': 1}})
dbsql = self.db.process_map(inmap)
- self.assertEqual(fix_indent(dbsql[0]),
- "CREATE SEQUENCE seq1 START WITH 1 "
- "INCREMENT BY 1 NO MAXVALUE NO MINVALUE CACHE 1")
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_STMT_FULL)
def test_create_sequence_in_schema(self):
"Create a sequence within a non-public schema"
- self.db.execute("CREATE SCHEMA s1")
- self.db.execute_commit(DROP_STMT)
+ self.db.execute_commit("CREATE SCHEMA s1")
inmap = new_std_map()
inmap.update({'schema s1': {'sequence seq1': {
'start_value': 1, 'increment_by': 1, 'max_value': None,
@@ -60,15 +65,13 @@ def test_bad_sequence_map(self):
def test_drop_sequence(self):
"Drop an existing sequence"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE SEQUENCE seq1")
+ self.db.execute_commit(CREATE_STMT)
dbsql = self.db.process_map(new_std_map())
self.assertEqual(dbsql, ["DROP SEQUENCE seq1"])
def test_rename_sequence(self):
"Rename an existing sequence"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE SEQUENCE seq1")
+ self.db.execute_commit(CREATE_STMT)
inmap = new_std_map()
inmap['schema public'].update({'sequence seq2': {
'oldname': 'seq1',
@@ -79,8 +82,7 @@ def test_rename_sequence(self):
def test_bad_rename_sequence(self):
"Error renaming a non-existing sequence"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE SEQUENCE seq1")
+ self.db.execute_commit(CREATE_STMT)
inmap = new_std_map()
inmap['schema public'].update({'sequence seq2': {
'oldname': 'seq3',
@@ -90,8 +92,7 @@ def test_bad_rename_sequence(self):
def test_change_sequence(self):
"Change sequence attributes"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE SEQUENCE seq1")
+ self.db.execute_commit(CREATE_STMT)
inmap = new_std_map()
inmap['schema public'].update({'sequence seq1': {
'start_value': 5, 'increment_by': 10, 'max_value': None,
@@ -101,6 +102,52 @@ def test_change_sequence(self):
"ALTER SEQUENCE seq1 START WITH 5 INCREMENT BY 10 "
"CACHE 30")
+ def test_sequence_with_comment(self):
+ "Create a sequence with a comment"
+ inmap = new_std_map()
+ inmap['schema public'].update({'sequence seq1': {
+ 'start_value': 1, 'increment_by': 1, 'max_value': None,
+ 'min_value': None, 'cache_value': 1,
+ 'description': "Test sequence seq1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_STMT_FULL)
+ self.assertEqual(dbsql[1], COMMENT_STMT)
+
+ def test_comment_on_sequence(self):
+ "Create a comment for an existing sequence"
+ self.db.execute_commit(CREATE_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'sequence seq1': {
+ 'start_value': 1, 'increment_by': 1, 'max_value': None,
+ 'min_value': None, 'cache_value': 1,
+ 'description': "Test sequence seq1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [COMMENT_STMT])
+
+ def test_drop_sequence_comment(self):
+ "Drop the comment on an existing sequence"
+ self.db.execute(CREATE_STMT)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'sequence seq1': {
+ 'start_value': 1, 'increment_by': 1, 'max_value': None,
+ 'min_value': None, 'cache_value': 1}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, ["COMMENT ON SEQUENCE seq1 IS NULL"])
+
+ def test_change_sequence_comment(self):
+ "Change existing comment on a sequence"
+ self.db.execute(CREATE_STMT)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'sequence seq1': {
+ 'start_value': 1, 'increment_by': 1, 'max_value': None,
+ 'min_value': None, 'cache_value': 1,
+ 'description': "Changed sequence seq1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [
+ "COMMENT ON SEQUENCE seq1 IS 'Changed sequence seq1'"])
+
def suite():
tests = unittest.TestLoader().loadTestsFromTestCase(SequenceToMapTestCase)
View
91 tests/dbobject/test_view.py
@@ -5,7 +5,9 @@
from utils import PyrseasTestCase, fix_indent, new_std_map
-DROP_STMT = "DROP VIEW IF EXISTS v1"
+CREATE_STMT = "CREATE VIEW v1 AS SELECT now()::date AS today"
+COMMENT_STMT = "COMMENT ON VIEW v1 IS 'Test view v1'"
+VIEW_DEFN = " SELECT now()::date AS today;"
class ViewToMapTestCase(PyrseasTestCase):
@@ -13,9 +15,8 @@ class ViewToMapTestCase(PyrseasTestCase):
def test_map_view_no_table(self):
"Map a created view without a table dependency"
- ddlstmt = "CREATE VIEW v1 AS SELECT now()::date AS today"
- expmap = {'definition': " SELECT now()::date AS today;"}
- dbmap = self.db.execute_and_map(ddlstmt)
+ expmap = {'definition': VIEW_DEFN}
+ dbmap = self.db.execute_and_map(CREATE_STMT)
self.assertEqual(dbmap['schema public']['view v1'], expmap)
def test_map_view(self):
@@ -26,23 +27,26 @@ def test_map_view(self):
dbmap = self.db.execute_and_map(ddlstmt)
self.assertEqual(dbmap['schema public']['view v1'], expmap)
+ def test_map_view_comment(self):
+ "Map a view with a comment"
+ self.db.execute(CREATE_STMT)
+ dbmap = self.db.execute_and_map(COMMENT_STMT)
+ self.assertEqual(dbmap['schema public']['view v1']['description'],
+ 'Test view v1')
+
class ViewToSqlTestCase(PyrseasTestCase):
"""Test SQL generation from input views"""
def test_create_view_no_table(self):
"Create a view with no table dependency"
- self.db.execute_commit(DROP_STMT)
inmap = new_std_map()
- inmap['schema public'].update({'view v1': {
- 'definition': " SELECT now()::date AS today;"}})
+ inmap['schema public'].update({'view v1': {'definition': VIEW_DEFN}})
dbsql = self.db.process_map(inmap)
- self.assertEqual(fix_indent(dbsql[0]), "CREATE VIEW v1 AS "
- "SELECT now()::date AS today")
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_STMT)
def test_create_view(self):
"Create a view"
- self.db.execute_commit(DROP_STMT)
inmap = new_std_map()
inmap['schema public'].update({'table t1': {
'columns': [{'c1': {'type': 'integer'}},
@@ -58,11 +62,10 @@ def test_create_view(self):
def test_create_view_in_schema(self):
"Create a view within a non-public schema"
- self.db.execute("CREATE SCHEMA s1")
- self.db.execute_commit(DROP_STMT)
+ self.db.execute("DROP SCHEMA IF EXISTS s1 CASCADE")
+ self.db.execute_commit("CREATE SCHEMA s1")
inmap = new_std_map()
- inmap.update({'schema s1': {'view v1': {
- 'definition': " SELECT now()::date AS today;"}}})
+ inmap.update({'schema s1': {'view v1': {'definition': VIEW_DEFN}}})
dbsql = self.db.process_map(inmap)
self.assertEqual(fix_indent(dbsql[0]), "CREATE VIEW s1.v1 AS "
"SELECT now()::date AS today")
@@ -71,20 +74,17 @@ def test_create_view_in_schema(self):
def test_bad_view_map(self):
"Error creating a view with a bad map"
inmap = new_std_map()
- inmap['schema public'].update({'v1': {
- 'definition': " SELECT now()::date AS today;"}})
+ inmap['schema public'].update({'v1': {'definition': VIEW_DEFN}})
self.assertRaises(KeyError, self.db.process_map, inmap)
def test_drop_view_no_table(self):
"Drop an existing view without a table dependency"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE VIEW v1 AS SELECT now()::date AS today")
+ self.db.execute_commit(CREATE_STMT)
dbsql = self.db.process_map(new_std_map())
self.assertEqual(dbsql, ["DROP VIEW v1"])
def test_drop_view(self):
"Drop an existing view with table dependencies"
- self.db.execute(DROP_STMT)
self.db.execute("CREATE TABLE t1 (c1 INTEGER, c2 TEXT)")
self.db.execute("CREATE TABLE t2 (c1 INTEGER, c3 TEXT)")
self.db.execute_commit("CREATE VIEW v1 AS SELECT t1.c1, c2, c3 "
@@ -102,29 +102,24 @@ def test_drop_view(self):
def test_rename_view(self):
"Rename an existing view"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE VIEW v1 AS SELECT now()::date AS today")
+ self.db.execute_commit(CREATE_STMT)
inmap = new_std_map()
inmap['schema public'].update({'view v2': {
- 'oldname': 'v1',
- 'definition': " SELECT now()::date AS today;"}})
+ 'oldname': 'v1', 'definition': VIEW_DEFN}})
dbsql = self.db.process_map(inmap)
self.assertEqual(dbsql, ["ALTER VIEW v1 RENAME TO v2"])
def test_bad_rename_view(self):
"Error renaming a non-existing view"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE VIEW v1 AS SELECT now()::date AS today")
+ self.db.execute_commit(CREATE_STMT)
inmap = new_std_map()
inmap['schema public'].update({'view v2': {
- 'oldname': 'v3',
- 'definition': " SELECT now()::date AS today;"}})
+ 'oldname': 'v3', 'definition': VIEW_DEFN}})
self.assertRaises(KeyError, self.db.process_map, inmap)
def test_change_view_defn(self):
"Change view definition"
- self.db.execute(DROP_STMT)
- self.db.execute_commit("CREATE VIEW v1 AS SELECT now()::date AS today")
+ self.db.execute_commit(CREATE_STMT)
inmap = new_std_map()
inmap['schema public'].update({'view v1': {
'definition': " SELECT now()::date AS todays_date;"}})
@@ -132,6 +127,44 @@ def test_change_view_defn(self):
self.assertEqual(fix_indent(dbsql[0]), "CREATE OR REPLACE VIEW v1 AS "
"SELECT now()::date AS todays_date")
+ def test_view_with_comment(self):
+ "Create a view with a comment"
+ inmap = new_std_map()
+ inmap['schema public'].update({'view v1': {
+ 'definition': VIEW_DEFN, 'description': "Test view v1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(fix_indent(dbsql[0]), CREATE_STMT)
+ self.assertEqual(dbsql[1], COMMENT_STMT)
+
+ def test_comment_on_view(self):
+ "Create a comment for an existing view"
+ self.db.execute_commit(CREATE_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'view v1': {
+ 'definition': VIEW_DEFN, 'description': "Test view v1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, [COMMENT_STMT])
+
+ def test_drop_view_comment(self):
+ "Drop the comment on an existing view"
+ self.db.execute(CREATE_STMT)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'view v1': {'definition': VIEW_DEFN}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, ["COMMENT ON VIEW v1 IS NULL"])
+
+ def test_change_view_comment(self):
+ "Change existing comment on a view"
+ self.db.execute(CREATE_STMT)
+ self.db.execute_commit(COMMENT_STMT)
+ inmap = new_std_map()
+ inmap['schema public'].update({'view v1': {
+ 'definition': VIEW_DEFN,
+ 'description': "Changed view v1"}})
+ dbsql = self.db.process_map(inmap)
+ self.assertEqual(dbsql, ["COMMENT ON VIEW v1 IS 'Changed view v1'"])
+
def suite():
tests = unittest.TestLoader().loadTestsFromTestCase(ViewToMapTestCase)
Please sign in to comment.
Something went wrong with that request. Please try again.