Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Support for displaying access privileges on table columns.

 * pyrseas/dbobject/__init__.py (DbObject.map_privs): Check that
   object has an owner before formatting grantor information.
 * pyrseas/dbobject/column.py (Column.to_map): Add no_privs argument
   and pass it downstream.  (ColumnDict.query): Fetch privileges.
   (ColumnDict._from_catalog): Convert the privileges to a list.
 * pyrseas/dbobject/dbtype.py (Composite.to_map): Pass False to
   Column.to_map.
 * pyrseas/dbobject/foreign.py (ForeignTable.to_map): Pass no_privs to
   Column.to_map.
 * pyrseas/dbobject/table.py (Table.to_map): Pass no_privs to
   Column.to_map.
 * tests/dbobject/test_privs.py: New test to verify functionality.
  • Loading branch information...
commit 730e8efbabc8299a2fd5c66e7c30d888120c7104 1 parent 91b8d07
@jmafc jmafc authored
View
5 pyrseas/dbobject/__init__.py
@@ -213,8 +213,9 @@ def map_privs(self):
if code + '*' in privcodes:
priv = {priv: {'grantable': True}}
privs.append(priv)
- if grantor != self.owner:
- privs = {'privs': privs, 'grantor': grantor}
+ if hasattr(self, 'owner'):
+ if grantor != self.owner:
+ privs = {'privs': privs, 'grantor': grantor}
privlist.append({usr: privs})
return privlist
View
10 pyrseas/dbobject/column.py
@@ -13,15 +13,17 @@ class Column(DbSchemaObject):
"A table column definition"
keylist = ['schema', 'table']
+ allprivs = 'arwx'
- def to_map(self):
+ def to_map(self, no_privs):
"""Convert a column to a YAML-suitable format
+ :param no_privs: exclude privilege information
:return: dictionary
"""
if hasattr(self, 'dropped'):
return None
- dct = self._base_map()
+ dct = self._base_map(False, no_privs)
del dct['number'], dct['name']
if '_table' in dct:
del dct['_table']
@@ -139,6 +141,7 @@ def diff_map(self, incol):
attnotnull AS not_null, attinhcount AS inherited,
pg_get_expr(adbin, adrelid) AS default,
attisdropped AS dropped,
+ array_to_string(attacl, ',') AS privileges,
col_description(c.oid, attnum) AS description
FROM pg_attribute JOIN pg_class c ON (attrelid = c.oid)
JOIN pg_namespace ON (relnamespace = pg_namespace.oid)
@@ -161,6 +164,7 @@ class ColumnDict(DbObjectDict):
attnotnull AS not_null, attinhcount AS inherited,
pg_get_expr(adbin, adrelid) AS default,
collname AS collation, attisdropped AS dropped,
+ array_to_string(attacl, ',') AS privileges,
col_description(c.oid, attnum) AS description
FROM pg_attribute JOIN pg_class c ON (attrelid = c.oid)
JOIN pg_namespace ON (relnamespace = pg_namespace.oid)
@@ -178,6 +182,8 @@ def _from_catalog(self):
if self.dbconn.version < 90100:
self.query = QUERY_PRE91
for col in self.fetch():
+ if hasattr(col, 'privileges'):
+ col.privileges = col.privileges.split(',')
sch, tbl = col.key()
if (sch, tbl) not in self:
self[(sch, tbl)] = []
View
2  pyrseas/dbobject/dbtype.py
@@ -102,7 +102,7 @@ def to_map(self, no_owner):
return
attrs = []
for attr in self.attributes:
- att = attr.to_map()
+ att = attr.to_map(False)
if att:
attrs.append(att)
dct = {'attributes': attrs}
View
2  pyrseas/dbobject/foreign.py
@@ -584,7 +584,7 @@ def to_map(self, no_owner, no_privs):
return
cols = []
for i in range(len(self.columns)):
- col = self.columns[i].to_map()
+ col = self.columns[i].to_map(no_privs)
if col:
cols.append(col)
tbl = {'columns': cols, 'server': self.server}
View
2  pyrseas/dbobject/table.py
@@ -201,7 +201,7 @@ def to_map(self, dbschemas, no_owner, no_privs):
return
cols = []
for column in self.columns:
- col = column.to_map()
+ col = column.to_map(no_privs)
if col:
cols.append(col)
tbl = {'columns': cols}
View
17 tests/dbobject/test_privs.py
@@ -44,6 +44,23 @@ def test_map_table(self):
{'references': {'grantable': True}}]}]}
self.assertEqual(dbmap['schema public']['table t1'], expmap)
+ def test_map_column(self):
+ "Map a table with GRANTs on column"
+ self.maxDiff = None
+ stmts = [CREATE_TABLE, "GRANT SELECT ON t1 TO PUBLIC",
+ "GRANT INSERT (c1, c2) ON t1 TO user1",
+ "GRANT INSERT (c2), UPDATE (c2) ON t1 TO user2"]
+ dbmap = self.to_map(stmts, no_privs=False)
+ expmap = {'columns': [{'c1': {'type': 'integer',
+ 'privileges': [{'user1': ['insert']}]}},
+ {'c2': {'type': 'text',
+ 'privileges': [{'user1': ['insert']},
+ {'user2': [
+ 'insert', 'update']}]}}],
+ 'privileges': [{self.db.user: ['all']},
+ {'PUBLIC': ['select']}]}
+ self.assertEqual(dbmap['schema public']['table t1'], expmap)
+
def test_map_sequence(self):
"Map a sequence with various GRANTs"
stmts = ["CREATE SEQUENCE seq1",
Please sign in to comment.
Something went wrong with that request. Please try again.