Skip to content

Commit

Permalink
Rework auditing to track individual tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Maxime Petazzoni <maxime.petazzoni@bulix.org>
  • Loading branch information
mpetazzoni committed Jul 7, 2015
1 parent a066dc5 commit d69e47f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 78 deletions.
57 changes: 12 additions & 45 deletions maestro/maestro.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from __future__ import print_function

import functools
import inspect
import requests.exceptions

from . import audit
from . import entities
Expand Down Expand Up @@ -231,36 +229,6 @@ def complete(self, tokens, **kwargs):

print(' '.join(filter(lambda x: x.startswith(prefix), set(choices))))

def _audit_play(self, play):
"""Run an orchestration play, wrapping its execution in audit trail
calls.
If auditors have been configured, they will be triggered before the
orchestration play starts, and when it ends in either success or error
situations.
Args:
play (plays.BaseOrchestrationPlay): An orchestration play instance
ready to run.
"""
action = inspect.stack()[1][3]
things = [c.name for c in play.containers]
self.auditor.action(things, action)
try:
play.run()
self.auditor.success(things, action)
except requests.exceptions.Timeout as e:
try:
msg = e.args[0][1]
except:
# varies with the timeout exception
msg = e.args[0][0]
self.auditor.error(things, action, message=msg)
exceptions.raise_with_tb()
except Exception as e:
self.auditor.error(things, action, message=e)
exceptions.raise_with_tb()

def status(self, things, full=False, with_dependencies=False,
concurrency=None, **kwargs):
"""Display the status of the given services and containers, but only
Expand Down Expand Up @@ -301,9 +269,8 @@ def pull(self, things, with_dependencies=False,
containers = self._ordered_containers(things) \
if with_dependencies else self._to_containers(things)

self._audit_play(
plays.Pull(containers, self.registries,
ignore_dependencies, concurrency))
plays.Pull(containers, self.registries, ignore_dependencies,
concurrency, auditor=self.auditor).run()

def start(self, things, refresh_images=False, with_dependencies=False,
ignore_dependencies=False, concurrency=None, reuse=False,
Expand All @@ -327,9 +294,9 @@ def start(self, things, refresh_images=False, with_dependencies=False,
containers = self._ordered_containers(things) \
if with_dependencies else self._to_containers(things)

self._audit_play(
plays.Start(containers, self.registries, refresh_images,
ignore_dependencies, concurrency, reuse))
plays.Start(containers, self.registries, refresh_images,
ignore_dependencies, concurrency, reuse,
auditor=self.auditor).run()

def restart(self, things, refresh_images=False, with_dependencies=False,
ignore_dependencies=False, concurrency=None, step_delay=0,
Expand Down Expand Up @@ -359,10 +326,10 @@ def restart(self, things, refresh_images=False, with_dependencies=False,
"""
containers = self._ordered_containers(things, False) \
if with_dependencies else self._to_containers(things)
self._audit_play(
plays.Restart(containers, self.registries, refresh_images,
ignore_dependencies, concurrency, step_delay,
stop_start_delay, reuse, only_if_changed))
plays.Restart(containers, self.registries, refresh_images,
ignore_dependencies, concurrency, step_delay,
stop_start_delay, reuse, only_if_changed,
auditor=self.auditor).run()

def stop(self, things, with_dependencies=False, ignore_dependencies=False,
concurrency=None, **kwargs):
Expand All @@ -384,8 +351,8 @@ def stop(self, things, with_dependencies=False, ignore_dependencies=False,
"""
containers = self._ordered_containers(things, False) \
if with_dependencies else self._to_containers(things)
self._audit_play(
plays.Stop(containers, ignore_dependencies, concurrency))
plays.Stop(containers, ignore_dependencies,
concurrency, auditor=self.auditor).run()

def clean(self, things, with_dependencies=False, concurrency=None,
**kwargs):
Expand All @@ -400,7 +367,7 @@ def clean(self, things, with_dependencies=False, concurrency=None,
"""
containers = self._ordered_containers(things) \
if with_dependencies else self._to_containers(things)
self._audit_play(plays.Clean(containers, concurrency))
plays.Clean(containers, concurrency, auditor=self.auditor).run()

def logs(self, things, follow, n, **kwargs):
"""Display the logs of the given container.
Expand Down
29 changes: 16 additions & 13 deletions maestro/plays/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ class BaseOrchestrationPlay:
_SHIP_CSIZE, _SHIP_CSIZE))

def __init__(self, containers=[], forward=True, ignore_dependencies=False,
concurrency=None):
concurrency=None, auditor=None):
self._containers = containers
self._forward = forward
self._ignore_dependencies = ignore_dependencies
self._concurrency = threading.Semaphore(concurrency or len(containers))
self._auditor = auditor

self._dependencies = dict(
(c.name, self._gather_dependencies(c)) for c in containers)
Expand Down Expand Up @@ -93,7 +94,7 @@ def act(task):

try:
self._concurrency.acquire(blocking=True)
task.run()
task.run(auditor=self._auditor)
self._concurrency.release()
self._done.add(task.container)
except Exception:
Expand Down Expand Up @@ -251,10 +252,11 @@ class Start(BaseOrchestrationPlay):
application to become available before moving to the next one."""

def __init__(self, containers=[], registries={}, refresh_images=False,
ignore_dependencies=True, concurrency=None, reuse=False):
ignore_dependencies=True, concurrency=None, reuse=False,
auditor=None):
BaseOrchestrationPlay.__init__(
self, containers, ignore_dependencies=ignore_dependencies,
concurrency=concurrency)
concurrency=concurrency, auditor=auditor)

self._registries = registries
self._refresh_images = refresh_images
Expand All @@ -275,10 +277,10 @@ class Pull(BaseOrchestrationPlay):
images for the given services and containers."""

def __init__(self, containers=[], registries={},
ignore_dependencies=True, concurrency=None):
ignore_dependencies=True, concurrency=None, auditor=None):
BaseOrchestrationPlay.__init__(
self, containers, ignore_dependencies=ignore_dependencies,
concurrency=concurrency)
concurrency=concurrency, auditor=auditor)

self._registries = registries

Expand All @@ -297,11 +299,11 @@ class Stop(BaseOrchestrationPlay):
that dependent services are stopped first."""

def __init__(self, containers=[], ignore_dependencies=True,
concurrency=None):
concurrency=None, auditor=None):
BaseOrchestrationPlay.__init__(
self, containers, forward=False,
ignore_dependencies=ignore_dependencies,
concurrency=concurrency)
concurrency=concurrency, auditor=auditor)

def _run(self):
for order, container in enumerate(self._containers):
Expand All @@ -316,18 +318,18 @@ class Clean(BaseOrchestrationPlay):
"""A Maestro orchestration play that will remove stopped containers from
Docker."""

def __init__(self, containers=[], concurrency=None):
def __init__(self, containers=[], concurrency=None, auditor=None):
BaseOrchestrationPlay.__init__(
self, containers, ignore_dependencies=False,
concurrency=concurrency)
concurrency=concurrency, auditor=auditor)

def _run(self):
for order, container in enumerate(self._containers):
o = self._om.get_formatter(order, prefix=(
BaseOrchestrationPlay.LINE_FMT.format(
order + 1, container.name, container.service.name,
container.ship.address)))
self.register(tasks.RemoveTask(o, container))
self.register(tasks.CleanTask(o, container))


class Restart(BaseOrchestrationPlay):
Expand All @@ -338,11 +340,12 @@ class Restart(BaseOrchestrationPlay):

def __init__(self, containers=[], registries={}, refresh_images=False,
ignore_dependencies=True, concurrency=None, step_delay=0,
stop_start_delay=0, reuse=False, only_if_changed=False):
stop_start_delay=0, reuse=False, only_if_changed=False,
auditor=None):
BaseOrchestrationPlay.__init__(
self, containers, forward=False,
ignore_dependencies=ignore_dependencies,
concurrency=concurrency)
concurrency=concurrency, auditor=auditor)

self._registries = registries
self._refresh_images = refresh_images
Expand Down
55 changes: 35 additions & 20 deletions maestro/plays/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
class Task:
"""Base class for tasks acting on containers."""

def __init__(self, o, container):
def __init__(self, action, o, container):
"""Initialize the base task parameters.
Args:
o (termoutput.OutputFormatter): the output formatter used for task
output.
container (entities.Container): the container the task operates on.
"""
self.action = action
self.o = o
self.container = container

Expand Down Expand Up @@ -76,17 +77,30 @@ def _check_for_state(self, state, cond):

return True

def run(self):
def run(self, auditor=None):
if auditor:
auditor.action(self.container.name, self.action)

try:
self._run()
if auditor:
auditor.success(self.container.name, self.action)
except Exception as e:
if auditor:
auditor.error(self.container.name, self.action, message=e)
exceptions.raise_with_tb()

def _run(self):
raise NotImplementedError


class StatusTask(Task):
"""Check for and display a container's status."""

def __init__(self, o, container):
Task.__init__(self, o, container)
Task.__init__(self, 'status', o, container)

def run(self):
def _run(self):
self.o.reset()
self.o.pending('checking...')

Expand Down Expand Up @@ -115,12 +129,12 @@ class StartTask(Task):

def __init__(self, o, container, registries={}, refresh=False,
reuse=False):
Task.__init__(self, o, container)
Task.__init__(self, 'start', o, container)
self._registries = registries
self._refresh = refresh
self._reuse = reuse

def run(self):
def _run(self):
self.o.reset()
error = None
try:
Expand All @@ -142,7 +156,8 @@ def run(self):
('Halting start sequence because {} failed to start!'
.format(self.container)),
self.container.ship.backend.logs(self.container.id)]
raise exceptions.OrchestrationException('\n'.join(error))
raise exceptions.OrchestrationException(
'\n'.join(error).strip())
except Exception:
self.o.commit(red('failed to start container!'))
raise
Expand All @@ -166,9 +181,9 @@ def _create_and_start_container(self):
# application were already running.
return None

# Otherwise we need to start it.
if (not self._reuse) or (not self.container.status()):
# Otherwise we need to start it.
RemoveTask(self.o, self.container, standalone=False).run()
CleanTask(self.o, self.container, standalone=False).run()

# Check if the image is available, or if we need to pull it down.
image = self.container.get_image_details()
Expand Down Expand Up @@ -250,9 +265,9 @@ class StopTask(Task):
"""Stop a container."""

def __init__(self, o, container):
Task.__init__(self, o, container)
Task.__init__(self, 'stop', o, container)

def run(self):
def _run(self):
self.o.reset()
self.o.pending('checking container...')
try:
Expand Down Expand Up @@ -292,15 +307,15 @@ class RestartTask(Task):
def __init__(self, o, container, registries={}, refresh=False,
step_delay=0, stop_start_delay=0, reuse=False,
only_if_changed=False):
Task.__init__(self, o, container)
Task.__init__(self, 'restart', o, container)
self._registries = registries
self._refresh = refresh
self._step_delay = step_delay
self._stop_start_delay = stop_start_delay
self._reuse = reuse
self._only_if_changed = only_if_changed

def run(self):
def _run(self):
self.o.reset()

if self._refresh:
Expand Down Expand Up @@ -344,10 +359,10 @@ class LoginTask(Task):
"""

def __init__(self, o, container, registries={}):
Task.__init__(self, o, container)
Task.__init__(self, 'login', o, container)
self._registries = registries

def run(self):
def _run(self):
registry = LoginTask.registry_for_container(self.container,
self._registries)
if not registry:
Expand Down Expand Up @@ -385,12 +400,12 @@ class PullTask(Task):
"""Pull (download) the image a container is based on."""

def __init__(self, o, container, registries={}, standalone=True):
Task.__init__(self, o, container)
Task.__init__(self, 'pull', o, container)
self._registries = registries
self._standalone = standalone
self._progress = {}

def run(self):
def _run(self):
self.o.reset()

# First, attempt to login if we can/need to.
Expand Down Expand Up @@ -443,14 +458,14 @@ def _update_pull_progress(self, last):
return total


class RemoveTask(Task):
class CleanTask(Task):
"""Remove a container from Docker if it exists."""

def __init__(self, o, container, standalone=True):
Task.__init__(self, o, container)
Task.__init__(self, 'clean', o, container)
self._standalone = standalone

def run(self):
def _run(self):
self.o.reset()
status = self.container.status()
if not status:
Expand Down

0 comments on commit d69e47f

Please sign in to comment.