Skip to content

Commit

Permalink
Merge branch 'improvement/2266-metalk8s-drain-tests' into q/2.4
Browse files Browse the repository at this point in the history
  • Loading branch information
bert-e committed Aug 12, 2020
2 parents e869013 + fb8257e commit 84701ed
Show file tree
Hide file tree
Showing 15 changed files with 1,302 additions and 123 deletions.
13 changes: 10 additions & 3 deletions salt/_modules/metalk8s_drain.py
Expand Up @@ -64,7 +64,8 @@ def _mirrorpod_filter(pod):
Args:
- pod: kubernetes pod object
Returns: True if the pod has the mirror annotation, False if not
Returns: (False, "") if the pod has the mirror annotation,
(True, "") if not
'''
mirror_annotation = "kubernetes.io/config.mirror"

Expand Down Expand Up @@ -248,8 +249,11 @@ def get_pod_controller(self, pod):
pod['metadata']['namespace'], controller_ref
)
if not response:
meta = pod['metadata']
raise DrainException(
"Missing pod controller for '{0}'".format(controller_ref.name)
"Missing controller for pod '{}/{}'".format(
meta['namespace'], meta['name']
)
)
return controller_ref

Expand Down Expand Up @@ -284,7 +288,9 @@ def daemonset_filter(self, pod):
)

raise DrainException(
"Missing pod controller for '{0}'".format(controller_ref.name)
"Missing controller for pod '{}/{}'".format(
pod['metadata']['namespace'], pod['metadata']['name']
)
)

if not self.ignore_daemonset:
Expand Down Expand Up @@ -324,6 +330,7 @@ def get_pods_for_eviction(self):
warnings.setdefault(
warning, []).append(pod['metadata']['name'])
is_deletable &= filter_deletable

if is_deletable:
pods.append(pod)

Expand Down
73 changes: 73 additions & 0 deletions salt/tests/unit/log_utils.py
@@ -0,0 +1,73 @@
"""Copied and inspired from `unittest._log`, added in Python 3.4+"""
import collections
import contextlib
from io import StringIO
import logging


LoggingWatcher = collections.namedtuple(
"LoggingWatcher", ["records", "output"]
)


class CapturingHandler(logging.Handler):
def __init__(self):
super(CapturingHandler, self).__init__()
self.watcher = LoggingWatcher([], [])

def flush(self):
pass

def emit(self, record):
self.watcher.records.append(record)
msg = self.format(record)
self.watcher.output.append(msg)


LOGGING_FORMAT = "%(levelname)s | %(message)s"


@contextlib.contextmanager
def capture_logs(logger, level=logging.DEBUG, fmt=LOGGING_FORMAT):
formatter = logging.Formatter(fmt=fmt)
handler = CapturingHandler()
handler.setFormatter(formatter)

old_handlers = logger.handlers[:]
old_level = logger.level
old_propagate = logger.propagate
logger.handlers = [handler]
logger.setLevel(level)
logger.propagate = False

try:
yield handler.watcher
finally:
logger.handlers = old_handlers
logger.setLevel(old_level)
logger.propagate = old_propagate


def check_captured_logs(watcher, expected_records):
if not expected_records:
assert watcher.records == [], \
"Expected no logs, got:\n{}".format(
'\n'.join(msg for msg in watcher.output)
)
else:
assert len(watcher.records) == len(expected_records), \
"Expected {} log lines, got {}. Received:\n{}".format(
len(expected_records),
len(watcher.records),
'\n'.join(msg for msg in watcher.output)
)

for expected, actual in zip(expected_records, watcher.records):
assert expected['level'] == actual.levelname, \
"Invalid log level, got '{}', expected '{}'\n{}".format(
actual.levelname, expected['level'], actual.message
)
assert expected['contains'] in actual.message, \
"Log message '{}' does not contain '{}'".format(
actual.message, expected['contains']
)
Empty file.
242 changes: 242 additions & 0 deletions salt/tests/unit/mocks/kubernetes.py
@@ -0,0 +1,242 @@
"""Mocked implementations of Kubernetes API for use in unit tests.
Aims to provide a simple yet configurable implementation of CRUD methods,
allowing to keep track of actions in an in-memory "database" or trigger
arbitrary problems for specific test needs.
"""
import contextlib
import copy

from salt.utils import dictupdate
from salttesting.mock import MagicMock, patch

from tests.unit import utils


class ResourceFilter:
"""Helper object for filtering a list of resource instances."""

NAMED_FILTERS = {
'name': lambda i, v: i['metadata']['name'] == v,
'namespace': lambda i, v: i['metadata']['namespace'] == v,
}

def __init__(self, instances):
self.instances = instances

def filter(self, name, value):
if name in ResourceFilter.NAMED_FILTERS:
return [
i for i in self.instances
if ResourceFilter.NAMED_FILTERS[name](i, value)
]

if callable(value):
return [i for i in self.instances if value(i)]

return [
i for i in self.instances
if utils.get_dict_element(i, name) == value
]

def filter_update(self, name, value):
self.instances = self.filter(name, value)


class APIMock:
"""Mock a K8s-style REST API over a set of resources stored in a dict."""

def __init__(self, database=None):
self.database = database or {}

@property
def api_resources(self):
return list(self.database.keys())

def filter(self, instances, filters):
filtered = ResourceFilter(instances)

for name, value in filters.items():
filtered.filter_update(name, value)

return filtered.instances

def retrieve(self, resource, **filters):
instances = self.database.get(resource, None)

assert instances is not None, \
"Resource '{}' unknown (available: {})".format(
resource, ', '.join(self.api_resources)
)

return self.filter(instances, filters)

def get_instance(self, resource, instance):
filters = {
'name': instance['metadata']['name'],
}
namespace = instance['metadata'].get('namespace')
if namespace is not None:
filters['namespace'] = namespace

candidates = self.retrieve(resource, **filters)
return candidates[0] if candidates else None

def create(self, resource, instance):
existing = self.get_instance(resource, instance)
assert existing is None, \
"Cannot create '{}/{}': already exists.".format(
resource, instance['metadata']['name']
)

self.database.setdefault(resource, []).append(instance)

def update(self, resource, instance):
existing = self.get_instance(resource, instance)
assert existing is not None, \
"Cannot update '{}/{}': not found.".format(
resource, instance['metadata']['name']
)

self.database[resource].remove(existing)
self.database[resource].append(instance)

def delete(self, resource, **filters):
for to_remove in self.retrieve(resource, **filters):
self.database[resource].remove(to_remove)

def patch(self, resource, name, patch, **filters):
candidates = self.retrieve(resource, name=name, **filters)
assert len(candidates) == 1, \
"Found more than one instance of '{}/{}' to patch".format(
resource, name
)

updated = dictupdate.update(candidates[0], patch)
self.update(resource, updated)


class KubernetesAPIMock:
"""Add simple helpers to mock `metalk8s_kubernetes` methods in tests.
Manages a set of resources (e.g. "pods") and the corresponding instances,
and provides mocked equivalents for `metalk8s_kubernetes` methods relying
on the managed resources.
TODO:
- add mock implementation for all methods from metalk8s_kubernetes
- add helpers for manipulating the database
- add helpers for populating the database
"""

def __init__(self, database=None, resources=None):
self.api = APIMock(database or {})

# resources contains the mapping between short resource names, used as
# keys in the DATABASE, and (kind / apiVersion) pairs used when
# interacting with the real K8s API
self.resources = resources or {}

def seed(self, database=None):
self.api.database = copy.deepcopy(database or {})

def time_mock_from_events(self, events):
return TimedEventsMock(self.api, events)

def get_resource(self, kind, apiVersion):
resource = self.resources.get((apiVersion, kind), None)
assert resource is not None, \
"'{}/{}' is not a known resource ({})".format(
apiVersion, kind,
', '.join(
'{vk[0]}/{vk[1]}: {r}'.format(r=resource, vk=versionkind)
for versionkind, resource in self.resources.items()
)
)
return resource

def get_object(self, name, kind, apiVersion, **kwargs):
resource = self.get_resource(kind, apiVersion)
objects = self.api.retrieve(resource, name=name, **kwargs)
res = objects[0] if objects else None
print("Called get_object %s/%s name=%s kwargs=%r - %r" % (
apiVersion, kind, name, kwargs, res
))
return res

def list_objects(self, kind, apiVersion, all_namespaces=False,
field_selector=None, **kwargs):
resource = self.get_resource(kind, apiVersion)

# If namespace isn't in kwargs, then all members of the matching
# resource (after other filters were applied) will get returned
assert all_namespaces or 'namespace' in kwargs, \
"Must either enable `all_namespaces` or pass a `namespace` kwarg"

if field_selector is not None:
# Naive re-implem
key, _, value = field_selector.partition('=')
if value is None:
value = ''
kwargs[key] = value

res = self.api.retrieve(resource, **kwargs)
print("Called get_object %s/%s kwargs=%r - %r" % (
apiVersion, kind, kwargs, res
))
return res


class TimedEventsMock:
"""Store timed events to affect an APIMock and mock the `time` module."""

def __init__(self, api, events):
self.api = api
self.events = events
self._timer = 0
self._time_mock = MagicMock(side_effect=self.get_time)
self._sleep_mock = MagicMock(side_effect=self.fake_sleep)
self._initialized = False

@property
def time(self):
return self._time_mock

@property
def sleep(self):
return self._sleep_mock

def get_time(self, *_a, **_k):
if not self._initialized:
eventlist = self.events.get(0, [])
for event in eventlist:
self.handle_event(event)
self._initialized = True

return self._timer

def fake_sleep(self, duration, *_a, **_k):
print("Called time.sleep(%d) - now at %d" % (duration, self._timer))
self.process_events(duration)
self._timer += duration

def process_events(self, duration):
for timestep, eventlist in self.events.items():
if self._timer < timestep <= self._timer + duration:
for event in eventlist:
self.handle_event(event)

def handle_event(self, event):
print("Processing event %r" % event)
kwargs = copy.deepcopy(event)
resource = kwargs.pop('resource')
verb = kwargs.pop('verb')

method = getattr(self.api, verb)
method(resource, **kwargs)
print("Done")

@contextlib.contextmanager
def patch(self):
with patch('time.time', self.time), patch('time.sleep', self.sleep):
yield

0 comments on commit 84701ed

Please sign in to comment.