diff --git a/TODO b/TODO index 394b228a..94fc342a 100644 --- a/TODO +++ b/TODO @@ -1,28 +1,5 @@ TODO ==== -Model as state history of immutable objects -------------------------------------------- - -1. Model gets delta from AllWatcher -2. Entity type+id in the delta uniquely identifies an entity in the model -3. The model keeps a history deque for each of these entities -4. The new delta is appended to the deque for this entity -5. When a new python object is created for this entitiy, it: - a. Registers an observer with the model so it'll get change callbacks - b. Gets its state from a pointer back to the last item in the model's - history deque for this entity - c. Has a previous() method that returns a copy of the object at its previous - frame of history (or None if no previous history exists). This object - would be disconnected (would not receive live updates from the model). - - -Make model-changing methods (like deploy()) return the appropriate object -------------------------------------------------------------------------- -For objects being added (newly created), this will require that the method -doesn't return until the AllWatcher returns a delta containing an id for -the newly created thing. - - -Add a LogWatcher coroutine that yields from debug-log api ---------------------------------------------------------- +- Add a LogWatcher coroutine that yields from debug-log api +- Add a way to exit the event loop when a Model matches a bundle yaml diff --git a/examples/relate.py b/examples/relate.py index bdfb2d78..fa329001 100644 --- a/examples/relate.py +++ b/examples/relate.py @@ -27,9 +27,13 @@ async def on_change(self, delta, old, new, model): class MyModelObserver(ModelObserver): + _shutting_down = False + async def on_change(self, delta, old, new, model): - if model.all_units_idle(): + if model.all_units_idle() and not self._shutting_down: + self._shutting_down = True logging.debug('All units idle, disconnecting') + await model.reset(force=True) await model.disconnect() model.loop.stop() @@ -42,12 +46,28 @@ async def run(): await model.reset(force=True) model.add_observer(MyModelObserver()) - await model.deploy( + ubuntu_app = await model.deploy( 'ubuntu-0', service_name='ubuntu', series='trusty', channel='stable', ) + ubuntu_app.on_change(asyncio.coroutine( + lambda delta, old_app, new_app, model: + print('App changed: {}'.format(new_app.entity_id)) + )) + ubuntu_app.on_remove(asyncio.coroutine( + lambda delta, old_app, new_app, model: + print('App removed: {}'.format(old_app.entity_id)) + )) + ubuntu_app.on_unit_add(asyncio.coroutine( + lambda delta, old_unit, new_unit, model: + print('Unit added: {}'.format(new_unit.entity_id)) + )) + ubuntu_app.on_unit_remove(asyncio.coroutine( + lambda delta, old_unit, new_unit, model: + print('Unit removed: {}'.format(old_unit.entity_id)) + )) await model.deploy( 'nrpe-11', service_name='nrpe', diff --git a/juju/application.py b/juju/application.py index 978500fa..1e87ced2 100644 --- a/juju/application.py +++ b/juju/application.py @@ -7,6 +7,26 @@ class Application(model.ModelEntity): + @property + def _unit_match_pattern(self): + return r'^{}.*$'.format(self.entity_id) + + def on_unit_add(self, callable_): + """Add a "unit added" observer to this entity, which will be called + whenever a unit is added to this application. + + """ + self.model.add_observer( + callable_, 'unit', 'add', self._unit_match_pattern) + + def on_unit_remove(self, callable_): + """Add a "unit removed" observer to this entity, which will be called + whenever a unit is removed from this application. + + """ + self.model.add_observer( + callable_, 'unit', 'remove', self._unit_match_pattern) + @property def units(self): return [ diff --git a/juju/model.py b/juju/model.py index fe07f809..5d436fd8 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,6 +1,8 @@ import asyncio import collections import logging +import re +import weakref from concurrent.futures import CancelledError from functools import partial @@ -17,13 +19,49 @@ log = logging.getLogger(__name__) +class _Observer(object): + """Wrapper around an observer callable. + + This wrapper allows filter criteria to be associated with the + callable so that it's only called for changes that meet the criteria. + + """ + def __init__(self, callable_, entity_type, action, entity_id): + self.callable_ = callable_ + self.entity_type = entity_type + self.action = action + self.entity_id = entity_id + if self.entity_id: + if not self.entity_id.startswith('^'): + self.entity_id = '^' + self.entity_id + if not self.entity_id.endswith('$'): + self.entity_id += '$' + + async def __call__(self, delta, old, new, model): + await self.callable_(delta, old, new, model) + + def cares_about(self, entity_type, action, entity_id): + """Return True if this observer "cares about" (i.e. wants to be + called) for a change matching the entity_type, action, and + entity_id parameters. + + """ + if (self.entity_id and entity_id and + not re.match(self.entity_id, str(entity_id))): + return False + + if self.entity_type and self.entity_type != entity_type: + return False + + if self.action and self.action != action: + return False + + return True + + class ModelObserver(object): async def __call__(self, delta, old, new, model): - if old is None and new is not None: - type_ = 'add' - else: - type_ = delta.type - handler_name = 'on_{}_{}'.format(delta.entity, type_) + handler_name = 'on_{}_{}'.format(delta.entity, delta.type) method = getattr(self, handler_name, self.on_change) await method(delta, old, new, model) @@ -40,6 +78,9 @@ def __init__(self, model): self.model = model self.state = dict() + def clear(self): + self.state.clear() + def _live_entity_map(self, entity_type): """Return an id:Entity map of all the living entities of type ``entity_type``. @@ -120,8 +161,16 @@ def get_entity( updated by ``delta`` """ + """ + log.debug( + 'Getting %s:%s at index %s', + entity_type, entity_id, history_index) + """ + if history_index < 0 and history_index != -1: history_index += len(self.entity_history(entity_type, entity_id)) + if history_index < 0: + return None try: self.entity_data(entity_type, entity_id, history_index) @@ -168,6 +217,23 @@ def __getattr__(self, name): self.entity_type, self.entity_id)) return self.data[name] + def __bool__(self): + return bool(self.data) + + def on_change(self, callable_): + """Add a change observer to this entity. + + """ + self.model.add_observer( + callable_, self.entity_type, 'change', self.entity_id) + + def on_remove(self, callable_): + """Add a remove observer to this entity. + + """ + self.model.add_observer( + callable_, self.entity_type, 'remove', self.entity_id) + @property def entity_type(self): """A string identifying the entity type of this object, e.g. @@ -182,7 +248,7 @@ def current(self): entity in the underlying model. This will be True except when the object represents an entity at a - prior state in history, e.g. if the object was obtained by calling + non-latest state in history, e.g. if the object was obtained by calling .previous() on another object. """ @@ -276,7 +342,7 @@ def __init__(self, loop=None): """ self.loop = loop or asyncio.get_event_loop() self.connection = None - self.observers = set() + self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) self._watcher_task = None self._watch_shutdown = asyncio.Event(loop=loop) @@ -329,6 +395,7 @@ async def reset(self, force=False): await self.block_until( lambda: len(self.machines) == 0 ) + self.state.clear() async def block_until(self, *conditions, timeout=None): """Return only after all conditions are true. @@ -336,7 +403,7 @@ async def block_until(self, *conditions, timeout=None): """ async def _block(): while not all(c() for c in conditions): - await asyncio.sleep(.1) + await asyncio.sleep(0) await asyncio.wait_for(_block(), timeout) @property @@ -363,10 +430,11 @@ def units(self): """ return self.state.units - def add_observer(self, callable_): + def add_observer( + self, callable_, entity_type=None, action=None, entity_id=None): """Register an "on-model-change" callback - Once a watch is started (Model.watch() is called), ``callable_`` + Once the model is connected, ``callable_`` will be called each time the model changes. callable_ should be Awaitable and accept the following positional arguments: @@ -385,8 +453,15 @@ def add_observer(self, callable_): model - The :class:`Model` itself. + Events for which ``callable_`` is called can be specified by passing + entity_type, action, and/or id_ filter criteria, e.g.: + + add_observer( + myfunc, entity_type='application', action='add', id_='ubuntu') + """ - self.observers.add(callable_) + observer = _Observer(callable_, entity_type, action, entity_id) + self.observers[observer] = callable_ def _watch(self): """Start an asynchronous watch against this model. @@ -442,11 +517,32 @@ async def _notify_observers(self, delta, old_obj, new_obj): by applying this delta. """ + if not old_obj: + delta.type = 'add' + log.debug( 'Model changed: %s %s %s', delta.entity, delta.type, delta.get_id()) + for o in self.observers: - asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + if o.cares_about(delta.entity, delta.type, delta.get_id()): + asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + + async def _wait_for_new(self, entity_type, entity_id): + """Wait for a new object to appear in the Model and return it. + + Waits for an object of type ``entity_type`` with id ``entity_id``. + + This coroutine blocks until the new object appears in the model. + + """ + entity_added = asyncio.Event(loop=self.loop) + + async def callback(delta, old, new, model): + entity_added.set() + self.add_observer(callback, entity_type, 'add', entity_id) + await entity_added.wait() + return self.state._live_entity_map(entity_type)[entity_id] def add_machine( self, spec=None, constraints=None, disks=None, series=None, @@ -686,7 +782,8 @@ async def deploy( storage=storage, ) - return await app_facade.Deploy([app]) + await app_facade.Deploy([app]) + return await self._wait_for_new('application', service_name) def destroy(self): """Terminate all machines and resources for this model.