Skip to content
This repository has been archived by the owner on May 6, 2023. It is now read-only.

Commit

Permalink
Merge branch 'kv-configs'
Browse files Browse the repository at this point in the history
  • Loading branch information
joergeschmann committed Nov 21, 2019
2 parents 8c96f8c + 0464b39 commit 6da7104
Show file tree
Hide file tree
Showing 39 changed files with 973 additions and 662 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -6,6 +6,9 @@ dist

# python
venv
*.pyc
__pycache__/


# jetbrains ide folders
.idea
102 changes: 63 additions & 39 deletions README.md
Expand Up @@ -25,78 +25,102 @@ python -m pip install counselor
## Usage
```python
import logging
from datetime import timedelta
import uuid

from counselor.watcher import ReconfigurableService
from counselor.client import ConsulConfig
from counselor.config import KVConfigPath
from counselor.discovery import ServiceDiscovery

from datetime import timedelta
from counselor.endpoint.common import Response
from counselor.filter import KeyValuePair
from counselor.watcher import ReconfigurableService

logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)

# Define a service that get automatically reconfigured
# -> this is where you can encapsulate all the functionality to reconfigure/reload your service
class TestService(ReconfigurableService):
def __init__(self):
super().__init__()
self.last_config = None
def __init__(self, service_key: str, kv_config_path: KVConfigPath, current_config: dict = None):
super().__init__(service_key, kv_config_path, current_config)
self.failed_service_check = False
def notify_failed_service_check(self):
LOGGER.info("Failed service check")
self.updated = False

def notify_failed_service_check(self, response: Response):
LOGGER.info("Failed service check: {}".format(response.as_string()))
self.failed_service_check = True

def configure(self, config=dict) -> bool:
LOGGER.info("Configuring service")
self.current_config = config
self.failed_service_check = False
self.updated = True
return True

def reconfigure(self, new_config=dict) -> bool:
LOGGER.info("New configuration received: {}".format(new_config))
self.last_config = new_config
self.failed_service_check = False
self.configure(new_config)
return True

# Create a ServiceDiscovery instance to interact with consul
service_discovery = ServiceDiscovery.new_service_discovery_with_default_consul_client()
consul_config = ConsulConfig(host="127.0.0.1", port=8500)
service_discovery = ServiceDiscovery(consul_config)

# Register a service in Consul
register_status = service_discovery.register_service(service_key="my-test-service", tags=["test"], meta={
"version": "1.0",
"status": "active",
"base_time": "1573639530",
})
# Define the path in the KV store in Consul -> v1/kv/test/env/feature/service/config
kv_config_path = KVConfigPath("test", "feature", "service", "config", "env")

# Print the response
register_status.as_string()
# Your current service config, that you want to store in Consul
current_config = {
"foo": "bar",
"number": 3.1415,
"active": True,
"list": ["one", "two", "three"],
"map": {"a": 1, "b": 2, "c": 3}
}

# Create an instance of your service facade, that lets the watcher notify your service of changes
test_service = TestService()
test_service = TestService(uuid.uuid4().hex, kv_config_path, current_config)

# Start the watcher with an interval of 3 seconds -> you should see log messages that the watcher is active
watch_status = service_discovery.start_config_watch(test_service, timedelta(seconds=3))
# Check whether there is already a config stored in Consul
service_discovery.fetch_config(test_service)

# Print the repsonse
watch_status.as_string()
# If not store it -> you should see it in the UI: http://localhost:8500/ui/dc1/kv/test/env/feature/service/config/edit
config_store_response = service_discovery.store_config(test_service)
config_store_response.as_string()

# Change the configuration
service_discovery.service_definition.meta["addition"] = "True"
# Register a service in Consul -> you should see the registered service
register_response = service_discovery.register_service(service_key=test_service.service_key, tags=["test"], meta={
"version": "1.0",
"status": "active",
"base_time": "1573639530",
})
register_response.as_string()

# Update the service definition -> you should see a log message that your service received a new config
update_status = service_discovery.update_service_definition()
# Start the watcher with an interval of 3 seconds -> you should see log messages that the watcher is active
interval = timedelta(seconds=3)
watch_response = service_discovery.start_config_watch(test_service, interval)
watch_response.as_string()

# Print the response
update_status.as_string()
# Change the configuration, you can also do it via Consul ui -> you should see a log message that your service received a new config
test_service.current_config["reconfigure_action"] = "restart"
update_response = service_discovery.store_config(test_service)
update_response.as_string()

# Your service instance should contain the new config
test_service.last_config
# Your service instance should have set the updated flag by now
test_service.updated

# Stop the watcher
service_discovery.stop_config_watch()

# Deregister the service
deregister_status = service_discovery.deregister_service()

# Print the resopnse
deregister_status.as_string()
deregister_response = service_discovery.deregister_service()
deregister_response.as_string()

# You should not have any registered services left
response, services = service_discovery.consul.agent.services()
response.as_string()
services
search_response, found_services = service_discovery.search_for_services(tags=["test"],
meta=[KeyValuePair('status', 'active')])
search_response.as_string()
found_services
```

For other examples, please have a look at the test folder.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -4,7 +4,7 @@
long_description = fh.read()

setup(name='counselor',
version='0.1.3',
version='0.2.0',
description='Package to interact with HashiCorp Consul',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
Binary file removed src/counselor/__pycache__/client.cpython-36.pyc
Binary file not shown.
Binary file not shown.
Binary file removed src/counselor/__pycache__/filter.cpython-36.pyc
Binary file not shown.
Binary file removed src/counselor/__pycache__/signal.cpython-36.pyc
Binary file not shown.
Binary file removed src/counselor/__pycache__/trigger.cpython-36.pyc
Binary file not shown.
Binary file removed src/counselor/__pycache__/watcher.cpython-36.pyc
Binary file not shown.
4 changes: 2 additions & 2 deletions src/counselor/client.py
@@ -1,5 +1,5 @@
from .endpoint.agent import Agent
from .endpoint.http import Request
from .endpoint.http_client import HttpRequest
from .endpoint.keyvalue import KV


Expand All @@ -14,7 +14,7 @@ def __init__(self,
datacenter=None,
token=None,
scheme='http',
transport=Request()):
transport=HttpRequest()):
self.host = host
self.port = port
self.version = version
Expand Down
47 changes: 47 additions & 0 deletions src/counselor/config.py
@@ -0,0 +1,47 @@
from .endpoint.common import Response


class KVConfigPath:
def __init__(self, project: str, feature: str, service: str, detail: str = "config", env: str = "dev"):
self.project = project
self.feature = feature
self.service = service
self.detail = detail
self.env = env

def compose_path(self) -> str:
return "{}/{}/{}/{}/{}".format(self.project, self.env, self.feature, self.service, self.detail)


class ReconfigurableService:
"""Interface for a service to be notified by the watcher.
Extend this class to encapsulate all the logic to reconfigure your service.
"""

def __init__(self, service_key: str, config_path: KVConfigPath, current_config: dict):
if service_key == "":
raise ValueError("Please define a unique service key")
if config_path is None:
raise ValueError("Please define a key value path for the service config")

# TODO: url encode the values to be safe because it will be part of the request url

self.service_key = service_key
self.config_path = config_path
self.current_config = current_config

def notify_failed_service_check(self, response: Response):
"""If the service fails to fetch the ServiceDefinition from Consul, this method is called.
"""
pass

def configure(self, config=dict) -> bool:
"""This method will be called when the config is fetched from Consul outside of a watcher,
like for the initial configuration.
"""
pass

def reconfigure(self, new_config=dict) -> bool:
"""If the Watcher notices a change, this method is called to give the new configuration.
"""
pass
86 changes: 43 additions & 43 deletions src/counselor/discovery.py
@@ -1,12 +1,15 @@
import logging
from datetime import timedelta
from threading import Event
from typing import List

from .client import Consul, ConsulConfig
from .endpoint.encoding import StatusResponse
from .config import ReconfigurableService
from .endpoint.common import Response
from .endpoint.service import ServiceDefinition
from .filter import Filter, KeyValuePair, Operators
from .watcher import ServiceWatcher, ReconfigurableService
from .trigger import Trigger
from .watcher import KVConfigWatcherTask

LOGGER = logging.getLogger(__name__)

Expand All @@ -19,35 +22,32 @@ class ServiceDiscovery:
"""

def __init__(self, consul_config=ConsulConfig()):
self.consul = Consul(consul_config)
self.service_definition = None
self.config_watch = None
self._consul = Consul(consul_config)
self._service_definition = None
self._trigger = None
# TODO: set the stop event from arguments
self._stop_event = Event()

@staticmethod
def new_service_discovery_with_default_consul_client():
return ServiceDiscovery(ConsulConfig())

def load_service(self, key: str) -> StatusResponse:
"""Activats an existing ServiceDefinition from Consul.
def fetch_config(self, service: ReconfigurableService) -> bool:
"""Fetch the config from consul to initially configure your service
"""
response, found_config = self._consul.kv.get_raw(service.config_path.compose_path())
if not response.successful or found_config is None:
return False

status_response, service_definition = self.consul.agent.service.get_details(key)
if status_response.successful:
self.service_definition = service_definition
service.reconfigure(found_config)

return status_response

def get_active_service_key(self) -> str:
"""Return the key of the active ServiceDefinition.
def store_config(self, service: ReconfigurableService) -> Response:
"""Store the config in Consul
"""

if self.service_definition is None:
return ""
else:
return self.service_definition.key
return self._consul.kv.set(service.config_path.compose_path(), service.current_config)

def search_for_services(self, tags: List[str] = None, meta: List[KeyValuePair] = None) -> (
StatusResponse, List[ServiceDefinition]):
Response, List[ServiceDefinition]):
"""Search for active ServiceDefinitions.
"""

Expand All @@ -69,60 +69,60 @@ def search_for_services(self, tags: List[str] = None, meta: List[KeyValuePair] =
query_tuple = ('filter', filter_expression)
filter_tuples.append(query_tuple)

return self.consul.agent.services(filter_tuples)
return self._consul.agent.services(filter_tuples)

def register_service(self, service_key: str, tags=None, meta=None) -> StatusResponse:
def register_service(self, service_key: str, tags=None, meta=None) -> Response:
"""Register a ServiceDefinition in Consul.
"""

self.service_definition = ServiceDefinition.new_simple_service_definition(service_key, tags, meta)
self._service_definition = ServiceDefinition.new_simple_service_definition(service_key, tags, meta)
return self.update_service_definition()

def update_service_definition(self) -> StatusResponse:
"""Update the ServiceDefinition in Consul. For example if you want to update the configuration.
def update_service_definition(self) -> Response:
"""Update the ServiceDefinition in Consul.
"""

LOGGER.info("Updating definition {}".format(self.service_definition.key))
return self.consul.agent.service.register(self.service_definition)
LOGGER.info("Updating definition {}".format(self._service_definition.key))
return self._consul.agent.service.register(self._service_definition)

def deregister_service(self) -> StatusResponse:
def deregister_service(self) -> Response:
"""Delete the ServiceDefinition in Consul. Also stops the watcher if still active.
"""

LOGGER.info("Deregistering service {}".format(self.service_definition.key))
LOGGER.info("Deregistering service {}".format(self._service_definition.key))

try:
if self.config_watch is not None and self.config_watch.running:
if self._trigger is not None and self._trigger.running:
LOGGER.info("Stopping config watch first")
self.stop_config_watch()

return self.consul.agent.service.deregister(self.service_definition.key)
return self._consul.agent.service.deregister(self._service_definition.key)
except Exception as exc:
return StatusResponse.new_error_result_with_message_only("{}".format(exc))
return Response.create_error_result_with_message_only("{}".format(exc))

def start_config_watch(self, service: ReconfigurableService, check_interval: timedelta) -> StatusResponse:
def start_config_watch(self, service: ReconfigurableService, check_interval: timedelta) -> Response:
"""Create a watcher that periodically checks for config changes.
"""

LOGGER.info("Starting config watch for {}".format(self.service_definition.key))
if self.service_definition is None:
return StatusResponse.new_error_result_with_message_only("No service registered")
LOGGER.info("Starting config watch for {}".format(service.service_key))

try:
self.config_watch = ServiceWatcher.new_service_watcher_from_task_details(service, self.service_definition,
self.consul, check_interval)
self.config_watch.start_nonblocking()
watcher_task = KVConfigWatcherTask(service, self._consul, check_interval, self._stop_event)
self._trigger = Trigger()
self._trigger.add_task(watcher_task)
self._trigger.run_nonblocking()
except Exception as exc:
return StatusResponse.new_error_result_with_message_only("{}".format(exc))
return Response.create_error_result_with_message_only("{}".format(exc))

return StatusResponse.new_successful_result()
return Response.create_successful_result()

def stop_config_watch(self):
"""Stop the watcher.
"""

LOGGER.info("Stopping config watch for {}".format(self.service_definition.key))
LOGGER.info("Stopping config watch for {}".format(self._service_definition.key))
try:
self.config_watch.stop()
self._stop_event.set()
self._trigger.stop_tasks()
except Exception as exc:
LOGGER.info("Error when stopping watcher: {}".format(exc))
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 6da7104

Please sign in to comment.