Skip to content

Commit

Permalink
feat(milvus): add Milvus instrumentation (#1068)
Browse files Browse the repository at this point in the history
Co-authored-by: Jose Cespedes <josecespedes@ibm.com>
Co-authored-by: Nir Gazit <nirga@users.noreply.github.com>
  • Loading branch information
3 people committed May 20, 2024
1 parent 3427f38 commit f6ea8ce
Show file tree
Hide file tree
Showing 17 changed files with 1,552 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .cz.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ version_files = [
"packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/version.py",
"packages/opentelemetry-instrumentation-langchain/pyproject.toml:^version",
"packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/version.py",
"packages/opentelemetry-instrumentation-milvus/pyproject.toml:^version",
"packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/langchain/version.py",
"packages/opentelemetry-instrumentation-openai/pyproject.toml:^version",
"packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/version.py",
"packages/opentelemetry-instrumentation-pinecone/pyproject.toml:^version",
Expand Down
11 changes: 11 additions & 0 deletions packages/opentelemetry-instrumentation-milvus/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[flake8]
exclude =
.git,
__pycache__,
build,
dist,
.tox,
venv,
.venv,
.pytest_cache
max-line-length = 120
1 change: 1 addition & 0 deletions packages/opentelemetry-instrumentation-milvus/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.db
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.9.5
21 changes: 21 additions & 0 deletions packages/opentelemetry-instrumentation-milvus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# OpenTelemetry Milvus Instrumentation

<a href="https://pypi.org/project/opentelemetry-instrumentation-milvus/">
<img src="https://badge.fury.io/py/opentelemetry-instrumentation-milvus.svg">
</a>

This library allows tracing client-side calls to Milvus vector DB sent with the official [Milvus library](https://github.com/milvus-io/milvus).

## Installation

```bash
pip install opentelemetry-instrumentation-milvus
```

## Example usage

```python
from opentelemetry.instrumentation.milvus import MilvusInstrumentor

MilvusInstrumentor().instrument()
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""OpenTelemetry Milvus DB instrumentation"""

import logging
import pymilvus

from typing import Collection

from opentelemetry.instrumentation.milvus.config import Config
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

from opentelemetry.instrumentation.milvus.wrapper import _wrap
from opentelemetry.instrumentation.milvus.version import __version__

logger = logging.getLogger(__name__)

_instruments = ("pymilvus >= 2.4.1",)

WRAPPED_METHODS = [
{
"package": pymilvus,
"object": "MilvusClient",
"method": "insert",
"span_name": "milvus.insert"
},
{
"package": pymilvus,
"object": "MilvusClient",
"method": "upsert",
"span_name": "milvus.upsert"
},
{
"package": pymilvus,
"object": "MilvusClient",
"method": "delete",
"span_name": "milvus.delete"
},
{
"package": pymilvus,
"object": "MilvusClient",
"method": "search",
"span_name": "milvus.search"
},
{
"package": pymilvus,
"object": "MilvusClient",
"method": "get",
"span_name": "milvus.get"
},
{
"package": pymilvus,
"object": "MilvusClient",
"method": "query",
"span_name": "milvus.query"
},
]


class MilvusInstrumentor(BaseInstrumentor):
"""An instrumentor for Milvus's client library."""

def __init__(self, exception_logger=None):
super().__init__()
Config.exception_logger = exception_logger

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
for wrapped_method in WRAPPED_METHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")
wrap_method = wrapped_method.get("method")
if getattr(wrap_package, wrap_object, None):
wrap_function_wrapper(
wrap_package,
f"{wrap_object}.{wrap_method}",
_wrap(tracer, wrapped_method),
)

def _uninstrument(self, **kwargs):
for wrapped_method in WRAPPED_METHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")

wrapped = getattr(wrap_package, wrap_object, None)
if wrapped:
unwrap(wrapped, wrapped_method.get("method"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class Config:
exception_logger = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging
from opentelemetry.instrumentation.milvus.config import Config


def dont_throw(func):
"""
A decorator that wraps the passed in function and logs exceptions instead of throwing them.
@param func: The function to wrap
@return: The wrapper function
"""
# Obtain a logger specific to the function's module
logger = logging.getLogger(func.__module__)

def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning("Failed to execute %s, error: %s", func.__name__, str(e))
if Config.exception_logger:
Config.exception_logger(e)

return wrapper
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.18.2"
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
from opentelemetry.instrumentation.milvus.utils import dont_throw
from opentelemetry.semconv.trace import SpanAttributes

from opentelemetry import context as context_api
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
)
from opentelemetry.semconv.ai import Events


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, to_wrap):
def wrapper(wrapped, instance, args, kwargs):
return func(tracer, to_wrap, wrapped, instance, args, kwargs)

return wrapper

return _with_tracer


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


@_with_tracer_wrapper
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
with tracer.start_as_current_span(name) as span:
span.set_attribute(SpanAttributes.DB_SYSTEM, "milvus")
span.set_attribute(SpanAttributes.DB_OPERATION, to_wrap.get("method"))

if to_wrap.get("method") == "insert":
_set_insert_attributes(span, kwargs)
elif to_wrap.get("method") == "upsert":
_set_upsert_attributes(span, kwargs)
elif to_wrap.get("method") == "delete":
_set_delete_attributes(span, kwargs)
elif to_wrap.get("method") == "search":
_set_search_attributes(span, kwargs)
elif to_wrap.get("method") == "get":
_set_get_attributes(span, kwargs)
elif to_wrap.get("method") == "query":
_set_query_attributes(span, kwargs)

return_value = wrapped(*args, **kwargs)
if to_wrap.get("method") == "query":
_add_query_result_events(span, return_value)

return return_value


def _encode_filter(_filter):
_filter_str = None
if _filter:
_filter_str = str(_filter)

return _filter_str


def _encode_partition_name(partition_name):
partition_name_str = None
if partition_name:
partition_name_str = str(partition_name)

return partition_name_str


def _encode_include(include):
include_str = None
if include:
include_str = str(include)

return include_str


def count_or_none(obj):
if obj:
return len(obj)

return None


@dont_throw
def _set_insert_attributes(span, kwargs):
_set_span_attribute(
span, "db.milvus.insert.collection_name", kwargs.get("collection_name")
)
_set_span_attribute(
span, "db.milvus.insert.data_count", count_or_none(kwargs.get("data"))
)
_set_span_attribute(
span, "db.milvus.insert.timeout", kwargs.get("timeout")
)
_set_span_attribute(
span, "db.milvus.insert.partition_name", _encode_partition_name(kwargs.get("partition_name"))
)


@dont_throw
def _set_get_attributes(span, kwargs):
_set_span_attribute(
span, "db.milvus.get.collection_name", kwargs.get("collection_name")
)
_set_span_attribute(
span, "db.milvus.query.ids_count", count_or_none(kwargs.get("ids"))
)
_set_span_attribute(
span, "db.milvus.search.output_fields_count", count_or_none(kwargs.get("output_fields"))
)
_set_span_attribute(
span, "db.milvus.insert.timeout", kwargs.get("timeout")
)
_set_span_attribute(
span, "db.milvus.get.partition_names_count", count_or_none(kwargs.get("partition_names"))
)


@dont_throw
def _set_search_attributes(span, kwargs):
_set_span_attribute(
span, "db.milvus.search.collection_name", kwargs.get("collection_name")
)
_set_span_attribute(
span, "db.milvus.search.data_count", count_or_none(kwargs.get("data"))
)
_set_span_attribute(
span, "db.milvus.search.filter", kwargs.get("filter")
)
_set_span_attribute(
span, "db.milvus.search.limit", kwargs.get("limit")
)
_set_span_attribute(
span, "db.milvus.search.output_fields_count", count_or_none(kwargs.get("output_fields"))
)
_set_span_attribute(
span, "db.milvus.search.search_params", kwargs.get("search_params")
)
_set_span_attribute(
span, "db.milvus.search.timeout", kwargs.get("timeout")
)
_set_span_attribute(
span, "db.milvus.search.partition_names_count", kwargs.get("partition_name")
)
_set_span_attribute(
span, "db.milvus.search.anns_field", kwargs.get("anns_field")
)


@dont_throw
def _set_query_attributes(span, kwargs):
_set_span_attribute(
span, "db.milvus.query.collection_name", kwargs.get("collection_name")
)
_set_span_attribute(
span, "db.milvus.query.filter", _encode_filter(kwargs.get("filter"))
)
_set_span_attribute(
span, "db.milvus.query.output_fields_count", count_or_none(kwargs.get("output_fields"))
)
_set_span_attribute(
span, "db.milvus.query.timeout", kwargs.get("timeout")
)
_set_span_attribute(
span, "db.milvus.query.ids_count", count_or_none(kwargs.get("ids"))
)
_set_span_attribute(
span, "db.milvus.query.partition_names_count", count_or_none(kwargs.get("partition_names"))
)
_set_span_attribute(
span, "db.milvus.query.limit", kwargs.get("limit")
)


@dont_throw
def _add_query_result_events(span, kwargs):
for element in kwargs:
span.add_event(name=Events.DB_QUERY_RESULT.value, attributes=element)


@dont_throw
def _set_upsert_attributes(span, kwargs):
_set_span_attribute(
span, "db.milvus.upsert.collection_name", kwargs.get("collection_name")
)
_set_span_attribute(
span, "db.milvus.upsert.data_count", count_or_none(kwargs.get("data"))
)
_set_span_attribute(
span, "db.milvus.upsert.timeout_count", count_or_none(kwargs.get("timeout"))
)
_set_span_attribute(
span, "db.milvus.upsert.partition_name", _encode_partition_name(kwargs.get("partition_name"))
)


@dont_throw
def _set_delete_attributes(span, kwargs):
_set_span_attribute(
span, "db.milvus.delete.collection_name", kwargs.get("collection_name")
)
_set_span_attribute(
span, "db.milvus.delete.timeout_count", count_or_none(kwargs.get("timeout"))
)
_set_span_attribute(
span, "db.milvus.delete.partition_name", _encode_partition_name(kwargs.get("partition_name"))
)
_set_span_attribute(
span, "db.milvus.delete.ids_count", count_or_none(kwargs.get("ids"))
)
_set_span_attribute(
span, "db.milvus.delete.filter", _encode_filter(kwargs.get("filter"))
)
Loading

0 comments on commit f6ea8ce

Please sign in to comment.