Skip to content

Commit

Permalink
fix: disable async background flush of traces (#29)
Browse files Browse the repository at this point in the history
When running in complex test suite that mocks many things, trying to
flush in the background traces does not work. For example network
requests might be mocked, or time might be mocked, making SSL connection
impossible.

This stores the span in memory until a flush is requested at the end of
pytest.

This also means there is no way to capture the opentelemetry log in the
background are we can now intercept any error raised by the code while
flushing the traces.
  • Loading branch information
jd authored Dec 20, 2024
1 parent 7140d19 commit fd8cdb0
Showing 3 changed files with 35 additions and 34 deletions.
22 changes: 14 additions & 8 deletions pytest_mergify/__init__.py
Original file line number Diff line number Diff line change
@@ -37,14 +37,20 @@ def pytest_terminal_summary(
# sure that we capture the possible error logs, otherwise they are
# emitted on exit (atexit()).
if self.mergify_tracer.tracer_provider is not None:
self.mergify_tracer.tracer_provider.shutdown() # type: ignore[no-untyped-call]

if self.mergify_tracer.log_handler.log_list:
terminalreporter.write_line(
"There are been some errors reported by the tracer:", red=True
)
for line in self.mergify_tracer.log_handler.log_list:
terminalreporter.write_line(line)
try:
self.mergify_tracer.tracer_provider.force_flush()
except Exception as e:
terminalreporter.write_line(
f"Error while exporting traces: {e}",
red=True,
)
try:
self.mergify_tracer.tracer_provider.shutdown() # type: ignore[no-untyped-call]
except Exception as e:
terminalreporter.write_line(
f"Error while shutting down the tracer: {e}",
red=True,
)

if self.mergify_tracer.token is None:
terminalreporter.write_line(
40 changes: 15 additions & 25 deletions pytest_mergify/tracer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import dataclasses
import logging
import os

import opentelemetry.sdk.resources
from opentelemetry.sdk.trace import export
from opentelemetry import context
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, Span
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, Span, ReadableSpan
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
@@ -33,15 +32,21 @@ def on_start(
self.trace_id = span.context.trace_id


class ListLogHandler(logging.Handler):
"""Custom logging handler to capture log messages into a list."""
class SynchronousBatchSpanProcessor(export.SimpleSpanProcessor):
def __init__(self, exporter: export.SpanExporter) -> None:
super().__init__(exporter)
self.queue: list[ReadableSpan] = []

def __init__(self) -> None:
super().__init__()
self.log_list: list[str] = []
def force_flush(self, timeout_millis: int = 30_000) -> bool:
self.span_exporter.export(self.queue)
self.queue.clear()
return True

def on_end(self, span: ReadableSpan) -> None:
if not span.context.trace_flags.sampled:
return

def emit(self, record: logging.LogRecord) -> None:
self.log_list.append(self.format(record))
self.queue.append(span)


@dataclasses.dataclass
@@ -63,25 +68,10 @@ class MergifyTracer:
tracer_provider: opentelemetry.sdk.trace.TracerProvider | None = dataclasses.field(
init=False, default=None
)
log_handler: ListLogHandler = dataclasses.field(
init=False, default_factory=ListLogHandler
)

def __post_init__(self) -> None:
span_processor: SpanProcessor

# Set up the logger
self.log_handler.setLevel(logging.ERROR) # Capture ERROR logs by default
self.log_handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
)

logger = logging.getLogger("opentelemetry")
# FIXME: we should remove the handler when this tracer is not used
# (e.g., reconfigure() is called) Since reconfigure() is unlikely to be
# called outside our testing things, it's not a big deal to leak it.
logger.addHandler(self.log_handler)

if os.environ.get("PYTEST_MERGIFY_DEBUG"):
self.exporter = export.ConsoleSpanExporter()
span_processor = export.SimpleSpanProcessor(self.exporter)
@@ -101,7 +91,7 @@ def __post_init__(self) -> None:
headers={"Authorization": f"Bearer {self.token}"},
compression=Compression.Gzip,
)
span_processor = export.BatchSpanProcessor(self.exporter)
span_processor = SynchronousBatchSpanProcessor(self.exporter)
else:
return

7 changes: 6 additions & 1 deletion tests/test_plugin.py
Original file line number Diff line number Diff line change
@@ -116,4 +116,9 @@ def test_span(pytestconfig):
)
result = pytester.runpytest_subprocess()
result.assert_outcomes(passed=1)
assert "There are been some errors reported by the tracer:" in result.stdout.lines
assert any(
line.startswith(
"Error while exporting traces: HTTPConnectionPool(host='localhost', port=9999): Max retries exceeded with url"
)
for line in result.stdout.lines
)

0 comments on commit fd8cdb0

Please sign in to comment.