Skip to content

Commit

Permalink
Add support for deploying bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
johnsca committed Oct 14, 2016
1 parent 46887b8 commit fd2a74a
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 20 deletions.
4 changes: 3 additions & 1 deletion juju/client/connection.py
Expand Up @@ -11,6 +11,8 @@

import yaml

from juju.errors import JujuAPIError

log = logging.getLogger("websocket")


Expand Down Expand Up @@ -81,7 +83,7 @@ async def rpc(self, msg, encoder=None):
#log.debug("Send: %s", outgoing)
#log.debug("Recv: %s", result)
if result and 'error' in result:
raise RuntimeError(result)
raise JujuAPIError(result)
return result

async def clone(self):
Expand Down
7 changes: 7 additions & 0 deletions juju/errors.py
@@ -0,0 +1,7 @@

class JujuAPIError(Exception):
def __init__(self, result):
self.message = result['error']
self.response = result['response']
self.request_id = result['request-id']
super().__init__(self.message)
300 changes: 281 additions & 19 deletions juju/model.py
Expand Up @@ -2,13 +2,17 @@
import collections
import logging
from concurrent.futures import CancelledError
from functools import partial

from theblues import charmstore

from .client import client
from .client import watcher
from .client import connection
from .delta import get_entity_delta
from .delta import get_entity_class
from .exceptions import DeadEntityException
from .errors import JujuAPIError

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -277,6 +281,7 @@ def __init__(self, loop=None):
self._watcher_task = None
self._watch_shutdown = asyncio.Event(loop=loop)
self._watch_received = asyncio.Event(loop=loop)
self._charmstore = CharmStore(self.loop)

async def connect_current(self):
"""Connect to the current Juju model.
Expand Down Expand Up @@ -651,30 +656,37 @@ async def deploy(
for k, v in storage.items()
}

entity_id = await self.charmstore.entityId(entity_url)

app_facade = client.ApplicationFacade()
client_facade = client.ClientFacade()
app_facade.connect(self.connection)
client_facade.connect(self.connection)

log.debug(
'Deploying %s', entity_url)

await client_facade.AddCharm(channel, entity_url)
app = client.ApplicationDeploy(
application=service_name,
channel=channel,
charm_url=entity_url,
config=config,
constraints=constraints,
endpoint_bindings=bind,
num_units=num_units,
placement=placement,
resources=resources,
series=series,
storage=storage,
)

return await app_facade.Deploy([app])
if 'bundle/' in entity_id:
handler = BundleHandler(self)
await handler.fetch_plan(entity_id)
await handler.execute_plan()
else:
log.debug(
'Deploying %s', entity_id)

await client_facade.AddCharm(channel, entity_id)
app = client.ApplicationDeploy(
application=service_name,
channel=channel,
charm_url=entity_id,
config=config,
constraints=constraints,
endpoint_bindings=bind,
num_units=num_units,
placement=placement,
resources=resources,
series=series,
storage=storage,
)

return await app_facade.Deploy([app])

def destroy(self):
"""Terminate all machines and resources for this model.
Expand Down Expand Up @@ -989,3 +1001,253 @@ def upload_backup(self, archive_path):
"""
pass

@property
def charmstore(self):
return self._charmstore


class BundleHandler(object):
"""
Handle bundles by using the API to translate bundle YAML into a plan of
steps and then dispatching each of those using the API.
"""
def __init__(self, model):
self.model = model
self.charmstore = model.charmstore
self.plan = []
self.references = {}
self._units_by_app = {}
for unit_name, unit in model.units.items():
app_units = self._units_by_app.setdefault(unit.application, [])
app_units.append(unit_name)
self.client_facade = client.ClientFacade()
self.client_facade.connect(model.connection)
self.app_facade = client.ApplicationFacade()
self.app_facade.connect(model.connection)
self.ann_facade = client.AnnotationsFacade()
self.ann_facade.connect(model.connection)

async def fetch_plan(self, entity_id):
yaml = await self.charmstore.files(entity_id,
filename='bundle.yaml',
read_file=True)
self.plan = await self.client_facade.GetBundleChanges(yaml)

async def execute_plan(self):
for step in self.plan.changes:
method = getattr(self, step.method)
result = await method(*step.args)
self.references[step.id_] = result

def resolve(self, reference):
if reference and reference.startswith('$'):
reference = self.references[reference[1:]]
return reference

async def addCharm(self, charm, series):
"""
:param charm string:
Charm holds the URL of the charm to be added.
:param series string:
Series holds the series of the charm to be added
if the charm default is not sufficient.
"""
entity_id = await self.charmstore.entityId(charm)
log.debug('Adding %s', entity_id)
await self.client_facade.AddCharm(None, entity_id)
return entity_id

async def addMachines(self, series, constraints, container_type,
parent_id):
"""
:param series string:
Series holds the optional machine OS series.
:param constraints string:
Constraints holds the optional machine constraints.
:param Container_type string:
ContainerType optionally holds the type of the container (for
instance ""lxc" or kvm"). It is not specified for top level
machines.
:param parent_id string:
ParentId optionally holds a placeholder pointing to another machine
change or to a unit change. This value is only specified in the
case this machine is a container, in which case also ContainerType
is set.
"""
params = client.AddMachineParams(
series=series,
constraints=constraints,
container_type=container_type,
parent_id=self.resolve(parent_id),
)
results = await self.client_facade.AddMachines(params)
log.debug('Added new machine %s', results[0].machine)
return results[0].machine

async def addRelation(self, endpoint1, endpoint2):
"""
:param endpoint1 string:
:param endpoint2 string:
Endpoint1 and Endpoint2 hold relation endpoints in the
"application:interface" form, where the application is always a
placeholder pointing to an application change, and the interface is
optional. Examples are "$deploy-42:web" or just "$deploy-42".
"""
endpoints = [endpoint1, endpoint2]
# resolve indirect references
for i in range(len(endpoints)):
parts = endpoints[i].split(':')
parts[0] = self.resolve(parts[0])
endpoints[i] = ':'.join(parts)
try:
await self.app_facade.AddRelation(endpoints)
log.debug('Added relation %s <-> %s', *endpoints)
except JujuAPIError as e:
if 'relation already exists' not in e.message:
raise
log.debug('Relation %s <-> %s already exists', *endpoints)
return None

async def deploy(self, charm, series, application, options, constraints,
storage, endpoint_bindings, resources):
"""
:param charm string:
Charm holds the URL of the charm to be used to deploy this
application.
:param series string:
Series holds the series of the application to be deployed
if the charm default is not sufficient.
:param application string:
Application holds the application name.
:param options map[string]interface{}:
Options holds application options.
:param constraints string:
Constraints holds the optional application constraints.
:param storage map[string]string:
Storage holds the optional storage constraints.
:param endpoint_bindings map[string]string:
EndpointBindings holds the optional endpoint bindings
:param resources map[string]int:
Resources identifies the revision to use for each resource
of the application's charm.
"""
# resolve indirect references
charm = self.resolve(charm)
# stringify all config values for API
options = {k: str(v) for k, v in options.items()}
# build param object
app = client.ApplicationDeploy(
charm_url=charm,
series=series,
application=application,
config=options,
constraints=constraints,
storage=storage,
endpoint_bindings=endpoint_bindings,
resources=resources,
)
# do the do
log.debug('Deploying %s', charm)
await self.app_facade.Deploy([app])
return application

async def addUnit(self, application, to):
"""
:param application string:
Application holds the application placeholder name for which a unit
is added.
:param to string:
To holds the optional location where to add the unit, as a
placeholder pointing to another unit change or to a machine change.
"""
application = self.resolve(application)
placement = self.resolve(to)
if self._units_by_app.get(application):
# enough units for this application already exist;
# claim one, and carry on
# NB: this should probably honor placement, but the juju client
# doesn't, so we're not bothering, either
unit_name = self._units_by_app[application].pop()
log.debug('Reusing unit %s for %s', unit_name, application)
return unit_name
log.debug('Adding unit of %s%s',
application,
(' to %s' % placement) if placement else '')
result = await self.app_facade.AddUnits(
application=application,
placement=placement,
num_units=1,
)
return result.units[0]

async def expose(self, application):
"""
:param application string:
Application holds the placeholder name of the application that must
be exposed.
"""
application = self.resolve(application)
log.debug('Exposing %s', application)
await self.app_facade.Expose(application)
return None

async def setAnnotations(self, id_, entity_type, annotations):
"""
:param id_ string:
Id is the placeholder for the application or machine change
corresponding to the entity to be annotated.
:param entity_type EntityType:
EntityType holds the type of the entity, "application" or
"machine".
:param annotations map[string]string:
Annotations holds the annotations as key/value pairs.
"""
entity_id = self.resolve(id_)
log.debug('Updating annotations of %s', entity_id)
ann = client.EntityAnnotations(
entity=entity_id,
annotations=annotations,
)
await self.ann_facade.Set([ann])
return None


class CharmStore(object):
"""
Async wrapper around theblues.charmstore.CharmStore
"""
def __init__(self, loop):
self.loop = loop
self._cs = charmstore.CharmStore()

def __getattr__(self, name):
"""
Wrap method calls in coroutines that use run_in_executor to make them
async.
"""
attr = getattr(self._cs, name)
if not callable(attr):
wrapper = partial(getattr, self._cs, name)
setattr(self, name, wrapper)
else:
async def coro(*args, **kwargs):
method = partial(attr, *args, **kwargs)
return await self.loop.run_in_executor(None, method)
setattr(self, name, coro)
wrapper = coro
return wrapper
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -24,6 +24,7 @@
install_requires=[
'websockets',
'pyyaml',
'theblues',
],
include_package_data=True,
maintainer='Juju Ecosystem Engineering',
Expand Down

0 comments on commit fd2a74a

Please sign in to comment.