Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncate request and response payloads to avoid sending too much data #6

Merged
merged 6 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions nameko_grpc_opentelemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from nameko_opentelemetry import active_tracer
from nameko_opentelemetry.entrypoints import EntrypointAdapter
from nameko_opentelemetry.scrubbers import scrub
from nameko_opentelemetry.utils import serialise_to_string
from nameko_opentelemetry.utils import serialise_to_string, truncate
from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
Expand Down Expand Up @@ -69,7 +69,15 @@ def get_call_args_attributes(self, worker_ctx, call_args, redacted):
scrub(MessageToDict(request), self.config)
)

return {"rpc.grpc.request": request_string}
request_truncated, truncated = truncate(
request_string,
max_len=self.config.get("truncate_max_length"),
)

return {
"rpc.grpc.request": request_truncated,
"rpc.grpc.request_truncated": str(truncated),
}

def get_result_attributes(self, worker_ctx, result):
attributes = {}
Expand Down Expand Up @@ -100,7 +108,17 @@ def get_result_attributes(self, worker_ctx, result):
else:
response_string = ""

attributes.update({"rpc.grpc.response": response_string})
response_truncated, truncated = truncate(
response_string,
max_len=self.config.get("truncate_max_length"),
)

attributes.update(
{
"rpc.grpc.response": response_truncated,
"rpc.grpc.response_truncated": str(truncated),
}
)

return attributes

Expand Down
99 changes: 99 additions & 0 deletions tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ def send_request_payloads(self, request):
def config(self, config, send_request_payloads):
# disable request payloads based on param
config["send_request_payloads"] = send_request_payloads
# override default truncation length
config["truncate_max_length"] = 50
return config

@pytest.fixture
Expand Down Expand Up @@ -472,8 +474,28 @@ def test_unary_request(

if send_request_payloads:
assert attributes["rpc.grpc.request"] == "{'value': 'A'}"
assert attributes["rpc.grpc.request_truncated"] == "False"
else:
assert "rpc.grpc.request" not in attributes
assert "rpc.grpc.request_truncated" not in attributes

def test_unary_request_truncated(
self, protos, client, container, memory_exporter, send_request_payloads
):
with entrypoint_waiter(container, "unary_unary"):
response = client.unary_unary(protos.ExampleRequest(value="A" * 100))
assert len(response.message) == 100

spans = memory_exporter.get_finished_spans()
assert len(spans) == 2

server_span = list(filter(lambda span: span.kind == SpanKind.SERVER, spans))[0]

attributes = server_span.attributes

if send_request_payloads:
assert len(attributes["rpc.grpc.request"]) == 50
assert attributes["rpc.grpc.request_truncated"] == "True"

def test_streaming_request(
self, protos, client, container, memory_exporter, send_request_payloads
Expand All @@ -495,8 +517,32 @@ def generate_requests():

if send_request_payloads:
assert attributes["rpc.grpc.request"] == "{'value': 'A'} | {'value': 'B'}"
assert attributes["rpc.grpc.request_truncated"] == "False"
else:
assert "rpc.grpc.request" not in attributes
assert "rpc.grpc.request_truncated" not in attributes

def test_streaming_request_truncated(
self, protos, client, container, memory_exporter, send_request_payloads
):
def generate_requests():
for index, value in enumerate(["A"] * 100):
yield protos.ExampleRequest(value=value + str(index))

with entrypoint_waiter(container, "stream_unary"):
response = client.stream_unary(generate_requests())
assert len(response.message.split(",")) == 100

spans = memory_exporter.get_finished_spans()
assert len(spans) == 2

server_span = list(filter(lambda span: span.kind == SpanKind.SERVER, spans))[0]

attributes = server_span.attributes

if send_request_payloads:
assert len(attributes["rpc.grpc.request"]) == 50
assert attributes["rpc.grpc.request_truncated"] == "True"

def test_different_argument_name(
self, protos, client, container, memory_exporter, send_request_payloads
Expand All @@ -522,6 +568,7 @@ def test_different_argument_name(
)
else:
assert "rpc.grpc.request" not in attributes
assert "rpc.grpc.request_truncated" not in attributes


class TestResultAttributes:
Expand All @@ -535,6 +582,8 @@ def send_response_payloads(self, request):
def config(self, config, send_response_payloads):
# disable request payloads based on param
config["send_response_payloads"] = send_response_payloads
# override default truncation length
config["truncate_max_length"] = 200
return config

@pytest.fixture
Expand Down Expand Up @@ -598,8 +647,31 @@ def test_unary_response(
attributes = server_span.attributes
if send_response_payloads:
assert attributes["rpc.grpc.response"] == "{'message': 'A'}"
assert attributes["rpc.grpc.response_truncated"] == "False"
else:
assert "rpc.grpc.response" not in attributes
assert "rpc.grpc.response_truncated" not in attributes

def test_unary_response_truncated(
self, container, client, protos, memory_exporter, send_response_payloads
):
multiplier = 1000
with entrypoint_waiter(container, "unary_unary"):
response = client.unary_unary(
protos.ExampleRequest(value="A", multiplier=multiplier)
)
assert response.message == "A" * multiplier

spans = memory_exporter.get_finished_spans()
assert len(spans) == 2

server_span = list(filter(lambda span: span.kind == SpanKind.SERVER, spans))[0]

attributes = server_span.attributes

if send_response_payloads:
assert len(attributes["rpc.grpc.response"]) == 200
assert attributes["rpc.grpc.response_truncated"] == "True"

def test_stream_response(
self, container, client, protos, memory_exporter, send_response_payloads
Expand All @@ -625,8 +697,33 @@ def test_stream_response(
attributes["rpc.grpc.response"]
== "{'message': 'A', 'seqno': 1} | {'message': 'A', 'seqno': 2}"
)
assert attributes["rpc.grpc.response_truncated"] == "False"
else:
assert "rpc.grpc.response" not in attributes
assert "rpc.grpc.response_truncated" not in attributes

def test_stream_response_truncated(
self, container, client, protos, memory_exporter, send_response_payloads
):
with entrypoint_waiter(container, "unary_stream"):
responses = client.unary_stream(
protos.ExampleRequest(value="A", response_count=100)
)
assert (
len([(response.message, response.seqno) for response in responses])
== 100
)

spans = memory_exporter.get_finished_spans()
assert len(spans) == 2

server_span = list(filter(lambda span: span.kind == SpanKind.SERVER, spans))[0]

attributes = server_span.attributes

if send_response_payloads:
assert len(attributes["rpc.grpc.response"]) == 200
assert attributes["rpc.grpc.response_truncated"] == "True"

def test_error_in_stream(
self, container, client, protos, memory_exporter, send_response_payloads
Expand All @@ -650,8 +747,10 @@ def test_error_in_stream(
attributes["rpc.grpc.response"]
== "{'message': 'A', 'seqno': 1} | Error: boom"
)
assert attributes["rpc.grpc.response_truncated"] == "False"
else:
assert "rpc.grpc.response" not in attributes
assert "rpc.grpc.response_truncated" not in attributes


class TestNoTracer:
Expand Down