|
|
@@ -2,13 +2,17 @@ |
|
|
import collections
|
|
|
import logging
|
|
|
from concurrent.futures import CancelledError
|
|
|
+from functools import partial
|
|
|
+
|
|
|
+from theblues import charmstore
|
|
|
|
|
|
from .client import client
|
|
|
from .client import watcher
|
|
|
from .client import connection
|
|
|
from .delta import get_entity_delta
|
|
|
from .delta import get_entity_class
|
|
|
from .exceptions import DeadEntityException
|
|
|
+from .errors import JujuAPIError
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@@ -277,6 +281,7 @@ def __init__(self, loop=None): |
|
|
self._watcher_task = None
|
|
|
self._watch_shutdown = asyncio.Event(loop=loop)
|
|
|
self._watch_received = asyncio.Event(loop=loop)
|
|
|
+ self._charmstore = CharmStore(self.loop)
|
|
|
|
|
|
async def connect_current(self):
|
|
|
"""Connect to the current Juju model.
|
|
|
@@ -651,30 +656,37 @@ def debug_log( |
|
|
for k, v in storage.items()
|
|
|
}
|
|
|
|
|
|
+ entity_id = await self.charmstore.entityId(entity_url)
|
|
|
+
|
|
|
app_facade = client.ApplicationFacade()
|
|
|
client_facade = client.ClientFacade()
|
|
|
app_facade.connect(self.connection)
|
|
|
client_facade.connect(self.connection)
|
|
|
|
|
|
- log.debug(
|
|
|
- 'Deploying %s', entity_url)
|
|
|
-
|
|
|
- await client_facade.AddCharm(channel, entity_url)
|
|
|
- app = client.ApplicationDeploy(
|
|
|
- application=service_name,
|
|
|
- channel=channel,
|
|
|
- charm_url=entity_url,
|
|
|
- config=config,
|
|
|
- constraints=constraints,
|
|
|
- endpoint_bindings=bind,
|
|
|
- num_units=num_units,
|
|
|
- placement=placement,
|
|
|
- resources=resources,
|
|
|
- series=series,
|
|
|
- storage=storage,
|
|
|
- )
|
|
|
-
|
|
|
- return await app_facade.Deploy([app])
|
|
|
+ if 'bundle/' in entity_id:
|
|
|
+ handler = BundleHandler(self)
|
|
|
+ await handler.fetch_plan(entity_id)
|
|
|
+ await handler.execute_plan()
|
|
|
+ else:
|
|
|
+ log.debug(
|
|
|
+ 'Deploying %s', entity_id)
|
|
|
+
|
|
|
+ await client_facade.AddCharm(channel, entity_id)
|
|
|
+ app = client.ApplicationDeploy(
|
|
|
+ application=service_name,
|
|
|
+ channel=channel,
|
|
|
+ charm_url=entity_id,
|
|
|
+ config=config,
|
|
|
+ constraints=constraints,
|
|
|
+ endpoint_bindings=bind,
|
|
|
+ num_units=num_units,
|
|
|
+ placement=placement,
|
|
|
+ resources=resources,
|
|
|
+ series=series,
|
|
|
+ storage=storage,
|
|
|
+ )
|
|
|
+
|
|
|
+ return await app_facade.Deploy([app])
|
|
|
|
|
|
def destroy(self):
|
|
|
"""Terminate all machines and resources for this model.
|
|
|
@@ -989,3 +1001,253 @@ def upload_backup(self, archive_path): |
|
|
|
|
|
"""
|
|
|
pass
|
|
|
+
|
|
|
+ @property
|
|
|
+ def charmstore(self):
|
|
|
+ return self._charmstore
|
|
|
+
|
|
|
+
|
|
|
+class BundleHandler(object):
|
|
|
+ """
|
|
|
+ Handle bundles by using the API to translate bundle YAML into a plan of
|
|
|
+ steps and then dispatching each of those using the API.
|
|
|
+ """
|
|
|
+ def __init__(self, model):
|
|
|
+ self.model = model
|
|
|
+ self.charmstore = model.charmstore
|
|
|
+ self.plan = []
|
|
|
+ self.references = {}
|
|
|
+ self._units_by_app = {}
|
|
|
+ for unit_name, unit in model.units.items():
|
|
|
+ app_units = self._units_by_app.setdefault(unit.application, [])
|
|
|
+ app_units.append(unit_name)
|
|
|
+ self.client_facade = client.ClientFacade()
|
|
|
+ self.client_facade.connect(model.connection)
|
|
|
+ self.app_facade = client.ApplicationFacade()
|
|
|
+ self.app_facade.connect(model.connection)
|
|
|
+ self.ann_facade = client.AnnotationsFacade()
|
|
|
+ self.ann_facade.connect(model.connection)
|
|
|
+
|
|
|
+ async def fetch_plan(self, entity_id):
|
|
|
+ yaml = await self.charmstore.files(entity_id,
|
|
|
+ filename='bundle.yaml',
|
|
|
+ read_file=True)
|
|
|
+ self.plan = await self.client_facade.GetBundleChanges(yaml)
|
|
|
+
|
|
|
+ async def execute_plan(self):
|
|
|
+ for step in self.plan.changes:
|
|
|
+ method = getattr(self, step.method)
|
|
|
+ result = await method(*step.args)
|
|
|
+ self.references[step.id_] = result
|
|
|
+
|
|
|
+ def resolve(self, reference):
|
|
|
+ if reference and reference.startswith('$'):
|
|
|
+ reference = self.references[reference[1:]]
|
|
|
+ return reference
|
|
|
+
|
|
|
+ async def addCharm(self, charm, series):
|
|
|
+ """
|
|
|
+ :param charm string:
|
|
|
+ Charm holds the URL of the charm to be added.
|
|
|
+
|
|
|
+ :param series string:
|
|
|
+ Series holds the series of the charm to be added
|
|
|
+ if the charm default is not sufficient.
|
|
|
+ """
|
|
|
+ entity_id = await self.charmstore.entityId(charm)
|
|
|
+ log.debug('Adding %s', entity_id)
|
|
|
+ await self.client_facade.AddCharm(None, entity_id)
|
|
|
+ return entity_id
|
|
|
+
|
|
|
+ async def addMachines(self, series, constraints, container_type,
|
|
|
+ parent_id):
|
|
|
+ """
|
|
|
+ :param series string:
|
|
|
+ Series holds the optional machine OS series.
|
|
|
+
|
|
|
+ :param constraints string:
|
|
|
+ Constraints holds the optional machine constraints.
|
|
|
+
|
|
|
+ :param Container_type string:
|
|
|
+ ContainerType optionally holds the type of the container (for
|
|
|
+ instance ""lxc" or kvm"). It is not specified for top level
|
|
|
+ machines.
|
|
|
+
|
|
|
+ :param parent_id string:
|
|
|
+ ParentId optionally holds a placeholder pointing to another machine
|
|
|
+ change or to a unit change. This value is only specified in the
|
|
|
+ case this machine is a container, in which case also ContainerType
|
|
|
+ is set.
|
|
|
+ """
|
|
|
+ params = client.AddMachineParams(
|
|
|
+ series=series,
|
|
|
+ constraints=constraints,
|
|
|
+ container_type=container_type,
|
|
|
+ parent_id=self.resolve(parent_id),
|
|
|
+ )
|
|
|
+ results = await self.client_facade.AddMachines(params)
|
|
|
+ log.debug('Added new machine %s', results[0].machine)
|
|
|
+ return results[0].machine
|
|
|
+
|
|
|
+ async def addRelation(self, endpoint1, endpoint2):
|
|
|
+ """
|
|
|
+ :param endpoint1 string:
|
|
|
+ :param endpoint2 string:
|
|
|
+ Endpoint1 and Endpoint2 hold relation endpoints in the
|
|
|
+ "application:interface" form, where the application is always a
|
|
|
+ placeholder pointing to an application change, and the interface is
|
|
|
+ optional. Examples are "$deploy-42:web" or just "$deploy-42".
|
|
|
+ """
|
|
|
+ endpoints = [endpoint1, endpoint2]
|
|
|
+ # resolve indirect references
|
|
|
+ for i in range(len(endpoints)):
|
|
|
+ parts = endpoints[i].split(':')
|
|
|
+ parts[0] = self.resolve(parts[0])
|
|
|
+ endpoints[i] = ':'.join(parts)
|
|
|
+ try:
|
|
|
+ await self.app_facade.AddRelation(endpoints)
|
|
|
+ log.debug('Added relation %s <-> %s', *endpoints)
|
|
|
+ except JujuAPIError as e:
|
|
|
+ if 'relation already exists' not in e.message:
|
|
|
+ raise
|
|
|
+ log.debug('Relation %s <-> %s already exists', *endpoints)
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def deploy(self, charm, series, application, options, constraints,
|
|
|
+ storage, endpoint_bindings, resources):
|
|
|
+ """
|
|
|
+ :param charm string:
|
|
|
+ Charm holds the URL of the charm to be used to deploy this
|
|
|
+ application.
|
|
|
+
|
|
|
+ :param series string:
|
|
|
+ Series holds the series of the application to be deployed
|
|
|
+ if the charm default is not sufficient.
|
|
|
+
|
|
|
+ :param application string:
|
|
|
+ Application holds the application name.
|
|
|
+
|
|
|
+ :param options map[string]interface{}:
|
|
|
+ Options holds application options.
|
|
|
+
|
|
|
+ :param constraints string:
|
|
|
+ Constraints holds the optional application constraints.
|
|
|
+
|
|
|
+ :param storage map[string]string:
|
|
|
+ Storage holds the optional storage constraints.
|
|
|
+
|
|
|
+ :param endpoint_bindings map[string]string:
|
|
|
+ EndpointBindings holds the optional endpoint bindings
|
|
|
+
|
|
|
+ :param resources map[string]int:
|
|
|
+ Resources identifies the revision to use for each resource
|
|
|
+ of the application's charm.
|
|
|
+ """
|
|
|
+ # resolve indirect references
|
|
|
+ charm = self.resolve(charm)
|
|
|
+ # stringify all config values for API
|
|
|
+ options = {k: str(v) for k, v in options.items()}
|
|
|
+ # build param object
|
|
|
+ app = client.ApplicationDeploy(
|
|
|
+ charm_url=charm,
|
|
|
+ series=series,
|
|
|
+ application=application,
|
|
|
+ config=options,
|
|
|
+ constraints=constraints,
|
|
|
+ storage=storage,
|
|
|
+ endpoint_bindings=endpoint_bindings,
|
|
|
+ resources=resources,
|
|
|
+ )
|
|
|
+ # do the do
|
|
|
+ log.debug('Deploying %s', charm)
|
|
|
+ await self.app_facade.Deploy([app])
|
|
|
+ return application
|
|
|
+
|
|
|
+ async def addUnit(self, application, to):
|
|
|
+ """
|
|
|
+ :param application string:
|
|
|
+ Application holds the application placeholder name for which a unit
|
|
|
+ is added.
|
|
|
+
|
|
|
+ :param to string:
|
|
|
+ To holds the optional location where to add the unit, as a
|
|
|
+ placeholder pointing to another unit change or to a machine change.
|
|
|
+ """
|
|
|
+ application = self.resolve(application)
|
|
|
+ placement = self.resolve(to)
|
|
|
+ if self._units_by_app.get(application):
|
|
|
+ # enough units for this application already exist;
|
|
|
+ # claim one, and carry on
|
|
|
+ # NB: this should probably honor placement, but the juju client
|
|
|
+ # doesn't, so we're not bothering, either
|
|
|
+ unit_name = self._units_by_app[application].pop()
|
|
|
+ log.debug('Reusing unit %s for %s', unit_name, application)
|
|
|
+ return unit_name
|
|
|
+ log.debug('Adding unit of %s%s',
|
|
|
+ application,
|
|
|
+ (' to %s' % placement) if placement else '')
|
|
|
+ result = await self.app_facade.AddUnits(
|
|
|
+ application=application,
|
|
|
+ placement=placement,
|
|
|
+ num_units=1,
|
|
|
+ )
|
|
|
+ return result.units[0]
|
|
|
+
|
|
|
+ async def expose(self, application):
|
|
|
+ """
|
|
|
+ :param application string:
|
|
|
+ Application holds the placeholder name of the application that must
|
|
|
+ be exposed.
|
|
|
+ """
|
|
|
+ application = self.resolve(application)
|
|
|
+ log.debug('Exposing %s', application)
|
|
|
+ await self.app_facade.Expose(application)
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def setAnnotations(self, id_, entity_type, annotations):
|
|
|
+ """
|
|
|
+ :param id_ string:
|
|
|
+ Id is the placeholder for the application or machine change
|
|
|
+ corresponding to the entity to be annotated.
|
|
|
+
|
|
|
+ :param entity_type EntityType:
|
|
|
+ EntityType holds the type of the entity, "application" or
|
|
|
+ "machine".
|
|
|
+
|
|
|
+ :param annotations map[string]string:
|
|
|
+ Annotations holds the annotations as key/value pairs.
|
|
|
+ """
|
|
|
+ entity_id = self.resolve(id_)
|
|
|
+ log.debug('Updating annotations of %s', entity_id)
|
|
|
+ ann = client.EntityAnnotations(
|
|
|
+ entity=entity_id,
|
|
|
+ annotations=annotations,
|
|
|
+ )
|
|
|
+ await self.ann_facade.Set([ann])
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+class CharmStore(object):
|
|
|
+ """
|
|
|
+ Async wrapper around theblues.charmstore.CharmStore
|
|
|
+ """
|
|
|
+ def __init__(self, loop):
|
|
|
+ self.loop = loop
|
|
|
+ self._cs = charmstore.CharmStore()
|
|
|
+
|
|
|
+ def __getattr__(self, name):
|
|
|
+ """
|
|
|
+ Wrap method calls in coroutines that use run_in_executor to make them
|
|
|
+ async.
|
|
|
+ """
|
|
|
+ attr = getattr(self._cs, name)
|
|
|
+ if not callable(attr):
|
|
|
+ wrapper = partial(getattr, self._cs, name)
|
|
|
+ setattr(self, name, wrapper)
|
|
|
+ else:
|
|
|
+ async def coro(*args, **kwargs):
|
|
|
+ method = partial(attr, *args, **kwargs)
|
|
|
+ return await self.loop.run_in_executor(None, method)
|
|
|
+ setattr(self, name, coro)
|
|
|
+ wrapper = coro
|
|
|
+ return wrapper
|
0 comments on commit
fd2a74a