Skip to content

Commit

Permalink
Keep client interceptors in sync with grpc client interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
Mihir Gore committed May 5, 2021
1 parent 01db88b commit 7a3cb42
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 19 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
- `opentelemetry-instrumenation-flask` now supports trace response headers.
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))

- `opentelemetry-instrumentation-grpc` Keep client interceptor in sync with grpc client interceptors.
([#442](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/442))

### Removed
- Remove `http.status_text` from span attributes
([#406](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/406))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
# limitations under the License.

# pylint:disable=relative-beyond-top-level
# pylint:disable=arguments-differ
# pylint:disable=no-member
# pylint:disable=signature-differs

"""Implementation of gRPC Python interceptors."""

Expand All @@ -41,41 +39,67 @@ class _StreamClientInfo(
):
pass

def _get_metadata_timeout(**kwargs):
metadata = kwargs.get("metadata")
timeout = kwargs.get("timeout")
return metadata, timeout


class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(self, method, base_callable, interceptor):
self._method = method
self._base_callable = base_callable
self._interceptor = interceptor

def __call__(self, request, timeout=None, metadata=None, credentials=None):
def __call__(
self,
request,
**kwargs,
):
def invoker(request, metadata):
return self._base_callable(request, timeout, metadata, credentials)
kwargs["metadata"] = metadata
return self._base_callable(
request,
**kwargs,
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
)

def with_call(
self, request, timeout=None, metadata=None, credentials=None
self,
request,
**kwargs,
):
def invoker(request, metadata):
kwargs["metadata"] = metadata
return self._base_callable.with_call(
request, timeout, metadata, credentials
request,
**kwargs,
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
)

def future(self, request, timeout=None, metadata=None, credentials=None):
def future(
self,
request,
**kwargs,
):
def invoker(request, metadata):
kwargs["metadata"] = metadata
return self._base_callable.future(
request, timeout, metadata, credentials
request,
**kwargs,
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
Expand All @@ -88,10 +112,19 @@ def __init__(self, method, base_callable, interceptor):
self._base_callable = base_callable
self._interceptor = interceptor

def __call__(self, request, timeout=None, metadata=None, credentials=None):
def __call__(
self,
request,
**kwargs,
):
def invoker(request, metadata):
return self._base_callable(request, timeout, metadata, credentials)
kwargs["metadata"] = metadata
return self._base_callable(
request,
**kwargs,
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _StreamClientInfo(self._method, False, True, timeout)
return self._interceptor.intercept_stream(
request, metadata, client_info, invoker
Expand All @@ -105,39 +138,54 @@ def __init__(self, method, base_callable, interceptor):
self._interceptor = interceptor

def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
**kwargs,
):
def invoker(request_iterator, metadata):
kwargs["metadata"] = metadata
return self._base_callable(
request_iterator, timeout, metadata, credentials
request_iterator,
**kwargs,
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _StreamClientInfo(self._method, True, False, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
)

def with_call(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
**kwargs,
):
def invoker(request_iterator, metadata):
kwargs["metadata"] = metadata
return self._base_callable.with_call(
request_iterator, timeout, metadata, credentials
request_iterator,
**kwargs
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _StreamClientInfo(self._method, True, False, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
)

def future(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
**kwargs
):
def invoker(request_iterator, metadata):
kwargs["metadata"] = metadata
return self._base_callable.future(
request_iterator, timeout, metadata, credentials
request_iterator,
**kwargs,
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _StreamClientInfo(self._method, True, False, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
Expand All @@ -151,13 +199,18 @@ def __init__(self, method, base_callable, interceptor):
self._interceptor = interceptor

def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
**kwargs,
):
def invoker(request_iterator, metadata):
kwargs["metadata"] = metadata
return self._base_callable(
request_iterator, timeout, metadata, credentials
request_iterator,
**kwargs,
)

metadata, timeout = _get_metadata_timeout(**kwargs)
client_info = _StreamClientInfo(self._method, True, True, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,57 @@
from ._server import create_test_server
from .protobuf.test_server_pb2 import Request

# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor.
class Interceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
def __init__(self):
pass

def intercept_unary_unary(
self, continuation, client_call_details, request
):
return self._intercept_call(continuation, client_call_details, request)

def intercept_unary_stream(
self, continuation, client_call_details, request
):
return self._intercept_call(continuation, client_call_details, request)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(
continuation, client_call_details, request_iterator
)

def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(
continuation, client_call_details, request_iterator
)

@staticmethod
def _intercept_call(
continuation, client_call_details, request_or_iterator
):
return continuation(client_call_details, request_or_iterator)


class TestClientProto(TestBase):
def setUp(self):
super().setUp()
GrpcInstrumentorClient().instrument()
self.server = create_test_server(25565)
self.server.start()
# use a user defined interceptor along with the opentelemetry client interceptor
interceptors = [Interceptor()]
self.channel = grpc.insecure_channel("localhost:25565")
self.channel = grpc.intercept_channel(self.channel, *interceptors)
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)

def tearDown(self):
Expand Down

0 comments on commit 7a3cb42

Please sign in to comment.