Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument redis.asyncio clients #1076

Merged
merged 4 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrument` and `opentelemetry-bootstrap` now include a `--version` flag
([#1065](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1065))
- `opentelemetry-instrumentation-redis` now instruments asynchronous Redis clients, if the installed redis-py includes async support (>=4.2.0).
([#1076](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1076))

## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@
client = redis.StrictRedis(host="localhost", port=6379)
client.get("my-key")

Async Redis clients (i.e. redis.asyncio.Redis) are also instrumented in the same way:

.. code:: python

from opentelemetry.instrumentation.redis import RedisInstrumentor
import redis.asyncio


# Instrument redis
RedisInstrumentor().instrument()

# This will report a span with the default settings
async def redis_get():
client = redis.asyncio.Redis(host="localhost", port=6379)
await client.get("my-key")

The `instrument` method accepts the following keyword args:

tracer_provider (TracerProvider) - an optional tracer provider
Expand Down Expand Up @@ -102,6 +118,10 @@ def response_hook(span, instance, response):
typing.Callable[[Span, redis.connection.Connection, Any], None]
]

sd2k marked this conversation as resolved.
Show resolved Hide resolved
_REDIS_ASYNCIO_VERSION = (4, 2, 0)
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
import redis.asyncio


def _set_connection_attributes(span, conn):
if not span.is_recording():
Expand Down Expand Up @@ -176,6 +196,22 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
wrap_function_wrapper(
"redis.asyncio",
f"{redis_class}.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.client",
f"{pipeline_class}.execute",
_traced_execute_pipeline,
)
wrap_function_wrapper(
"redis.asyncio.client",
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)


class RedisInstrumentor(BaseInstrumentor):
Expand Down Expand Up @@ -222,3 +258,8 @@ def _uninstrument(self, **kwargs):
unwrap(redis.Redis, "pipeline")
unwrap(redis.client.Pipeline, "execute")
unwrap(redis.client.Pipeline, "immediate_execute_command")
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
unwrap(redis.asyncio.Redis, "execute_command")
unwrap(redis.asyncio.Redis, "pipeline")
unwrap(redis.asyncio.client.Pipeline, "execute")
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio

import redis
import redis.asyncio

from opentelemetry import trace
from opentelemetry.instrumentation.redis import RedisInstrumentor
Expand Down Expand Up @@ -121,6 +124,120 @@ def test_parent(self):
self.assertEqual(child_span.name, "GET")


def async_call(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)


class TestAsyncRedisInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.asyncio.Redis(port=6379)
async_call(self.redis_client.flushall())
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

def _check_span(self, span, name):
self.assertEqual(span.name, name)
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)
self.assertEqual(span.attributes.get(SpanAttributes.DB_NAME), 0)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], "localhost"
)
self.assertEqual(span.attributes[SpanAttributes.NET_PEER_PORT], 6379)

def test_long_command(self):
async_call(self.redis_client.mget(*range(1000)))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "MGET")
self.assertTrue(
span.attributes.get(SpanAttributes.DB_STATEMENT).startswith(
"MGET 0 1 2 3"
)
)
self.assertTrue(
span.attributes.get(SpanAttributes.DB_STATEMENT).endswith("...")
)

def test_basics(self):
self.assertIsNone(async_call(self.redis_client.get("cheese")))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "GET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
)
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)

def test_pipeline_traced(self):
async def pipeline_simple():
async with self.redis_client.pipeline(
transaction=False
) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
await pipeline.execute()

async_call(pipeline_simple())

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "SET RPUSH HGETALL")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)

def test_pipeline_immediate(self):
async def pipeline_immediate():
async with self.redis_client.pipeline() as pipeline:
pipeline.set("a", 1)
await pipeline.immediate_execute_command("SET", "b", 2)
await pipeline.execute()

async_call(pipeline_immediate())

spans = self.memory_exporter.get_finished_spans()
# expecting two separate spans here, rather than a
# single span for the whole pipeline
self.assertEqual(len(spans), 2)
span = spans[0]
self._check_span(span, "SET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "SET b 2"
)

def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")

with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(async_call(self.redis_client.get("cheese")))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]

# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_span_context())

self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")

self.assertEqual(child_span.name, "GET")


class TestRedisDBIndexInstrument(TestBase):
def setUp(self):
super().setUp()
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ deps =
psycopg2 ~= 2.8.4
aiopg >= 0.13.0, < 1.3.0
sqlalchemy ~= 1.4
redis ~= 3.5
redis ~= 4.2
celery[pytest] >= 4.0, < 6.0
protobuf>=3.13.0
requests==2.25.0
Expand Down