From f1d9c89ae38be26cfcaa8b58d7b066648476a8e7 Mon Sep 17 00:00:00 2001 From: Joerg Eschmann Date: Mon, 25 Nov 2019 17:19:07 +0100 Subject: [PATCH] extended ServiceDiscovery to provide more flexibility --- README.md | 245 +++++++++++++----- setup.py | 2 +- src/counselor/client.py | 46 +--- src/counselor/config.py | 20 +- src/counselor/discovery.py | 163 ++++++++---- .../endpoint/{agent.py => agent_endpoint.py} | 20 +- .../endpoint/{check.py => check_endpoint.py} | 11 +- src/counselor/endpoint/common.py | 3 + src/counselor/endpoint/entity.py | 26 +- src/counselor/endpoint/http_endpoint.py | 56 +++- .../endpoint/{keyvalue.py => kv_endpoint.py} | 45 ++-- .../{service.py => service_endpoint.py} | 16 +- src/counselor/trigger.py | 5 + src/counselor/watcher.py | 40 +-- test/agent_test.py | 5 +- test/discovery_test.py | 100 ++++--- test/keyvalue_test.py | 42 ++- test/service_test.py | 51 ++++ test/trigger_test.py | 2 +- test/watcher_test.py | 29 ++- 20 files changed, 646 insertions(+), 281 deletions(-) rename src/counselor/endpoint/{agent.py => agent_endpoint.py} (76%) rename src/counselor/endpoint/{check.py => check_endpoint.py} (80%) rename src/counselor/endpoint/{keyvalue.py => kv_endpoint.py} (60%) rename src/counselor/endpoint/{service.py => service_endpoint.py} (72%) create mode 100644 test/service_test.py diff --git a/README.md b/README.md index 18fd866..c3698a4 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,11 @@ This lib provides functionality to interact with Consul from HashiCorp. It is still in work and you should not use it in production. -The main use case for this lib is, you have a service that you want to register in Consul and automatically reconfigure when the configuration for that service changed. Instead of having a local Consul agent running that executes a shell script or calls an http endpoint, Counselor uses an interface your service can implement, to notify it of changes to the service. The configuration of the service is stored in the meta field in Consul. To check for updates, a trigger periodically fetches the service definition and check it for changes. +The main use case for this lib is, you have a service that you want to register in Consul and automatically +reconfigure when the configuration for that service changed. Instead of having a local Consul agent running that +executes a shell script or calls an http endpoint, Counselor uses an interface your service can implement, +to notify it of changes to the service. The configuration of the service is stored in the KV store of Consul. +To check for updates, a trigger periodically fetches the service definition and check it for changes. ## Setup You can use the Makefile to install the lib locally @@ -23,105 +27,212 @@ python -m pip install counselor ``` ## Usage +Here are some examples executed in the python console to show you how to use the library. + +### KV Store +```python +import logging + +from counselor.config import KVConfigPath +from counselor.discovery import ServiceDiscovery +from counselor.endpoint.http_endpoint import EndpointConfig + +logging.basicConfig(level=logging.DEBUG) + +# Create a ServiceDiscovery instance to interact with consul +consul_config = EndpointConfig(host="127.0.0.1", port=8500, version="v1") +service_discovery = ServiceDiscovery.new_service_discovery_with_consul_config(consul_config) + +# Create a key value config path +kv_config_path = KVConfigPath("test-project", "test-domain", "test-service", "test-config", "test-env") +config_path = kv_config_path.compose_path() + +# Check whether there is already a config stored in that config path. +# You get two objects back, one for the response, that lets you know whether the request was successul of not. +# The other is the config itself. If the response is successful, the config instance is filled. +response, found_config = service_discovery.fetch_config_by_path(kv_config_path.compose_path()) +response.as_string() + +# Create a config for your service +test_service_config = { + "foo": "bar", + "number": 3.1415, + "active": True, + "list": ["one", "two", "three"], + "map": {"a": 1, "b": 2, "c": 3} +} + +# Store the config in the Consul KV store +response = service_discovery.store_config(config_path, test_service_config) +response.as_string() + +# Now you should find the config +response, found_config = service_discovery.fetch_config_by_path(config_path) +response.as_string() +found_config + +# To update the config, change the config and send it to Consul. Keep in mind that the +# config will be overwritten. That means any field that is not in the config anymore will be deleted in the KV store. +test_service_config["active"] = False +response = service_discovery.update_config(config_path, test_service_config) +response.as_string() +``` + +### Service registry +```python +from counselor.discovery import ServiceDiscovery +from counselor.endpoint.http_endpoint import EndpointConfig +from counselor.filter import KeyValuePair + + +# Create a ServiceDiscovery instance to interact with consul +consul_config = EndpointConfig(host="127.0.0.1", port=8500, version="v1") +service_discovery = ServiceDiscovery.new_service_discovery_with_consul_config(consul_config) + +# To register a service you need at least a unique key. This key is used to identify your service. Consul has only +# this level of identification. So if you track multiple instance of the same service, you might add a number to +# differentiate between the instances. +service_key = "test-service" + +# You can group your service with tags. For example, you could tag all your db services with the tag "db". +# A dash in the tag name can cause errors. You should use an underscore _ instead. +service_tags = ["test"] + +# The meta field allows you to define arbitrary characteristics of your service. In this example we have the version, +# the status and the base_time stored. The only limitation is that all keys and values have to be strings. +service_meta = { + "version": "1.0", + "status": "active", + "base_time": "1573639530", +} + +# Register the service +response = service_discovery.register_service(service_key=service_key, tags=service_tags, meta=service_meta) +response.as_string() + +# Fetch the service definition. +response, found_service = service_discovery.get_service_details(service_key) +response.as_string() +found_service.as_json() + +# To update the service modify the tag or meta field and send it to Consul. +service_tags = service_tags + ["additional_tag"] +service_meta = { + "status": "inactive" +} +response = service_discovery.update_service(service_key=service_key, tags=service_tags, meta=service_meta) + +# You are able to use the tags and meta map to search and filter the services. +response, found_services = service_discovery.search_for_services(tags=["additional_tag"], meta=[KeyValuePair('status', 'inactive')]) +response.as_string() +found_services[0].as_json() + +# At the end you can deregister your service by key +response = service_discovery.deregister_service(service_key) +response.as_string() +``` + +### Watch for config changes ```python import logging from datetime import timedelta -import uuid +from threading import Event -from counselor.client import ConsulConfig from counselor.config import KVConfigPath from counselor.discovery import ServiceDiscovery from counselor.endpoint.common import Response -from counselor.filter import KeyValuePair +from counselor.endpoint.http_endpoint import EndpointConfig from counselor.watcher import ReconfigurableService logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) -# Define a service that get automatically reconfigured -# -> this is where you can encapsulate all the functionality to reconfigure/reload your service +# Create a ServiceDiscovery instance to interact with consul +consul_config = EndpointConfig(host="127.0.0.1", port=8500, version="v1") +service_discovery = ServiceDiscovery.new_service_discovery_with_consul_config(consul_config) + +# To check for config updates in Consul, there is a Trigger that periodically fetches the config from Consul. +# It then compares the received config with the last know version. If there is a difference, it will notify you. +# We have an interface for that, called ReconfigurableService. You have to extend that class to provide the +# necessary functionality. In the followed example, the TestService simply logs the events. +# +# notify_failed_service_check() is called when Consul is not reachable or does not return the config. +# configure() is called the first time it fetches the config +# reconfigure() is called whenever the modification_index is increased and an update available class TestService(ReconfigurableService): - def __init__(self, service_key: str, kv_config_path: KVConfigPath, current_config: dict = None): - super().__init__(service_key, kv_config_path, current_config) + def __init__(self, service_key: str, config_path: KVConfigPath, current_config: dict = None): + self.service_key = service_key + self.config_path = config_path + self.last_config = current_config self.failed_service_check = False self.updated = False + def get_service_key(self) -> str: + return self.service_key + + def get_current_config(self) -> dict: + return self.last_config + + def get_config_path(self) -> str: + return self.config_path.compose_path() + def notify_failed_service_check(self, response: Response): LOGGER.info("Failed service check: {}".format(response.as_string())) self.failed_service_check = True def configure(self, config=dict) -> bool: - LOGGER.info("Configuring service") - self.current_config = config + LOGGER.info("Configured: {}".format(config)) + self.last_config = config self.failed_service_check = False - self.updated = True return True def reconfigure(self, new_config=dict) -> bool: LOGGER.info("New configuration received: {}".format(new_config)) self.configure(new_config) + self.updated = True return True -# Create a ServiceDiscovery instance to interact with consul -consul_config = ConsulConfig(host="127.0.0.1", port=8500) -service_discovery = ServiceDiscovery(consul_config) - -# Define the path in the KV store in Consul -> v1/kv/test/env/feature/service/config -kv_config_path = KVConfigPath("test", "feature", "service", "config", "env") - -# Your current service config, that you want to store in Consul -current_config = { - "foo": "bar", - "number": 3.1415, - "active": True, - "list": ["one", "two", "three"], - "map": {"a": 1, "b": 2, "c": 3} -} - # Create an instance of your service facade, that lets the watcher notify your service of changes -test_service = TestService(uuid.uuid4().hex, kv_config_path, current_config) - -# Check whether there is already a config stored in Consul -service_discovery.fetch_config(test_service) - -# If not store it -> you should see it in the UI: http://localhost:8500/ui/dc1/kv/test/env/feature/service/config/edit -config_store_response = service_discovery.store_config(test_service) -config_store_response.as_string() - -# Register a service in Consul -> you should see the registered service -register_response = service_discovery.register_service(service_key=test_service.service_key, tags=["test"], meta={ - "version": "1.0", - "status": "active", - "base_time": "1573639530", -}) -register_response.as_string() - -# Start the watcher with an interval of 3 seconds -> you should see log messages that the watcher is active -interval = timedelta(seconds=3) -watch_response = service_discovery.start_config_watch(test_service, interval) -watch_response.as_string() - -# Change the configuration, you can also do it via Consul ui -> you should see a log message that your service received a new config -test_service.current_config["reconfigure_action"] = "restart" -update_response = service_discovery.store_config(test_service) -update_response.as_string() - -# Your service instance should have set the updated flag by now +test_service = TestService(service_key="test-service", + config_path=KVConfigPath("test", "feature", "service", "config", "env"), + current_config={ + "foo": "bar", + "number": 3.1415, + "active": True, + "list": ["one", "two", "three"], + "map": {"a": 1, "b": 2, "c": 3} + }) + +# The service definition and the config in the KV store are seperate. You can store a config and watch for updates, +# without having the service registered. The method register_service_and_store_config will do both in one call. +service_tags = ["test"] +service_meta = None +response = service_discovery.register_service_and_store_config(reconfigurable_service=test_service, tags=service_tags, + meta=service_meta) +response.as_string() + +# You can add one or multiple config watches and start the trigger. +# With the stop you have the ability to stop the watcher by setting the event. This is helpful if you have other +# resources and you want to have a graceful shut down. +stop_event = Event() +service_discovery.add_config_watch(service=test_service, check_interval=timedelta(seconds=3), stop_event=stop_event) +response = service_discovery.start_config_watch() + +# Once the watcher is started, you should see log messages that Consul is checked for updates. +# You can now either change the service either via Consul UI or with the service_discovery instance. +test_service.last_config["reconfigure_action"] = "reload" +response = service_discovery.update_config_by_service(test_service) + +# You should then see that a new config is recieved and the update flag is set. test_service.updated -# Stop the watcher +# To stop the watcher you can either set the event, +stop_event.set() +# stop the trigger directly, service_discovery.stop_config_watch() - -# Deregister the service -deregister_response = service_discovery.deregister_service() -deregister_response.as_string() - -# You should not have any registered services left -search_response, found_services = service_discovery.search_for_services(tags=["test"], - meta=[KeyValuePair('status', 'active')]) -search_response.as_string() -found_services -``` +# or clear the watchers +service_discovery.clear_watchers() +``` For other examples, please have a look at the test folder. diff --git a/setup.py b/setup.py index 5fcff88..15941d0 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ long_description = fh.read() setup(name='counselor', - version='0.2.0', + version='0.2.1', description='Package to interact with HashiCorp Consul', long_description=long_description, long_description_content_type="text/markdown", diff --git a/src/counselor/client.py b/src/counselor/client.py index 3cbaca0..1f3eead 100644 --- a/src/counselor/client.py +++ b/src/counselor/client.py @@ -1,55 +1,25 @@ -from .endpoint.agent import Agent -from .endpoint.http_client import HttpRequest -from .endpoint.keyvalue import KV - - -class ConsulConfig: - """Config to connect to Consul. - """ - - def __init__(self, - host="127.0.0.1", - port=8500, - version='v1', - datacenter=None, - token=None, - scheme='http', - transport=HttpRequest()): - self.host = host - self.port = port - self.version = version - self.datacenter = datacenter - self.token = token - self.scheme = scheme - self.transport = transport - - def compose_base_uri(self) -> str: - """Return the base URI for API requests. - """ - - if self.port: - return '{0}://{1}:{2}/{3}'.format(self.scheme, self.host, self.port, self.version) - return '{0}://{1}/{2}'.format(self.scheme, self.host, self.version) +from .endpoint.agent_endpoint import AgentEndpoint +from .endpoint.http_endpoint import EndpointConfig +from .endpoint.kv_endpoint import KVEndpoint class Consul(object): """Client to use the API. """ - def __init__(self, config=ConsulConfig()): + def __init__(self, config=EndpointConfig()): self.config = config - base_uri = config.compose_base_uri() - self._agent = Agent(base_uri, config.transport, config.datacenter, config.token) - self._kv = KV(base_uri, config.transport, config.datacenter, config.token) + self._agent = AgentEndpoint(endpoint_config=config, url_parts=["agent"]) + self._kv = KVEndpoint(endpoint_config=config, url_parts=["kv"]) @property - def agent(self) -> Agent: + def agent(self) -> AgentEndpoint: """Get the agent service instance. """ return self._agent @property - def kv(self) -> KV: + def kv(self) -> KVEndpoint: """Get the key value service instance. """ return self._kv diff --git a/src/counselor/config.py b/src/counselor/config.py index e7e3fee..3001696 100644 --- a/src/counselor/config.py +++ b/src/counselor/config.py @@ -18,17 +18,19 @@ class ReconfigurableService: Extend this class to encapsulate all the logic to reconfigure your service. """ - def __init__(self, service_key: str, config_path: KVConfigPath, current_config: dict): - if service_key == "": - raise ValueError("Please define a unique service key") - if config_path is None: - raise ValueError("Please define a key value path for the service config") + def get_service_key(self) -> str: + """Return a unique service key to identify your service + """ + pass - # TODO: url encode the values to be safe because it will be part of the request url + def get_config_path(self) -> str: + """Return the config path where the config is stored, for example via KVConfigPath.compose_path() + """ + pass - self.service_key = service_key - self.config_path = config_path - self.current_config = current_config + def get_current_config(self) -> dict: + """Return the current config as a dictionary""" + pass def notify_failed_service_check(self, response: Response): """If the service fails to fetch the ServiceDefinition from Consul, this method is called. diff --git a/src/counselor/discovery.py b/src/counselor/discovery.py index deafa19..d2c8dc4 100644 --- a/src/counselor/discovery.py +++ b/src/counselor/discovery.py @@ -3,11 +3,12 @@ from threading import Event from typing import List -from .client import Consul, ConsulConfig +from .client import Consul from .config import ReconfigurableService from .endpoint.common import Response -from .endpoint.service import ServiceDefinition -from .filter import Filter, KeyValuePair, Operators +from .endpoint.entity import ServiceDefinition +from .endpoint.http_endpoint import EndpointConfig +from .filter import KeyValuePair, Filter, Operators from .trigger import Trigger from .watcher import KVConfigWatcherTask @@ -16,35 +17,101 @@ class ServiceDiscovery: """Facade to interact with Consul. The use case is that you have a service you want to register in Consul. - The configuration for the service is stored in the meta field. After your service is registered, - you can start a config watcher, that periodically fetches the entry from Consul. If there is a change in the - configuration, the service is notified via ReconfigurableService interface to reconfigure itself. + The service definition is stored to the service module, whereas the configuration for the service is persisted in + the Consul KV store. After your service is registered, you can start a config watcher, that periodically + fetches the config from Consul KV store. If there is a change in the configuration, the service is notified + via ReconfigurableService interface to reconfigure itself. """ - def __init__(self, consul_config=ConsulConfig()): - self._consul = Consul(consul_config) - self._service_definition = None - self._trigger = None - # TODO: set the stop event from arguments - self._stop_event = Event() + def __init__(self, client: Consul): + self._consul = client + self._trigger = Trigger() @staticmethod - def new_service_discovery_with_default_consul_client(): - return ServiceDiscovery(ConsulConfig()) + def new_service_discovery_with_defaults() -> 'ServiceDiscovery': + return ServiceDiscovery.new_service_discovery_with_consul_config(EndpointConfig()) - def fetch_config(self, service: ReconfigurableService) -> bool: - """Fetch the config from consul to initially configure your service + @staticmethod + def new_service_discovery_with_consul_config(config: EndpointConfig) -> 'ServiceDiscovery': + return ServiceDiscovery.new_service_discovery_with_consul_client(Consul(config)) + + @staticmethod + def new_service_discovery_with_consul_client(client: Consul) -> 'ServiceDiscovery': + return ServiceDiscovery(client) + + @staticmethod + def new_service_discovery_with_config_details(consul_ip: str = "127.0.0.1", + consul_port: int = 8500) -> 'ServiceDiscovery': + return ServiceDiscovery.new_service_discovery_with_consul_config( + EndpointConfig(host=consul_ip, port=consul_port)) + + def fetch_config_by_path(self, path: str) -> (Response, dict): + return self._consul.kv.get_raw(path) + + def store_config(self, path: str, config: dict) -> Response: + """Store the config in Consul. + """ + return self._consul.kv.set(path, config) + + def update_config(self, path: str, config: dict) -> Response: + """Update the config in Consul. + """ + return self._consul.kv.set(path, config) + + def update_config_by_service(self, service: ReconfigurableService) -> Response: + return self.update_config(service.get_config_path(), service.get_current_config()) + + def register_service(self, service_key: str, tags=None, meta=None) -> Response: + """Register a ServiceDefinition in Consul. """ - response, found_config = self._consul.kv.get_raw(service.config_path.compose_path()) - if not response.successful or found_config is None: - return False - service.reconfigure(found_config) + LOGGER.info("Registering service definition {}".format(service_key)) + + service_definition = ServiceDefinition(key=service_key, tags=tags, meta=meta) + + return self._update_service_definition(service_definition) + + def get_service_details(self, service_key) -> (Response, ServiceDefinition): + return self._consul.agent.service.get_details(service_key) - def store_config(self, service: ReconfigurableService) -> Response: - """Store the config in Consul + def register_service_and_store_config(self, reconfigurable_service: ReconfigurableService, tags: List[str], + meta: dict) -> Response: + register_response = self.register_service(reconfigurable_service.get_service_key(), tags, meta) + if not register_response.successful: + return register_response + + return self.store_config(reconfigurable_service.get_config_path(), reconfigurable_service.get_current_config()) + + def update_service(self, service_key: str, tags: List[str] = None, meta: dict = None) -> Response: + """Update the ServiceDefinition in Consul. + Update is the same as a registration. + """ + LOGGER.info("Updating service definition {}".format(service_key)) + + service_definition = ServiceDefinition(key=service_key, tags=tags, meta=meta) + + return self._update_service_definition(service_definition) + + def _update_service_definition(self, service_definition: ServiceDefinition) -> Response: + return self._consul.agent.service.register(service_definition) + + def deregister_service(self, service_key: str) -> Response: + """Delete the ServiceDefinition in Consul. Also stops the watcher if still active. """ - return self._consul.kv.set(service.config_path.compose_path(), service.current_config) + + if service_key is None or service_key == "": + return Response.create_successful_result() + + LOGGER.info("Deregistering service {}".format(service_key)) + + try: + if self._trigger is not None and self._trigger.running: + LOGGER.info("Stopping config watch first") + self.stop_config_watch() + + return self._consul.agent.service.deregister(service_key) + except Exception as exc: + return Response.create_error_result_with_message_only("{}".format(exc)) def search_for_services(self, tags: List[str] = None, meta: List[KeyValuePair] = None) -> ( Response, List[ServiceDefinition]): @@ -71,45 +138,40 @@ def search_for_services(self, tags: List[str] = None, meta: List[KeyValuePair] = return self._consul.agent.services(filter_tuples) - def register_service(self, service_key: str, tags=None, meta=None) -> Response: - """Register a ServiceDefinition in Consul. + def add_multiple_config_watches(self, services: List[ReconfigurableService], check_interval: timedelta, + stop_event=Event()): + """Add a list of config watchers """ - self._service_definition = ServiceDefinition.new_simple_service_definition(service_key, tags, meta) - return self.update_service_definition() + if services is None or len(services) == 0: + return - def update_service_definition(self) -> Response: - """Update the ServiceDefinition in Consul. - """ + for service in services: + if service is None: + continue - LOGGER.info("Updating definition {}".format(self._service_definition.key)) - return self._consul.agent.service.register(self._service_definition) + LOGGER.info("Adding config watch for {}".format(service.get_service_key())) + watcher_task = KVConfigWatcherTask(service, self._consul, check_interval, stop_event) + self._trigger.add_task(watcher_task) - def deregister_service(self) -> Response: - """Delete the ServiceDefinition in Consul. Also stops the watcher if still active. + def add_config_watch(self, service: ReconfigurableService, check_interval: timedelta, + stop_event=Event()): + """Create a watcher that periodically checks for config changes. """ - LOGGER.info("Deregistering service {}".format(self._service_definition.key)) + self.add_multiple_config_watches(services=[service], check_interval=check_interval, stop_event=stop_event) - try: - if self._trigger is not None and self._trigger.running: - LOGGER.info("Stopping config watch first") - self.stop_config_watch() - - return self._consul.agent.service.deregister(self._service_definition.key) - except Exception as exc: - return Response.create_error_result_with_message_only("{}".format(exc)) + def clear_watchers(self): + """Remove all the watchers""" + self._trigger.clear() - def start_config_watch(self, service: ReconfigurableService, check_interval: timedelta) -> Response: - """Create a watcher that periodically checks for config changes. + def start_config_watch(self) -> Response: + """Start the config watcher tasks """ - LOGGER.info("Starting config watch for {}".format(service.service_key)) + LOGGER.info("Starting config watches") try: - watcher_task = KVConfigWatcherTask(service, self._consul, check_interval, self._stop_event) - self._trigger = Trigger() - self._trigger.add_task(watcher_task) self._trigger.run_nonblocking() except Exception as exc: return Response.create_error_result_with_message_only("{}".format(exc)) @@ -120,9 +182,8 @@ def stop_config_watch(self): """Stop the watcher. """ - LOGGER.info("Stopping config watch for {}".format(self._service_definition.key)) + LOGGER.info("Stopping config watches") try: - self._stop_event.set() self._trigger.stop_tasks() except Exception as exc: LOGGER.info("Error when stopping watcher: {}".format(exc)) diff --git a/src/counselor/endpoint/agent.py b/src/counselor/endpoint/agent_endpoint.py similarity index 76% rename from src/counselor/endpoint/agent.py rename to src/counselor/endpoint/agent_endpoint.py index fb30a10..ed41503 100644 --- a/src/counselor/endpoint/agent.py +++ b/src/counselor/endpoint/agent_endpoint.py @@ -1,24 +1,28 @@ from typing import List -from .check import Check +from .check_endpoint import CheckEndpoint from .common import Response from .decoder import ServiceDefinitionListDecoder, JsonDecoder, Decoder from .entity import ServiceDefinition -from .http_endpoint import HttpEndpoint -from .service import Service +from .http_endpoint import HttpEndpoint, EndpointConfig +from .service_endpoint import ServiceEndpoint -class Agent(HttpEndpoint): +class AgentEndpoint(HttpEndpoint): """The Consul agent registers, deregisters and checks services. """ - def __init__(self, uri, request, datacenter=None, token=None): + def __init__(self, endpoint_config: EndpointConfig, url_parts: List[str]): """Create a new instance of the Agent. """ - super(Agent, self).__init__(uri, request, datacenter, token) - self.check = Check(self._base_uri, request, datacenter, token) - self.service = Service(self._base_uri, request, datacenter, token) + if url_parts is None: + url_parts = ["agent"] + + super(AgentEndpoint, self).__init__(endpoint_config, url_parts) + + self.check = CheckEndpoint(endpoint_config, url_parts + ["check"]) + self.service = ServiceEndpoint(endpoint_config, url_parts + ["service"]) def checks(self) -> (Response, List[str]): """Return all the checks that are registered with the local agent. diff --git a/src/counselor/endpoint/check.py b/src/counselor/endpoint/check_endpoint.py similarity index 80% rename from src/counselor/endpoint/check.py rename to src/counselor/endpoint/check_endpoint.py index 488c0c8..90a6508 100644 --- a/src/counselor/endpoint/check.py +++ b/src/counselor/endpoint/check_endpoint.py @@ -1,14 +1,21 @@ +from typing import List + from .common import Response -from .http_endpoint import HttpEndpoint +from .http_endpoint import HttpEndpoint, EndpointConfig -class Check(HttpEndpoint): +class CheckEndpoint(HttpEndpoint): """ At the moment checks are implemented as periodic http requests with our own watcher implementation. The next step is to involve the consul agent. TODO: implement """ + def __init__(self, endpoint_config: EndpointConfig, url_parts: List[str]): + if url_parts is None: + url_parts = ["agent", "check"] + super().__init__(endpoint_config, url_parts) + def register(self, name, script=None, check_id=None, interval=None, ttl=None, notes=None, http=None): response = self.put_response(url_parts=['register'], query=None, payload={ 'ID': check_id, diff --git a/src/counselor/endpoint/common.py b/src/counselor/endpoint/common.py index f913d85..729c303 100644 --- a/src/counselor/endpoint/common.py +++ b/src/counselor/endpoint/common.py @@ -44,3 +44,6 @@ def update_by_decode_result(self, decoder: Decoder): def as_string(self) -> str: return "successful: {} \nkind: {} \nmessage: {} \nexc: {}".format(self.successful, self.kind, self.message, self.exc) + + class ErrorTypes: + NotDefined = "NotDefined" diff --git a/src/counselor/endpoint/entity.py b/src/counselor/endpoint/entity.py index aca5fca..9a3a14f 100644 --- a/src/counselor/endpoint/entity.py +++ b/src/counselor/endpoint/entity.py @@ -7,16 +7,16 @@ class ServiceDefinition: """ def __init__(self, key: str, address=None, port=0, tags=None, meta=None, content_hash=None): - if tags is None: - tags = [] - if meta is None: - meta = {} - self.key = key self.address = address self.port = port - self.tags = tags - self.meta = meta + + self.tags: List[str] = [] + self.set_tags(tags) + + self.meta: dict = {} + self.set_meta(meta) + self.content_hash = content_hash self.check = None self.interval = None @@ -27,6 +27,18 @@ def __init__(self, key: str, address=None, port=0, tags=None, meta=None, content def new_simple_service_definition(key: str, tags=None, meta=None): return ServiceDefinition(key=key, tags=tags, meta=meta) + def set_tags(self, tags: List[str] = None): + if tags is None: + tags = [] + + self.tags = tags + + def set_meta(self, meta: dict = None): + if meta is None: + meta = {} + + self.meta = meta + def validate(self): if self.port and not isinstance(self.port, int): raise ValueError('Port must be an integer') diff --git a/src/counselor/endpoint/http_endpoint.py b/src/counselor/endpoint/http_endpoint.py index a16547e..c3ef877 100644 --- a/src/counselor/endpoint/http_endpoint.py +++ b/src/counselor/endpoint/http_endpoint.py @@ -1,4 +1,5 @@ import logging +from typing import List from urllib.parse import urlencode from .common import Response @@ -8,26 +9,55 @@ LOGGER = logging.getLogger(__name__) +class EndpointConfig: + """Config to connect to Consul. + """ + + def __init__(self, + host="127.0.0.1", + port=8500, + version='v1', + datacenter=None, + token=None, + scheme='http', + transport=HttpRequest()): + self.host = host + self.port = port + self.version = version + self.datacenter = datacenter + self.token = token + self.scheme = scheme + self.transport = transport + + def compose_base_uri(self) -> str: + """Return the base URI for API requests. + """ + + if self.port: + return '{0}://{1}:{2}/{3}'.format(self.scheme, self.host, self.port, self.version) + return '{0}://{1}/{2}'.format(self.scheme, self.host, self.version) + + class HttpEndpoint(object): """Base class for API endpoints""" - def __init__(self, uri, request=HttpRequest(), datacenter=None, token=None): + def __init__(self, endpoint_config: EndpointConfig, url_parts: List[str]): """Create a new instance of the Endpoint class """ - self._request = request - self._base_uri = '{0}/{1}'.format(uri, self.__class__.__name__.lower()) - self._dc = datacenter - self._token = token + self._endpoint_config = endpoint_config + self._base_uri = endpoint_config.compose_base_uri() + if url_parts is not None and len(url_parts) > 0: + self._base_uri = '{0}/{1}'.format(self._base_uri, '/'.join(url_parts)) def build_uri(self, params, query_params=None): """Build the request URI """ if not query_params: query_params = dict() - if self._dc: - query_params['dc'] = self._dc - if self._token: - query_params['token'] = self._token + if self._endpoint_config.datacenter: + query_params['dc'] = self._endpoint_config.datacenter + if self._endpoint_config.token: + query_params['token'] = self._endpoint_config.token path = '/'.join(params) if query_params: return '{0}/{1}?{2}'.format(self._base_uri, path, @@ -39,25 +69,25 @@ def get_response(self, url_parts=None, query=None) -> HttpResponse: url_parts = [] uri = self.build_uri(url_parts, query) - return self._request.get(uri) + return self._endpoint_config.transport.get(uri) def post_response(self, url_parts, query=None, payload=None) -> HttpResponse: if url_parts is None: url_parts = [] - return self._request.post(self.build_uri(url_parts, query), payload) + return self._endpoint_config.transport.post(self.build_uri(url_parts, query), payload) def put_response(self, url_parts, query=None, payload=None) -> HttpResponse: if url_parts is None: url_parts = [] - return self._request.put(self.build_uri(url_parts, query), payload) + return self._endpoint_config.transport.put(self.build_uri(url_parts, query), payload) def delete_response(self, url_parts, query=None) -> HttpResponse: if url_parts is None: url_parts = [] - return self._request.delete(self.build_uri(url_parts, query)) + return self._endpoint_config.transport.delete(self.build_uri(url_parts, query)) @staticmethod def decode_response(response: HttpResponse, decoder: Decoder): diff --git a/src/counselor/endpoint/keyvalue.py b/src/counselor/endpoint/kv_endpoint.py similarity index 60% rename from src/counselor/endpoint/keyvalue.py rename to src/counselor/endpoint/kv_endpoint.py index 9014a9f..3eb8ab7 100644 --- a/src/counselor/endpoint/keyvalue.py +++ b/src/counselor/endpoint/kv_endpoint.py @@ -1,23 +1,29 @@ import logging +from typing import List from .common import Response from .decoder import JsonDecoder, ConsulKVDecoder from .entity import ConsulKeyValue from .http_client import HttpResponse -from .http_endpoint import HttpEndpoint +from .http_endpoint import HttpEndpoint, EndpointConfig LOGGER = logging.getLogger(__name__) -class KV(HttpEndpoint): +class KVEndpoint(HttpEndpoint): """Key value store interface to consul. This class is meant to store dicts as values. TODO: use StatusResponse as returned value """ - def get_raw(self, item) -> (Response, dict): + def __init__(self, endpoint_config: EndpointConfig, url_parts: List[str]): + if url_parts is None: + url_parts = ["kv"] + super().__init__(endpoint_config, url_parts) + + def get_raw(self, path) -> (Response, dict): query_params = {'raw': True} - response = self._get(item=item, query_params=query_params) + response = self._get(path=path, query_params=query_params) endpoint_response = Response.create_from_http_response(response) if not endpoint_response.successful: @@ -30,12 +36,12 @@ def get_raw(self, item) -> (Response, dict): return endpoint_response, result - def get(self, item) -> (Response, ConsulKeyValue): + def get(self, path) -> (Response, ConsulKeyValue): """Get a value. Raw means without the Consul metadata like CreateIndex and ModifyIndex. """ - response = self._get(item=item) + response = self._get(path=path) endpoint_response = Response.create_from_http_response(response) if not endpoint_response.successful: @@ -48,42 +54,45 @@ def get(self, item) -> (Response, ConsulKeyValue): return endpoint_response, consul_kv - def _get(self, item: str, query_params=None) -> HttpResponse: + def _get(self, path: str, query_params=None) -> HttpResponse: + if path is None or path == "": + return HttpResponse(status_code=500, body="Path can not be empty", headers=None) + if query_params is None: query_params = {} - item = item.lstrip('/') - return self.get_response(url_parts=[item], query=query_params) + path = path.lstrip('/') + return self.get_response(url_parts=[path], query=query_params) - def set(self, item: str, value, flags=None) -> Response: + def set(self, path: str, value, flags=None) -> Response: """Set a value. """ - item = item.rstrip('/') + path = path.rstrip('/') query_params = {} if flags is not None: query_params['flags'] = flags - response = self.put_response(url_parts=[item], query=query_params, payload=value) + response = self.put_response(url_parts=[path], query=query_params, payload=value) return Response.create_from_http_response(response) - def delete(self, item, recurse=False) -> Response: + def delete(self, path, recurse=False) -> Response: """Remove an item. """ query_params = {'recurse': True} if recurse else {} - response = self.delete_response(url_parts=[item], query=query_params) + response = self.delete_response(url_parts=[path], query=query_params) return Response.create_from_http_response(response) - def acquire_lock(self, item, session) -> Response: + def acquire_lock(self, path, session) -> Response: """Set a lock. """ - response = self.put_response(url_parts=[item], query=None, payload={'acquire': session}) + response = self.put_response(url_parts=[path], query=None, payload={'acquire': session}) return Response.create_from_http_response(response) - def release_lock(self, item, session) -> Response: + def release_lock(self, path, session) -> Response: """Release a lock. """ - response = self.put_response(url_parts=[item], query=None, payload={'release': session}) + response = self.put_response(url_parts=[path], query=None, payload={'release': session}) return Response.create_from_http_response(response) diff --git a/src/counselor/endpoint/service.py b/src/counselor/endpoint/service_endpoint.py similarity index 72% rename from src/counselor/endpoint/service.py rename to src/counselor/endpoint/service_endpoint.py index 1653f43..823ba09 100644 --- a/src/counselor/endpoint/service.py +++ b/src/counselor/endpoint/service_endpoint.py @@ -1,19 +1,23 @@ import logging +from typing import List from .common import Response from .decoder import ServiceDefinitionDecoder from .encoder import Encoder from .entity import ServiceDefinition -from .http_endpoint import HttpEndpoint +from .http_endpoint import HttpEndpoint, EndpointConfig LOGGER = logging.getLogger(__name__) -class Service(HttpEndpoint): +class ServiceEndpoint(HttpEndpoint): """Service endpoint for Consul. """ - CHECK_EXCEPTION = 'check must be a tuple of script, interval, and ttl' + def __init__(self, endpoint_config: EndpointConfig, url_parts: List[str] = None): + if url_parts is None: + url_parts = ["agent", "service"] + super().__init__(endpoint_config, url_parts) def register(self, service_definition: ServiceDefinition) -> Response: """Register a service. @@ -41,6 +45,12 @@ def get_details(self, service_key) -> (Response, ServiceDefinition): return endpoint_response, service_definition + def update(self, service_definition: ServiceDefinition) -> Response: + """Update is the same as registering - the values are simply overwritten + """ + + return self.register(service_definition) + def deregister(self, service_key) -> Response: """Deregister a service. """ diff --git a/src/counselor/trigger.py b/src/counselor/trigger.py index 8d493cd..13ab1a1 100644 --- a/src/counselor/trigger.py +++ b/src/counselor/trigger.py @@ -13,6 +13,10 @@ def __init__(self): self.tasks = [] self.running = False + def clear(self): + self.stop_tasks() + self.tasks.clear() + def add_task(self, task: Thread): self.tasks.append(task) @@ -39,4 +43,5 @@ def stop_tasks(self): LOGGER.info("Stopping task {}".format(t.name)) t.stop() + self.running = False LOGGER.info("Trigger exited.") diff --git a/src/counselor/watcher.py b/src/counselor/watcher.py index 2199937..95ee887 100644 --- a/src/counselor/watcher.py +++ b/src/counselor/watcher.py @@ -48,13 +48,14 @@ def __init__(self, service: ReconfigurableService, consul_client: Consul, interv self.consul_client = consul_client def get_name(self) -> str: - return self.service.service_key + return self.service.get_service_key() def check(self): - LOGGER.info("Checking service: {}".format(self.service.service_key)) + LOGGER.info("Checking service: {}".format(self.service.get_service_key())) try: - response, service_definition = self.consul_client.agent.service.get_details(self.service.service_key) + response, new_service_definition = self.consul_client.agent.service.get_details( + self.service.get_service_key()) except Exception as exc: return self.service.notify_failed_service_check(Response.create_error_result_with_exception_only(exc)) @@ -63,13 +64,13 @@ def check(self): return if self.last_service_config_hash == "": - self.last_service_config_hash = service_definition.content_hash + self.last_service_config_hash = new_service_definition.content_hash return - if self.last_service_config_hash != service_definition.content_hash: - successful = self.service.reconfigure(service_definition.meta) + if self.last_service_config_hash != new_service_definition.content_hash: + successful = self.service.reconfigure(new_service_definition.meta) if successful: - self.last_service_config_hash = service_definition.content_hash + self.last_service_config_hash = new_service_definition.content_hash else: LOGGER.error("Reconfiguration was not successful") @@ -85,13 +86,13 @@ def __init__(self, service: ReconfigurableService, consul_client: Consul, interv self.consul_client = consul_client def get_name(self) -> str: - return self.service.service_key + return self.service.get_service_key() def check(self): - LOGGER.info("Checking service config: {}".format(self.service.service_key)) + LOGGER.info("Checking service config: {}".format(self.service.get_service_key())) try: - response, consul_config = self.consul_client.kv.get(self.service.config_path.compose_path()) + response, new_config = self.consul_client.kv.get(self.service.get_config_path()) except Exception as exc: return self.service.notify_failed_service_check(Response.create_error_result_with_exception_only(exc)) @@ -99,9 +100,16 @@ def check(self): self.service.notify_failed_service_check(response) return - if self.last_modify_index < consul_config.modify_index: - successful = self.service.reconfigure(consul_config.value) - if successful: - self.last_modify_index = consul_config.modify_index - else: - LOGGER.error("Reconfiguration was not successful") + successful = False + if self.last_modify_index == 0: + successful = self.service.configure(new_config.value) + elif self.last_modify_index < new_config.modify_index: + successful = self.service.reconfigure(new_config.value) + else: + LOGGER.debug("Config still up to date: {}".format(self.last_modify_index)) + return + + if successful: + self.last_modify_index = new_config.modify_index + else: + LOGGER.error("Reconfiguration was not successful") diff --git a/test/agent_test.py b/test/agent_test.py index 8b1525c..7a31dc1 100644 --- a/test/agent_test.py +++ b/test/agent_test.py @@ -3,6 +3,7 @@ from src.counselor import client from src.counselor.endpoint.encoder import ServiceDefinition +from src.counselor.endpoint.http_endpoint import EndpointConfig logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) @@ -12,7 +13,7 @@ class AgentTests(unittest.TestCase): def setUp(self): LOGGER.info("Setting up") self.test_service_key = "unit-test-service" - self.consul_config = client.ConsulConfig() + self.consul_config = EndpointConfig() self.consul = client.Consul(config=self.consul_config) def tearDown(self): @@ -40,7 +41,7 @@ def test_services_registration(self): LOGGER.info(found_service_definition.as_json()) service_definition.meta["version"] = "v1.1" - update_status = self.consul.agent.service.register(service_definition) + update_status = self.consul.agent.service.update(service_definition) LOGGER.info("Update status: {}".format(update_status.as_string())) search_status, found_services = self.consul.agent.services() diff --git a/test/discovery_test.py b/test/discovery_test.py index 060b9ac..9e78b5e 100644 --- a/test/discovery_test.py +++ b/test/discovery_test.py @@ -4,9 +4,11 @@ import uuid from datetime import timedelta -from counselor.config import KVConfigPath +from src.counselor import client +from src.counselor.config import KVConfigPath from src.counselor.discovery import ServiceDiscovery from src.counselor.endpoint.common import Response +from src.counselor.endpoint.http_endpoint import EndpointConfig from src.counselor.filter import KeyValuePair from src.counselor.watcher import ReconfigurableService @@ -15,17 +17,28 @@ class TestService(ReconfigurableService): - def __init__(self, service_key: str, kv_config_path: KVConfigPath, current_config: dict = None): - super().__init__(service_key, kv_config_path, current_config) + def __init__(self, service_key: str, kv_config_path: KVConfigPath, current_config: dict): + self.service_key = service_key + self.kv_config_path = kv_config_path + self.current_config = current_config self.failed_service_check = False self.updated = False + def get_config_path(self) -> str: + return self.kv_config_path.compose_path() + + def get_current_config(self) -> dict: + return self.current_config + + def get_service_key(self) -> str: + return self.service_key + def notify_failed_service_check(self, response: Response): LOGGER.info("Failed service check: {}".format(response.as_string())) self.failed_service_check = True def configure(self, config=dict) -> bool: - LOGGER.info("Configuring service") + LOGGER.info("Configuring service: {}".format(config)) self.current_config = config self.failed_service_check = False self.updated = True @@ -37,43 +50,54 @@ def reconfigure(self, new_config=dict) -> bool: return True -class TestServiceDiscovery(unittest.TestCase): +class ServiceDiscoveryTests(unittest.TestCase): - def test_filter_services(self): - service_discovery = ServiceDiscovery.new_service_discovery_with_default_consul_client() + def setUp(self): + LOGGER.info("Setting up") + self.test_key_prefix = "test" + self.service_key = uuid.uuid4().hex + self.consul_config = EndpointConfig() + self.consul = client.Consul(config=self.consul_config) + self.service_discovery = ServiceDiscovery.new_service_discovery_with_consul_client(self.consul) - register_status = service_discovery.register_service(service_key=uuid.uuid4().hex, tags=["test", "ts"], meta={ - "version": "1.0", - "foo": "bar", - }) + def tearDown(self): + LOGGER.info("Cleaning up") + self.consul.kv.delete(self.test_key_prefix, recurse=True) + self.consul.agent.service.deregister(self.service_key) + + def test_filter_services(self): + service_key = uuid.uuid4().hex + register_status = self.service_discovery.register_service(service_key=service_key, tags=["test", "ts"], + meta={ + "version": "1.0", + "foo": "bar", + }) self.assertTrue(register_status.successful, "Could not register service: {}".format(register_status.as_string())) - search_status, found_services = service_discovery.search_for_services(tags=["ts"], - meta=[KeyValuePair('foo', 'bar')]) + search_status, found_services = self.service_discovery.search_for_services(tags=["ts"], + meta=[KeyValuePair('foo', 'bar')]) self.assertTrue(search_status.successful, "Could not filter for services: {}".format(search_status.as_string())) self.assertEqual(1, len(found_services)) self.assertEqual(found_services[0].meta['foo'], 'bar', "Meta value does not match") - deregister_status = service_discovery.deregister_service() + deregister_status = self.service_discovery.deregister_service(service_key) self.assertTrue(deregister_status.successful, "Service deregistration is not successful: {}".format(deregister_status.as_string())) - search_status, found_services = service_discovery.search_for_services(tags=["ts"], - meta=[KeyValuePair('foo', 'bar')]) + search_status, found_services = self.service_discovery.search_for_services(tags=["ts"], + meta=[KeyValuePair('foo', 'bar')]) self.assertTrue(search_status.successful, "Search was not successful: {}".format(search_status.as_string())) self.assertEqual(0, len(found_services)) def test_kv_config_watch(self): - service_discovery = ServiceDiscovery.new_service_discovery_with_default_consul_client() - - kv_config_path = KVConfigPath("test", "feature", "service", "detail", "env") + kv_config_path = KVConfigPath(self.test_key_prefix, "feature", "service", "detail", "env") current_config = { "foo": "bar", "number": 3.1415, @@ -81,19 +105,26 @@ def test_kv_config_watch(self): "list": ["one", "two", "three"], "map": {"a": 1, "b": 2, "c": 3} } - test_service = TestService(uuid.uuid4().hex, kv_config_path, current_config) - self.assertFalse(service_discovery.fetch_config(test_service)) + test_service = TestService(self.service_key, kv_config_path, current_config) + response, found_config = self.service_discovery.fetch_config_by_path(test_service.get_config_path()) + self.assertFalse(response.successful) + self.assertIsNone(found_config) - config_store_response = service_discovery.store_config(test_service) + config_store_response = self.service_discovery.store_config(test_service.get_config_path(), current_config) self.assertTrue(config_store_response.successful, "Could not store config: {}".format(config_store_response.as_string())) - register_status = service_discovery.register_service(service_key=test_service.service_key, tags=["test"], meta={ - "version": "1.0", - "status": "active", - "base_time": "1573639530", - }) + response, found_config = self.service_discovery.fetch_config_by_path(test_service.get_config_path()) + self.assertTrue(response.successful) + self.assertEqual(current_config, found_config) + + register_status = self.service_discovery.register_service(service_key=test_service.service_key, tags=["test"], + meta={ + "version": "1.0", + "status": "active", + "base_time": "1573639530", + }) self.assertTrue(register_status.successful, "Could not register service: {}".format(register_status.as_string())) @@ -102,26 +133,27 @@ def test_kv_config_watch(self): self.assertIsNotNone(test_service.current_config, "Config should be set") interval = timedelta(seconds=1) - watch_status = service_discovery.start_config_watch(test_service, interval) + self.service_discovery.add_config_watch(test_service, interval) + watch_status = self.service_discovery.start_config_watch() self.assertTrue(watch_status.successful, "Could not start watcher: {}".format(watch_status.as_string())) time.sleep(1.0) test_service.current_config["reconfigure_action"] = "restart" - update_status = service_discovery.store_config(test_service) + update_status = self.service_discovery.update_config(test_service.get_config_path(), + test_service.get_current_config()) self.assertTrue(update_status.successful, "Update was not successful {}".format(update_status.as_string())) time.sleep(5.0) - service_discovery.stop_config_watch() + self.service_discovery.stop_config_watch() self.assertFalse(test_service.failed_service_check, "Check should not fail") self.assertIsNotNone(test_service.current_config, "Last config should should be set by the watcher update") self.assertTrue(test_service.updated, "Updated flag in service should be true") - deregister_status = service_discovery.deregister_service() + deregister_status = self.service_discovery.deregister_service(self.service_key) self.assertTrue(deregister_status.successful, "Could not deregister service: {}".format(deregister_status.as_string())) - -if __name__ == '__main__': - unittest.main() + if __name__ == '__main__': + unittest.main() diff --git a/test/keyvalue_test.py b/test/keyvalue_test.py index 70677d2..d0134e4 100644 --- a/test/keyvalue_test.py +++ b/test/keyvalue_test.py @@ -1,7 +1,9 @@ +import json import logging import unittest from src.counselor import client +from src.counselor.endpoint.http_endpoint import EndpointConfig logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) @@ -10,8 +12,8 @@ class KeyValueTests(unittest.TestCase): def setUp(self): LOGGER.info("Setting up") - self.test_key_prefix = "test/unit" - self.consul_config = client.ConsulConfig() + self.test_key_prefix = "test" + self.consul_config = EndpointConfig() self.consul = client.Consul(config=self.consul_config) def tearDown(self): @@ -60,6 +62,42 @@ def test_kv_consul_entry(self): self.assertTrue(get_response.successful) self.assertEqual(test_config, found_entry.value, "Configs do not match") + def test_recursive_kv(self): + service_config_path = key = self.test_key_prefix + "/service" + service_config = { + "env": "test", + "pairs": ["btc", "etc", "ren"], + "strategy": { + "goal": 42, + "risk": 3.1415, + } + } + response = self.consul.kv.set(service_config_path, service_config) + self.assertTrue(response.successful, response.as_string()) + + s1_config_path = service_config_path + "/s1" + s1_config = { + "name": "service-1", + "current_rate": 1.8 + } + response = self.consul.kv.set(s1_config_path, s1_config) + self.assertTrue(response.successful, response.as_string()) + + s2_config_path = service_config_path + "/s2" + s2_config = { + "name": "service-2", + "current_rate": 1.4 + } + response = self.consul.kv.set(s2_config_path, s2_config) + self.assertTrue(response.successful, response.as_string()) + + query_params = {"recurse": True} + http_response = self.consul.kv._get(path=service_config_path, query_params=query_params) + self.assertTrue(http_response.is_successful()) + + parsed_config = json.loads(http_response.payload) + self.assertEqual(3, len(parsed_config)) + if __name__ == '__main__': unittest.main() diff --git a/test/service_test.py b/test/service_test.py new file mode 100644 index 0000000..0491a49 --- /dev/null +++ b/test/service_test.py @@ -0,0 +1,51 @@ +import logging +import unittest + +from src.counselor.endpoint.entity import ServiceDefinition +from src.counselor.endpoint.http_endpoint import EndpointConfig +from src.counselor.endpoint.service_endpoint import ServiceEndpoint + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + + +class ServiceTests(unittest.TestCase): + def setUp(self): + LOGGER.info("Setting up") + self.test_service_key = "unit-test-service" + self.consul_config = EndpointConfig(host="127.0.0.1", port=8500, version="v1") + self.service_endpoint = ServiceEndpoint(self.consul_config) + + def tearDown(self): + LOGGER.info("Cleaning up") + self.service_endpoint.deregister(self.test_service_key) + + def test_services_registration(self): + service_definition = ServiceDefinition( + key=self.test_service_key, + address="127.0.0.1", + port=61123, + tags=["unit", "test", "v1"], + meta={ + "version": "1.0", + "status": "active", + "base_time": "1573639530", + } + ) + + register_status = self.service_endpoint.register(service_definition) + self.assertTrue(register_status.successful) + + get_status, found_service_definition = self.service_endpoint.get_details(service_definition.key) + self.assertTrue(get_status.successful) + self.assertEqual(service_definition.key, found_service_definition.key) + self.assertEqual(service_definition.port, found_service_definition.port) + self.assertEqual(service_definition.meta["base_time"], found_service_definition.meta["base_time"]) + + service_definition.meta["version"] = "v1.1" + update_status = self.service_endpoint.update(service_definition) + self.assertTrue(update_status.successful) + + search_status, found_service_definition = self.service_endpoint.get_details(service_definition.key) + self.assertTrue(search_status.successful) + self.assertEqual(service_definition.meta["version"], found_service_definition.meta["version"]) diff --git a/test/trigger_test.py b/test/trigger_test.py index c78c6ce..987937f 100644 --- a/test/trigger_test.py +++ b/test/trigger_test.py @@ -21,7 +21,7 @@ def check(self): print("{}s task's current time : {}".format(self.interval.total_seconds(), time.ctime())) -class TestTrigger(unittest.TestCase): +class TriggerTests(unittest.TestCase): def test_triggers(self): trigger = Trigger() diff --git a/test/watcher_test.py b/test/watcher_test.py index 7d73674..42a1e70 100644 --- a/test/watcher_test.py +++ b/test/watcher_test.py @@ -4,10 +4,11 @@ from datetime import timedelta from threading import Event -from counselor.client import ConsulConfig, Consul -from counselor.config import KVConfigPath -from counselor.endpoint.common import Response -from counselor.watcher import ReconfigurableService, KVConfigWatcherTask +from src.counselor.client import Consul +from src.counselor.config import KVConfigPath +from src.counselor.endpoint.common import Response +from src.counselor.endpoint.http_endpoint import EndpointConfig +from src.counselor.watcher import ReconfigurableService, KVConfigWatcherTask logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) @@ -15,10 +16,20 @@ class TestService(ReconfigurableService): def __init__(self, service_key: str, config_path: KVConfigPath, current_config: dict = None): - super().__init__(service_key, config_path, current_config) - self.last_config = None + self.service_key = service_key + self.config_path = config_path + self.last_config = current_config self.failed_service_check = False + def get_service_key(self) -> str: + return self.service_key + + def get_current_config(self) -> dict: + return self.last_config + + def get_config_path(self) -> str: + return self.config_path.compose_path() + def notify_failed_service_check(self, response: Response): LOGGER.info("Failed service check: {}".format(response.as_string())) self.failed_service_check = True @@ -35,10 +46,10 @@ def reconfigure(self, new_config=dict) -> bool: return True -class KVConfigWatcherTaskTest(unittest.TestCase): +class KVConfigWatcherTaskTests(unittest.TestCase): def setUp(self): LOGGER.info("Setting up") - self.consul_config = ConsulConfig() + self.consul_config = EndpointConfig() self.consul = Consul(config=self.consul_config) self.kv_config_path = KVConfigPath(project="project", feature="feature", service="service", detail="config", env="dev") @@ -59,7 +70,7 @@ def test_config_watcher_task(self): set_response = self.consul.kv.set(self.kv_config_path.compose_path(), test_config) self.assertTrue(set_response.successful) - test_service = TestService(uuid.uuid4().hex, self.kv_config_path) + test_service = TestService(uuid.uuid4().hex, self.kv_config_path, test_config) interval = timedelta(seconds=1) stop_event = Event()