Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep client interceptors in sync with grpc client interceptors #442

Merged
merged 2 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
- `opentelemetry-instrumenation-flask` now supports trace response headers.
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))

- `opentelemetry-instrumentation-grpc` Keep client interceptor in sync with grpc client interceptors.
([#442](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/442))

### Removed
- Remove `http.status_text` from span attributes
([#406](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/406))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
# limitations under the License.

# pylint:disable=relative-beyond-top-level
# pylint:disable=arguments-differ
# pylint:disable=no-member
# pylint:disable=signature-differs

"""Implementation of gRPC Python interceptors."""

Expand Down Expand Up @@ -48,32 +46,71 @@ def __init__(self, method, base_callable, interceptor):
self._base_callable = base_callable
self._interceptor = interceptor

def __call__(self, request, timeout=None, metadata=None, credentials=None):
def __call__(
self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request, metadata):
return self._base_callable(request, timeout, metadata, credentials)
return self._base_callable(
request,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
)

def with_call(
self, request, timeout=None, metadata=None, credentials=None
self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request, metadata):
return self._base_callable.with_call(
request, timeout, metadata, credentials
request,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
)

def future(self, request, timeout=None, metadata=None, credentials=None):
def future(
self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request, metadata):
return self._base_callable.future(
request, timeout, metadata, credentials
request,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _UnaryClientInfo(self._method, timeout)
Expand All @@ -88,9 +125,24 @@ def __init__(self, method, base_callable, interceptor):
self._base_callable = base_callable
self._interceptor = interceptor

def __call__(self, request, timeout=None, metadata=None, credentials=None):
def __call__(
self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request, metadata):
return self._base_callable(request, timeout, metadata, credentials)
return self._base_callable(
request,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _StreamClientInfo(self._method, False, True, timeout)
return self._interceptor.intercept_stream(
Expand All @@ -105,11 +157,22 @@ def __init__(self, method, base_callable, interceptor):
self._interceptor = interceptor

def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request_iterator, metadata):
return self._base_callable(
request_iterator, timeout, metadata, credentials
request_iterator,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _StreamClientInfo(self._method, True, False, timeout)
Expand All @@ -118,11 +181,22 @@ def invoker(request_iterator, metadata):
)

def with_call(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request_iterator, metadata):
return self._base_callable.with_call(
request_iterator, timeout, metadata, credentials
request_iterator,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _StreamClientInfo(self._method, True, False, timeout)
Expand All @@ -131,11 +205,22 @@ def invoker(request_iterator, metadata):
)

def future(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request_iterator, metadata):
return self._base_callable.future(
request_iterator, timeout, metadata, credentials
request_iterator,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _StreamClientInfo(self._method, True, False, timeout)
Expand All @@ -151,11 +236,22 @@ def __init__(self, method, base_callable, interceptor):
self._interceptor = interceptor

def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None
self,
request_iterator,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
def invoker(request_iterator, metadata):
return self._base_callable(
request_iterator, timeout, metadata, credentials
request_iterator,
timeout,
metadata,
credentials,
wait_for_ready,
compression,
)

client_info = _StreamClientInfo(self._method, True, True, timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,57 @@
from .protobuf.test_server_pb2 import Request


# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor.
class Interceptor(
mihirg marked this conversation as resolved.
Show resolved Hide resolved
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
def __init__(self):
pass

def intercept_unary_unary(
self, continuation, client_call_details, request
):
return self._intercept_call(continuation, client_call_details, request)

def intercept_unary_stream(
self, continuation, client_call_details, request
):
return self._intercept_call(continuation, client_call_details, request)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(
continuation, client_call_details, request_iterator
)

def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(
continuation, client_call_details, request_iterator
)

@staticmethod
def _intercept_call(
continuation, client_call_details, request_or_iterator
):
return continuation(client_call_details, request_or_iterator)


class TestClientProto(TestBase):
def setUp(self):
super().setUp()
GrpcInstrumentorClient().instrument()
self.server = create_test_server(25565)
self.server.start()
# use a user defined interceptor along with the opentelemetry client interceptor
interceptors = [Interceptor()]
self.channel = grpc.insecure_channel("localhost:25565")
self.channel = grpc.intercept_channel(self.channel, *interceptors)
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)

def tearDown(self):
Expand Down