Skip to content

Commit

Permalink
Patch sentinel bug (#997)
Browse files Browse the repository at this point in the history
Co-authored-by: Timothy Pansino <TimPansino@users.noreply.github.com>
Co-authored-by: Hannah Stepanek <hmstepanek@users.noreply.github.com>
Co-authored-by: Uma Annamalai <umaannamalai@users.noreply.github.com>
  • Loading branch information
4 people committed Dec 13, 2023
1 parent 030cfc9 commit cc4c4e1
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 8 deletions.
8 changes: 5 additions & 3 deletions newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
destination_type="Topic",
destination_name=topic or "Default",
source=wrapped,
) as trace:
dt_headers = {k: v.encode("utf-8") for k, v in trace.generate_request_headers(transaction)}
):
dt_headers = {k: v.encode("utf-8") for k, v in MessageTrace.generate_request_headers(transaction)}
# headers can be a list of tuples or a dict so convert to dict for consistency.
dt_headers.update(dict(headers) if headers else {})
if headers:
dt_headers.update(dict(headers))

try:
return wrapped(topic, headers=dt_headers, *args, **kwargs)
except Exception as error:
Expand Down
13 changes: 9 additions & 4 deletions newrelic/hooks/messagebroker_kafkapython.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
destination_name=topic or "Default",
source=wrapped,
terminal=False,
) as trace:
dt_headers = [(k, v.encode("utf-8")) for k, v in trace.generate_request_headers(transaction)]
headers.extend(dt_headers)
):
dt_headers = [(k, v.encode("utf-8")) for k, v in MessageTrace.generate_request_headers(transaction)]
# headers can be a list of tuples or a dict so convert to dict for consistency.
if headers:
dt_headers.extend(headers)

try:
return wrapped(topic, value=value, key=key, headers=headers, partition=partition, timestamp_ms=timestamp_ms)
return wrapped(
topic, value=value, key=key, headers=dt_headers, partition=partition, timestamp_ms=timestamp_ms
)
except Exception:
notice_error()
raise
Expand Down
22 changes: 21 additions & 1 deletion tests/messagebroker_confluentkafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import threading
import time

import pytest
from conftest import cache_kafka_producer_headers
Expand All @@ -28,6 +28,7 @@
)

from newrelic.api.background_task import background_task
from newrelic.api.function_trace import FunctionTrace
from newrelic.common.object_names import callable_name
from newrelic.packages import six

Expand Down Expand Up @@ -137,6 +138,25 @@ def test():
test()


def test_distributed_tracing_headers_under_terminal(topic, send_producer_message):
@validate_transaction_metrics(
"test_distributed_tracing_headers_under_terminal",
rollup_metrics=[
("Supportability/TraceContext/Create/Success", 1),
("Supportability/DistributedTrace/CreatePayload/Success", 1),
],
background_task=True,
)
@background_task(name="test_distributed_tracing_headers_under_terminal")
@cache_kafka_producer_headers()
@validate_messagebroker_headers
def test():
with FunctionTrace(name="terminal_trace", terminal=True):
send_producer_message()

test()


def test_producer_errors(topic, producer, monkeypatch):
if hasattr(producer, "_value_serializer"):
# Remove serializer to intentionally cause a type error in underlying producer implementation
Expand Down
20 changes: 20 additions & 0 deletions tests/messagebroker_kafkapython/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)

from newrelic.api.background_task import background_task
from newrelic.api.function_trace import FunctionTrace
from newrelic.common.object_names import callable_name
from newrelic.packages import six

Expand Down Expand Up @@ -70,6 +71,25 @@ def test():
test()


def test_distributed_tracing_headers_under_terminal(topic, send_producer_message):
@validate_transaction_metrics(
"test_distributed_tracing_headers_under_terminal",
rollup_metrics=[
("Supportability/TraceContext/Create/Success", 1),
("Supportability/DistributedTrace/CreatePayload/Success", 1),
],
background_task=True,
)
@background_task(name="test_distributed_tracing_headers_under_terminal")
@cache_kafka_producer_headers
@validate_messagebroker_headers
def test():
with FunctionTrace(name="terminal_trace", terminal=True):
send_producer_message()

test()


def test_producer_errors(topic, producer, monkeypatch):
monkeypatch.setitem(producer.config, "value_serializer", None)
monkeypatch.setitem(producer.config, "key_serializer", None)
Expand Down

0 comments on commit cc4c4e1

Please sign in to comment.