Skip to content

Commit

Permalink
Retire old node records in terminal state from EPUM
Browse files Browse the repository at this point in the history
Node recors in states TERMINATED, FAILED, or REJECTED are deleted if
they are older than record_reaping_max_age.  The default configuration
is 2 hours and can be changed through the EPUM YAML configuration
file (it needs to be expressed in seconds).
  • Loading branch information
priteau committed Oct 4, 2012
1 parent 38c35a3 commit a13cd1a
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 7 deletions.
24 changes: 23 additions & 1 deletion epu/epumanagement/__init__.py
Expand Up @@ -3,6 +3,7 @@
from epu.epumanagement.reactor import EPUMReactor
from epu.epumanagement.doctor import EPUMDoctor
from epu.epumanagement.decider import EPUMDecider
from epu.epumanagement.reaper import EPUMReaper
from epu.epumanagement.store import LocalEPUMStore, ZooKeeperEPUMStore
from epu.epumanagement.core import DomainSubscribers
from epu.epumanagement.conf import EPUM_INITIALCONF_EXTERNAL_DECIDE,\
Expand All @@ -11,7 +12,8 @@
PROVISIONER_VARS_KEY, EPUM_INITIALCONF_SERVICE_NAME, \
EPUM_DEFAULT_SERVICE_NAME, EPUM_INITIALCONF_ZOOKEEPER_HOSTS, \
EPUM_INITIALCONF_ZOOKEEPER_PATH, EPUM_INITIALCONF_PERSISTENCE,\
EPUM_INITIALCONF_ZOOKEEPER_PASSWORD, EPUM_INITIALCONF_ZOOKEEPER_USERNAME, EPUM_INITIALCONF_PROC_NAME
EPUM_INITIALCONF_ZOOKEEPER_PASSWORD, EPUM_INITIALCONF_ZOOKEEPER_USERNAME, \
EPUM_INITIALCONF_PROC_NAME, EPUM_RECORD_REAPING_DEFAULT_MAX_AGE

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -121,6 +123,16 @@ def __init__(self, initial_conf, notifier, provisioner_client,
self.doctor = EPUMDoctor(self.epum_store, notifier, provisioner_client, epum_client,
ouagent_client, disable_loop=self._external_decide_mode)

# The instance of the EPUManagementService process that hosts a particular EPUMReaper instance
# might not be the elected leader. When it is the elected leader, this EPUMReaper handles that
# functionality. When it is not the elected leader, this EPUMReaper handles the constant
# participation in the election.

record_reaping_max_age = initial_conf.get('record_reaping_max_age',
EPUM_RECORD_REAPING_DEFAULT_MAX_AGE)
self.reaper = EPUMReaper(self.epum_store, record_reaping_max_age,
disable_loop=self._external_decide_mode)


def initialize(self):
"""
Expand All @@ -132,6 +144,7 @@ def initialize(self):
# from persistence and refreshes local caches.
self.doctor.recover()
self.decider.recover()
self.reaper.recover()

self.initialized = True

Expand All @@ -153,6 +166,15 @@ def _doctor_appt(self, timestamp=None):
raise Exception("Not configured to accept external doctor check invocations")
self.doctor._loop_top(timestamp=timestamp)

def _run_reaper_loop(self):
"""For unit and integration tests only
"""
if not self.initialized:
raise Exception("Not initialized")
if not self._external_decide_mode:
raise Exception("Not configured to accept external decision invocations")
self.reaper._loop_top()

# -------------------------------------------
# External Messages: Sent by other components
# -------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions epu/epumanagement/conf.py
Expand Up @@ -34,6 +34,8 @@
EPUM_CONF_HEALTH_REALLY_MISSING = "really_missing_timeout"
EPUM_CONF_HEALTH_ZOMBIE = "zombie_seconds"

# Default time before removing node records in terminal state
EPUM_RECORD_REAPING_DEFAULT_MAX_AGE = 7200

# Other
EPUM_DEFAULT_SERVICE_NAME = "epu_management_service"
Expand Down
99 changes: 99 additions & 0 deletions epu/epumanagement/reaper.py
@@ -0,0 +1,99 @@
import logging
import time

from dashi.util import LoopingCall
from epu.epumanagement.conf import *
from epu.domain_log import EpuLoggerThreadSpecific
from epu.states import InstanceState as states

log = logging.getLogger(__name__)

class EPUMReaper(object):
"""This process infrequently queries each domain in the datastore. It finds
VM records in a terminal state past the threshold and removes them.
The instance of the EPUManagementService process that hosts a particular EPUMReaper instance
might not be the elected reaper. When it is the elected reaper, this EPUMReaper instance
handles that functionality. When it is not the elected reaper, this EPUMReaper instance
handles being available in the election.
"""

def __init__(self, epum_store,
record_reaping_max_age, disable_loop=False):
"""
@param epum_store State abstraction for all EPUs
@param record_reaping_max_age Instance records older than record_reaping_max_age will be deleted
@param disable_loop For unit/integration tests, don't run a timed decision loop
"""
self.epum_store = epum_store
self.record_reaping_max_age = record_reaping_max_age

self.control_loop = None
self.enable_loop = not disable_loop
self.is_leader = False

def recover(self):
"""Called whenever the whole EPUManagement instance is instantiated.
"""
# For callbacks: "now_leader()" and "not_leader()"
self.epum_store.register_reaper(self)

def now_leader(self, block=False):
"""Called when this instance becomes the reaper leader.
"""
log.info("Elected as Reaper leader")
self._leader_initialize()
self.is_leader = True
if block:
if self.control_loop:
self.control_loop.thread.join()
else:
raise ValueError("cannot block without a control loop")

def not_leader(self):
"""Called when this instance is known not to be the reaper leader.
"""
if self.control_loop:
self.control_loop.stop()
self.control_loop = None
self.is_leader = False

def _leader_initialize(self):
"""Performs initialization routines that may require async processing
"""
if self.enable_loop:
if not self.control_loop:
self.control_loop = LoopingCall(self._loop_top)
self.control_loop.start(300)

def _loop_top(self):
"""Run the reaper loop.
Every time this runs, each domain is checked for instances in terminal
states TERMINATED, FAILED, or REJECTED. They are deleted if they are
older than self.record_reaping_max_age.
"""
# Perhaps in the meantime, the leader connection failed, bail early
if not self.is_leader:
return

now = time.time()
domains = self.epum_store.get_all_domains()

for domain in domains:
with EpuLoggerThreadSpecific(domain=domain.domain_id, user=domain.owner):
if not domain.is_removed():
instances = domain.get_instances()
for instance in instances:
log.info("Instance is " + instance['state'])
if instance['state'] in [states.TERMINATED, states.FAILED, states.REJECTED]:
state_time = instance['state_time']
if now > state_time + self.record_reaping_max_age:
log.info("Removing instance %s with no state change for %f seconds",
instance['instance_id'], now - state_time)
domain.remove_instance(instance['instance_id'])

# Perhaps in the meantime, the leader connection failed, bail early
if not self.is_leader:
return
83 changes: 79 additions & 4 deletions epu/epumanagement/store.py
Expand Up @@ -53,6 +53,14 @@ def register_doctor(self, doctor):
"""For callbacks: now_leader() and not_leader()
"""

def currently_reaper(self):
"""See currently_decider()
"""

def register_reaper(self, reaper):
"""For callbacks: now_leader() and not_leader()
"""

def epum_service_name(self):
"""Return the service name (to use for heartbeat/IaaS subscriptions, launches, etc.)
Expand Down Expand Up @@ -253,6 +261,12 @@ def get_instance(self, instance_id):
Returns the instance record, or None if not found
"""

def remove_instance(self, instance_id):
"""Remove an instance record
Raise a NotFoundError if the instance is unknown
"""

def set_instance_heartbeat_time(self, instance_id, time):
"""Store a new instance heartbeat
"""
Expand Down Expand Up @@ -386,6 +400,10 @@ def __init__(self, service_name):

self.local_decider_ref = None
self.local_doctor_ref = None
self.local_reaper_ref = None

def initialize(self):
pass

def _change_decider(self, make_leader):
"""For internal use by EPUMStore
Expand Down Expand Up @@ -413,16 +431,27 @@ def _change_doctor(self, make_leader):
else:
self.local_doctor_ref.not_leader()


def initialize(self):
pass

def register_doctor(self, doctor):
"""For callbacks: now_leader() and not_leader()
"""
self.local_doctor_ref = doctor
self._change_doctor(True)

def _change_reaper(self, make_leader):
"""For internal use by EPUMStore
@param make_leader True/False
"""
if self.local_reaper_ref:
if make_leader:
self.local_reaper_ref.now_leader()
else:
self.local_reaper_ref.not_leader()

def register_reaper(self, reaper):
"""For callbacks: now_leader() and not_leader()
"""
self.local_reaper_ref = reaper
self._change_reaper(True)

def epum_service_name(self):
"""Return the service name (to use for heartbeat/IaaS subscriptions, launches, etc.)
Expand Down Expand Up @@ -740,6 +769,17 @@ def get_instance(self, instance_id):
"""
return self.instances.get(instance_id)

def remove_instance(self, instance_id):
"""Remove an instance record
Raise a NotFoundError if the instance is unknown
"""
instance = self.instances.get(instance_id)
if instance:
del self.instances[instance_id]
else:
raise NotFoundError()

def set_instance_heartbeat_time(self, instance_id, time):
"""Store a new instance heartbeat
"""
Expand Down Expand Up @@ -795,6 +835,8 @@ class ZooKeeperEPUMStore(EPUMStore):

DECIDER_ELECTION_PATH = "/elections/decider"
DOCTOR_ELECTION_PATH = "/elections/doctor"
REAPER_ELECTION_PATH = "/elections/reaper"

DOMAINS_PATH = "/domains"
DEFINITIONS_PATH = "/definitions"

Expand All @@ -819,6 +861,7 @@ def __init__(self, service_name, hosts, base_path, username=None, password=None,

self.decider_election = self.kazoo.Election(self.DECIDER_ELECTION_PATH, identifier=zk_id)
self.doctor_election = self.kazoo.Election(self.DOCTOR_ELECTION_PATH, identifier=zk_id)
self.reaper_election = self.kazoo.Election(self.REAPER_ELECTION_PATH, identifier=zk_id)

if username and password:
self.kazoo_auth_scheme = "digest"
Expand All @@ -837,9 +880,11 @@ def __init__(self, service_name, hosts, base_path, username=None, password=None,
self._election_condition = threading.Condition()
self._decider_election_thread = None
self._doctor_election_thread = None
self._reaper_election_thread = None

self._decider_leader = None
self._doctor_leader = None
self._reaper_leader = None

# cache domain stores locally. Note that this is not necessarily the
# complete set of domains, just the ones that this worker has seen.
Expand Down Expand Up @@ -879,8 +924,14 @@ def _handle_connection_state(self, state):
except Exception, e:
log.exception("Error deposing doctor leader: %s", e)

try:
self._reaper_leader.depose()
except Exception, e:
log.exception("Error deposing reaper leader: %s", e)

self.decider_election.cancel()
self.doctor_election.cancel()
self.reaper_election.cancel()

elif state == KazooState.CONNECTED:
with self._election_condition:
Expand Down Expand Up @@ -918,6 +969,15 @@ def register_doctor(self, doctor):
self._doctor_election_thread = tevent.spawn(self._run_election,
self.doctor_election, doctor, "doctor")

def register_reaper(self, reaper):
"""For callbacks: now_leader() and not_leader()
"""
if self._reaper_leader:
raise Exception("reaper already registered")
self._reaper_leader = reaper
self._reaper_election_thread = tevent.spawn(self._run_election,
self.reaper_election, reaper, "reaper")

def epum_service_name(self):
"""Return the service name (to use for heartbeat/IaaS subscriptions, launches, etc.)
Expand Down Expand Up @@ -1430,6 +1490,21 @@ def get_instance(self, instance_id):

return instance

def remove_instance(self, instance_id):
"""Remove an instance record
Raise a NotFoundError if the instance is unknown
"""


path = self._get_instance_path(instance_id)
try:
instance_json, stat = self.kazoo.get(path)
except NoNodeException:
raise NotFoundError()

self.kazoo.delete(path)

def set_instance_heartbeat_time(self, instance_id, time):
"""Store a new instance heartbeat
"""
Expand Down
26 changes: 25 additions & 1 deletion epu/epumanagement/test/mocks.py
Expand Up @@ -3,11 +3,35 @@
import copy

from epu.decisionengine.engineapi import Engine
from epu.epumanagement.core import CoreInstance
from epu.epumanagement.store import LocalDomainStore
from epu.states import InstanceState, InstanceHealthState


log = logging.getLogger(__name__)


class FakeDomainStore(LocalDomainStore):
def new_fake_instance_state(self, instance_id, state, state_time,
health=None, errors=None):
instance = self.get_instance(instance_id)
if health is None:
if instance:
health = instance.health
else:
health = InstanceHealthState.UNKNOWN

if errors is None and instance and instance.errors:
errors = instance.errors

newinstance = CoreInstance(instance_id=instance_id, launch_id="thelaunch",
site="chicago", allocation="big", state=state,
state_time=state_time, health=health, errors=errors)
if instance:
self.update_instance(newinstance, previous=instance)
else:
self.add_instance(newinstance)


class MockDomain(object):

def __init__(self, owner):
Expand Down

0 comments on commit a13cd1a

Please sign in to comment.