Skip to content

Commit

Permalink
Add rediscluster support (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
FranGM committed May 19, 2021
1 parent 4381dec commit fa3487b
Show file tree
Hide file tree
Showing 10 changed files with 869 additions and 1 deletion.
450 changes: 450 additions & 0 deletions baseplate/clients/redis_cluster.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
- "memcached"
- "redis"
- "zookeeper"

- "redis-cluster-node"
cassandra:
image: "cassandra:3.11"
environment:
Expand All @@ -25,3 +25,5 @@ services:
image: "redis:4.0.9"
zookeeper:
image: "zookeeper:3.4.10"
redis-cluster-node:
image: docker.io/grokzen/redis-cluster:6.2.0
1 change: 1 addition & 0 deletions docs/api/baseplate/clients/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Instrumented Client Libraries
baseplate.clients.kombu: Client for publishing to queues <kombu>
baseplate.clients.memcache: Memcached Client <memcache>
baseplate.clients.redis: Redis Client <redis>
baseplate.clients.redis_cluster: Redis Cluster Client <redis_cluster>
baseplate.clients.requests: Requests (HTTP) Client <requests>
baseplate.clients.sqlalchemy: SQL Client for relational databases (e.g. PostgreSQL) <sqlalchemy>
baseplate.clients.thrift: Thrift client for RPC to other backend services <thrift>
Expand Down
145 changes: 145 additions & 0 deletions docs/api/baseplate/clients/redis_cluster.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
``baseplate.clients.redis_cluster``
===================================

`Redis`_ is an in-memory data structure store used where speed is necessary but
complexity is beyond simple key-value operations. (If you're just doing
caching, prefer :doc:`memcached <memcache>`). `Redis-py-cluster`_ is a Python
client library that supports interacting with Redis when operating in cluster mode.

.. _`Redis`: https://redis.io/
.. _`redis-py-cluster`: https://github.com/Grokzen/redis-py

.. automodule:: baseplate.clients.redis_cluster

.. versionadded:: 2.1

Example
-------

To integrate redis-py-cluster with your application, add the appropriate client
declaration to your context configuration::

baseplate.configure_context(
app_config,
{
...
"foo": ClusterRedisClient(),
...
}
)

configure it in your application's configuration file:

.. code-block:: ini
[app:main]
...
# required: what redis instance to connect to
foo.url = redis://localhost:6379/0
# optional: the maximum size of the connection pool
foo.max_connections = 99
# optional: how long to wait for a connection to establish
foo.timeout = 3 seconds
# optional: Whether read requests should be directed to replicas as well
# instead of just the primary
foo.read_from_replicas = true
...
and then use the attached :py:class:`~redis.Redis`-like object in
request::

def my_method(request):
request.foo.ping()

Configuration
-------------

.. autoclass:: ClusterRedisClient

.. autofunction:: cluster_pool_from_config

Classes
-------

.. autoclass:: ClusterRedisContextFactory
:members:

.. autoclass:: MonitoredClusterRedisConnection
:members:

Runtime Metrics
---------------

In addition to request-level metrics reported through spans, this wrapper
reports connection pool statistics periodically via the :ref:`runtime-metrics`
system. All metrics are tagged with ``client``, the name given to
:py:meth:`~baseplate.Baseplate.configure_context` when registering this context
factory.

The following metrics are reported:

``runtime.pool.size``
The size limit for the connection pool.
``runtime.pool.in_use``
How many connections have been established and are currently checked out and
being used.


Hot Key Tracking
----------------

Optionally, the client can help track key usage across the Redis cluster to
help you identify if you have "hot" keys (keys that are read from or
written to much more frequently than other keys). This is particularly useful
in clusters with ``noeviction`` set as the eviction policy, since Redis
lacks a built-in mechanism to help you track hot keys in this case.

Since tracking every single key used is expensive, the tracker works by
tracking a small percentage or reads and/or writes, which can be configured
on your client:

.. code-block:: ini
[app:main]
...
# Note that by default the sample rate will be zero for both reads and writes
# optional: Sample keys for 1% of read operations
foo.track_key_reads_sample_rate = 0.01
# optional: Sample keys for 10% of write operations
foo.track_key_writes_sample_rate = 0.01
...
The keys tracked will be written to a sorted set in the Redis cluster itself,
which we can query at any time to see what keys are read from or written to
more often than others. Keys used for writes will be stored in
`baseplate-hot-key-tracker-writes` and keys used for reads will be stored in
`baseplate-hot-key-tracker-reads`. Here's an example of how you can query the
top 10 keys on each categories with their associated scores:

.. code-block:: console
> ZREVRANGEBYSCORE baseplate-hot-key-tracker-reads +inf 0 WITHSCORES LIMIT 0 10
> ZREVRANGEBYSCORE baseplate-hot-key-tracker-writes +inf 0 WITHSCORES LIMIT 0 10
Note that due to how the sampling works the scores are only meaningful in a
relative sense (by comparing one key's access frequency to others in the list)
but can't be used to make any inferences about key access rate or anything like
that.

Both tracker sets have a default TTL of 24 hours, so once they stop being
written to (for instance, if key tracking is disabled) they will clean up
after themselves in 24 hours, allowing us to start fresh the next time we
want to enable key tracking.
1 change: 1 addition & 0 deletions requirements-transitive.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pyramid==1.10.5
python-json-logger==2.0.1
reddit-cqlmapper==0.3.0
redis==3.5.3
redis-py-cluster==2.1.2
regex==2020.11.13
requests==2.25.1
sentry-sdk==0.20.1
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ sphinx-autodoc-typehints==1.11.1
sphinxcontrib-spelling==7.1.0
webtest==2.0.35
wheel==0.36.2
fakeredis==1.5.0
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ ignore_missing_imports = True
[mypy-pythonjsonlogger.*]
ignore_missing_imports = True

[mypy-rediscluster.*]
ignore_missing_imports = True

[mypy-sqlalchemy.*]
ignore_missing_imports = True

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"memcache": ["pymemcache>=1.3.0,<1.4.4"],
"pyramid": ["pyramid>=1.9.0,<2.0"],
"redis": ["redis>=2.10.0,<4.0.0"],
"redis-py-cluster": ["redis-py-cluster>=2.1.2,<3.0.0"],
"refcycle": ["objgraph>=3.0,<4.0"],
"requests": ["advocate>=1.0.0,<2.0"],
"sentry": ["sentry-sdk>=0.19,<1.0"],
Expand Down
158 changes: 158 additions & 0 deletions tests/integration/redis_cluster_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import unittest

try:
import rediscluster
except ImportError:
raise unittest.SkipTest("redis-py-cluster is not installed")

from baseplate.lib.config import ConfigurationError
from baseplate.clients.redis_cluster import cluster_pool_from_config

from baseplate.clients.redis_cluster import ClusterRedisClient
from baseplate import Baseplate
from . import TestBaseplateObserver, get_endpoint_or_skip_container

redis_endpoint = get_endpoint_or_skip_container("redis-cluster-node", 7000)


# This belongs on the unit tests section but the client class attempts to initialise
# the list of nodes when being instantiated so it's simpler to test here with a redis
# cluster available
class ClusterPoolFromConfigTests(unittest.TestCase):
def test_empty_config(self):
with self.assertRaises(ConfigurationError):
cluster_pool_from_config({})

def test_basic_url(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")

def test_timeouts(self):
pool = cluster_pool_from_config(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.timeout": "30 seconds",
}
)

self.assertEqual(pool.timeout, 30)

def test_max_connections(self):
pool = cluster_pool_from_config(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.max_connections": "300",
}
)

self.assertEqual(pool.max_connections, 300)

def test_max_connections_default(self):
# https://github.com/Grokzen/redis-py-cluster/issues/435
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

self.assertEqual(pool.max_connections, 50)

def test_kwargs_passthrough(self):
pool = cluster_pool_from_config(
{"rediscluster.url": f"redis://{redis_endpoint}/0"}, example="present"
)

self.assertEqual(pool.connection_kwargs["example"], "present")

def test_alternate_prefix(self):
pool = cluster_pool_from_config(
{"noodle.url": f"redis://{redis_endpoint}/0"}, prefix="noodle."
)
self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")

def test_only_primary_available(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})
node_list = [pool.get_node_by_slot(slot=1, read_command=False) for _ in range(0, 100)]

# The primary is on port 7000 so that's the only port we expect to see
self.assertTrue(all(node["port"] == 7000 for node in node_list))

def test_read_from_replicas(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

node_list = [pool.get_node_by_slot(slot=1, read_command=True) for _ in range(0, 100)]

# Both replicas and primary are available, so we expect to see some non-primaries here
self.assertTrue(any(node["port"] != 7000 for node in node_list))


class RedisClusterIntegrationTests(unittest.TestCase):
def setUp(self):
self.baseplate_observer = TestBaseplateObserver()

baseplate = Baseplate(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.timeout": "1 second",
"rediscluster.max_connections": "4",
}
)
baseplate.register(self.baseplate_observer)
baseplate.configure_context({"rediscluster": ClusterRedisClient()})

self.context = baseplate.make_context_object()
self.server_span = baseplate.make_server_span(self.context, "test")

def test_simple_command(self):
with self.server_span:
result = self.context.rediscluster.ping()

self.assertTrue(result)

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.PING")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)

def test_error(self):
with self.server_span:
with self.assertRaises(rediscluster.RedisClusterException):
self.context.rediscluster.execute_command("crazycommand")

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNotNone(span_observer.on_finish_exc_info)

def test_lock(self):
with self.server_span:
with self.context.rediscluster.lock("foo-lock"):
pass

server_span_observer = self.baseplate_observer.get_only_child()

self.assertGreater(len(server_span_observer.children), 0)
for span_observer in server_span_observer.children:
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)

def test_pipeline(self):
with self.server_span:
with self.context.rediscluster.pipeline("foo") as pipeline:
pipeline.set("foo", "bar")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.delete("foo")
pipeline.execute()

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.pipeline_foo")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)
Loading

0 comments on commit fa3487b

Please sign in to comment.