Skip to content
This repository has been archived by the owner on May 6, 2023. It is now read-only.

Commit

Permalink
extended ServiceDiscovery to provide more flexibility
Browse files Browse the repository at this point in the history
  • Loading branch information
joergeschmann committed Nov 25, 2019
1 parent 6da7104 commit f1d9c89
Show file tree
Hide file tree
Showing 20 changed files with 646 additions and 281 deletions.
245 changes: 178 additions & 67 deletions README.md
Expand Up @@ -3,7 +3,11 @@ This lib provides functionality to interact with Consul from HashiCorp.

It is still in work and you should not use it in production.

The main use case for this lib is, you have a service that you want to register in Consul and automatically reconfigure when the configuration for that service changed. Instead of having a local Consul agent running that executes a shell script or calls an http endpoint, Counselor uses an interface your service can implement, to notify it of changes to the service. The configuration of the service is stored in the meta field in Consul. To check for updates, a trigger periodically fetches the service definition and check it for changes.
The main use case for this lib is, you have a service that you want to register in Consul and automatically
reconfigure when the configuration for that service changed. Instead of having a local Consul agent running that
executes a shell script or calls an http endpoint, Counselor uses an interface your service can implement,
to notify it of changes to the service. The configuration of the service is stored in the KV store of Consul.
To check for updates, a trigger periodically fetches the service definition and check it for changes.

## Setup
You can use the Makefile to install the lib locally
Expand All @@ -23,105 +27,212 @@ python -m pip install counselor
```

## Usage
Here are some examples executed in the python console to show you how to use the library.

### KV Store
```python
import logging

from counselor.config import KVConfigPath
from counselor.discovery import ServiceDiscovery
from counselor.endpoint.http_endpoint import EndpointConfig

logging.basicConfig(level=logging.DEBUG)

# Create a ServiceDiscovery instance to interact with consul
consul_config = EndpointConfig(host="127.0.0.1", port=8500, version="v1")
service_discovery = ServiceDiscovery.new_service_discovery_with_consul_config(consul_config)

# Create a key value config path
kv_config_path = KVConfigPath("test-project", "test-domain", "test-service", "test-config", "test-env")
config_path = kv_config_path.compose_path()

# Check whether there is already a config stored in that config path.
# You get two objects back, one for the response, that lets you know whether the request was successul of not.
# The other is the config itself. If the response is successful, the config instance is filled.
response, found_config = service_discovery.fetch_config_by_path(kv_config_path.compose_path())
response.as_string()

# Create a config for your service
test_service_config = {
"foo": "bar",
"number": 3.1415,
"active": True,
"list": ["one", "two", "three"],
"map": {"a": 1, "b": 2, "c": 3}
}

# Store the config in the Consul KV store
response = service_discovery.store_config(config_path, test_service_config)
response.as_string()

# Now you should find the config
response, found_config = service_discovery.fetch_config_by_path(config_path)
response.as_string()
found_config

# To update the config, change the config and send it to Consul. Keep in mind that the
# config will be overwritten. That means any field that is not in the config anymore will be deleted in the KV store.
test_service_config["active"] = False
response = service_discovery.update_config(config_path, test_service_config)
response.as_string()
```

### Service registry
```python
from counselor.discovery import ServiceDiscovery
from counselor.endpoint.http_endpoint import EndpointConfig
from counselor.filter import KeyValuePair


# Create a ServiceDiscovery instance to interact with consul
consul_config = EndpointConfig(host="127.0.0.1", port=8500, version="v1")
service_discovery = ServiceDiscovery.new_service_discovery_with_consul_config(consul_config)

# To register a service you need at least a unique key. This key is used to identify your service. Consul has only
# this level of identification. So if you track multiple instance of the same service, you might add a number to
# differentiate between the instances.
service_key = "test-service"

# You can group your service with tags. For example, you could tag all your db services with the tag "db".
# A dash in the tag name can cause errors. You should use an underscore _ instead.
service_tags = ["test"]

# The meta field allows you to define arbitrary characteristics of your service. In this example we have the version,
# the status and the base_time stored. The only limitation is that all keys and values have to be strings.
service_meta = {
"version": "1.0",
"status": "active",
"base_time": "1573639530",
}

# Register the service
response = service_discovery.register_service(service_key=service_key, tags=service_tags, meta=service_meta)
response.as_string()

# Fetch the service definition.
response, found_service = service_discovery.get_service_details(service_key)
response.as_string()
found_service.as_json()

# To update the service modify the tag or meta field and send it to Consul.
service_tags = service_tags + ["additional_tag"]
service_meta = {
"status": "inactive"
}
response = service_discovery.update_service(service_key=service_key, tags=service_tags, meta=service_meta)

# You are able to use the tags and meta map to search and filter the services.
response, found_services = service_discovery.search_for_services(tags=["additional_tag"], meta=[KeyValuePair('status', 'inactive')])
response.as_string()
found_services[0].as_json()

# At the end you can deregister your service by key
response = service_discovery.deregister_service(service_key)
response.as_string()
```

### Watch for config changes
```python
import logging
from datetime import timedelta
import uuid
from threading import Event

from counselor.client import ConsulConfig
from counselor.config import KVConfigPath
from counselor.discovery import ServiceDiscovery
from counselor.endpoint.common import Response
from counselor.filter import KeyValuePair
from counselor.endpoint.http_endpoint import EndpointConfig
from counselor.watcher import ReconfigurableService

logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)

# Define a service that get automatically reconfigured
# -> this is where you can encapsulate all the functionality to reconfigure/reload your service
# Create a ServiceDiscovery instance to interact with consul
consul_config = EndpointConfig(host="127.0.0.1", port=8500, version="v1")
service_discovery = ServiceDiscovery.new_service_discovery_with_consul_config(consul_config)

# To check for config updates in Consul, there is a Trigger that periodically fetches the config from Consul.
# It then compares the received config with the last know version. If there is a difference, it will notify you.
# We have an interface for that, called ReconfigurableService. You have to extend that class to provide the
# necessary functionality. In the followed example, the TestService simply logs the events.
#
# notify_failed_service_check() is called when Consul is not reachable or does not return the config.
# configure() is called the first time it fetches the config
# reconfigure() is called whenever the modification_index is increased and an update available
class TestService(ReconfigurableService):
def __init__(self, service_key: str, kv_config_path: KVConfigPath, current_config: dict = None):
super().__init__(service_key, kv_config_path, current_config)
def __init__(self, service_key: str, config_path: KVConfigPath, current_config: dict = None):
self.service_key = service_key
self.config_path = config_path
self.last_config = current_config
self.failed_service_check = False
self.updated = False

def get_service_key(self) -> str:
return self.service_key

def get_current_config(self) -> dict:
return self.last_config

def get_config_path(self) -> str:
return self.config_path.compose_path()

def notify_failed_service_check(self, response: Response):
LOGGER.info("Failed service check: {}".format(response.as_string()))
self.failed_service_check = True

def configure(self, config=dict) -> bool:
LOGGER.info("Configuring service")
self.current_config = config
LOGGER.info("Configured: {}".format(config))
self.last_config = config
self.failed_service_check = False
self.updated = True
return True

def reconfigure(self, new_config=dict) -> bool:
LOGGER.info("New configuration received: {}".format(new_config))
self.configure(new_config)
self.updated = True
return True

# Create a ServiceDiscovery instance to interact with consul
consul_config = ConsulConfig(host="127.0.0.1", port=8500)
service_discovery = ServiceDiscovery(consul_config)

# Define the path in the KV store in Consul -> v1/kv/test/env/feature/service/config
kv_config_path = KVConfigPath("test", "feature", "service", "config", "env")

# Your current service config, that you want to store in Consul
current_config = {
"foo": "bar",
"number": 3.1415,
"active": True,
"list": ["one", "two", "three"],
"map": {"a": 1, "b": 2, "c": 3}
}

# Create an instance of your service facade, that lets the watcher notify your service of changes
test_service = TestService(uuid.uuid4().hex, kv_config_path, current_config)

# Check whether there is already a config stored in Consul
service_discovery.fetch_config(test_service)

# If not store it -> you should see it in the UI: http://localhost:8500/ui/dc1/kv/test/env/feature/service/config/edit
config_store_response = service_discovery.store_config(test_service)
config_store_response.as_string()

# Register a service in Consul -> you should see the registered service
register_response = service_discovery.register_service(service_key=test_service.service_key, tags=["test"], meta={
"version": "1.0",
"status": "active",
"base_time": "1573639530",
})
register_response.as_string()

# Start the watcher with an interval of 3 seconds -> you should see log messages that the watcher is active
interval = timedelta(seconds=3)
watch_response = service_discovery.start_config_watch(test_service, interval)
watch_response.as_string()

# Change the configuration, you can also do it via Consul ui -> you should see a log message that your service received a new config
test_service.current_config["reconfigure_action"] = "restart"
update_response = service_discovery.store_config(test_service)
update_response.as_string()

# Your service instance should have set the updated flag by now
test_service = TestService(service_key="test-service",
config_path=KVConfigPath("test", "feature", "service", "config", "env"),
current_config={
"foo": "bar",
"number": 3.1415,
"active": True,
"list": ["one", "two", "three"],
"map": {"a": 1, "b": 2, "c": 3}
})

# The service definition and the config in the KV store are seperate. You can store a config and watch for updates,
# without having the service registered. The method register_service_and_store_config will do both in one call.
service_tags = ["test"]
service_meta = None
response = service_discovery.register_service_and_store_config(reconfigurable_service=test_service, tags=service_tags,
meta=service_meta)
response.as_string()

# You can add one or multiple config watches and start the trigger.
# With the stop you have the ability to stop the watcher by setting the event. This is helpful if you have other
# resources and you want to have a graceful shut down.
stop_event = Event()
service_discovery.add_config_watch(service=test_service, check_interval=timedelta(seconds=3), stop_event=stop_event)
response = service_discovery.start_config_watch()

# Once the watcher is started, you should see log messages that Consul is checked for updates.
# You can now either change the service either via Consul UI or with the service_discovery instance.
test_service.last_config["reconfigure_action"] = "reload"
response = service_discovery.update_config_by_service(test_service)

# You should then see that a new config is recieved and the update flag is set.
test_service.updated

# Stop the watcher
# To stop the watcher you can either set the event,
stop_event.set()
# stop the trigger directly,
service_discovery.stop_config_watch()

# Deregister the service
deregister_response = service_discovery.deregister_service()
deregister_response.as_string()

# You should not have any registered services left
search_response, found_services = service_discovery.search_for_services(tags=["test"],
meta=[KeyValuePair('status', 'active')])
search_response.as_string()
found_services
```
# or clear the watchers
service_discovery.clear_watchers()
```

For other examples, please have a look at the test folder.

2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -4,7 +4,7 @@
long_description = fh.read()

setup(name='counselor',
version='0.2.0',
version='0.2.1',
description='Package to interact with HashiCorp Consul',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
46 changes: 8 additions & 38 deletions src/counselor/client.py
@@ -1,55 +1,25 @@
from .endpoint.agent import Agent
from .endpoint.http_client import HttpRequest
from .endpoint.keyvalue import KV


class ConsulConfig:
"""Config to connect to Consul.
"""

def __init__(self,
host="127.0.0.1",
port=8500,
version='v1',
datacenter=None,
token=None,
scheme='http',
transport=HttpRequest()):
self.host = host
self.port = port
self.version = version
self.datacenter = datacenter
self.token = token
self.scheme = scheme
self.transport = transport

def compose_base_uri(self) -> str:
"""Return the base URI for API requests.
"""

if self.port:
return '{0}://{1}:{2}/{3}'.format(self.scheme, self.host, self.port, self.version)
return '{0}://{1}/{2}'.format(self.scheme, self.host, self.version)
from .endpoint.agent_endpoint import AgentEndpoint
from .endpoint.http_endpoint import EndpointConfig
from .endpoint.kv_endpoint import KVEndpoint


class Consul(object):
"""Client to use the API.
"""

def __init__(self, config=ConsulConfig()):
def __init__(self, config=EndpointConfig()):
self.config = config
base_uri = config.compose_base_uri()
self._agent = Agent(base_uri, config.transport, config.datacenter, config.token)
self._kv = KV(base_uri, config.transport, config.datacenter, config.token)
self._agent = AgentEndpoint(endpoint_config=config, url_parts=["agent"])
self._kv = KVEndpoint(endpoint_config=config, url_parts=["kv"])

@property
def agent(self) -> Agent:
def agent(self) -> AgentEndpoint:
"""Get the agent service instance.
"""
return self._agent

@property
def kv(self) -> KV:
def kv(self) -> KVEndpoint:
"""Get the key value service instance.
"""
return self._kv
20 changes: 11 additions & 9 deletions src/counselor/config.py
Expand Up @@ -18,17 +18,19 @@ class ReconfigurableService:
Extend this class to encapsulate all the logic to reconfigure your service.
"""

def __init__(self, service_key: str, config_path: KVConfigPath, current_config: dict):
if service_key == "":
raise ValueError("Please define a unique service key")
if config_path is None:
raise ValueError("Please define a key value path for the service config")
def get_service_key(self) -> str:
"""Return a unique service key to identify your service
"""
pass

# TODO: url encode the values to be safe because it will be part of the request url
def get_config_path(self) -> str:
"""Return the config path where the config is stored, for example via KVConfigPath.compose_path()
"""
pass

self.service_key = service_key
self.config_path = config_path
self.current_config = current_config
def get_current_config(self) -> dict:
"""Return the current config as a dictionary"""
pass

def notify_failed_service_check(self, response: Response):
"""If the service fails to fetch the ServiceDefinition from Consul, this method is called.
Expand Down

0 comments on commit f1d9c89

Please sign in to comment.