Skip to content

Commit

Permalink
Reorganize copy denormalizations to properly support multiple-column …
Browse files Browse the repository at this point in the history
…copy.

 * pyrseas/extend/__init__.py (DbExtension.__init__): Allow all
   attributes.
 * pyrseas/extend/audit.py (CfgAuditColumn.apply): Call add_func at
   the database rather than schema level.
 * pyrseas/extend/denorm.py (ExtDenorms): New helper class.
   (ExtCopyDenormColumn.add_trigger_func): Deal with both single or
   multi-item translation tables.  (ExtCopyDenormColumn.apply):
   Refactor out column-level code to ExtDenorms.  Create single and
   multi-item translation tables.  (ExtDenormDict.from_map): Accept
   new input map structure and use new internal structures.
   (ExtDenormDict.link_current): Adjust loop to new internal
   structures.
 * pyrseas/extend/function.py (CfgFunctionSource, CfgFunctionSegment,
   CfgFunctionTemplate, CfgFunctionSourceDict): New classes to support
   more flexible transformations of function sources.
   (CfgFunction.apply): Replace source patterns using both single and
   multi-item tables.
 * pyrseas/extend/table.py (ExtTable.apply): Call denorms.apply
   instead of processing columns individually.
   (ExtClassDict.link_refs): Columns are now an attribute of an
   ExtDenorm.
 * pyrseas/extenddb.py (ExtendDatabase.__init__): Initialize funcsrcs.
   (ExtendDatabase.add_func): New method to add functions at database
   level.  (ExtendDatabase._from_cfgmap): Process function templates
   and segments.
 * tests/extend/test_denorm.py
   (CopyDenormalizationTestCase.test_copy_column): Reorganize
   according to new input map.
   (CopyDenormalizationTestCase.test_copy_columns_cross_schema): New
   test to verify changed functionality.
  • Loading branch information
jmafc committed May 15, 2012
1 parent 37e537a commit 75e7274
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 101 deletions.
5 changes: 1 addition & 4 deletions pyrseas/extend/__init__.py
Expand Up @@ -18,12 +18,9 @@ def __init__(self, **attrs):
"""Initialize the extension object from a dictionary of attributes
:param attrs: the dictionary of attributes
Non-key attributes without a value are discarded.
"""
for key, val in list(attrs.items()):
if val or key in self.keylist:
setattr(self, key, val)
setattr(self, key, val)


class DbExtensionDict(dict):
Expand Down
4 changes: 2 additions & 2 deletions pyrseas/extend/audit.py
Expand Up @@ -46,9 +46,9 @@ def apply(self, table, extdb):
(sch, fnc) = split_schema_obj(fnc)
if (sch, fncsig) not in currdb.functions:
newfunc = extdb.functions[fnc].apply(
sch, extdb.columns.col_trans_tbl)
sch, extdb.columns.col_trans_tbl, extdb)
# add new function to the current db
extdb.schemas[sch].add_func(newfunc)
extdb.add_func(sch, newfunc)
extdb.add_lang(newfunc.language)


Expand Down
148 changes: 102 additions & 46 deletions pyrseas/extend/denorm.py
Expand Up @@ -7,10 +7,47 @@
DbExtension and CfgAuditColumnDict derived from DbExtensionDict.
"""
from pyrseas.extend import DbExtension, DbExtensionDict
from pyrseas.extend.column import CfgColumn
from pyrseas.dbobject.column import Column
from pyrseas.dbobject import split_schema_obj


class ExtDenorms(object):
"""Helper class for defining denormalized columns"""

def __init__(self):
self.columns = []
self.denorms = []

def apply(self, table, extdb):
currdb = extdb.current
for col in self.columns:
fk = self.denorms[col.denorm]._foreign_key
reftbl = currdb.tables[(fk.ref_schema, fk.ref_table)]
newcol = None
for refcol in reftbl.columns:
if refcol.name == col.basename:
newtype = refcol.type
if hasattr(col, 'type'):
newtype = col.type
newnull = None
if hasattr(col, 'not_null'):
newnull = col.not_null
newcol = Column(schema=table.schema, table=table.name,
name=col.name, type=newtype,
not_null=newnull, number=refcol.number + 1,
_table=table)
break
if newcol is None:
raise KeyError("Denorm column '%s': base column '%s' not found"
% (col.name, self.basename))
if col.name not in table.column_names():
table.columns.append(newcol)

for den in self.denorms:
den.apply(table, extdb, self.columns)


class ExtDenormColumn(DbExtension):
"""An extension that adds automatically maintained denormalized columns"""

Expand All @@ -29,57 +66,46 @@ def add_trigger_func(self, table, extdb, func, trans_tbl):
:param trans_tbl: translation table for function source
"""
extdb.triggers[func].apply(table)
fncname = extdb.functions[func].adjust_name(trans_tbl)
if isinstance(trans_tbl, dict) and 'single' in trans_tbl:
strans_tbl = trans_tbl['single']
else:
strans_tbl = trans_tbl
fncname = extdb.functions[func].adjust_name(strans_tbl)
(sch, fncname) = split_schema_obj(fncname, table.schema)
fncsig = fncname + '()'
if (sch, fncsig) not in extdb.current.functions:
newfunc = extdb.functions[func].apply(sch, trans_tbl)
extdb.schemas[sch].add_func(newfunc)
newfunc = extdb.functions[func].apply(sch, trans_tbl, extdb)
extdb.add_func(sch, newfunc)
extdb.add_lang(newfunc.language)

def apply(self, table, extdb):
def apply(self, table, extdb, columns):
"""Apply copy denormalizations to the argument table.
:param table: table to which columns/triggers will be added
:param extdb: extension dictionaries
:param columns: list of columns added by this denormalization
"""
currdb = extdb.current
fk = self._foreign_key
reftbl = currdb.tables[(fk.ref_schema, fk.ref_table)]
newcol = None
for col in reftbl.columns:
if col.name == self.copy:
newtype = col.type
if hasattr(self, 'type'):
newtype = self.type
newnull = None
if hasattr(col, 'not_null'):
newnull = col.not_null
newcol = Column(schema=self.schema, table=self.table,
name=self.name, type=newtype, not_null=newnull,
number=col.number + 1, _table=table)
break
if newcol is None:
raise KeyError("Denorm column '%s': copy column '%s' not found" %
(self.name, self.copy))
if self.name not in table.column_names():
table.columns.append(newcol)

childcols = [c.name for c in columns]
parcols = [c.basename for c in columns]
keys = [(table.column_names()[k - 1]) for k in fk.keycols]
parkeys = [(fk.references.column_names()[k - 1]) for k in fk.ref_cols]
# translation table
trans_tbl = [
('{{parent_schema}}', fk.ref_schema),
('{{parent_table}}', fk.ref_table),
('{{parent_column}}', self.copy),
('{{parent_key}}', parkeys[0]),
('{{child_schema}}', table.schema),
('{{child_table}}', table.name),
('{{child_column}}', self.name),
('{{child_fkey}}', keys[0])]

self.add_trigger_func(table, extdb, 'copy_denorm', trans_tbl)
self.add_trigger_func(reftbl, extdb, 'copy_cascade', trans_tbl)
trans_tbls = {'single': [
('{{parent_schema}}', fk.ref_schema),
('{{parent_table}}', fk.ref_table),
('{{child_schema}}', table.schema),
('{{child_table}}', table.name)],
'multi': {
'parent_key': parkeys,
'parent_column': parcols,
'child_fkey': keys,
'child_column': childcols}}
self.add_trigger_func(table, extdb, 'copy_denorm', trans_tbls)
self.add_trigger_func(reftbl, extdb, 'copy_cascade', trans_tbls)


class ExtDenormDict(DbExtensionDict):
Expand All @@ -96,25 +122,55 @@ def from_map(self, table, indenorms):
if not isinstance(indenorms, list):
raise ValueError("Table %s: denorm columns must be a list" %
table.name)
dencols = self[(table.schema, table.name)] = []

for col in indenorms:
for den in list(col.keys()):
arg = col[den]
if 'copy' in arg:
cls = ExtCopyDenormColumn
dencols.append(cls(schema=table.schema, table=table.name,
name=den, **arg))
denorms = self[(table.schema, table.name)] = ExtDenorms()

for (i, dencol) in enumerate(indenorms):
dentype = dencol.keys()[0]
denspec = dencol[dentype]
if dentype == 'copy':
cls = ExtCopyDenormColumn
try:
fkey = denspec['foreign_key']
except KeyError as exc:
exc.args = ("Copy denormalization for table '%s' is "
"missing foreign key" % table.name, )
raise
denorms.denorms.append(cls(foreign_key=fkey))
try:
dencols = denspec['columns']
except KeyError as exc:
exc.args = ("Denormalization for table '%s' has no columns"
% table.name, )
raise

args = {'denorm': i}
for col in dencols:
if isinstance(col, dict):
for fromcol in list(col.keys()):
args.update(basename=fromcol)
spec = col[fromcol]
if isinstance(spec, dict):
if 'prefix' in spec:
colname = spec['prefix'] + fromcol
elif 'suffix' in spec:
colname = fromcol + spec['suffix']
else:
colname = spec
else:
colname = col
args.update(basename=colname)
denorms.columns.append(CfgColumn(schema=table.schema,
table=table.name, name=colname,
**args))

def link_current(self, constrs):
"""Connect denormalizations to actual database constraints
:param constrs: constraints in database
"""
for (sch, tbl) in list(self.keys()):
for i, col in enumerate(self[(sch, tbl)]):
for col in self[(sch, tbl)].denorms:
if hasattr(col, 'foreign_key'):
fkname = col.foreign_key
assert(constrs[(sch, tbl, fkname)])
self[(sch, tbl)][i]._foreign_key = constrs[(
sch, tbl, fkname)]
col._foreign_key = constrs[(sch, tbl, fkname)]

0 comments on commit 75e7274

Please sign in to comment.