Fetching contributors…
Cannot retrieve contributors at this time
373 lines (302 sloc) 13.6 KB
# -*- coding: utf-8 -*-
"""Complete solution for database fixtures using only SQLAlchemy.
Important features:
- Fixtures can be autogenerated from an existing database.
Not necessarily the whole database -- you can pass in queries.
- **Fixtures are expressed as Python code.**
- Fixtures can then be applied to other databases by calling a function.
You can use this as long as your models have a primary key column that is
consistently named (for instance, it is called "id" in all your models).
When you use Mediovaigel to create the fixtures, you must do so in order.
If model B depends on (has a foreign key to) model A, then you must create
the fixtures for model A before those for model B.
The foreign key values stored in the fixtures are those of the original
database (from which the fixtures are generated). A translation is
performed in the fixture loading process. As fixtures are created
in the database, their new IDs are stored in memory, and then the
foreign keys referencing them get the new IDs, not the ones written
in the fixtures.
**Do not trust the fixtures file.** You must test it. In other words,
to know whether your fixtures can be loaded... you have to
actually load them onto another database. Sorry.
Self-referential entities are supported. Here is how:
Suppose entity A is being loaded, but it needs to reference entity B that
has not been loaded yet. The program puts entity A aside for a while;
as soon as entity B appears, entity A is retried.
At the end of the loading process, the transaction is only committed if
all fixtures have been successfully loaded. If even one entity could not
be created, the transaction is rolled back.
As you can see, Mediovaigel is strogonofically stainless.
Cabriocaric, really.
from __future__ import (absolute_import, division, print_function,
from codecs import open
from copy import copy
from datetime import date, datetime, timedelta
from decimal import Decimal
from pprint import pprint
# from uuid import uuid4
from nine import nine, str, basestring, IS_PYTHON2
from bag import resolve
from .tricks import (
model_property_names, foreign_key_from_col, foreign_keys_in,
EMPTY = []
class _IndentWriter(object):
def __init__(self):
self.indentation = 0
self.lines = []
def indent(self):
self.indentation += 4
def dedent(self):
self.indentation -= 4
def add(self, line):
self.lines.append(' ' * self.indentation + line)
def __str__(self):
return '\n'.join(self.lines)
# TODO Save memory by yielding lines instead of adding them to a list
# TODO Provide a saving_to(file, generator, encoding='utf-8')
# TODO Ability to register callbacks to be run after loading each instance.
REPRESENTABLE = (int, basestring, float, Decimal,
date, datetime, timedelta)
class Mediovaigel(_IndentWriter):
"""Use this to generate SQLAlchemy fixtures from an existing database.
The fixtures are expressed as Python code, so they sort of self-load.
One uses Mediovaigel like this::
from bag.sqlalchemy.mediovaigel import Mediovaigel
from my.models import Course, Lecture, User, session
m = Mediovaigel()
# The order of the lines below matters:
m.generate_fixtures(Course, sas=session)
# A Lecture belongs to a Course, so it comes after the Course:
m.generate_fixtures(Lecture, sas=session)
# Do not store users' passwords on the fixtures file:
m.generate_fixtures(User, sas=session,
ignore_attribs=['id', 'password'])
# To limit the scope, pass a query instead of the session:
m.generate_fixtures(User, query=session.query(User).filter_by(id=42))
# (...)
Take a look at the generated file, it has a function that you can use to
load the fixtures on a database.
def __init__(self, pk_property_name='id'):
``pk_property_name`` must be the name of the primary key column
consistently used in your models.
super(Mediovaigel, self).__init__() = pk_property_name
self.imports = ['import datetime', 'from decimal import Decimal']
# self.refs = {}
def _serialize_property_value(self, val):
"""Return a string containing the representation, or None.
Override this in subclasses to support other types.
if val is None or isinstance(val, REPRESENTABLE):
return repr(val)
def serialize_property_value(self, entity, attrib):
"""Return the representation of a value, or raise RuntimeError."""
val = self._serialize_property_value(getattr(entity, attrib))
if val:
return val
raise RuntimeError(
'Cannot serialize. Entity: {}. Attrib: {}. Value: {}'.format(
entity, attrib, getattr(entity, attrib)))
def generate_fixtures(self, cls=None, query=None, ignore_attribs=None,
"""Generate fixtures for one model class. Optionally from a query.
``cls`` can be one of 2 things:
* a model class; or
* a string containing a resource spec pointing to a Table instance,
for example: ""
``ignore_attribs`` is a list of the properties for this class that
should not be passed to the constructor when instantiating an entity.
assert cls and (sas or query)
if isinstance(cls, basestring):
return self._process_table(
cls, ignore_attribs or [], sas=sas)
return self._process_class(
cls, query=query, ignore_attribs=ignore_attribs or [],
def _process_class(self, cls, query=None, ignore_attribs=None,
attribs = model_property_names(cls, blacklist=ignore_attribs,
assert len(attribs) > 0
attribs = sorted(attribs)
self.imports.append('from {} import {}'.format(
cls.__module__, cls.__name__))
for entity in (query or sas.query(cls)).yield_per(50):
# if hasattr(entity, 'id'):
# ref = cls.__name__ + str(
# else: # If there is no id, we generate our own random id:
# ref = cls.__name__ + str(uuid4())[-5:]
# self.refs[ref] = entity
# self.add('{} = {}('.format(ref, cls.__name__))
self.add('yield ({}, {}('.format(
getattr(entity,, cls.__name__))
for attrib in attribs:
val = self.serialize_property_value(entity, attrib)
self.add('{}={},'.format(attrib, val))
# self.add('session.add({})\n'.format(ref))
def _process_table(self, resource_spec, ignore_attribs, sas):
"""Intended for association tables."""
from sqlalchemy import select
table = resolve(resource_spec)
cols = list(enumerate(table.c.keys()))
for row in sas.execute(select([table])).fetchall():
self.add("yield [")
for index, colname in cols:
if colname in ignore_attribs:
def output(self, encoding='utf-8'):
"""Return the final Python code with the fixture functions."""
return TEMPLATE.format(
encoding=encoding, when=str(datetime.utcnow())[:16],
def save_to(self, path, encoding='utf-8'):
"""Save fixtures to ``path``."""
with open(path, 'w', encoding=encoding) as writer:
# -*- coding: {encoding} -*-
'''Fixtures autogenerated by Mediovaigel on {when}'''
PK = "{pk}"
def load_fixtures(session, fixtures=None, key_val_db=None, **kw):
from bag.sqlalchemy.mediovaigel import load_fixtures
load_fixtures(session, fixtures or the_fixtures(), key_val_db=key_val_db,
PK=PK, **kw)
def the_fixtures():
class load_fixtures(object):
"""Generated fixture files use this to load themselves on a database."""
def __init__(self, session, fixtures, PK='id', key_val_db=None): = session
self.PK = PK
self.mapp = key_val_db or {} # maps original IDs to new IDs
# This stores the foreign keys dict for each model class:
self.cached_fks = {}
# This stores entities whose creation must be delayed due to
# the temporary inexistence of other entities:
self.delayed = {}
for index, sequence in enumerate(fixtures):
if index % 500 == 0:
print('Loading fixture n.{}'.format(index))
if isinstance(sequence, list):
if self.delayed:
print('Darn, the delayed fixtures above remain. I give up. '
'The transaction has NOT been committed.')
print('Total: {} fixtures loaded. Committing the transaction...'
.format(index + 1))
def _load_entity(self, original_id, entity):
cls = type(entity)
key = cls.__tablename__ + str(original_id)
fks = self.cached_fks.get(cls) # TODO Isolate cache for legibility
if fks is None:
fks = foreign_keys_in(cls)
self.cached_fks[cls] = fks
data_to_set = []
for fk_attrib, fk in fks.items():
# Replace the old FK value with the NEW id stored in mapp
old_fk_value = getattr(entity, fk_attrib)
if old_fk_value is None:
new_id = self._get_new_id(fk, old_fk_value)
except KeyError as e:
print('Delaying {} #{} for lack of {}'.format(
cls.__name__, original_id, e.args[0]))
# Store this job so it will be retried later:
e.args[0], self._load_entity, original_id, entity)
return False
# print('Was loading {} #{} and BOOM!'.format(
# cls.__name__, original_id))
# raise
data_to_set.append((fk_attrib, new_id))
for fk_attrib, new_id in data_to_set:
setattr(entity, fk_attrib, new_id)
assert self.mapp.get(key) is None
# Store the new id for this entity so we can look it up in the future:
self.mapp[key] = getattr(entity, self.PK) # 'course42': 37
# An entity has been stored. Maybe it triggers related delayed entities
postponed = self.delayed.get(key)
if postponed:
print('=== Redeeming entities that need {}'.format(key))
while postponed:
method, args = postponed.pop()
success = method(*args)
del self.delayed[key]
return True
def _load_row(self, original_values):
from sqlalchemy import insert
values = copy(original_values)
table = resolve(values.pop(0)) # TODO Cache
cols = list(enumerate(table.c.keys())) # TODO Cache
for index, col in cols:
fk = foreign_key_from_col(table.c[col])
if fk:
# Replace the old FK value with the NEW id stored in mapp
old_fk_value = values[index]
if old_fk_value is None:
values[index] = self._get_new_id(fk, old_fk_value)
except KeyError as e:
print('Delaying {} row for lack of {}'.format(, e))
# Store this job so it will be retried later:
e.args[0], self._load_row, original_values)
return False, values=values))
return True
def _get_new_id(self, fk, old_id):
"""Given a ForeignKey object and its value in the old database,
looks up the cache and returns the value for the new database.
table_name = fk.target_fullname.split('.')[0]
return self.mapp[table_name + str(old_id)]
def _delay_creation(self, wanted, method, *args):
"""When an entity cannot be created yet because it references another
entity that doesn't exist yet, we store the job for retrying later.
In the dict, the key is the non-existent entity key, and the value
is a tuple with the arguments to the creation method.
val = (method, args)
if wanted in self.delayed:
self.delayed[wanted] = [val]